diff --git a/.gitignore b/.gitignore index 42bdaef55..0dd7fdbd6 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ _site/ dump.rdb .apt_generated artifacts +**/dependency-reduced-pom.xml diff --git a/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc b/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc index ff74e0a00..df3c6f674 100644 --- a/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc +++ b/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc @@ -1577,6 +1577,13 @@ A client for the Spring Cloud Stream schema registry can be configured using the } ---- +[NOTE] +==== +The default converter is optimized to cache not only the schemas from the remote server but also the `parse()` and `toString()` methods that are quite expensive. +Because of this, it uses a `DefaultSchemaRegistryClient` that does not caches responses. If you intend to use the client directly on your code, you can request a bean that also caches responses to be created. +To do that, just add the property `spring.cloud.stream.schemaRegistryClient.cached=true` to your application properties. +==== + ==== Avro Schema Registry Client Message Converters For Spring Boot applications that have a `SchemaRegistryClient` bean registered with the application context, Spring Cloud Stream will auto-configure an Apache Avro message converter that uses the schema registry client for schema management. diff --git a/spring-cloud-stream-schema/pom.xml b/spring-cloud-stream-schema/pom.xml index 6c3d7432f..397cb42a3 100644 --- a/spring-cloud-stream-schema/pom.xml +++ b/spring-cloud-stream-schema/pom.xml @@ -8,7 +8,9 @@ 4.0.0 spring-cloud-stream-schema - + + 1.8.1 + @@ -28,7 +30,7 @@ org.apache.avro avro - 1.8.1 + ${avro.version} true @@ -47,4 +49,26 @@ test + + + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-test-sources + + schema + + + + + + ${project.basedir}/target/generated-test-sources + ${project.basedir}/src/test/resources/schemas + + + + diff --git a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/ParsedSchema.java b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/ParsedSchema.java new file mode 100644 index 000000000..c20cac7da --- /dev/null +++ b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/ParsedSchema.java @@ -0,0 +1,60 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.schema; + +import org.apache.avro.Schema; + +/** + * This class exists to avoid unnecessary parsing of schema textual representation, + * as well as calls to {@link org.apache.avro.Schema} toString method which is very expensive + * due the utilization of {@link com.fasterxml.jackson.databind.ObjectMapper} to output a JSON representation of the schema. + * + * Once a schema is found for any Class, be it a POJO or a {@link org.apache.avro.generic.GenericContainer}, + * both textual representation as well as the {@link org.apache.avro.Schema} will be stored within this class + * + * @author Vinicius Carvalho + * + */ +public class ParsedSchema { + + private final Schema schema; + + private final String representation; + + private SchemaRegistrationResponse registration; + + public ParsedSchema(Schema schema){ + this.schema = schema; + this.representation = schema.toString(); + } + + public Schema getSchema() { + return schema; + } + + public String getRepresentation() { + return representation; + } + + public SchemaRegistrationResponse getRegistration() { + return registration; + } + + public void setRegistration(SchemaRegistrationResponse registration) { + this.registration = registration; + } +} diff --git a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AvroMessageConverterAutoConfiguration.java b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AvroMessageConverterAutoConfiguration.java index 753ec7371..b4436fcb5 100644 --- a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AvroMessageConverterAutoConfiguration.java +++ b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AvroMessageConverterAutoConfiguration.java @@ -22,6 +22,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cache.CacheManager; +import org.springframework.cache.concurrent.ConcurrentMapCacheManager; import org.springframework.cloud.stream.binder.StringConvertingContentTypeResolver; import org.springframework.cloud.stream.schema.client.SchemaRegistryClient; import org.springframework.context.annotation.Bean; @@ -36,12 +38,13 @@ import org.springframework.util.ObjectUtils; @ConditionalOnClass(name = "org.apache.avro.Schema") @ConditionalOnProperty(value = "spring.cloud.stream.schemaRegistryClient.enabled", matchIfMissing = true) @ConditionalOnBean(type = "org.springframework.cloud.stream.schema.client.SchemaRegistryClient") -@EnableConfigurationProperties(AvroMessageConverterProperties.class) +@EnableConfigurationProperties({AvroMessageConverterProperties.class}) public class AvroMessageConverterAutoConfiguration { @Autowired private AvroMessageConverterProperties avroMessageConverterProperties; + @Bean @ConditionalOnMissingBean(AvroSchemaRegistryClientMessageConverter.class) public AvroSchemaRegistryClientMessageConverter avroSchemaMessageConverter( @@ -61,6 +64,13 @@ public class AvroMessageConverterAutoConfiguration { this.avroMessageConverterProperties.getSchemaLocations()); } avroSchemaRegistryClientMessageConverter.setPrefix(this.avroMessageConverterProperties.getPrefix()); + avroSchemaRegistryClientMessageConverter.setCacheManager(cacheManager()); return avroSchemaRegistryClientMessageConverter; } + + @Bean + @ConditionalOnMissingBean + public CacheManager cacheManager(){ + return new ConcurrentMapCacheManager(); + } } diff --git a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AvroSchemaRegistryClientMessageConverter.java b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AvroSchemaRegistryClientMessageConverter.java index 56ad431ce..26e27cebd 100644 --- a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AvroSchemaRegistryClientMessageConverter.java +++ b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/avro/AvroSchemaRegistryClientMessageConverter.java @@ -18,8 +18,6 @@ package org.springframework.cloud.stream.schema.avro; import java.io.IOException; import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -29,6 +27,8 @@ import org.apache.avro.reflect.ReflectData; import org.springframework.beans.factory.BeanInitializationException; import org.springframework.beans.factory.InitializingBean; +import org.springframework.cache.CacheManager; +import org.springframework.cloud.stream.schema.ParsedSchema; import org.springframework.cloud.stream.schema.SchemaNotFoundException; import org.springframework.cloud.stream.schema.SchemaReference; import org.springframework.cloud.stream.schema.SchemaRegistrationResponse; @@ -41,37 +41,53 @@ import org.springframework.util.MimeType; import org.springframework.util.ObjectUtils; /** - * A {@link org.springframework.messaging.converter.MessageConverter} - * for Apache Avro, with the ability to publish and retrieve schemas - * stored in a schema server, allowing for schema evolution in applications. - * The supported content types are in the form `application/*+avro`. + * A {@link org.springframework.messaging.converter.MessageConverter} for Apache Avro, + * with the ability to publish and retrieve schemas stored in a schema server, allowing + * for schema evolution in applications. The supported content types are in the form + * `application/*+avro`. * - * During the conversion to a message, the converter will set the 'contentType' - * header to 'application/[prefix].[subject].v[version]+avro', where: + * During the conversion to a message, the converter will set the 'contentType' header to + * 'application/[prefix].[subject].v[version]+avro', where: * *
  • - * - * - * + * + * + * *
  • * - * When converting from a message, the converter will parse the content-type - * and use it to fetch and cache the writer schema using the provided - * {@link SchemaRegistryClient}. + * When converting from a message, the converter will parse the content-type and use it to + * fetch and cache the writer schema using the provided {@link SchemaRegistryClient}. * @author Marius Bogoevici * @author Vinicius Carvalho */ -public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessageConverter implements InitializingBean { +public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessageConverter + implements InitializingBean { public static final String AVRO_FORMAT = "avro"; - public static final Pattern PREFIX_VALIDATION_PATTERN = Pattern.compile("[\\p{Alnum}]"); + public static final Pattern PREFIX_VALIDATION_PATTERN = Pattern + .compile("[\\p{Alnum}]"); + + public static final String CACHE_PREFIX = "org.springframework.cloud.stream.schema"; + + public static final String REFLECTION_CACHE_NAME = CACHE_PREFIX + ".reflectionCache"; + + public static final String SCHEMA_CACHE_NAME = CACHE_PREFIX + ".schemaCache"; + + public static final String REFERENCE_CACHE_NAME = CACHE_PREFIX + ".referenceCache"; private Pattern versionedSchema; private boolean dynamicSchemaGenerationEnabled; - private Map localSchemaMap = new HashMap<>(); + private CacheManager cacheManager; private Schema readerSchema; @@ -83,31 +99,33 @@ public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessag /** * Creates a new instance, configuring it with a {@link SchemaRegistryClient}. - * @param schemaRegistryClient the {@link SchemaRegistryClient} used to interact with the schema registry server. + * @param schemaRegistryClient the {@link SchemaRegistryClient} used to interact with + * the schema registry server. */ - public AvroSchemaRegistryClientMessageConverter(SchemaRegistryClient schemaRegistryClient) { + public AvroSchemaRegistryClientMessageConverter( + SchemaRegistryClient schemaRegistryClient) { super(Arrays.asList(new MimeType("application", "*+avro"))); Assert.notNull(schemaRegistryClient, "cannot be null"); this.schemaRegistryClient = schemaRegistryClient; } - /** - * Allows the converter to generate and register schemas automatically. - * If set to false, it only allows the converter to use pre-registered schemas. - * Default 'true'. - * @param dynamicSchemaGenerationEnabled true if dynamic schema generation is enabled - */ - public void setDynamicSchemaGenerationEnabled(boolean dynamicSchemaGenerationEnabled) { - this.dynamicSchemaGenerationEnabled = dynamicSchemaGenerationEnabled; - } - public boolean isDynamicSchemaGenerationEnabled() { return this.dynamicSchemaGenerationEnabled; } /** - * A set of locations where the converter can load schemas from. - * Schemas provided at these locations will be registered automatically. + * Allows the converter to generate and register schemas automatically. If set to + * false, it only allows the converter to use pre-registered schemas. Default 'true'. + * @param dynamicSchemaGenerationEnabled true if dynamic schema generation is enabled + */ + public void setDynamicSchemaGenerationEnabled( + boolean dynamicSchemaGenerationEnabled) { + this.dynamicSchemaGenerationEnabled = dynamicSchemaGenerationEnabled; + } + + /** + * A set of locations where the converter can load schemas from. Schemas provided at + * these locations will be registered automatically. * * @param schemaLocations */ @@ -122,14 +140,15 @@ public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessag */ public void setPrefix(String prefix) { Assert.hasText(prefix, "Prefix cannot be empty"); - Assert.isTrue(!PREFIX_VALIDATION_PATTERN.matcher(this.prefix).matches(), "Invalid prefix:" + this.prefix); + Assert.isTrue(!PREFIX_VALIDATION_PATTERN.matcher(this.prefix).matches(), + "Invalid prefix:" + this.prefix); this.prefix = prefix; } @Override public void afterPropertiesSet() throws Exception { - this.versionedSchema = Pattern.compile( - "application/" + this.prefix + "\\.([\\p{Alnum}\\$\\.]+)\\.v(\\p{Digit}+)\\+avro"); + this.versionedSchema = Pattern.compile("application/" + this.prefix + + "\\.([\\p{Alnum}\\$\\.]+)\\.v(\\p{Digit}+)\\+avro"); if (!ObjectUtils.isEmpty(this.schemaLocations)) { this.logger.info("Scanning avro schema resources on classpath"); if (this.logger.isInfoEnabled()) { @@ -139,18 +158,23 @@ public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessag try { Schema schema = parseSchema(schemaLocation); if (this.logger.isInfoEnabled()) { - this.logger.info("Resource " + schemaLocation.getFilename() + " parsed into schema " + schema - .getNamespace() + "." + schema.getName()); + this.logger.info("Resource " + schemaLocation.getFilename() + + " parsed into schema " + schema.getNamespace() + "." + + schema.getName()); } - this.schemaRegistryClient.register(toSubject(schema), AVRO_FORMAT, schema.toString(true)); + this.schemaRegistryClient.register(toSubject(schema), AVRO_FORMAT, + schema.toString(true)); if (this.logger.isInfoEnabled()) { - this.logger.info("Schema " + schema.getName() + " registered with id " + schema); + this.logger.info("Schema " + schema.getName() + + " registered with id " + schema); } - this.localSchemaMap.put(schema.getNamespace() + "." + schema.getName(), schema); + this.cacheManager.getCache(REFLECTION_CACHE_NAME) + .put(schema.getNamespace() + "." + schema.getName(), schema); } catch (IOException e) { if (this.logger.isWarnEnabled()) { - this.logger.warn("Failed to parse schema at " + schemaLocation.getFilename(), e); + this.logger.warn("Failed to parse schema at " + + schemaLocation.getFilename(), e); } } } @@ -179,25 +203,34 @@ public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessag @Override protected Schema resolveSchemaForWriting(Object payload, MessageHeaders headers, MimeType hintedContentType) { + Schema schema; - SchemaReference schemaReference = extractSchemaReference(hintedContentType); - // the mimeType does not contain a schema reference - if (schemaReference == null) { - schema = extractSchemaForWriting(payload); - SchemaRegistrationResponse schemaRegistrationResponse = this.schemaRegistryClient.register( - toSubject(schema), AVRO_FORMAT, schema.toString(true)); - schemaReference = schemaRegistrationResponse.getSchemaReference(); + schema = extractSchemaForWriting(payload); + ParsedSchema parsedSchema = this.cacheManager.getCache(REFERENCE_CACHE_NAME) + .get(schema, ParsedSchema.class); + + if (parsedSchema == null) { + parsedSchema = new ParsedSchema(schema); + this.cacheManager.getCache(REFERENCE_CACHE_NAME).putIfAbsent(schema, + parsedSchema); } - else { - Schema.Parser parser = new Schema.Parser(); - String schemaContents = this.schemaRegistryClient.fetch(schemaReference); - schema = parser.parse(schemaContents); + + if (parsedSchema.getRegistration() == null) { + SchemaRegistrationResponse response = this.schemaRegistryClient.register( + toSubject(schema), AVRO_FORMAT, parsedSchema.getRepresentation()); + parsedSchema.setRegistration(response); + } + + SchemaReference schemaReference = parsedSchema.getRegistration() + .getSchemaReference(); + if (headers instanceof MutableMessageHeaders) { headers.put(MessageHeaders.CONTENT_TYPE, - "application/vnd." + schemaReference.getSubject() + ".v" + schemaReference - .getVersion() + "+avro"); + "application/vnd." + schemaReference.getSubject() + ".v" + + schemaReference.getVersion() + "+avro"); } + return schema; } @@ -216,12 +249,22 @@ public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessag protected Schema resolveWriterSchemaForDeserialization(MimeType mimeType) { if (this.readerSchema == null) { Schema schema = null; + ParsedSchema parsedSchema = null; SchemaReference schemaReference = extractSchemaReference(mimeType); if (schemaReference != null) { - String schemaContent = this.schemaRegistryClient.fetch(schemaReference); - schema = new Schema.Parser().parse(schemaContent); + parsedSchema = cacheManager.getCache(REFERENCE_CACHE_NAME) + .get(schemaReference, ParsedSchema.class); + if (parsedSchema == null) { + String schemaContent = this.schemaRegistryClient + .fetch(schemaReference); + schema = new Schema.Parser().parse(schemaContent); + parsedSchema = new ParsedSchema(schema); + cacheManager.getCache(REFERENCE_CACHE_NAME) + .putIfAbsent(schemaReference, parsedSchema); + } + } - return schema; + return parsedSchema.getSchema(); } else { return this.readerSchema; @@ -255,20 +298,25 @@ public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessag } } else { - schema = this.localSchemaMap.get(payload.getClass().getName()); + schema = this.cacheManager.getCache(REFLECTION_CACHE_NAME) + .get(payload.getClass().getName(), Schema.class); if (schema == null) { if (!isDynamicSchemaGenerationEnabled()) { - throw new SchemaNotFoundException( - String.format("No schema found in the local cache for %s, and dynamic schema generation " + - "is not enabled", payload.getClass())); + throw new SchemaNotFoundException(String + .format("No schema found in the local cache for %s, and dynamic schema generation " + + "is not enabled", payload.getClass())); } else { schema = ReflectData.get().getSchema(payload.getClass()); - this.schemaRegistryClient.register(toSubject(schema), AVRO_FORMAT, schema.toString(true)); } - this.localSchemaMap.put(payload.getClass().getName(), schema); + this.cacheManager.getCache(REFLECTION_CACHE_NAME) + .put(payload.getClass().getName(), schema); } } return schema; } + + public void setCacheManager(CacheManager cacheManager) { + this.cacheManager = cacheManager; + } } diff --git a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/CachingRegistryClient.java b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/CachingRegistryClient.java new file mode 100644 index 000000000..1f2a93a99 --- /dev/null +++ b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/CachingRegistryClient.java @@ -0,0 +1,68 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.schema.client; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cache.CacheManager; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.cloud.stream.schema.SchemaReference; +import org.springframework.cloud.stream.schema.SchemaRegistrationResponse; + +/** + * @author Vinicius Carvalho + */ +public class CachingRegistryClient implements SchemaRegistryClient { + + protected static final String CACHE_PREFIX = "org.springframework.cloud.stream.schema.client"; + + protected static final String ID_CACHE = CACHE_PREFIX + ".schemaByIdCache"; + + protected static final String REF_CACHE = CACHE_PREFIX + ".schemaByReferenceCache"; + + private SchemaRegistryClient delegate; + + @Autowired + private CacheManager cacheManager; + + public CachingRegistryClient(SchemaRegistryClient delegate) { + this.delegate = delegate; + } + + @Override + public SchemaRegistrationResponse register(String subject, String format, String schema) { + SchemaRegistrationResponse response = delegate.register(subject,format,schema); + cacheManager.getCache(ID_CACHE).put(response.getSchemaReference(),schema); + cacheManager.getCache(REF_CACHE).put(response.getId(),schema); + return response; + } + + @Override + @Cacheable(cacheNames = REF_CACHE) + public String fetch(SchemaReference schemaReference) { + return delegate.fetch(schemaReference); + } + + @Override + @Cacheable(cacheNames = ID_CACHE) + public String fetch(int id) { + return delegate.fetch(id); + } + + public void setDelegate(SchemaRegistryClient delegate) { + this.delegate = delegate; + } +} diff --git a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/DefaultSchemaRegistryClient.java b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/DefaultSchemaRegistryClient.java index fa0f75419..92d82eb0b 100644 --- a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/DefaultSchemaRegistryClient.java +++ b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/DefaultSchemaRegistryClient.java @@ -27,6 +27,7 @@ import org.springframework.web.client.RestTemplate; /** * @author Marius Bogoevici + * @author Vinicius Carvalho */ public class DefaultSchemaRegistryClient implements SchemaRegistryClient { diff --git a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/config/SchemaRegistryClientConfiguration.java b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/config/SchemaRegistryClientConfiguration.java index f0fa71393..5eceec876 100644 --- a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/config/SchemaRegistryClientConfiguration.java +++ b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/config/SchemaRegistryClientConfiguration.java @@ -16,7 +16,9 @@ package org.springframework.cloud.stream.schema.client.config; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.stream.schema.client.CachingRegistryClient; import org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient; import org.springframework.cloud.stream.schema.client.SchemaRegistryClient; import org.springframework.context.annotation.Bean; @@ -25,17 +27,26 @@ import org.springframework.util.StringUtils; /** * @author Marius Bogoevici + * @author Vinicius Carvalho */ @Configuration @EnableConfigurationProperties(SchemaRegistryClientProperties.class) public class SchemaRegistryClientConfiguration { + @Autowired + private SchemaRegistryClientProperties schemaRegistryClientProperties; + @Bean - public SchemaRegistryClient schemaRegistryClient(SchemaRegistryClientProperties schemaRegistryClientProperties) { + public SchemaRegistryClient schemaRegistryClient() { DefaultSchemaRegistryClient defaultSchemaRegistryClient = new DefaultSchemaRegistryClient(); + if (StringUtils.hasText(schemaRegistryClientProperties.getEndpoint())) { defaultSchemaRegistryClient.setEndpoint(schemaRegistryClientProperties.getEndpoint()); } - return defaultSchemaRegistryClient; + + SchemaRegistryClient client = (schemaRegistryClientProperties.isCached()) ? new CachingRegistryClient(defaultSchemaRegistryClient) : defaultSchemaRegistryClient; + + return client; } + } diff --git a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/config/SchemaRegistryClientProperties.java b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/config/SchemaRegistryClientProperties.java index 94416ce0d..00c296b25 100644 --- a/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/config/SchemaRegistryClientProperties.java +++ b/spring-cloud-stream-schema/src/main/java/org/springframework/cloud/stream/schema/client/config/SchemaRegistryClientProperties.java @@ -20,13 +20,14 @@ import org.springframework.boot.context.properties.ConfigurationProperties; /** * @author Marius Bogoevici + * @author Vinicius Carvalho */ @ConfigurationProperties(prefix = "spring.cloud.stream.schemaRegistryClient") public class SchemaRegistryClientProperties { private String endpoint; - private boolean enabled = true; + private boolean cached = false; public String getEndpoint() { return this.endpoint; @@ -36,11 +37,11 @@ public class SchemaRegistryClientProperties { this.endpoint = endpoint; } - public boolean isEnabled() { - return enabled; + public boolean isCached() { + return cached; } - public void setEnabled(boolean enabled) { - this.enabled = enabled; + public void setCached(boolean cached) { + this.cached = cached; } } diff --git a/spring-cloud-stream-schema/src/test/java/org/springframework/cloud/schema/avro/AvroMessageConverterSerializationTests.java b/spring-cloud-stream-schema/src/test/java/org/springframework/cloud/schema/avro/AvroMessageConverterSerializationTests.java new file mode 100644 index 000000000..bf542a154 --- /dev/null +++ b/spring-cloud-stream-schema/src/test/java/org/springframework/cloud/schema/avro/AvroMessageConverterSerializationTests.java @@ -0,0 +1,106 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.schema.avro; + +import java.util.Collections; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import example.avro.User; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.springframework.boot.SpringApplication; +import org.springframework.cache.concurrent.ConcurrentMapCacheManager; +import org.springframework.cloud.stream.binder.StringConvertingContentTypeResolver; +import org.springframework.cloud.stream.schema.SchemaReference; +import org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter; +import org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient; +import org.springframework.cloud.stream.schema.client.SchemaRegistryClient; +import org.springframework.cloud.stream.schema.server.SchemaRegistryServerApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.integration.support.MutableMessageHeaders; +import org.springframework.messaging.Message; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; + +/** + * @author Vinicius Carvalho + */ +public class AvroMessageConverterSerializationTests { + + Pattern versionedSchema = Pattern.compile( + "application/" + "vnd" + "\\.([\\p{Alnum}\\$\\.]+)\\.v(\\p{Digit}+)\\+avro"); + + private ConfigurableApplicationContext schemaRegistryServerContext; + Log logger = LogFactory.getLog(getClass()); + + + @Before + public void setup(){ + schemaRegistryServerContext = SpringApplication.run( + SchemaRegistryServerApplication.class); + } + + @After + public void tearDown() { + schemaRegistryServerContext.close(); + } + + @Test + public void sourceWriteSameVersion() throws Exception { + User specificRecord = new User(); + specificRecord.setName("joe"); + Schema v1 = new Schema.Parser().parse(AvroMessageConverterSerializationTests.class.getClassLoader().getResourceAsStream("schemas/user.avsc")); + GenericRecord genericRecord = new GenericData.Record(v1); + genericRecord.put("name","joe"); + SchemaRegistryClient client = new DefaultSchemaRegistryClient(); + AvroSchemaRegistryClientMessageConverter converter = new AvroSchemaRegistryClientMessageConverter(client); + converter.setDynamicSchemaGenerationEnabled(false); + converter.setContentTypeResolver(new StringConvertingContentTypeResolver()); + converter.setCacheManager(new ConcurrentMapCacheManager()); + converter.afterPropertiesSet(); + + Message specificMessage = converter.toMessage(specificRecord,new MutableMessageHeaders(Collections.emptyMap()), MimeTypeUtils.parseMimeType("application/*+avro")); + SchemaReference specificRef = extractSchemaReference( MimeTypeUtils.parseMimeType(specificMessage.getHeaders().get("contentType").toString())); + + Message genericMessage = converter.toMessage(genericRecord,new MutableMessageHeaders(Collections.emptyMap()), MimeTypeUtils.parseMimeType("application/*+avro")); + SchemaReference genericRef = extractSchemaReference( MimeTypeUtils.parseMimeType(genericMessage.getHeaders().get("contentType").toString())); + + Assert.assertEquals(genericRef,specificRef); + Assert.assertEquals(1, genericRef.getVersion()); + } + + private SchemaReference extractSchemaReference(MimeType mimeType) { + SchemaReference schemaReference = null; + Matcher schemaMatcher = this.versionedSchema.matcher(mimeType.toString()); + if (schemaMatcher.find()) { + String subject = schemaMatcher.group(1); + Integer version = Integer.parseInt(schemaMatcher.group(2)); + schemaReference = new SchemaReference(subject, version, AvroSchemaRegistryClientMessageConverter.AVRO_FORMAT); + } + return schemaReference; + } + +} diff --git a/spring-cloud-stream-schema/src/test/java/org/springframework/cloud/schema/avro/AvroSchemaRegistryClientMessageConverterTests.java b/spring-cloud-stream-schema/src/test/java/org/springframework/cloud/schema/avro/AvroSchemaRegistryClientMessageConverterTests.java index 016ddef2f..666a5d118 100644 --- a/spring-cloud-stream-schema/src/test/java/org/springframework/cloud/schema/avro/AvroSchemaRegistryClientMessageConverterTests.java +++ b/spring-cloud-stream-schema/src/test/java/org/springframework/cloud/schema/avro/AvroSchemaRegistryClientMessageConverterTests.java @@ -108,7 +108,7 @@ public class AvroSchemaRegistryClientMessageConverterTests { assertThat(receivedPojos.get(1)).isNotSameAs(firstOutboundUser2); assertThat(receivedPojos.get(1).getFavoriteColor()).isEqualTo(firstOutboundUser2.getFavoriteColor()); assertThat(receivedPojos.get(1).getName()).isEqualTo(firstOutboundUser2.getName()); - assertThat(receivedPojos.get(1).getFavoritePlace()).isEqualTo("NYC"); + assertThat(receivedPojos.get(1).getFavoritePlace()).isEqualTo("Boston"); assertThat(receivedPojos.get(2)).isNotSameAs(secondBarOutboundPojo); diff --git a/spring-cloud-stream-schema/src/test/java/org/springframework/cloud/schema/avro/AvroStubSchemaRegistryClientMessageConverterTests.java b/spring-cloud-stream-schema/src/test/java/org/springframework/cloud/schema/avro/AvroStubSchemaRegistryClientMessageConverterTests.java index c21e50c9d..5427e7f38 100644 --- a/spring-cloud-stream-schema/src/test/java/org/springframework/cloud/schema/avro/AvroStubSchemaRegistryClientMessageConverterTests.java +++ b/spring-cloud-stream-schema/src/test/java/org/springframework/cloud/schema/avro/AvroStubSchemaRegistryClientMessageConverterTests.java @@ -49,6 +49,7 @@ public class AvroStubSchemaRegistryClientMessageConverterTests { public void testSendMessage() throws Exception { ConfigurableApplicationContext sourceContext = SpringApplication.run(AvroSourceApplication.class, "--server.port=0", + "--debug", "--spring.jmx.enabled=false", "--spring.cloud.stream.bindings.output.contentType=application/*+avro", "--spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true"); @@ -103,7 +104,7 @@ public class AvroStubSchemaRegistryClientMessageConverterTests { assertThat(receivedPojos.get(1)).isNotSameAs(firstOutboundUser2); assertThat(receivedPojos.get(1).getFavoriteColor()).isEqualTo(firstOutboundUser2.getFavoriteColor()); assertThat(receivedPojos.get(1).getName()).isEqualTo(firstOutboundUser2.getName()); - assertThat(receivedPojos.get(1).getFavoritePlace()).isEqualTo("NYC"); + assertThat(receivedPojos.get(1).getFavoritePlace()).isEqualTo("Boston"); assertThat(receivedPojos.get(2)).isNotSameAs(secondBarOutboundPojo); diff --git a/spring-cloud-stream-schema/src/test/resources/schemas/user.avsc b/spring-cloud-stream-schema/src/test/resources/schemas/user.avsc new file mode 100644 index 000000000..f5ef8c98d --- /dev/null +++ b/spring-cloud-stream-schema/src/test/resources/schemas/user.avsc @@ -0,0 +1,10 @@ +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favoriteNumber", "type": ["int", "null"]}, + {"name": "favoriteColor", "type": ["string", "null"]} + + ] +} \ No newline at end of file diff --git a/spring-cloud-stream-tools/src/main/resources/checkstyle-suppressions.xml b/spring-cloud-stream-tools/src/main/resources/checkstyle-suppressions.xml index bdcd8fc68..a5be3bd96 100644 --- a/spring-cloud-stream-tools/src/main/resources/checkstyle-suppressions.xml +++ b/spring-cloud-stream-tools/src/main/resources/checkstyle-suppressions.xml @@ -5,4 +5,5 @@ + \ No newline at end of file