diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java index 3f02d67bb..418eac37e 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java @@ -165,7 +165,7 @@ public abstract class AbstractBinder message) { + protected final MessageValues serializePayloadIfNecessary(Message message) { Object originalPayload = message.getPayload(); Object originalContentType = message.getHeaders().get(MessageHeaders.CONTENT_TYPE); @@ -183,7 +183,7 @@ public abstract class AbstractBinder) requestMessage, true); + messageValues = EmbeddedHeaderUtils.extractHeaders((Message) requestMessage, + true); } catch (Exception e) { AbstractMessageChannelBinder.this.logger.error( - EmbeddedHeadersMessageConverter.decodeExceptionMessage( + EmbeddedHeaderUtils.decodeExceptionMessage( requestMessage), e); messageValues = new MessageValues(requestMessage); } @@ -342,8 +339,7 @@ public abstract class AbstractMessageChannelBinder - * Previously, there was no leading 0xff; the value length was 1 byte and only - * String header values were supported (no JSON conversion). + * Previously, there was no leading 0xff; the value length was 1 byte and only String + * header values were supported (no JSON conversion). * * @author Eric Bottard * @author Gary Russell * @author Ilayaperumal Gopinathan + * @author Marius Bogoevici + * + * @since 1.2 */ -public class EmbeddedHeadersMessageConverter { +public abstract class EmbeddedHeaderUtils { - private final Jackson2JsonObjectMapper objectMapper = new Jackson2JsonObjectMapper(); + private static final Jackson2JsonObjectMapper objectMapper = new Jackson2JsonObjectMapper(); public static String decodeExceptionMessage(Message requestMessage) { return "Could not convert message: " + DatatypeConverter.printHexBinary((byte[]) requestMessage.getPayload()); } - /** - * Return a new message where some of the original headers of {@code original} - * have been embedded into the new message payload. + * Return a new message where some of the original headers of {@code original} have + * been embedded into the new message payload. */ - public byte[] embedHeaders(MessageValues original, String... headers) throws Exception { + public static byte[] embedHeaders(MessageValues original, String... headers) throws Exception { byte[][] headerValues = new byte[headers.length][]; int n = 0; int headerCount = 0; @@ -63,7 +67,7 @@ public class EmbeddedHeadersMessageConverter { for (String header : headers) { Object value = original.get(header) == null ? null : original.get(header); if (value != null) { - String json = this.objectMapper.toJson(value); + String json = objectMapper.toJson(value); headerValues[n] = json.getBytes("UTF-8"); headerCount++; headersLength += header.length() + headerValues[n++].length; @@ -95,15 +99,16 @@ public class EmbeddedHeadersMessageConverter { * have been promoted back to actual headers. The new payload is now the original * payload. * - * @param message the message to extract headers + * @param message the message to extract headers * @param copyRequestHeaders boolean value to specify if the request headers should be - * copied + * copied */ - public MessageValues extractHeaders(Message message, boolean copyRequestHeaders) throws Exception { + public static MessageValues extractHeaders(Message message, boolean copyRequestHeaders) throws Exception { return extractHeaders(message.getPayload(), copyRequestHeaders, message.getHeaders()); } - private MessageValues extractHeaders(byte[] payload, boolean copyRequestHeaders, MessageHeaders requestHeaders) throws Exception { + private static MessageValues extractHeaders(byte[] payload, boolean copyRequestHeaders, + MessageHeaders requestHeaders) throws Exception { ByteBuffer byteBuffer = ByteBuffer.wrap(payload); int headerCount = byteBuffer.get() & 0xff; if (headerCount < 255) { @@ -118,7 +123,7 @@ public class EmbeddedHeadersMessageConverter { byteBuffer.position(byteBuffer.position() + len); len = byteBuffer.getInt(); String headerValue = new String(payload, byteBuffer.position(), len, "UTF-8"); - Object headerContent = this.objectMapper.fromJson(headerValue, Object.class); + Object headerContent = objectMapper.fromJson(headerValue, Object.class); headers.put(headerName, headerContent); byteBuffer.position(byteBuffer.position() + len); } @@ -133,17 +138,16 @@ public class EmbeddedHeadersMessageConverter { * have been promoted back to actual headers. The new payload is now the original * payload. * - * @param payload the message payload + * @param payload the message payload * @return the message with extracted headers * @throws Exception */ - public MessageValues extractHeaders(byte[] payload) throws Exception { + public static MessageValues extractHeaders(byte[] payload) throws Exception { return extractHeaders(payload, false, null); } - private MessageValues oldExtractHeaders(ByteBuffer byteBuffer, byte[] bytes, int headerCount, - boolean copyRequestHeaders, MessageHeaders requestHeaders) - throws UnsupportedEncodingException { + private static MessageValues oldExtractHeaders(ByteBuffer byteBuffer, byte[] bytes, int headerCount, + boolean copyRequestHeaders, MessageHeaders requestHeaders) throws UnsupportedEncodingException { Map headers = new HashMap(); for (int i = 0; i < headerCount; i++) { int len = byteBuffer.get(); @@ -165,7 +169,7 @@ public class EmbeddedHeadersMessageConverter { return buildMessageValues(newPayload, headers, copyRequestHeaders, requestHeaders); } - private MessageValues buildMessageValues(byte[] payload, Map headers, + private static MessageValues buildMessageValues(byte[] payload, Map headers, boolean copyRequestHeaders, MessageHeaders requestHeaders) { MessageValues messageValues = new MessageValues(payload, headers); if (copyRequestHeaders && requestHeaders != null) { @@ -174,4 +178,19 @@ public class EmbeddedHeadersMessageConverter { return messageValues; } + public static String[] headersToEmbed(String[] configuredHeaders) { + String[] headersToMap; + if (ObjectUtils.isEmpty(configuredHeaders)) { + headersToMap = BinderHeaders.STANDARD_HEADERS; + } + else { + String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, + BinderHeaders.STANDARD_HEADERS.length + configuredHeaders.length); + System.arraycopy(configuredHeaders, 0, combinedHeadersToMap, BinderHeaders.STANDARD_HEADERS.length, + configuredHeaders.length); + headersToMap = combinedHeadersToMap; + } + return headersToMap; + } + } diff --git a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/MessageConverterTests.java b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/MessageConverterTests.java index 66306b175..ef577edae 100644 --- a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/MessageConverterTests.java +++ b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/MessageConverterTests.java @@ -35,15 +35,14 @@ public class MessageConverterTests { @Test public void testHeaderEmbedding() throws Exception { - EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter(); Message message = MessageBuilder.withPayload("Hello".getBytes()).setHeader("foo", "bar") .setHeader("baz", "quxx").build(); - byte[] embedded = converter.embedHeaders(new MessageValues(message), "foo", "baz"); + byte[] embedded = EmbeddedHeaderUtils.embedHeaders(new MessageValues(message), "foo", "baz"); assertThat(embedded[0] & 0xff).isEqualTo(0xff); assertThat(new String(embedded).substring(1)).isEqualTo( "\u0002\u0003foo\u0000\u0000\u0000\u0005\"bar\"\u0003baz\u0000\u0000\u0000\u0006\"quxx\"Hello"); - MessageValues extracted = converter.extractHeaders(MessageBuilder.withPayload(embedded).build(), false); + MessageValues extracted = EmbeddedHeaderUtils.extractHeaders(MessageBuilder.withPayload(embedded).build(), false); assertThat(new String((byte[]) extracted.getPayload())).isEqualTo("Hello"); assertThat(extracted.get("foo")).isEqualTo("bar"); assertThat(extracted.get("baz")).isEqualTo("quxx"); @@ -51,15 +50,14 @@ public class MessageConverterTests { @Test public void testHeaderExtractionWithDirectPayload() throws Exception { - EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter(); Message message = MessageBuilder.withPayload("Hello".getBytes()).setHeader("foo", "bar") .setHeader("baz", "quxx").build(); - byte[] embedded = converter.embedHeaders(new MessageValues(message), "foo", "baz"); + byte[] embedded = EmbeddedHeaderUtils.embedHeaders(new MessageValues(message), "foo", "baz"); assertThat(embedded[0] & 0xff).isEqualTo(0xff); assertThat(new String(embedded).substring(1)).isEqualTo( "\u0002\u0003foo\u0000\u0000\u0000\u0005\"bar\"\u0003baz\u0000\u0000\u0000\u0006\"quxx\"Hello"); - MessageValues extracted = converter.extractHeaders(embedded); + MessageValues extracted = EmbeddedHeaderUtils.extractHeaders(embedded); assertThat(new String((byte[]) extracted.getPayload())).isEqualTo("Hello"); assertThat(extracted.get("foo")).isEqualTo("bar"); assertThat(extracted.get("baz")).isEqualTo("quxx"); @@ -68,15 +66,14 @@ public class MessageConverterTests { @Test public void testUnicodeHeader() throws Exception { - EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter(); Message message = MessageBuilder.withPayload("Hello".getBytes()).setHeader("foo", "bar") .setHeader("baz", "ØØØØØØØØ").build(); - byte[] embedded = converter.embedHeaders(new MessageValues(message), "foo", "baz"); + byte[] embedded = EmbeddedHeaderUtils.embedHeaders(new MessageValues(message), "foo", "baz"); assertThat(embedded[0] & 0xff).isEqualTo(0xff); assertThat(new String(embedded, "UTF-8").substring(1)).isEqualTo( "\u0002\u0003foo\u0000\u0000\u0000\u0005\"bar\"\u0003baz\u0000\u0000\u0000\u0012\"ØØØØØØØØ\"Hello"); - MessageValues extracted = converter.extractHeaders(MessageBuilder.withPayload(embedded).build(), false); + MessageValues extracted = EmbeddedHeaderUtils.extractHeaders(MessageBuilder.withPayload(embedded).build(), false); assertThat(new String((byte[]) extracted.getPayload())).isEqualTo("Hello"); assertThat(extracted.get("foo")).isEqualTo("bar"); assertThat(extracted.get("baz")).isEqualTo("ØØØØØØØØ"); @@ -84,19 +81,17 @@ public class MessageConverterTests { @Test public void testHeaderEmbeddingMissingHeader() throws Exception { - EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter(); Message message = MessageBuilder.withPayload("Hello".getBytes()).setHeader("foo", "bar").build(); - byte[] embedded = converter.embedHeaders(new MessageValues(message), "foo", "baz"); + byte[] embedded = EmbeddedHeaderUtils.embedHeaders(new MessageValues(message), "foo", "baz"); assertThat(embedded[0] & 0xff).isEqualTo(0xff); assertThat(new String(embedded).substring(1)).isEqualTo("\u0001\u0003foo\u0000\u0000\u0000\u0005\"bar\"Hello"); } @Test public void testCanDecodeOldFormat() throws Exception { - EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter(); byte[] bytes = "\u0002\u0003foo\u0003bar\u0003baz\u0004quxxHello".getBytes("UTF-8"); Message message = new GenericMessage<>(bytes); - MessageValues extracted = converter.extractHeaders(message, false); + MessageValues extracted = EmbeddedHeaderUtils.extractHeaders(message, false); assertThat(new String((byte[]) extracted.getPayload())).isEqualTo("Hello"); assertThat(extracted.get("foo")).isEqualTo("bar"); assertThat(extracted.get("baz")).isEqualTo("quxx"); @@ -104,15 +99,14 @@ public class MessageConverterTests { @Test public void testBadDecode() throws Exception { - EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter(); byte[] bytes = "\u0002\u0003foo\u0020bar\u0003baz\u0004quxxHello".getBytes("UTF-8"); - Message message = new GenericMessage(bytes); + Message message = new GenericMessage<>(bytes); try { - converter.extractHeaders(message, false); + EmbeddedHeaderUtils.extractHeaders(message, false); Assert.fail("Exception expected"); } catch (Exception e) { - String s = EmbeddedHeadersMessageConverter.decodeExceptionMessage(message); + String s = EmbeddedHeaderUtils.decodeExceptionMessage(message); assertThat(e).isInstanceOf(StringIndexOutOfBoundsException.class); assertThat(s).startsWith("Could not convert message: 0203666F6F"); }