From 11b21d9eecf97d52eed4458d80d4bdd76bc89888 Mon Sep 17 00:00:00 2001 From: Vinicius Carvalho Date: Mon, 11 Sep 2017 09:55:07 -0400 Subject: [PATCH] Fixes issue with originalContentTypeHeader - Fixes #1072 - Added a new ContentTypeResolver that searches for originalContentType as well as contentType headers --- .../avro/AbstractAvroMessageConverter.java | 29 ++------ ...AvroMessageConverterAutoConfiguration.java | 2 - ...vroMessageConverterSerializationTests.java | 68 +++++++++++++++---- .../binder/OriginalContentTypeResolver.java | 57 ++++++++++++++++ 4 files changed, 114 insertions(+), 42 deletions(-) create mode 100644 spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/OriginalContentTypeResolver.java diff --git a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AbstractAvroMessageConverter.java b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AbstractAvroMessageConverter.java index af544935e..f00a045e7 100644 --- a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AbstractAvroMessageConverter.java +++ b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AbstractAvroMessageConverter.java @@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Collections; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; @@ -37,7 +38,7 @@ import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecord; -import org.springframework.cloud.stream.binder.BinderHeaders; +import org.springframework.cloud.stream.binder.OriginalContentTypeResolver; import org.springframework.core.io.Resource; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @@ -54,11 +55,12 @@ import org.springframework.util.MimeType; public abstract class AbstractAvroMessageConverter extends AbstractMessageConverter { protected AbstractAvroMessageConverter(MimeType supportedMimeType) { - super(supportedMimeType); + this(Collections.singletonList(supportedMimeType)); } protected AbstractAvroMessageConverter(Collection supportedMimeTypes) { super(supportedMimeTypes); + setContentTypeResolver(new OriginalContentTypeResolver()); } protected static Schema parseSchema(Resource r) throws IOException { @@ -70,29 +72,6 @@ public abstract class AbstractAvroMessageConverter extends AbstractMessageConver return super.canConvertFrom(message, targetClass) && (message.getPayload() instanceof byte[]); } - @Override - protected boolean supportsMimeType(MessageHeaders headers) { - - for (MimeType current : getSupportedMimeTypes()) { - if(mimeTypeMatches(current,headers.get(MessageHeaders.CONTENT_TYPE)) || mimeTypeMatches(current,headers.get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)) ){ - return true; - } - } - - return false; - } - - private boolean mimeTypeMatches(MimeType reference, Object target){ - if(target == null){ - return !isStrictContentTypeMatch(); - }else if(target instanceof MimeType){ - return reference.equals((MimeType)target); - }else if(target instanceof String){ - return reference.equals(MimeType.valueOf((String)target)); - } - return false; - } - @Override protected Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { Object result = null; diff --git a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AvroMessageConverterAutoConfiguration.java b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AvroMessageConverterAutoConfiguration.java index 947323270..30acd316d 100644 --- a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AvroMessageConverterAutoConfiguration.java +++ b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AvroMessageConverterAutoConfiguration.java @@ -24,7 +24,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cache.CacheManager; import org.springframework.cache.concurrent.ConcurrentMapCacheManager; -import org.springframework.cloud.stream.binder.StringConvertingContentTypeResolver; import org.springframework.cloud.stream.schema.client.SchemaRegistryClient; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -52,7 +51,6 @@ public class AvroMessageConverterAutoConfiguration { schemaRegistryClient); avroSchemaRegistryClientMessageConverter.setDynamicSchemaGenerationEnabled( this.avroMessageConverterProperties.isDynamicSchemaGenerationEnabled()); - avroSchemaRegistryClientMessageConverter.setContentTypeResolver(new StringConvertingContentTypeResolver()); if (this.avroMessageConverterProperties.getReaderSchema() != null) { avroSchemaRegistryClientMessageConverter.setReaderSchema( this.avroMessageConverterProperties.getReaderSchema()); diff --git a/spring-cloud-stream-schema/src/test/java/org/springframework/cloud/schema/avro/AvroMessageConverterSerializationTests.java b/spring-cloud-stream-schema/src/test/java/org/springframework/cloud/schema/avro/AvroMessageConverterSerializationTests.java index 9c921b896..ac624b5b8 100644 --- a/spring-cloud-stream-schema/src/test/java/org/springframework/cloud/schema/avro/AvroMessageConverterSerializationTests.java +++ b/spring-cloud-stream-schema/src/test/java/org/springframework/cloud/schema/avro/AvroMessageConverterSerializationTests.java @@ -16,6 +16,7 @@ package org.springframework.cloud.schema.avro; +import java.io.ByteArrayOutputStream; import java.util.Collections; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -24,6 +25,10 @@ import example.avro.User; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.After; @@ -32,16 +37,18 @@ import org.junit.Before; import org.junit.Test; import org.springframework.boot.SpringApplication; -import org.springframework.cache.concurrent.ConcurrentMapCacheManager; -import org.springframework.cloud.stream.binder.StringConvertingContentTypeResolver; +import org.springframework.cache.support.NoOpCacheManager; +import org.springframework.cloud.stream.binder.BinderHeaders; import org.springframework.cloud.stream.schema.SchemaReference; import org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter; import org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient; import org.springframework.cloud.stream.schema.client.SchemaRegistryClient; import org.springframework.cloud.stream.schema.server.SchemaRegistryServerApplication; import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.support.MutableMessageHeaders; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; @@ -59,8 +66,8 @@ public class AvroMessageConverterSerializationTests { @Before public void setup() { - schemaRegistryServerContext = SpringApplication.run( - SchemaRegistryServerApplication.class); + schemaRegistryServerContext = SpringApplication + .run(SchemaRegistryServerApplication.class); } @After @@ -72,33 +79,64 @@ public class AvroMessageConverterSerializationTests { public void sourceWriteSameVersion() throws Exception { User specificRecord = new User(); specificRecord.setName("joe"); - Schema v1 = new Schema.Parser().parse( - AvroMessageConverterSerializationTests.class.getClassLoader().getResourceAsStream("schemas/user.avsc")); + Schema v1 = new Schema.Parser().parse(AvroMessageConverterSerializationTests.class + .getClassLoader().getResourceAsStream("schemas/user.avsc")); GenericRecord genericRecord = new GenericData.Record(v1); genericRecord.put("name", "joe"); SchemaRegistryClient client = new DefaultSchemaRegistryClient(); - AvroSchemaRegistryClientMessageConverter converter = new AvroSchemaRegistryClientMessageConverter(client); + AvroSchemaRegistryClientMessageConverter converter = new AvroSchemaRegistryClientMessageConverter( + client, new NoOpCacheManager()); converter.setDynamicSchemaGenerationEnabled(false); - converter.setContentTypeResolver(new StringConvertingContentTypeResolver()); - converter.setCacheManager(new ConcurrentMapCacheManager()); converter.afterPropertiesSet(); Message specificMessage = converter.toMessage(specificRecord, - new MutableMessageHeaders(Collections.emptyMap()), + new MutableMessageHeaders(Collections. emptyMap()), MimeTypeUtils.parseMimeType("application/*+avro")); - SchemaReference specificRef = extractSchemaReference( - MimeTypeUtils.parseMimeType(specificMessage.getHeaders().get("contentType").toString())); + SchemaReference specificRef = extractSchemaReference(MimeTypeUtils.parseMimeType( + specificMessage.getHeaders().get("contentType").toString())); Message genericMessage = converter.toMessage(genericRecord, - new MutableMessageHeaders(Collections.emptyMap()), + new MutableMessageHeaders(Collections. emptyMap()), MimeTypeUtils.parseMimeType("application/*+avro")); - SchemaReference genericRef = extractSchemaReference( - MimeTypeUtils.parseMimeType(genericMessage.getHeaders().get("contentType").toString())); + SchemaReference genericRef = extractSchemaReference(MimeTypeUtils.parseMimeType( + genericMessage.getHeaders().get("contentType").toString())); Assert.assertEquals(genericRef, specificRef); Assert.assertEquals(1, genericRef.getVersion()); } + @Test + public void testOriginalContentTypeHeaderOnly() throws Exception { + User specificRecord = new User(); + specificRecord.setName("joe"); + Schema v1 = new Schema.Parser().parse(AvroMessageConverterSerializationTests.class + .getClassLoader().getResourceAsStream("schemas/user.avsc")); + GenericRecord genericRecord = new GenericData.Record(v1); + genericRecord.put("name", "joe"); + SchemaRegistryClient client = new DefaultSchemaRegistryClient(); + client.register("user", "avro", v1.toString()); + AvroSchemaRegistryClientMessageConverter converter = new AvroSchemaRegistryClientMessageConverter( + client, new NoOpCacheManager()); + converter.setDynamicSchemaGenerationEnabled(false); + converter.afterPropertiesSet(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DatumWriter writer = new SpecificDatumWriter<>(User.class); + Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null); + writer.write(specificRecord, encoder); + encoder.flush(); + Message source = MessageBuilder.withPayload(baos.toByteArray()) + .setHeader(MessageHeaders.CONTENT_TYPE, + MimeTypeUtils.APPLICATION_OCTET_STREAM) + .setHeader(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, + "application/vnd.user.v1+avro") + .build(); + Object converted = converter.fromMessage(source, User.class); + Assert.assertNotNull(converted); + Assert.assertEquals(specificRecord.getName().toString(), + ((User) converted).getName().toString()); + + } + private SchemaReference extractSchemaReference(MimeType mimeType) { SchemaReference schemaReference = null; Matcher schemaMatcher = this.versionedSchema.matcher(mimeType.toString()); diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/OriginalContentTypeResolver.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/OriginalContentTypeResolver.java new file mode 100644 index 000000000..66b3e56af --- /dev/null +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/OriginalContentTypeResolver.java @@ -0,0 +1,57 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.ContentTypeResolver; +import org.springframework.util.MimeType; + +/** + * @author Vinicius Carvalho + * + * Resolves contentType looking for a originalContentType header first. If not found + * returns the contentType + * + */ +public class OriginalContentTypeResolver implements ContentTypeResolver { + + private ConcurrentMap mimeTypeCache = new ConcurrentHashMap<>(); + + @Override + public MimeType resolve(MessageHeaders headers) { + Object contentType = headers + .get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE) != null + ? headers.get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE) + : headers.get(MessageHeaders.CONTENT_TYPE); + MimeType mimeType = null; + if (contentType instanceof MimeType) { + mimeType = (MimeType) contentType; + } + else if (contentType instanceof String) { + mimeType = mimeTypeCache.get(contentType); + if (mimeType == null) { + String valueAsString = (String) contentType; + mimeType = MimeType.valueOf(valueAsString); + mimeTypeCache.put(valueAsString, mimeType); + } + } + return mimeType; + } +}