Extracting payload serde code from AbstractBinder into a utility class

Fix #1030
This commit is contained in:
Soby Chacko
2017-07-25 16:29:01 -04:00
committed by Marius Bogoevici
parent d06fd64348
commit 6f1bc59ee2
4 changed files with 275 additions and 175 deletions

View File

@@ -25,7 +25,6 @@ import com.esotericsoftware.kryo.Registration;
import org.junit.Before;
import org.junit.Test;
import org.springframework.cloud.stream.binder.AbstractBinder.JavaClassMimeTypeConversion;
import org.springframework.integration.codec.kryo.KryoRegistrar;
import org.springframework.integration.codec.kryo.PojoCodec;
import org.springframework.integration.support.MessageBuilder;
@@ -176,36 +175,36 @@ public class MessageChannelBinderSupportTests {
@Test
public void mimeTypeIsSimpleObject() throws ClassNotFoundException {
MimeType mt = JavaClassMimeTypeConversion.mimeTypeFromObject(new Object(), null);
String className = JavaClassMimeTypeConversion.classNameFromMimeType(mt);
MimeType mt = JavaClassMimeTypeUtils.mimeTypeFromObject(new Object(), null);
String className = JavaClassMimeTypeUtils.classNameFromMimeType(mt);
assertThat(Class.forName(className)).isEqualTo(Object.class);
}
@Test
public void mimeTypeIsObjectArray() throws ClassNotFoundException {
MimeType mt = JavaClassMimeTypeConversion.mimeTypeFromObject(new String[0], null);
String className = JavaClassMimeTypeConversion.classNameFromMimeType(mt);
MimeType mt = JavaClassMimeTypeUtils.mimeTypeFromObject(new String[0], null);
String className = JavaClassMimeTypeUtils.classNameFromMimeType(mt);
assertThat(Class.forName(className)).isEqualTo(String[].class);
}
@Test
public void mimeTypeIsMultiDimensionalObjectArray() throws ClassNotFoundException {
MimeType mt = JavaClassMimeTypeConversion.mimeTypeFromObject(new String[0][0][0], null);
String className = JavaClassMimeTypeConversion.classNameFromMimeType(mt);
MimeType mt = JavaClassMimeTypeUtils.mimeTypeFromObject(new String[0][0][0], null);
String className = JavaClassMimeTypeUtils.classNameFromMimeType(mt);
assertThat(Class.forName(className)).isEqualTo(String[][][].class);
}
@Test
public void mimeTypeIsPrimitiveArray() throws ClassNotFoundException {
MimeType mt = JavaClassMimeTypeConversion.mimeTypeFromObject(new int[0], null);
String className = JavaClassMimeTypeConversion.classNameFromMimeType(mt);
MimeType mt = JavaClassMimeTypeUtils.mimeTypeFromObject(new int[0], null);
String className = JavaClassMimeTypeUtils.classNameFromMimeType(mt);
assertThat(Class.forName(className)).isEqualTo(int[].class);
}
@Test
public void mimeTypeIsMultiDimensionalPrimitiveArray() throws ClassNotFoundException {
MimeType mt = JavaClassMimeTypeConversion.mimeTypeFromObject(new int[0][0][0], null);
String className = JavaClassMimeTypeConversion.classNameFromMimeType(mt);
MimeType mt = JavaClassMimeTypeUtils.mimeTypeFromObject(new int[0][0][0], null);
String className = JavaClassMimeTypeUtils.classNameFromMimeType(mt);
assertThat(Class.forName(className)).isEqualTo(int[][][].class);
}

View File

@@ -16,13 +16,6 @@
package org.springframework.cloud.stream.binder;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,20 +25,14 @@ import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.core.serializer.support.SerializationFailedException;
import org.springframework.expression.EvaluationContext;
import org.springframework.integration.codec.Codec;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
@@ -56,6 +43,7 @@ import org.springframework.util.StringUtils;
* @author Ilayaperumal Gopinathan
* @author Mark Fisher
* @author Marius Bogoevici
* @author Soby Chacko
*/
public abstract class AbstractBinder<T, C extends ConsumerProperties, P extends ProducerProperties>
implements ApplicationContextAware, InitializingBean, Binder<T, C, P> {
@@ -76,8 +64,6 @@ public abstract class AbstractBinder<T, C extends ConsumerProperties, P extends
private volatile EvaluationContext evaluationContext;
private volatile Map<String, Class<?>> payloadTypeCache = new ConcurrentHashMap<>();
/**
* For binder implementations that support a prefix, apply the prefix to the name.
*
@@ -166,107 +152,19 @@ public abstract class AbstractBinder<T, C extends ConsumerProperties, P extends
}
protected final MessageValues serializePayloadIfNecessary(Message<?> message) {
Object originalPayload = message.getPayload();
Object originalContentType = message.getHeaders().get(MessageHeaders.CONTENT_TYPE);
// Pass content type as String since some transport adapters will exclude
// CONTENT_TYPE Header otherwise
Object contentType = JavaClassMimeTypeConversion
.mimeTypeFromObject(originalPayload, ObjectUtils.nullSafeToString(originalContentType)).toString();
Object payload = serializePayloadIfNecessary(originalPayload);
MessageValues messageValues = new MessageValues(message);
messageValues.setPayload(payload);
messageValues.put(MessageHeaders.CONTENT_TYPE, contentType);
if (originalContentType != null && !originalContentType.toString().equals(contentType.toString())) {
messageValues.put(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, originalContentType.toString());
}
return messageValues;
return MessageSerializationUtils.serializePayload(message, this.codec);
}
protected final byte[] serializePayloadIfNecessary(Object originalPayload) {
if (originalPayload instanceof byte[]) {
return (byte[]) originalPayload;
}
else {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
if (originalPayload instanceof String) {
return ((String) originalPayload).getBytes("UTF-8");
}
this.codec.encode(originalPayload, bos);
return bos.toByteArray();
}
catch (IOException e) {
throw new SerializationFailedException(
"unable to serialize payload [" + originalPayload.getClass().getName() + "]", e);
}
}
return MessageSerializationUtils.serializePayload(originalPayload, this.codec);
}
protected final MessageValues deserializePayloadIfNecessary(Message<?> message) {
return deserializePayloadIfNecessary(new MessageValues(message));
return MessageSerializationUtils.deserializePayload(new MessageValues(message), this.contentTypeResolver, this.codec);
}
protected final MessageValues deserializePayloadIfNecessary(MessageValues messageValues) {
Object originalPayload = messageValues.getPayload();
MimeType contentType = this.contentTypeResolver.resolve(messageValues);
Object payload = deserializePayload(originalPayload, contentType);
if (payload != null) {
messageValues.setPayload(payload);
Object originalContentType = messageValues.get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE);
// Reset content-type only if the original content type is not null (when
// receiving messages from
// non-SCSt applications).
if (originalContentType != null) {
messageValues.put(MessageHeaders.CONTENT_TYPE, originalContentType);
messageValues.remove(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE);
}
}
return messageValues;
}
private Object deserializePayload(Object payload, MimeType contentType) {
if (payload instanceof byte[]) {
if (contentType == null || MimeTypeUtils.APPLICATION_OCTET_STREAM.equals(contentType)) {
return payload;
}
else {
return deserializePayload((byte[]) payload, contentType);
}
}
return payload;
}
private Object deserializePayload(byte[] bytes, MimeType contentType) {
if ("text".equalsIgnoreCase(contentType.getType()) || MimeTypeUtils.APPLICATION_JSON.equals(contentType)) {
try {
return new String(bytes, "UTF-8");
}
catch (UnsupportedEncodingException e) {
String errorMessage = "unable to deserialize [java.lang.String]. Encoding not supported. "
+ e.getMessage();
logger.error(errorMessage);
throw new SerializationFailedException(errorMessage, e);
}
}
else {
String className = JavaClassMimeTypeConversion.classNameFromMimeType(contentType);
try {
// Cache types to avoid unnecessary ClassUtils.forName calls.
Class<?> targetType = this.payloadTypeCache.get(className);
if (targetType == null) {
targetType = ClassUtils.forName(className, null);
this.payloadTypeCache.put(className, targetType);
}
return this.codec.decode(bytes, targetType);
} // catch all exceptions that could occur during de-serialization
catch (Exception e) {
String errorMessage = "Unable to deserialize [" + className + "] using the contentType [" + contentType
+ "] " + e.getMessage();
logger.error(errorMessage);
throw new SerializationFailedException(errorMessage, e);
}
}
return MessageSerializationUtils.deserializePayload(messageValues, this.contentTypeResolver, this.codec);
}
protected String buildPartitionRoutingExpression(String expressionRoot) {
@@ -291,61 +189,4 @@ public abstract class AbstractBinder<T, C extends ConsumerProperties, P extends
template.setBackOffPolicy(backOffPolicy);
return template;
}
/**
* Handles representing any java class as a {@link MimeType}.
*
* @author David Turanski
* @author Ilayaperumal Gopinathan
*/
public abstract static class JavaClassMimeTypeConversion {
private static ConcurrentMap<String, MimeType> mimeTypesCache = new ConcurrentHashMap<>();
static MimeType mimeTypeFromObject(Object payload, String originalContentType) {
Assert.notNull(payload, "payload object cannot be null.");
if (payload instanceof byte[]) {
return MimeTypeUtils.APPLICATION_OCTET_STREAM;
}
if (payload instanceof String) {
return MimeTypeUtils.APPLICATION_JSON_VALUE.equals(originalContentType) ? MimeTypeUtils.APPLICATION_JSON
: MimeTypeUtils.TEXT_PLAIN;
}
String className = payload.getClass().getName();
MimeType mimeType = mimeTypesCache.get(className);
if (mimeType == null) {
String modifiedClassName = className;
if (payload.getClass().isArray()) {
// Need to remove trailing ';' for an object array, e.g.
// "[Ljava.lang.String;" or multi-dimensional
// "[[[Ljava.lang.String;"
if (modifiedClassName.endsWith(";")) {
modifiedClassName = modifiedClassName.substring(0, modifiedClassName.length() - 1);
}
// Wrap in quotes to handle the illegal '[' character
modifiedClassName = "\"" + modifiedClassName + "\"";
}
mimeType = MimeType.valueOf("application/x-java-object;type=" + modifiedClassName);
mimeTypesCache.put(className, mimeType);
}
return mimeType;
}
static String classNameFromMimeType(MimeType mimeType) {
Assert.notNull(mimeType, "mimeType cannot be null.");
String className = mimeType.getParameter("type");
if (className == null) {
return null;
}
// unwrap quotes if any
className = className.replace("\"", "");
// restore trailing ';'
if (className.contains("[L")) {
className += ";";
}
return className;
}
}
}

View File

@@ -0,0 +1,94 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
/**
* Handles representing any java class as a {@link MimeType}.
*
* @author David Turanski
* @author Ilayaperumal Gopinathan
* @author Soby Chacko
*/
public abstract class JavaClassMimeTypeUtils {
private static ConcurrentMap<String, MimeType> mimeTypesCache = new ConcurrentHashMap<>();
/**
* Convert payload to {@link MimeType} based on the content type on the message.
*
* @param payload the payload to convert
* @param originalContentType content type on the message
* @return converted {@link MimeType}
*/
public static MimeType mimeTypeFromObject(Object payload, String originalContentType) {
Assert.notNull(payload, "payload object cannot be null.");
if (payload instanceof byte[]) {
return MimeTypeUtils.APPLICATION_OCTET_STREAM;
}
if (payload instanceof String) {
return MimeTypeUtils.APPLICATION_JSON_VALUE.equals(originalContentType) ? MimeTypeUtils.APPLICATION_JSON
: MimeTypeUtils.TEXT_PLAIN;
}
String className = payload.getClass().getName();
MimeType mimeType = mimeTypesCache.get(className);
if (mimeType == null) {
String modifiedClassName = className;
if (payload.getClass().isArray()) {
// Need to remove trailing ';' for an object array, e.g.
// "[Ljava.lang.String;" or multi-dimensional
// "[[[Ljava.lang.String;"
if (modifiedClassName.endsWith(";")) {
modifiedClassName = modifiedClassName.substring(0, modifiedClassName.length() - 1);
}
// Wrap in quotes to handle the illegal '[' character
modifiedClassName = "\"" + modifiedClassName + "\"";
}
mimeType = MimeType.valueOf("application/x-java-object;type=" + modifiedClassName);
mimeTypesCache.put(className, mimeType);
}
return mimeType;
}
/**
* Retrieve the class name from the type parameter in {@link MimeType}.
*
* @param mimeType {@link MimeType} to retrieve class name from
* @return class name from the type parameter in MimeType and null if the class name cannot be determined
*/
public static String classNameFromMimeType(MimeType mimeType) {
Assert.notNull(mimeType, "mimeType cannot be null.");
String className = mimeType.getParameter("type");
if (className == null) {
return null;
}
// unwrap quotes if any
className = className.replace("\"", "");
// restore trailing ';'
if (className.contains("[L")) {
className += ";";
}
return className;
}
}

View File

@@ -0,0 +1,166 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.core.serializer.support.SerializationFailedException;
import org.springframework.integration.codec.Codec;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.ContentTypeResolver;
import org.springframework.util.ClassUtils;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ObjectUtils;
/**
* Utility class for serializing and de-serializing the message payload.
*
* @author Soby Chacko
*/
public abstract class MessageSerializationUtils {
private static final Map<String, Class<?>> payloadTypeCache = new ConcurrentHashMap<>();
/**
* Serialize the message payload unless it is a byte array.
*
* @param message the message with the payload to serialize
* @param codec the codec used for serialization
* @return the Message with teh serialized payload
*/
public static MessageValues serializePayload(Message<?> message, Codec codec) {
Object originalPayload = message.getPayload();
Object originalContentType = message.getHeaders().get(MessageHeaders.CONTENT_TYPE);
// Pass content type as String since some transport adapters will exclude
// CONTENT_TYPE Header otherwise
Object contentType = JavaClassMimeTypeUtils
.mimeTypeFromObject(originalPayload, ObjectUtils.nullSafeToString(originalContentType)).toString();
Object payload = serializePayload(originalPayload, codec);
MessageValues messageValues = new MessageValues(message);
messageValues.setPayload(payload);
messageValues.put(MessageHeaders.CONTENT_TYPE, contentType);
if (originalContentType != null && !originalContentType.toString().equals(contentType.toString())) {
messageValues.put(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, originalContentType.toString());
}
return messageValues;
}
/**
* Serialize the payload object if it is not a byte array.
*
* @param originalPayload the payload to serialize
* @param codec the codec used for serialization
* @return the serialized byte array or the original payload if it is already a byte array
* @throws SerializationFailedException thrown when serialization failed
*/
public static byte[] serializePayload(Object originalPayload, Codec codec) {
if (originalPayload instanceof byte[]) {
return (byte[]) originalPayload;
}
else {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
if (originalPayload instanceof String) {
return ((String) originalPayload).getBytes("UTF-8");
}
codec.encode(originalPayload, bos);
return bos.toByteArray();
}
catch (IOException e) {
throw new SerializationFailedException(
"unable to serialize payload [" + originalPayload.getClass().getName() + "]", e);
}
}
}
/**
* De-serialize the message payload if necessary.
*
* @param messageValues message with the payload to deserialize
* @param contentTypeResolver used for resolving the mime type.
* @param codec used for deserialization
* @return Deserialized Message.
*/
public static MessageValues deserializePayload(MessageValues messageValues, ContentTypeResolver contentTypeResolver,
Codec codec) {
Object originalPayload = messageValues.getPayload();
MimeType contentType = contentTypeResolver.resolve(new MessageHeaders(messageValues.getHeaders()));
Object payload = deserializePayload(originalPayload, contentType, codec);
if (payload != null) {
messageValues.setPayload(payload);
Object originalContentType = messageValues.get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE);
// Reset content-type only if the original content type is not null (when
// receiving messages from
// non-SCSt applications).
if (originalContentType != null) {
messageValues.put(MessageHeaders.CONTENT_TYPE, originalContentType);
messageValues.remove(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE);
}
}
return messageValues;
}
private static Object deserializePayload(Object payload, MimeType contentType, Codec codec) {
if (payload instanceof byte[]) {
if (contentType == null || MimeTypeUtils.APPLICATION_OCTET_STREAM.equals(contentType)) {
return payload;
}
else {
return deserializePayload((byte[]) payload, contentType, payloadTypeCache, codec);
}
}
return payload;
}
private static Object deserializePayload(byte[] bytes, MimeType contentType,
Map<String, Class<?>> payloadTypeCache, Codec codec) {
if ("text".equalsIgnoreCase(contentType.getType()) || MimeTypeUtils.APPLICATION_JSON.equals(contentType)) {
try {
return new String(bytes, "UTF-8");
}
catch (UnsupportedEncodingException e) {
String errorMessage = "unable to deserialize [java.lang.String]. Encoding not supported. "
+ e.getMessage();
throw new SerializationFailedException(errorMessage, e);
}
}
else {
String className = JavaClassMimeTypeUtils.classNameFromMimeType(contentType);
try {
// Cache types to avoid unnecessary ClassUtils.forName calls.
Class<?> targetType = payloadTypeCache.get(className);
if (targetType == null) {
targetType = ClassUtils.forName(className, null);
payloadTypeCache.put(className, targetType);
}
return codec.decode(bytes, targetType);
} // catch all exceptions that could occur during de-serialization
catch (Exception e) {
String errorMessage = "Unable to deserialize [" + className + "] using the contentType [" + contentType
+ "] " + e.getMessage();
throw new SerializationFailedException(errorMessage, e);
}
}
}
}