Use Jackson SequenceWriter for streaming

Before this commit, the AbstractJackson2Encoder instantiated a
ObjectWriter per value. This is not an issue for single values or
non-streaming scenarios (which effectively are the same, because in the
latter values are collected into a list until offered to Jackson).
However, this does create a problem for SMILE, because it allows for
shared references that do not match up when writing each value with a
new ObjectWriter, resulting in errors parsing the result.

This commit uses Jackson's SequenceWriter for streaming scenarios,
allowing Jackson to reuse the same context for writing multiple values,
fixing the issue described above.

Closes gh-24198
This commit is contained in:
Arjen Poutsma
2020-01-17 13:43:29 +01:00
parent 5e9d29d813
commit 54669c51c9
2 changed files with 146 additions and 103 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,17 +20,18 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.testfixture.codec.AbstractEncoderTests;
import org.springframework.core.testfixture.io.buffer.DataBufferTestUtils;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.util.MimeType;
@@ -59,21 +60,6 @@ public class Jackson2SmileEncoderTests extends AbstractEncoderTests<Jackson2Smil
}
public Consumer<DataBuffer> pojoConsumer(Pojo expected) {
return dataBuffer -> {
try {
Pojo actual = this.mapper.reader().forType(Pojo.class)
.readValue(DataBufferTestUtils.dumpBytes(dataBuffer));
assertThat(actual).isEqualTo(expected);
release(dataBuffer);
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
};
}
@Override
@Test
public void canEncode() {
@@ -106,7 +92,19 @@ public class Jackson2SmileEncoderTests extends AbstractEncoderTests<Jackson2Smil
Flux<Pojo> input = Flux.fromIterable(list);
testEncode(input, Pojo.class, step -> step
.consumeNextWith(expect(list, List.class)));
.consumeNextWith(dataBuffer -> {
try {
Object actual = this.mapper.reader().forType(List.class)
.readValue(dataBuffer.asInputStream());
assertThat(actual).isEqualTo(list);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
finally {
release(dataBuffer);
}
}));
}
@Test
@@ -127,32 +125,22 @@ public class Jackson2SmileEncoderTests extends AbstractEncoderTests<Jackson2Smil
Flux<Pojo> input = Flux.just(pojo1, pojo2, pojo3);
ResolvableType type = ResolvableType.forClass(Pojo.class);
testEncodeAll(input, type, step -> step
.consumeNextWith(expect(pojo1, Pojo.class))
.consumeNextWith(expect(pojo2, Pojo.class))
.consumeNextWith(expect(pojo3, Pojo.class))
.verifyComplete(),
STREAM_SMILE_MIME_TYPE, null);
Flux<DataBuffer> result = this.encoder
.encode(input, bufferFactory, type, STREAM_SMILE_MIME_TYPE, null);
Mono<MappingIterator<Pojo>> joined = DataBufferUtils.join(result)
.map(buffer -> {
try {
return this.mapper.reader().forType(Pojo.class).readValues(buffer.asInputStream(true));
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
});
StepVerifier.create(joined)
.assertNext(iter -> assertThat(iter).toIterable().contains(pojo1, pojo2, pojo3))
.verifyComplete();
}
private <T> Consumer<DataBuffer> expect(T expected, Class<T> expectedType) {
return dataBuffer -> {
try {
Object actual = this.mapper.reader().forType(expectedType)
.readValue(dataBuffer.asInputStream());
assertThat(actual).isEqualTo(expected);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
finally {
release(dataBuffer);
}
};
}
}