Fixes issue with originalContentTypeHeader

- Fixes #1072
- Added a new ContentTypeResolver that searches for originalContentType
  as well as contentType headers
This commit is contained in:
Vinicius Carvalho
2017-09-11 09:55:07 -04:00
parent 4d8d3ebca5
commit 11b21d9eec
4 changed files with 114 additions and 42 deletions

View File

@@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
@@ -37,7 +38,7 @@ import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.binder.OriginalContentTypeResolver;
import org.springframework.core.io.Resource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@@ -54,11 +55,12 @@ import org.springframework.util.MimeType;
public abstract class AbstractAvroMessageConverter extends AbstractMessageConverter {
protected AbstractAvroMessageConverter(MimeType supportedMimeType) {
super(supportedMimeType);
this(Collections.singletonList(supportedMimeType));
}
protected AbstractAvroMessageConverter(Collection<MimeType> supportedMimeTypes) {
super(supportedMimeTypes);
setContentTypeResolver(new OriginalContentTypeResolver());
}
protected static Schema parseSchema(Resource r) throws IOException {
@@ -70,29 +72,6 @@ public abstract class AbstractAvroMessageConverter extends AbstractMessageConver
return super.canConvertFrom(message, targetClass) && (message.getPayload() instanceof byte[]);
}
@Override
protected boolean supportsMimeType(MessageHeaders headers) {
for (MimeType current : getSupportedMimeTypes()) {
if(mimeTypeMatches(current,headers.get(MessageHeaders.CONTENT_TYPE)) || mimeTypeMatches(current,headers.get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)) ){
return true;
}
}
return false;
}
private boolean mimeTypeMatches(MimeType reference, Object target){
if(target == null){
return !isStrictContentTypeMatch();
}else if(target instanceof MimeType){
return reference.equals((MimeType)target);
}else if(target instanceof String){
return reference.equals(MimeType.valueOf((String)target));
}
return false;
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object result = null;

View File

@@ -24,7 +24,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.CacheManager;
import org.springframework.cache.concurrent.ConcurrentMapCacheManager;
import org.springframework.cloud.stream.binder.StringConvertingContentTypeResolver;
import org.springframework.cloud.stream.schema.client.SchemaRegistryClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -52,7 +51,6 @@ public class AvroMessageConverterAutoConfiguration {
schemaRegistryClient);
avroSchemaRegistryClientMessageConverter.setDynamicSchemaGenerationEnabled(
this.avroMessageConverterProperties.isDynamicSchemaGenerationEnabled());
avroSchemaRegistryClientMessageConverter.setContentTypeResolver(new StringConvertingContentTypeResolver());
if (this.avroMessageConverterProperties.getReaderSchema() != null) {
avroSchemaRegistryClientMessageConverter.setReaderSchema(
this.avroMessageConverterProperties.getReaderSchema());

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.schema.avro;
import java.io.ByteArrayOutputStream;
import java.util.Collections;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -24,6 +25,10 @@ import example.avro.User;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
@@ -32,16 +37,18 @@ import org.junit.Before;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.cache.concurrent.ConcurrentMapCacheManager;
import org.springframework.cloud.stream.binder.StringConvertingContentTypeResolver;
import org.springframework.cache.support.NoOpCacheManager;
import org.springframework.cloud.stream.binder.BinderHeaders;
import org.springframework.cloud.stream.schema.SchemaReference;
import org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter;
import org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient;
import org.springframework.cloud.stream.schema.client.SchemaRegistryClient;
import org.springframework.cloud.stream.schema.server.SchemaRegistryServerApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.support.MutableMessageHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
@@ -59,8 +66,8 @@ public class AvroMessageConverterSerializationTests {
@Before
public void setup() {
schemaRegistryServerContext = SpringApplication.run(
SchemaRegistryServerApplication.class);
schemaRegistryServerContext = SpringApplication
.run(SchemaRegistryServerApplication.class);
}
@After
@@ -72,33 +79,64 @@ public class AvroMessageConverterSerializationTests {
public void sourceWriteSameVersion() throws Exception {
User specificRecord = new User();
specificRecord.setName("joe");
Schema v1 = new Schema.Parser().parse(
AvroMessageConverterSerializationTests.class.getClassLoader().getResourceAsStream("schemas/user.avsc"));
Schema v1 = new Schema.Parser().parse(AvroMessageConverterSerializationTests.class
.getClassLoader().getResourceAsStream("schemas/user.avsc"));
GenericRecord genericRecord = new GenericData.Record(v1);
genericRecord.put("name", "joe");
SchemaRegistryClient client = new DefaultSchemaRegistryClient();
AvroSchemaRegistryClientMessageConverter converter = new AvroSchemaRegistryClientMessageConverter(client);
AvroSchemaRegistryClientMessageConverter converter = new AvroSchemaRegistryClientMessageConverter(
client, new NoOpCacheManager());
converter.setDynamicSchemaGenerationEnabled(false);
converter.setContentTypeResolver(new StringConvertingContentTypeResolver());
converter.setCacheManager(new ConcurrentMapCacheManager());
converter.afterPropertiesSet();
Message specificMessage = converter.toMessage(specificRecord,
new MutableMessageHeaders(Collections.<String, Object>emptyMap()),
new MutableMessageHeaders(Collections.<String, Object> emptyMap()),
MimeTypeUtils.parseMimeType("application/*+avro"));
SchemaReference specificRef = extractSchemaReference(
MimeTypeUtils.parseMimeType(specificMessage.getHeaders().get("contentType").toString()));
SchemaReference specificRef = extractSchemaReference(MimeTypeUtils.parseMimeType(
specificMessage.getHeaders().get("contentType").toString()));
Message genericMessage = converter.toMessage(genericRecord,
new MutableMessageHeaders(Collections.<String, Object>emptyMap()),
new MutableMessageHeaders(Collections.<String, Object> emptyMap()),
MimeTypeUtils.parseMimeType("application/*+avro"));
SchemaReference genericRef = extractSchemaReference(
MimeTypeUtils.parseMimeType(genericMessage.getHeaders().get("contentType").toString()));
SchemaReference genericRef = extractSchemaReference(MimeTypeUtils.parseMimeType(
genericMessage.getHeaders().get("contentType").toString()));
Assert.assertEquals(genericRef, specificRef);
Assert.assertEquals(1, genericRef.getVersion());
}
@Test
public void testOriginalContentTypeHeaderOnly() throws Exception {
User specificRecord = new User();
specificRecord.setName("joe");
Schema v1 = new Schema.Parser().parse(AvroMessageConverterSerializationTests.class
.getClassLoader().getResourceAsStream("schemas/user.avsc"));
GenericRecord genericRecord = new GenericData.Record(v1);
genericRecord.put("name", "joe");
SchemaRegistryClient client = new DefaultSchemaRegistryClient();
client.register("user", "avro", v1.toString());
AvroSchemaRegistryClientMessageConverter converter = new AvroSchemaRegistryClientMessageConverter(
client, new NoOpCacheManager());
converter.setDynamicSchemaGenerationEnabled(false);
converter.afterPropertiesSet();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DatumWriter<User> writer = new SpecificDatumWriter<>(User.class);
Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
writer.write(specificRecord, encoder);
encoder.flush();
Message source = MessageBuilder.withPayload(baos.toByteArray())
.setHeader(MessageHeaders.CONTENT_TYPE,
MimeTypeUtils.APPLICATION_OCTET_STREAM)
.setHeader(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE,
"application/vnd.user.v1+avro")
.build();
Object converted = converter.fromMessage(source, User.class);
Assert.assertNotNull(converted);
Assert.assertEquals(specificRecord.getName().toString(),
((User) converted).getName().toString());
}
private SchemaReference extractSchemaReference(MimeType mimeType) {
SchemaReference schemaReference = null;
Matcher schemaMatcher = this.versionedSchema.matcher(mimeType.toString());

View File

@@ -0,0 +1,57 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.ContentTypeResolver;
import org.springframework.util.MimeType;
/**
* @author Vinicius Carvalho
*
* Resolves contentType looking for a originalContentType header first. If not found
* returns the contentType
*
*/
public class OriginalContentTypeResolver implements ContentTypeResolver {
private ConcurrentMap<String, MimeType> mimeTypeCache = new ConcurrentHashMap<>();
@Override
public MimeType resolve(MessageHeaders headers) {
Object contentType = headers
.get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE) != null
? headers.get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)
: headers.get(MessageHeaders.CONTENT_TYPE);
MimeType mimeType = null;
if (contentType instanceof MimeType) {
mimeType = (MimeType) contentType;
}
else if (contentType instanceof String) {
mimeType = mimeTypeCache.get(contentType);
if (mimeType == null) {
String valueAsString = (String) contentType;
mimeType = MimeType.valueOf(valueAsString);
mimeTypeCache.put(valueAsString, mimeType);
}
}
return mimeType;
}
}