Internal refactoring - expose header embedding and serialization
Fix #888 - Rename `EmbeddedHeaderMessageConverter` to `EmbeddedHeaderUtils`; - Add method to populate default header set; - Make `serializePayloadIfNecessary` protected so it can be accessed by subclasses;
This commit is contained in:
committed by
Ilayaperumal Gopinathan
parent
6592c1a8cd
commit
27184a4cfc
@@ -165,7 +165,7 @@ public abstract class AbstractBinder<T, C extends ConsumerProperties, P extends
|
||||
return name + GROUP_INDEX_DELIMITER + (StringUtils.hasText(group) ? group : "default");
|
||||
}
|
||||
|
||||
final MessageValues serializePayloadIfNecessary(Message<?> message) {
|
||||
protected final MessageValues serializePayloadIfNecessary(Message<?> message) {
|
||||
Object originalPayload = message.getPayload();
|
||||
Object originalContentType = message.getHeaders().get(MessageHeaders.CONTENT_TYPE);
|
||||
|
||||
@@ -183,7 +183,7 @@ public abstract class AbstractBinder<T, C extends ConsumerProperties, P extends
|
||||
return messageValues;
|
||||
}
|
||||
|
||||
private byte[] serializePayloadIfNecessary(Object originalPayload) {
|
||||
protected final byte[] serializePayloadIfNecessary(Object originalPayload) {
|
||||
if (originalPayload instanceof byte[]) {
|
||||
return (byte[]) originalPayload;
|
||||
}
|
||||
|
||||
@@ -57,9 +57,6 @@ public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties,
|
||||
|
||||
protected static final ExpressionParser EXPRESSION_PARSER = new SpelExpressionParser();
|
||||
|
||||
private final EmbeddedHeadersMessageConverter embeddedHeadersMessageConverter = new
|
||||
EmbeddedHeadersMessageConverter();
|
||||
|
||||
/**
|
||||
* Indicates whether the implementation and the message broker have
|
||||
* native support for message headers. If false, headers will be
|
||||
@@ -279,12 +276,12 @@ public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties,
|
||||
MessageValues messageValues;
|
||||
if (this.extractEmbeddedHeaders) {
|
||||
try {
|
||||
messageValues = AbstractMessageChannelBinder.this.embeddedHeadersMessageConverter.extractHeaders(
|
||||
(Message<byte[]>) requestMessage, true);
|
||||
messageValues = EmbeddedHeaderUtils.extractHeaders((Message<byte[]>) requestMessage,
|
||||
true);
|
||||
}
|
||||
catch (Exception e) {
|
||||
AbstractMessageChannelBinder.this.logger.error(
|
||||
EmbeddedHeadersMessageConverter.decodeExceptionMessage(
|
||||
EmbeddedHeaderUtils.decodeExceptionMessage(
|
||||
requestMessage), e);
|
||||
messageValues = new MessageValues(requestMessage);
|
||||
}
|
||||
@@ -342,8 +339,7 @@ public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties,
|
||||
if (originalContentType instanceof MimeType) {
|
||||
transformed.put(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, originalContentType.toString());
|
||||
}
|
||||
payload = AbstractMessageChannelBinder.this.embeddedHeadersMessageConverter.embedHeaders(transformed,
|
||||
this.embeddedHeaders);
|
||||
payload = EmbeddedHeaderUtils.embedHeaders(transformed, this.embeddedHeaders);
|
||||
}
|
||||
else {
|
||||
payload = (byte[]) transformed.getPayload();
|
||||
|
||||
@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@@ -27,35 +28,38 @@ import org.springframework.integration.IntegrationMessageHeaderAccessor;
|
||||
import org.springframework.integration.support.json.Jackson2JsonObjectMapper;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
* Encodes requested headers into payload with format
|
||||
* {@code 0xff, n(1), [ [lenHdr(1), hdr, lenValue(4), value] ... ]}.
|
||||
* The 0xff indicates this new format; n is number of headers (max 255); for
|
||||
* each header, the name length (1 byte) is followed by the name, followed by
|
||||
* the value length (int) followed by the value (json).
|
||||
* {@code 0xff, n(1), [ [lenHdr(1), hdr, lenValue(4), value] ... ]}. The 0xff indicates
|
||||
* this new format; n is number of headers (max 255); for each header, the name length (1
|
||||
* byte) is followed by the name, followed by the value length (int) followed by the value
|
||||
* (json).
|
||||
* <p>
|
||||
* Previously, there was no leading 0xff; the value length was 1 byte and only
|
||||
* String header values were supported (no JSON conversion).
|
||||
* Previously, there was no leading 0xff; the value length was 1 byte and only String
|
||||
* header values were supported (no JSON conversion).
|
||||
*
|
||||
* @author Eric Bottard
|
||||
* @author Gary Russell
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Marius Bogoevici
|
||||
*
|
||||
* @since 1.2
|
||||
*/
|
||||
public class EmbeddedHeadersMessageConverter {
|
||||
public abstract class EmbeddedHeaderUtils {
|
||||
|
||||
private final Jackson2JsonObjectMapper objectMapper = new Jackson2JsonObjectMapper();
|
||||
private static final Jackson2JsonObjectMapper objectMapper = new Jackson2JsonObjectMapper();
|
||||
|
||||
public static String decodeExceptionMessage(Message<?> requestMessage) {
|
||||
return "Could not convert message: " + DatatypeConverter.printHexBinary((byte[]) requestMessage.getPayload());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return a new message where some of the original headers of {@code original}
|
||||
* have been embedded into the new message payload.
|
||||
* Return a new message where some of the original headers of {@code original} have
|
||||
* been embedded into the new message payload.
|
||||
*/
|
||||
public byte[] embedHeaders(MessageValues original, String... headers) throws Exception {
|
||||
public static byte[] embedHeaders(MessageValues original, String... headers) throws Exception {
|
||||
byte[][] headerValues = new byte[headers.length][];
|
||||
int n = 0;
|
||||
int headerCount = 0;
|
||||
@@ -63,7 +67,7 @@ public class EmbeddedHeadersMessageConverter {
|
||||
for (String header : headers) {
|
||||
Object value = original.get(header) == null ? null : original.get(header);
|
||||
if (value != null) {
|
||||
String json = this.objectMapper.toJson(value);
|
||||
String json = objectMapper.toJson(value);
|
||||
headerValues[n] = json.getBytes("UTF-8");
|
||||
headerCount++;
|
||||
headersLength += header.length() + headerValues[n++].length;
|
||||
@@ -95,15 +99,16 @@ public class EmbeddedHeadersMessageConverter {
|
||||
* have been promoted back to actual headers. The new payload is now the original
|
||||
* payload.
|
||||
*
|
||||
* @param message the message to extract headers
|
||||
* @param message the message to extract headers
|
||||
* @param copyRequestHeaders boolean value to specify if the request headers should be
|
||||
* copied
|
||||
* copied
|
||||
*/
|
||||
public MessageValues extractHeaders(Message<byte[]> message, boolean copyRequestHeaders) throws Exception {
|
||||
public static MessageValues extractHeaders(Message<byte[]> message, boolean copyRequestHeaders) throws Exception {
|
||||
return extractHeaders(message.getPayload(), copyRequestHeaders, message.getHeaders());
|
||||
}
|
||||
|
||||
private MessageValues extractHeaders(byte[] payload, boolean copyRequestHeaders, MessageHeaders requestHeaders) throws Exception {
|
||||
private static MessageValues extractHeaders(byte[] payload, boolean copyRequestHeaders,
|
||||
MessageHeaders requestHeaders) throws Exception {
|
||||
ByteBuffer byteBuffer = ByteBuffer.wrap(payload);
|
||||
int headerCount = byteBuffer.get() & 0xff;
|
||||
if (headerCount < 255) {
|
||||
@@ -118,7 +123,7 @@ public class EmbeddedHeadersMessageConverter {
|
||||
byteBuffer.position(byteBuffer.position() + len);
|
||||
len = byteBuffer.getInt();
|
||||
String headerValue = new String(payload, byteBuffer.position(), len, "UTF-8");
|
||||
Object headerContent = this.objectMapper.fromJson(headerValue, Object.class);
|
||||
Object headerContent = objectMapper.fromJson(headerValue, Object.class);
|
||||
headers.put(headerName, headerContent);
|
||||
byteBuffer.position(byteBuffer.position() + len);
|
||||
}
|
||||
@@ -133,17 +138,16 @@ public class EmbeddedHeadersMessageConverter {
|
||||
* have been promoted back to actual headers. The new payload is now the original
|
||||
* payload.
|
||||
*
|
||||
* @param payload the message payload
|
||||
* @param payload the message payload
|
||||
* @return the message with extracted headers
|
||||
* @throws Exception
|
||||
*/
|
||||
public MessageValues extractHeaders(byte[] payload) throws Exception {
|
||||
public static MessageValues extractHeaders(byte[] payload) throws Exception {
|
||||
return extractHeaders(payload, false, null);
|
||||
}
|
||||
|
||||
private MessageValues oldExtractHeaders(ByteBuffer byteBuffer, byte[] bytes, int headerCount,
|
||||
boolean copyRequestHeaders, MessageHeaders requestHeaders)
|
||||
throws UnsupportedEncodingException {
|
||||
private static MessageValues oldExtractHeaders(ByteBuffer byteBuffer, byte[] bytes, int headerCount,
|
||||
boolean copyRequestHeaders, MessageHeaders requestHeaders) throws UnsupportedEncodingException {
|
||||
Map<String, Object> headers = new HashMap<String, Object>();
|
||||
for (int i = 0; i < headerCount; i++) {
|
||||
int len = byteBuffer.get();
|
||||
@@ -165,7 +169,7 @@ public class EmbeddedHeadersMessageConverter {
|
||||
return buildMessageValues(newPayload, headers, copyRequestHeaders, requestHeaders);
|
||||
}
|
||||
|
||||
private MessageValues buildMessageValues(byte[] payload, Map<String, Object> headers,
|
||||
private static MessageValues buildMessageValues(byte[] payload, Map<String, Object> headers,
|
||||
boolean copyRequestHeaders, MessageHeaders requestHeaders) {
|
||||
MessageValues messageValues = new MessageValues(payload, headers);
|
||||
if (copyRequestHeaders && requestHeaders != null) {
|
||||
@@ -174,4 +178,19 @@ public class EmbeddedHeadersMessageConverter {
|
||||
return messageValues;
|
||||
}
|
||||
|
||||
public static String[] headersToEmbed(String[] configuredHeaders) {
|
||||
String[] headersToMap;
|
||||
if (ObjectUtils.isEmpty(configuredHeaders)) {
|
||||
headersToMap = BinderHeaders.STANDARD_HEADERS;
|
||||
}
|
||||
else {
|
||||
String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0,
|
||||
BinderHeaders.STANDARD_HEADERS.length + configuredHeaders.length);
|
||||
System.arraycopy(configuredHeaders, 0, combinedHeadersToMap, BinderHeaders.STANDARD_HEADERS.length,
|
||||
configuredHeaders.length);
|
||||
headersToMap = combinedHeadersToMap;
|
||||
}
|
||||
return headersToMap;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -35,15 +35,14 @@ public class MessageConverterTests {
|
||||
|
||||
@Test
|
||||
public void testHeaderEmbedding() throws Exception {
|
||||
EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter();
|
||||
Message<byte[]> message = MessageBuilder.withPayload("Hello".getBytes()).setHeader("foo", "bar")
|
||||
.setHeader("baz", "quxx").build();
|
||||
byte[] embedded = converter.embedHeaders(new MessageValues(message), "foo", "baz");
|
||||
byte[] embedded = EmbeddedHeaderUtils.embedHeaders(new MessageValues(message), "foo", "baz");
|
||||
assertThat(embedded[0] & 0xff).isEqualTo(0xff);
|
||||
assertThat(new String(embedded).substring(1)).isEqualTo(
|
||||
"\u0002\u0003foo\u0000\u0000\u0000\u0005\"bar\"\u0003baz\u0000\u0000\u0000\u0006\"quxx\"Hello");
|
||||
|
||||
MessageValues extracted = converter.extractHeaders(MessageBuilder.withPayload(embedded).build(), false);
|
||||
MessageValues extracted = EmbeddedHeaderUtils.extractHeaders(MessageBuilder.withPayload(embedded).build(), false);
|
||||
assertThat(new String((byte[]) extracted.getPayload())).isEqualTo("Hello");
|
||||
assertThat(extracted.get("foo")).isEqualTo("bar");
|
||||
assertThat(extracted.get("baz")).isEqualTo("quxx");
|
||||
@@ -51,15 +50,14 @@ public class MessageConverterTests {
|
||||
|
||||
@Test
|
||||
public void testHeaderExtractionWithDirectPayload() throws Exception {
|
||||
EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter();
|
||||
Message<byte[]> message = MessageBuilder.withPayload("Hello".getBytes()).setHeader("foo", "bar")
|
||||
.setHeader("baz", "quxx").build();
|
||||
byte[] embedded = converter.embedHeaders(new MessageValues(message), "foo", "baz");
|
||||
byte[] embedded = EmbeddedHeaderUtils.embedHeaders(new MessageValues(message), "foo", "baz");
|
||||
assertThat(embedded[0] & 0xff).isEqualTo(0xff);
|
||||
assertThat(new String(embedded).substring(1)).isEqualTo(
|
||||
"\u0002\u0003foo\u0000\u0000\u0000\u0005\"bar\"\u0003baz\u0000\u0000\u0000\u0006\"quxx\"Hello");
|
||||
|
||||
MessageValues extracted = converter.extractHeaders(embedded);
|
||||
MessageValues extracted = EmbeddedHeaderUtils.extractHeaders(embedded);
|
||||
assertThat(new String((byte[]) extracted.getPayload())).isEqualTo("Hello");
|
||||
assertThat(extracted.get("foo")).isEqualTo("bar");
|
||||
assertThat(extracted.get("baz")).isEqualTo("quxx");
|
||||
@@ -68,15 +66,14 @@ public class MessageConverterTests {
|
||||
|
||||
@Test
|
||||
public void testUnicodeHeader() throws Exception {
|
||||
EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter();
|
||||
Message<byte[]> message = MessageBuilder.withPayload("Hello".getBytes()).setHeader("foo", "bar")
|
||||
.setHeader("baz", "ØØØØØØØØ").build();
|
||||
byte[] embedded = converter.embedHeaders(new MessageValues(message), "foo", "baz");
|
||||
byte[] embedded = EmbeddedHeaderUtils.embedHeaders(new MessageValues(message), "foo", "baz");
|
||||
assertThat(embedded[0] & 0xff).isEqualTo(0xff);
|
||||
assertThat(new String(embedded, "UTF-8").substring(1)).isEqualTo(
|
||||
"\u0002\u0003foo\u0000\u0000\u0000\u0005\"bar\"\u0003baz\u0000\u0000\u0000\u0012\"ØØØØØØØØ\"Hello");
|
||||
|
||||
MessageValues extracted = converter.extractHeaders(MessageBuilder.withPayload(embedded).build(), false);
|
||||
MessageValues extracted = EmbeddedHeaderUtils.extractHeaders(MessageBuilder.withPayload(embedded).build(), false);
|
||||
assertThat(new String((byte[]) extracted.getPayload())).isEqualTo("Hello");
|
||||
assertThat(extracted.get("foo")).isEqualTo("bar");
|
||||
assertThat(extracted.get("baz")).isEqualTo("ØØØØØØØØ");
|
||||
@@ -84,19 +81,17 @@ public class MessageConverterTests {
|
||||
|
||||
@Test
|
||||
public void testHeaderEmbeddingMissingHeader() throws Exception {
|
||||
EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter();
|
||||
Message<byte[]> message = MessageBuilder.withPayload("Hello".getBytes()).setHeader("foo", "bar").build();
|
||||
byte[] embedded = converter.embedHeaders(new MessageValues(message), "foo", "baz");
|
||||
byte[] embedded = EmbeddedHeaderUtils.embedHeaders(new MessageValues(message), "foo", "baz");
|
||||
assertThat(embedded[0] & 0xff).isEqualTo(0xff);
|
||||
assertThat(new String(embedded).substring(1)).isEqualTo("\u0001\u0003foo\u0000\u0000\u0000\u0005\"bar\"Hello");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCanDecodeOldFormat() throws Exception {
|
||||
EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter();
|
||||
byte[] bytes = "\u0002\u0003foo\u0003bar\u0003baz\u0004quxxHello".getBytes("UTF-8");
|
||||
Message<byte[]> message = new GenericMessage<>(bytes);
|
||||
MessageValues extracted = converter.extractHeaders(message, false);
|
||||
MessageValues extracted = EmbeddedHeaderUtils.extractHeaders(message, false);
|
||||
assertThat(new String((byte[]) extracted.getPayload())).isEqualTo("Hello");
|
||||
assertThat(extracted.get("foo")).isEqualTo("bar");
|
||||
assertThat(extracted.get("baz")).isEqualTo("quxx");
|
||||
@@ -104,15 +99,14 @@ public class MessageConverterTests {
|
||||
|
||||
@Test
|
||||
public void testBadDecode() throws Exception {
|
||||
EmbeddedHeadersMessageConverter converter = new EmbeddedHeadersMessageConverter();
|
||||
byte[] bytes = "\u0002\u0003foo\u0020bar\u0003baz\u0004quxxHello".getBytes("UTF-8");
|
||||
Message<byte[]> message = new GenericMessage<byte[]>(bytes);
|
||||
Message<byte[]> message = new GenericMessage<>(bytes);
|
||||
try {
|
||||
converter.extractHeaders(message, false);
|
||||
EmbeddedHeaderUtils.extractHeaders(message, false);
|
||||
Assert.fail("Exception expected");
|
||||
}
|
||||
catch (Exception e) {
|
||||
String s = EmbeddedHeadersMessageConverter.decodeExceptionMessage(message);
|
||||
String s = EmbeddedHeaderUtils.decodeExceptionMessage(message);
|
||||
assertThat(e).isInstanceOf(StringIndexOutOfBoundsException.class);
|
||||
assertThat(s).startsWith("Could not convert message: 0203666F6F");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user