From 54669c51c927cda27a80fb4a3e56a3207942bb66 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Fri, 17 Jan 2020 13:43:29 +0100 Subject: [PATCH] Use Jackson SequenceWriter for streaming Before this commit, the AbstractJackson2Encoder instantiated a ObjectWriter per value. This is not an issue for single values or non-streaming scenarios (which effectively are the same, because in the latter values are collected into a list until offered to Jackson). However, this does create a problem for SMILE, because it allows for shared references that do not match up when writing each value with a new ObjectWriter, resulting in errors parsing the result. This commit uses Jackson's SequenceWriter for streaming scenarios, allowing Jackson to reuse the same context for writing multiple values, fixing the issue described above. Closes gh-24198 --- .../codec/json/AbstractJackson2Encoder.java | 171 ++++++++++++------ .../codec/json/Jackson2SmileEncoderTests.java | 78 ++++---- 2 files changed, 146 insertions(+), 103 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java b/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java index f0d2ad1b59..b195c9acd6 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,6 @@ package org.springframework.http.codec.json; import java.io.IOException; -import java.io.OutputStream; import java.lang.annotation.Annotation; import java.nio.charset.Charset; import java.util.ArrayList; @@ -29,9 +28,11 @@ import java.util.Map; import com.fasterxml.jackson.core.JsonEncoding; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.util.ByteArrayBuilder; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.SequenceWriter; import com.fasterxml.jackson.databind.exc.InvalidDefinitionException; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -44,7 +45,6 @@ import org.springframework.core.codec.EncodingException; import org.springframework.core.codec.Hints; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.log.LogFormatUtils; import org.springframework.http.MediaType; import org.springframework.http.codec.HttpMessageEncoder; @@ -115,32 +115,37 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple Assert.notNull(bufferFactory, "'bufferFactory' must not be null"); Assert.notNull(elementType, "'elementType' must not be null"); - JsonEncoding encoding = getJsonEncoding(mimeType); - if (inputStream instanceof Mono) { - return Mono.from(inputStream).map(value -> - encodeValue(value, bufferFactory, elementType, mimeType, hints, encoding)).flux(); + return Mono.from(inputStream) + .map(value -> encodeValue(value, bufferFactory, elementType, mimeType, hints)) + .flux(); } else { - return this.streamingMediaTypes.stream() - .filter(mediaType -> mediaType.isCompatibleWith(mimeType)) - .findFirst() - .map(mediaType -> { - byte[] separator = STREAM_SEPARATORS.getOrDefault(mediaType, NEWLINE_SEPARATOR); - return Flux.from(inputStream).map(value -> { - DataBuffer buffer = encodeValue( - value, bufferFactory, elementType, mimeType, hints, encoding); - if (separator != null) { - buffer.write(separator); - } - return buffer; - }); - }) - .orElseGet(() -> { - ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType); - return Flux.from(inputStream).collectList().map(list -> - encodeValue(list, bufferFactory, listType, mimeType, hints, encoding)).flux(); - }); + byte[] separator = streamSeparator(mimeType); + if (separator != null) { // streaming + try { + ObjectWriter writer = createObjectWriter(elementType, mimeType, hints); + ByteArrayBuilder byteBuilder = new ByteArrayBuilder(writer.getFactory()._getBufferRecycler()); + JsonEncoding encoding = getJsonEncoding(mimeType); + JsonGenerator generator = getObjectMapper().getFactory().createGenerator(byteBuilder, encoding); + SequenceWriter sequenceWriter = writer.writeValues(generator); + + return Flux.from(inputStream) + .map(value -> encodeStreamingValue(value, bufferFactory, hints, sequenceWriter, byteBuilder, + separator)); + } + catch (IOException ex) { + return Flux.error(ex); + } + } + else { // non-streaming + ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType); + return Flux.from(inputStream) + .collectList() + .map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints)) + .flux(); + } + } } @@ -148,19 +153,87 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple public DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory, ResolvableType valueType, @Nullable MimeType mimeType, @Nullable Map hints) { - return encodeValue(value, bufferFactory, valueType, mimeType, hints, getJsonEncoding(mimeType)); + ObjectWriter writer = createObjectWriter(valueType, mimeType, hints); + ByteArrayBuilder byteBuilder = new ByteArrayBuilder(writer.getFactory()._getBufferRecycler()); + JsonEncoding encoding = getJsonEncoding(mimeType); + + logValue(hints, value); + + try { + JsonGenerator generator = getObjectMapper().getFactory().createGenerator(byteBuilder, encoding); + writer.writeValue(generator, value); + generator.flush(); + } + catch (InvalidDefinitionException ex) { + throw new CodecException("Type definition error: " + ex.getType(), ex); + } + catch (JsonProcessingException ex) { + throw new EncodingException("JSON encoding error: " + ex.getOriginalMessage(), ex); + } + catch (IOException ex) { + throw new IllegalStateException("Unexpected I/O error while writing to byte array builder", + ex); + } + + byte[] bytes = byteBuilder.toByteArray(); + DataBuffer buffer = bufferFactory.allocateBuffer(bytes.length); + buffer.write(bytes); + + return buffer; } - private DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory, ResolvableType valueType, - @Nullable MimeType mimeType, @Nullable Map hints, JsonEncoding encoding) { + private DataBuffer encodeStreamingValue(Object value, DataBufferFactory bufferFactory, @Nullable Map hints, + SequenceWriter sequenceWriter, ByteArrayBuilder byteArrayBuilder, byte[] separator) { + logValue(hints, value); + + try { + sequenceWriter.write(value); + sequenceWriter.flush(); + } + catch (InvalidDefinitionException ex) { + throw new CodecException("Type definition error: " + ex.getType(), ex); + } + catch (JsonProcessingException ex) { + throw new EncodingException("JSON encoding error: " + ex.getOriginalMessage(), ex); + } + catch (IOException ex) { + throw new IllegalStateException("Unexpected I/O error while writing to byte array builder", + ex); + } + + byte[] bytes = byteArrayBuilder.toByteArray(); + byteArrayBuilder.reset(); + + int offset; + int length; + if (bytes.length > 0 && bytes[0] == ' ') { + // SequenceWriter writes an unnecessary space in between values + offset = 1; + length = bytes.length - 1; + } + else { + offset = 0; + length = bytes.length; + } + DataBuffer buffer = bufferFactory.allocateBuffer(length + separator.length); + buffer.write(bytes, offset, length); + buffer.write(separator); + + return buffer; + } + + private void logValue(@Nullable Map hints, Object value) { if (!Hints.isLoggingSuppressed(hints)) { LogFormatUtils.traceDebug(logger, traceOn -> { String formatted = LogFormatUtils.formatValue(value, !traceOn); return Hints.getLogPrefix(hints) + "Encoding [" + formatted + "]"; }); } + } + private ObjectWriter createObjectWriter(ResolvableType valueType, @Nullable MimeType mimeType, + @Nullable Map hints) { JavaType javaType = getJavaType(valueType.getType(), null); Class jsonView = (hints != null ? (Class) hints.get(Jackson2CodecSupport.JSON_VIEW_HINT) : null); ObjectWriter writer = (jsonView != null ? @@ -170,35 +243,7 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple writer = writer.forType(javaType); } - writer = customizeWriter(writer, mimeType, valueType, hints); - - DataBuffer buffer = bufferFactory.allocateBuffer(); - boolean release = true; - OutputStream outputStream = buffer.asOutputStream(); - - try { - JsonGenerator generator = getObjectMapper().getFactory().createGenerator(outputStream, encoding); - writer.writeValue(generator, value); - generator.flush(); - release = false; - } - catch (InvalidDefinitionException ex) { - throw new CodecException("Type definition error: " + ex.getType(), ex); - } - catch (JsonProcessingException ex) { - throw new EncodingException("JSON encoding error: " + ex.getOriginalMessage(), ex); - } - catch (IOException ex) { - throw new IllegalStateException("Unexpected I/O error while writing to data buffer", - ex); - } - finally { - if (release) { - DataBufferUtils.release(buffer); - } - } - - return buffer; + return customizeWriter(writer, mimeType, valueType, hints); } protected ObjectWriter customizeWriter(ObjectWriter writer, @Nullable MimeType mimeType, @@ -207,6 +252,16 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple return writer; } + @Nullable + private byte[] streamSeparator(@Nullable MimeType mimeType) { + for (MediaType streamingMediaType : this.streamingMediaTypes) { + if (streamingMediaType.isCompatibleWith(mimeType)) { + return STREAM_SEPARATORS.getOrDefault(streamingMediaType, NEWLINE_SEPARATOR); + } + } + return null; + } + /** * Determine the JSON encoding to use for the given mime type. * @param mimeType the mime type as requested by the caller diff --git a/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2SmileEncoderTests.java b/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2SmileEncoderTests.java index ef2a9d8488..32b15ba0b5 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2SmileEncoderTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/json/Jackson2SmileEncoderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,17 +20,18 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; import java.util.List; -import java.util.function.Consumer; +import com.fasterxml.jackson.databind.MappingIterator; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.testfixture.codec.AbstractEncoderTests; -import org.springframework.core.testfixture.io.buffer.DataBufferTestUtils; import org.springframework.http.codec.ServerSentEvent; import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; import org.springframework.util.MimeType; @@ -59,21 +60,6 @@ public class Jackson2SmileEncoderTests extends AbstractEncoderTests pojoConsumer(Pojo expected) { - return dataBuffer -> { - try { - Pojo actual = this.mapper.reader().forType(Pojo.class) - .readValue(DataBufferTestUtils.dumpBytes(dataBuffer)); - assertThat(actual).isEqualTo(expected); - release(dataBuffer); - } - catch (IOException ex) { - throw new UncheckedIOException(ex); - } - }; - } - - @Override @Test public void canEncode() { @@ -106,7 +92,19 @@ public class Jackson2SmileEncoderTests extends AbstractEncoderTests input = Flux.fromIterable(list); testEncode(input, Pojo.class, step -> step - .consumeNextWith(expect(list, List.class))); + .consumeNextWith(dataBuffer -> { + try { + Object actual = this.mapper.reader().forType(List.class) + .readValue(dataBuffer.asInputStream()); + assertThat(actual).isEqualTo(list); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + finally { + release(dataBuffer); + } + })); } @Test @@ -127,32 +125,22 @@ public class Jackson2SmileEncoderTests extends AbstractEncoderTests input = Flux.just(pojo1, pojo2, pojo3); ResolvableType type = ResolvableType.forClass(Pojo.class); - testEncodeAll(input, type, step -> step - .consumeNextWith(expect(pojo1, Pojo.class)) - .consumeNextWith(expect(pojo2, Pojo.class)) - .consumeNextWith(expect(pojo3, Pojo.class)) - .verifyComplete(), - STREAM_SMILE_MIME_TYPE, null); + Flux result = this.encoder + .encode(input, bufferFactory, type, STREAM_SMILE_MIME_TYPE, null); + + Mono> joined = DataBufferUtils.join(result) + .map(buffer -> { + try { + return this.mapper.reader().forType(Pojo.class).readValues(buffer.asInputStream(true)); + } + catch (IOException ex) { + throw new UncheckedIOException(ex); + } + }); + + StepVerifier.create(joined) + .assertNext(iter -> assertThat(iter).toIterable().contains(pojo1, pojo2, pojo3)) + .verifyComplete(); } - - private Consumer expect(T expected, Class expectedType) { - return dataBuffer -> { - try { - Object actual = this.mapper.reader().forType(expectedType) - .readValue(dataBuffer.asInputStream()); - assertThat(actual).isEqualTo(expected); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - finally { - release(dataBuffer); - } - }; - - } - - - }