diff --git a/spring-cloud-stream-binder-test/src/test/java/org/springframework/cloud/stream/binder/MessageChannelBinderSupportTests.java b/spring-cloud-stream-binder-test/src/test/java/org/springframework/cloud/stream/binder/MessageChannelBinderSupportTests.java index 0f10b2d1a..4bde23c9a 100644 --- a/spring-cloud-stream-binder-test/src/test/java/org/springframework/cloud/stream/binder/MessageChannelBinderSupportTests.java +++ b/spring-cloud-stream-binder-test/src/test/java/org/springframework/cloud/stream/binder/MessageChannelBinderSupportTests.java @@ -25,7 +25,6 @@ import com.esotericsoftware.kryo.Registration; import org.junit.Before; import org.junit.Test; -import org.springframework.cloud.stream.binder.AbstractBinder.JavaClassMimeTypeConversion; import org.springframework.integration.codec.kryo.KryoRegistrar; import org.springframework.integration.codec.kryo.PojoCodec; import org.springframework.integration.support.MessageBuilder; @@ -176,36 +175,36 @@ public class MessageChannelBinderSupportTests { @Test public void mimeTypeIsSimpleObject() throws ClassNotFoundException { - MimeType mt = JavaClassMimeTypeConversion.mimeTypeFromObject(new Object(), null); - String className = JavaClassMimeTypeConversion.classNameFromMimeType(mt); + MimeType mt = JavaClassMimeTypeUtils.mimeTypeFromObject(new Object(), null); + String className = JavaClassMimeTypeUtils.classNameFromMimeType(mt); assertThat(Class.forName(className)).isEqualTo(Object.class); } @Test public void mimeTypeIsObjectArray() throws ClassNotFoundException { - MimeType mt = JavaClassMimeTypeConversion.mimeTypeFromObject(new String[0], null); - String className = JavaClassMimeTypeConversion.classNameFromMimeType(mt); + MimeType mt = JavaClassMimeTypeUtils.mimeTypeFromObject(new String[0], null); + String className = JavaClassMimeTypeUtils.classNameFromMimeType(mt); assertThat(Class.forName(className)).isEqualTo(String[].class); } @Test public void mimeTypeIsMultiDimensionalObjectArray() throws ClassNotFoundException { - MimeType mt = JavaClassMimeTypeConversion.mimeTypeFromObject(new String[0][0][0], null); - String className = JavaClassMimeTypeConversion.classNameFromMimeType(mt); + MimeType mt = JavaClassMimeTypeUtils.mimeTypeFromObject(new String[0][0][0], null); + String className = JavaClassMimeTypeUtils.classNameFromMimeType(mt); assertThat(Class.forName(className)).isEqualTo(String[][][].class); } @Test public void mimeTypeIsPrimitiveArray() throws ClassNotFoundException { - MimeType mt = JavaClassMimeTypeConversion.mimeTypeFromObject(new int[0], null); - String className = JavaClassMimeTypeConversion.classNameFromMimeType(mt); + MimeType mt = JavaClassMimeTypeUtils.mimeTypeFromObject(new int[0], null); + String className = JavaClassMimeTypeUtils.classNameFromMimeType(mt); assertThat(Class.forName(className)).isEqualTo(int[].class); } @Test public void mimeTypeIsMultiDimensionalPrimitiveArray() throws ClassNotFoundException { - MimeType mt = JavaClassMimeTypeConversion.mimeTypeFromObject(new int[0][0][0], null); - String className = JavaClassMimeTypeConversion.classNameFromMimeType(mt); + MimeType mt = JavaClassMimeTypeUtils.mimeTypeFromObject(new int[0][0][0], null); + String className = JavaClassMimeTypeUtils.classNameFromMimeType(mt); assertThat(Class.forName(className)).isEqualTo(int[][][].class); } diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java index abba4c64b..cb4342f35 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractBinder.java @@ -16,13 +16,6 @@ package org.springframework.cloud.stream.binder; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,20 +25,14 @@ import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.support.AbstractApplicationContext; -import org.springframework.core.serializer.support.SerializationFailedException; import org.springframework.expression.EvaluationContext; import org.springframework.integration.codec.Codec; import org.springframework.integration.expression.ExpressionUtils; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryTemplate; import org.springframework.util.Assert; -import org.springframework.util.ClassUtils; -import org.springframework.util.MimeType; -import org.springframework.util.MimeTypeUtils; -import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; /** @@ -56,6 +43,7 @@ import org.springframework.util.StringUtils; * @author Ilayaperumal Gopinathan * @author Mark Fisher * @author Marius Bogoevici + * @author Soby Chacko */ public abstract class AbstractBinder implements ApplicationContextAware, InitializingBean, Binder { @@ -76,8 +64,6 @@ public abstract class AbstractBinder> payloadTypeCache = new ConcurrentHashMap<>(); - /** * For binder implementations that support a prefix, apply the prefix to the name. * @@ -166,107 +152,19 @@ public abstract class AbstractBinder message) { - Object originalPayload = message.getPayload(); - Object originalContentType = message.getHeaders().get(MessageHeaders.CONTENT_TYPE); - - // Pass content type as String since some transport adapters will exclude - // CONTENT_TYPE Header otherwise - Object contentType = JavaClassMimeTypeConversion - .mimeTypeFromObject(originalPayload, ObjectUtils.nullSafeToString(originalContentType)).toString(); - Object payload = serializePayloadIfNecessary(originalPayload); - MessageValues messageValues = new MessageValues(message); - messageValues.setPayload(payload); - messageValues.put(MessageHeaders.CONTENT_TYPE, contentType); - if (originalContentType != null && !originalContentType.toString().equals(contentType.toString())) { - messageValues.put(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, originalContentType.toString()); - } - return messageValues; + return MessageSerializationUtils.serializePayload(message, this.codec); } protected final byte[] serializePayloadIfNecessary(Object originalPayload) { - if (originalPayload instanceof byte[]) { - return (byte[]) originalPayload; - } - else { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - try { - if (originalPayload instanceof String) { - return ((String) originalPayload).getBytes("UTF-8"); - } - this.codec.encode(originalPayload, bos); - return bos.toByteArray(); - } - catch (IOException e) { - throw new SerializationFailedException( - "unable to serialize payload [" + originalPayload.getClass().getName() + "]", e); - } - } + return MessageSerializationUtils.serializePayload(originalPayload, this.codec); } protected final MessageValues deserializePayloadIfNecessary(Message message) { - return deserializePayloadIfNecessary(new MessageValues(message)); + return MessageSerializationUtils.deserializePayload(new MessageValues(message), this.contentTypeResolver, this.codec); } protected final MessageValues deserializePayloadIfNecessary(MessageValues messageValues) { - Object originalPayload = messageValues.getPayload(); - MimeType contentType = this.contentTypeResolver.resolve(messageValues); - Object payload = deserializePayload(originalPayload, contentType); - if (payload != null) { - messageValues.setPayload(payload); - Object originalContentType = messageValues.get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE); - // Reset content-type only if the original content type is not null (when - // receiving messages from - // non-SCSt applications). - if (originalContentType != null) { - messageValues.put(MessageHeaders.CONTENT_TYPE, originalContentType); - messageValues.remove(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE); - } - } - return messageValues; - } - - private Object deserializePayload(Object payload, MimeType contentType) { - if (payload instanceof byte[]) { - if (contentType == null || MimeTypeUtils.APPLICATION_OCTET_STREAM.equals(contentType)) { - return payload; - } - else { - return deserializePayload((byte[]) payload, contentType); - } - } - return payload; - } - - private Object deserializePayload(byte[] bytes, MimeType contentType) { - if ("text".equalsIgnoreCase(contentType.getType()) || MimeTypeUtils.APPLICATION_JSON.equals(contentType)) { - try { - return new String(bytes, "UTF-8"); - } - catch (UnsupportedEncodingException e) { - String errorMessage = "unable to deserialize [java.lang.String]. Encoding not supported. " - + e.getMessage(); - logger.error(errorMessage); - throw new SerializationFailedException(errorMessage, e); - } - } - else { - String className = JavaClassMimeTypeConversion.classNameFromMimeType(contentType); - try { - // Cache types to avoid unnecessary ClassUtils.forName calls. - Class targetType = this.payloadTypeCache.get(className); - if (targetType == null) { - targetType = ClassUtils.forName(className, null); - this.payloadTypeCache.put(className, targetType); - } - return this.codec.decode(bytes, targetType); - } // catch all exceptions that could occur during de-serialization - catch (Exception e) { - String errorMessage = "Unable to deserialize [" + className + "] using the contentType [" + contentType - + "] " + e.getMessage(); - logger.error(errorMessage); - throw new SerializationFailedException(errorMessage, e); - } - } + return MessageSerializationUtils.deserializePayload(messageValues, this.contentTypeResolver, this.codec); } protected String buildPartitionRoutingExpression(String expressionRoot) { @@ -291,61 +189,4 @@ public abstract class AbstractBinder mimeTypesCache = new ConcurrentHashMap<>(); - - static MimeType mimeTypeFromObject(Object payload, String originalContentType) { - Assert.notNull(payload, "payload object cannot be null."); - if (payload instanceof byte[]) { - return MimeTypeUtils.APPLICATION_OCTET_STREAM; - } - if (payload instanceof String) { - return MimeTypeUtils.APPLICATION_JSON_VALUE.equals(originalContentType) ? MimeTypeUtils.APPLICATION_JSON - : MimeTypeUtils.TEXT_PLAIN; - } - String className = payload.getClass().getName(); - MimeType mimeType = mimeTypesCache.get(className); - if (mimeType == null) { - String modifiedClassName = className; - if (payload.getClass().isArray()) { - // Need to remove trailing ';' for an object array, e.g. - // "[Ljava.lang.String;" or multi-dimensional - // "[[[Ljava.lang.String;" - if (modifiedClassName.endsWith(";")) { - modifiedClassName = modifiedClassName.substring(0, modifiedClassName.length() - 1); - } - // Wrap in quotes to handle the illegal '[' character - modifiedClassName = "\"" + modifiedClassName + "\""; - } - mimeType = MimeType.valueOf("application/x-java-object;type=" + modifiedClassName); - mimeTypesCache.put(className, mimeType); - } - return mimeType; - } - - static String classNameFromMimeType(MimeType mimeType) { - Assert.notNull(mimeType, "mimeType cannot be null."); - String className = mimeType.getParameter("type"); - if (className == null) { - return null; - } - // unwrap quotes if any - className = className.replace("\"", ""); - - // restore trailing ';' - if (className.contains("[L")) { - className += ";"; - } - return className; - } - - } } diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/JavaClassMimeTypeUtils.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/JavaClassMimeTypeUtils.java new file mode 100644 index 000000000..cad16565b --- /dev/null +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/JavaClassMimeTypeUtils.java @@ -0,0 +1,94 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.springframework.util.Assert; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; + +/** + * Handles representing any java class as a {@link MimeType}. + * + * @author David Turanski + * @author Ilayaperumal Gopinathan + * @author Soby Chacko + */ +public abstract class JavaClassMimeTypeUtils { + + private static ConcurrentMap mimeTypesCache = new ConcurrentHashMap<>(); + + /** + * Convert payload to {@link MimeType} based on the content type on the message. + * + * @param payload the payload to convert + * @param originalContentType content type on the message + * @return converted {@link MimeType} + */ + public static MimeType mimeTypeFromObject(Object payload, String originalContentType) { + Assert.notNull(payload, "payload object cannot be null."); + if (payload instanceof byte[]) { + return MimeTypeUtils.APPLICATION_OCTET_STREAM; + } + if (payload instanceof String) { + return MimeTypeUtils.APPLICATION_JSON_VALUE.equals(originalContentType) ? MimeTypeUtils.APPLICATION_JSON + : MimeTypeUtils.TEXT_PLAIN; + } + String className = payload.getClass().getName(); + MimeType mimeType = mimeTypesCache.get(className); + if (mimeType == null) { + String modifiedClassName = className; + if (payload.getClass().isArray()) { + // Need to remove trailing ';' for an object array, e.g. + // "[Ljava.lang.String;" or multi-dimensional + // "[[[Ljava.lang.String;" + if (modifiedClassName.endsWith(";")) { + modifiedClassName = modifiedClassName.substring(0, modifiedClassName.length() - 1); + } + // Wrap in quotes to handle the illegal '[' character + modifiedClassName = "\"" + modifiedClassName + "\""; + } + mimeType = MimeType.valueOf("application/x-java-object;type=" + modifiedClassName); + mimeTypesCache.put(className, mimeType); + } + return mimeType; + } + + /** + * Retrieve the class name from the type parameter in {@link MimeType}. + * + * @param mimeType {@link MimeType} to retrieve class name from + * @return class name from the type parameter in MimeType and null if the class name cannot be determined + */ + public static String classNameFromMimeType(MimeType mimeType) { + Assert.notNull(mimeType, "mimeType cannot be null."); + String className = mimeType.getParameter("type"); + if (className == null) { + return null; + } + // unwrap quotes if any + className = className.replace("\"", ""); + + // restore trailing ';' + if (className.contains("[L")) { + className += ";"; + } + return className; + } +} diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/MessageSerializationUtils.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/MessageSerializationUtils.java new file mode 100644 index 000000000..0eb0157f6 --- /dev/null +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/MessageSerializationUtils.java @@ -0,0 +1,166 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.springframework.core.serializer.support.SerializationFailedException; +import org.springframework.integration.codec.Codec; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.ContentTypeResolver; +import org.springframework.util.ClassUtils; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; +import org.springframework.util.ObjectUtils; + +/** + * Utility class for serializing and de-serializing the message payload. + * + * @author Soby Chacko + */ +public abstract class MessageSerializationUtils { + + private static final Map> payloadTypeCache = new ConcurrentHashMap<>(); + + /** + * Serialize the message payload unless it is a byte array. + * + * @param message the message with the payload to serialize + * @param codec the codec used for serialization + * @return the Message with teh serialized payload + */ + public static MessageValues serializePayload(Message message, Codec codec) { + Object originalPayload = message.getPayload(); + Object originalContentType = message.getHeaders().get(MessageHeaders.CONTENT_TYPE); + + // Pass content type as String since some transport adapters will exclude + // CONTENT_TYPE Header otherwise + Object contentType = JavaClassMimeTypeUtils + .mimeTypeFromObject(originalPayload, ObjectUtils.nullSafeToString(originalContentType)).toString(); + Object payload = serializePayload(originalPayload, codec); + MessageValues messageValues = new MessageValues(message); + messageValues.setPayload(payload); + messageValues.put(MessageHeaders.CONTENT_TYPE, contentType); + if (originalContentType != null && !originalContentType.toString().equals(contentType.toString())) { + messageValues.put(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, originalContentType.toString()); + } + return messageValues; + } + + /** + * Serialize the payload object if it is not a byte array. + * + * @param originalPayload the payload to serialize + * @param codec the codec used for serialization + * @return the serialized byte array or the original payload if it is already a byte array + * @throws SerializationFailedException thrown when serialization failed + */ + public static byte[] serializePayload(Object originalPayload, Codec codec) { + if (originalPayload instanceof byte[]) { + return (byte[]) originalPayload; + } + else { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + try { + if (originalPayload instanceof String) { + return ((String) originalPayload).getBytes("UTF-8"); + } + codec.encode(originalPayload, bos); + return bos.toByteArray(); + } + catch (IOException e) { + throw new SerializationFailedException( + "unable to serialize payload [" + originalPayload.getClass().getName() + "]", e); + } + } + } + + /** + * De-serialize the message payload if necessary. + * + * @param messageValues message with the payload to deserialize + * @param contentTypeResolver used for resolving the mime type. + * @param codec used for deserialization + * @return Deserialized Message. + */ + public static MessageValues deserializePayload(MessageValues messageValues, ContentTypeResolver contentTypeResolver, + Codec codec) { + Object originalPayload = messageValues.getPayload(); + MimeType contentType = contentTypeResolver.resolve(new MessageHeaders(messageValues.getHeaders())); + Object payload = deserializePayload(originalPayload, contentType, codec); + if (payload != null) { + messageValues.setPayload(payload); + Object originalContentType = messageValues.get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE); + // Reset content-type only if the original content type is not null (when + // receiving messages from + // non-SCSt applications). + if (originalContentType != null) { + messageValues.put(MessageHeaders.CONTENT_TYPE, originalContentType); + messageValues.remove(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE); + } + } + return messageValues; + } + + private static Object deserializePayload(Object payload, MimeType contentType, Codec codec) { + if (payload instanceof byte[]) { + if (contentType == null || MimeTypeUtils.APPLICATION_OCTET_STREAM.equals(contentType)) { + return payload; + } + else { + return deserializePayload((byte[]) payload, contentType, payloadTypeCache, codec); + } + } + return payload; + } + + private static Object deserializePayload(byte[] bytes, MimeType contentType, + Map> payloadTypeCache, Codec codec) { + if ("text".equalsIgnoreCase(contentType.getType()) || MimeTypeUtils.APPLICATION_JSON.equals(contentType)) { + try { + return new String(bytes, "UTF-8"); + } + catch (UnsupportedEncodingException e) { + String errorMessage = "unable to deserialize [java.lang.String]. Encoding not supported. " + + e.getMessage(); + throw new SerializationFailedException(errorMessage, e); + } + } + else { + String className = JavaClassMimeTypeUtils.classNameFromMimeType(contentType); + try { + // Cache types to avoid unnecessary ClassUtils.forName calls. + Class targetType = payloadTypeCache.get(className); + if (targetType == null) { + targetType = ClassUtils.forName(className, null); + payloadTypeCache.put(className, targetType); + } + return codec.decode(bytes, targetType); + } // catch all exceptions that could occur during de-serialization + catch (Exception e) { + String errorMessage = "Unable to deserialize [" + className + "] using the contentType [" + contentType + + "] " + e.getMessage(); + throw new SerializationFailedException(errorMessage, e); + } + } + } +}