Introducing caching

Fix #832 #752

Adds caching capabilities to the converter. Remote calls to the
registry, Schema Parsing and toString were very expensive operations
that are now being cached

Adds a decorator to the registryClient that allows caching of remote
invocations.

Fixing review comments
This commit is contained in:
Vinicius Carvalho
2017-02-23 08:55:51 -05:00
committed by Marius Bogoevici
parent a67fe02fd7
commit 4cd9b0c3ec
15 changed files with 423 additions and 74 deletions

1
.gitignore vendored
View File

@@ -22,3 +22,4 @@ _site/
dump.rdb
.apt_generated
artifacts
**/dependency-reduced-pom.xml

View File

@@ -1577,6 +1577,13 @@ A client for the Spring Cloud Stream schema registry can be configured using the
}
----
[NOTE]
====
The default converter is optimized to cache not only the schemas from the remote server but also the `parse()` and `toString()` methods that are quite expensive.
Because of this, it uses a `DefaultSchemaRegistryClient` that does not caches responses. If you intend to use the client directly on your code, you can request a bean that also caches responses to be created.
To do that, just add the property `spring.cloud.stream.schemaRegistryClient.cached=true` to your application properties.
====
==== Avro Schema Registry Client Message Converters
For Spring Boot applications that have a `SchemaRegistryClient` bean registered with the application context, Spring Cloud Stream will auto-configure an Apache Avro message converter that uses the schema registry client for schema management.

View File

@@ -8,7 +8,9 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-schema</artifactId>
<properties>
<avro.version>1.8.1</avro.version>
</properties>
<dependencies>
<dependency>
@@ -28,7 +30,7 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
<version>${avro.version}</version>
<optional>true</optional>
</dependency>
<dependency>
@@ -47,4 +49,26 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-test-sources</phase>
<goals>
<goal>schema</goal>
</goals>
</execution>
</executions>
<configuration>
<testOutputDirectory>${project.basedir}/target/generated-test-sources</testOutputDirectory>
<testSourceDirectory>${project.basedir}/src/test/resources/schemas</testSourceDirectory>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,60 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.schema;
import org.apache.avro.Schema;
/**
* This class exists to avoid unnecessary parsing of schema textual representation,
* as well as calls to {@link org.apache.avro.Schema} toString method which is very expensive
* due the utilization of {@link com.fasterxml.jackson.databind.ObjectMapper} to output a JSON representation of the schema.
*
* Once a schema is found for any Class, be it a POJO or a {@link org.apache.avro.generic.GenericContainer},
* both textual representation as well as the {@link org.apache.avro.Schema} will be stored within this class
*
* @author Vinicius Carvalho
*
*/
public class ParsedSchema {
private final Schema schema;
private final String representation;
private SchemaRegistrationResponse registration;
public ParsedSchema(Schema schema){
this.schema = schema;
this.representation = schema.toString();
}
public Schema getSchema() {
return schema;
}
public String getRepresentation() {
return representation;
}
public SchemaRegistrationResponse getRegistration() {
return registration;
}
public void setRegistration(SchemaRegistrationResponse registration) {
this.registration = registration;
}
}

View File

@@ -22,6 +22,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.CacheManager;
import org.springframework.cache.concurrent.ConcurrentMapCacheManager;
import org.springframework.cloud.stream.binder.StringConvertingContentTypeResolver;
import org.springframework.cloud.stream.schema.client.SchemaRegistryClient;
import org.springframework.context.annotation.Bean;
@@ -36,12 +38,13 @@ import org.springframework.util.ObjectUtils;
@ConditionalOnClass(name = "org.apache.avro.Schema")
@ConditionalOnProperty(value = "spring.cloud.stream.schemaRegistryClient.enabled", matchIfMissing = true)
@ConditionalOnBean(type = "org.springframework.cloud.stream.schema.client.SchemaRegistryClient")
@EnableConfigurationProperties(AvroMessageConverterProperties.class)
@EnableConfigurationProperties({AvroMessageConverterProperties.class})
public class AvroMessageConverterAutoConfiguration {
@Autowired
private AvroMessageConverterProperties avroMessageConverterProperties;
@Bean
@ConditionalOnMissingBean(AvroSchemaRegistryClientMessageConverter.class)
public AvroSchemaRegistryClientMessageConverter avroSchemaMessageConverter(
@@ -61,6 +64,13 @@ public class AvroMessageConverterAutoConfiguration {
this.avroMessageConverterProperties.getSchemaLocations());
}
avroSchemaRegistryClientMessageConverter.setPrefix(this.avroMessageConverterProperties.getPrefix());
avroSchemaRegistryClientMessageConverter.setCacheManager(cacheManager());
return avroSchemaRegistryClientMessageConverter;
}
@Bean
@ConditionalOnMissingBean
public CacheManager cacheManager(){
return new ConcurrentMapCacheManager();
}
}

View File

@@ -18,8 +18,6 @@ package org.springframework.cloud.stream.schema.avro;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -29,6 +27,8 @@ import org.apache.avro.reflect.ReflectData;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cache.CacheManager;
import org.springframework.cloud.stream.schema.ParsedSchema;
import org.springframework.cloud.stream.schema.SchemaNotFoundException;
import org.springframework.cloud.stream.schema.SchemaReference;
import org.springframework.cloud.stream.schema.SchemaRegistrationResponse;
@@ -41,37 +41,53 @@ import org.springframework.util.MimeType;
import org.springframework.util.ObjectUtils;
/**
* A {@link org.springframework.messaging.converter.MessageConverter}
* for Apache Avro, with the ability to publish and retrieve schemas
* stored in a schema server, allowing for schema evolution in applications.
* The supported content types are in the form `application/*+avro`.
* A {@link org.springframework.messaging.converter.MessageConverter} for Apache Avro,
* with the ability to publish and retrieve schemas stored in a schema server, allowing
* for schema evolution in applications. The supported content types are in the form
* `application/*+avro`.
*
* During the conversion to a message, the converter will set the 'contentType'
* header to 'application/[prefix].[subject].v[version]+avro', where:
* During the conversion to a message, the converter will set the 'contentType' header to
* 'application/[prefix].[subject].v[version]+avro', where:
*
* <li>
* <ul><i>prefix</i> is a configurable prefix (default 'vnd');</ul>
* <ul><i>subject</i> is a subject derived from the type of the outgoing object - typically the class name;</ul>
* <ul><i>version</i> is the schema version for the given subject;</ul>
* <ul>
* <i>prefix</i> is a configurable prefix (default 'vnd');
* </ul>
* <ul>
* <i>subject</i> is a subject derived from the type of the outgoing object - typically
* the class name;
* </ul>
* <ul>
* <i>version</i> is the schema version for the given subject;
* </ul>
* </li>
*
* When converting from a message, the converter will parse the content-type
* and use it to fetch and cache the writer schema using the provided
* {@link SchemaRegistryClient}.
* When converting from a message, the converter will parse the content-type and use it to
* fetch and cache the writer schema using the provided {@link SchemaRegistryClient}.
* @author Marius Bogoevici
* @author Vinicius Carvalho
*/
public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessageConverter implements InitializingBean {
public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessageConverter
implements InitializingBean {
public static final String AVRO_FORMAT = "avro";
public static final Pattern PREFIX_VALIDATION_PATTERN = Pattern.compile("[\\p{Alnum}]");
public static final Pattern PREFIX_VALIDATION_PATTERN = Pattern
.compile("[\\p{Alnum}]");
public static final String CACHE_PREFIX = "org.springframework.cloud.stream.schema";
public static final String REFLECTION_CACHE_NAME = CACHE_PREFIX + ".reflectionCache";
public static final String SCHEMA_CACHE_NAME = CACHE_PREFIX + ".schemaCache";
public static final String REFERENCE_CACHE_NAME = CACHE_PREFIX + ".referenceCache";
private Pattern versionedSchema;
private boolean dynamicSchemaGenerationEnabled;
private Map<String, Schema> localSchemaMap = new HashMap<>();
private CacheManager cacheManager;
private Schema readerSchema;
@@ -83,31 +99,33 @@ public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessag
/**
* Creates a new instance, configuring it with a {@link SchemaRegistryClient}.
* @param schemaRegistryClient the {@link SchemaRegistryClient} used to interact with the schema registry server.
* @param schemaRegistryClient the {@link SchemaRegistryClient} used to interact with
* the schema registry server.
*/
public AvroSchemaRegistryClientMessageConverter(SchemaRegistryClient schemaRegistryClient) {
public AvroSchemaRegistryClientMessageConverter(
SchemaRegistryClient schemaRegistryClient) {
super(Arrays.asList(new MimeType("application", "*+avro")));
Assert.notNull(schemaRegistryClient, "cannot be null");
this.schemaRegistryClient = schemaRegistryClient;
}
/**
* Allows the converter to generate and register schemas automatically.
* If set to false, it only allows the converter to use pre-registered schemas.
* Default 'true'.
* @param dynamicSchemaGenerationEnabled true if dynamic schema generation is enabled
*/
public void setDynamicSchemaGenerationEnabled(boolean dynamicSchemaGenerationEnabled) {
this.dynamicSchemaGenerationEnabled = dynamicSchemaGenerationEnabled;
}
public boolean isDynamicSchemaGenerationEnabled() {
return this.dynamicSchemaGenerationEnabled;
}
/**
* A set of locations where the converter can load schemas from.
* Schemas provided at these locations will be registered automatically.
* Allows the converter to generate and register schemas automatically. If set to
* false, it only allows the converter to use pre-registered schemas. Default 'true'.
* @param dynamicSchemaGenerationEnabled true if dynamic schema generation is enabled
*/
public void setDynamicSchemaGenerationEnabled(
boolean dynamicSchemaGenerationEnabled) {
this.dynamicSchemaGenerationEnabled = dynamicSchemaGenerationEnabled;
}
/**
* A set of locations where the converter can load schemas from. Schemas provided at
* these locations will be registered automatically.
*
* @param schemaLocations
*/
@@ -122,14 +140,15 @@ public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessag
*/
public void setPrefix(String prefix) {
Assert.hasText(prefix, "Prefix cannot be empty");
Assert.isTrue(!PREFIX_VALIDATION_PATTERN.matcher(this.prefix).matches(), "Invalid prefix:" + this.prefix);
Assert.isTrue(!PREFIX_VALIDATION_PATTERN.matcher(this.prefix).matches(),
"Invalid prefix:" + this.prefix);
this.prefix = prefix;
}
@Override
public void afterPropertiesSet() throws Exception {
this.versionedSchema = Pattern.compile(
"application/" + this.prefix + "\\.([\\p{Alnum}\\$\\.]+)\\.v(\\p{Digit}+)\\+avro");
this.versionedSchema = Pattern.compile("application/" + this.prefix
+ "\\.([\\p{Alnum}\\$\\.]+)\\.v(\\p{Digit}+)\\+avro");
if (!ObjectUtils.isEmpty(this.schemaLocations)) {
this.logger.info("Scanning avro schema resources on classpath");
if (this.logger.isInfoEnabled()) {
@@ -139,18 +158,23 @@ public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessag
try {
Schema schema = parseSchema(schemaLocation);
if (this.logger.isInfoEnabled()) {
this.logger.info("Resource " + schemaLocation.getFilename() + " parsed into schema " + schema
.getNamespace() + "." + schema.getName());
this.logger.info("Resource " + schemaLocation.getFilename()
+ " parsed into schema " + schema.getNamespace() + "."
+ schema.getName());
}
this.schemaRegistryClient.register(toSubject(schema), AVRO_FORMAT, schema.toString(true));
this.schemaRegistryClient.register(toSubject(schema), AVRO_FORMAT,
schema.toString(true));
if (this.logger.isInfoEnabled()) {
this.logger.info("Schema " + schema.getName() + " registered with id " + schema);
this.logger.info("Schema " + schema.getName()
+ " registered with id " + schema);
}
this.localSchemaMap.put(schema.getNamespace() + "." + schema.getName(), schema);
this.cacheManager.getCache(REFLECTION_CACHE_NAME)
.put(schema.getNamespace() + "." + schema.getName(), schema);
}
catch (IOException e) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Failed to parse schema at " + schemaLocation.getFilename(), e);
this.logger.warn("Failed to parse schema at "
+ schemaLocation.getFilename(), e);
}
}
}
@@ -179,25 +203,34 @@ public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessag
@Override
protected Schema resolveSchemaForWriting(Object payload, MessageHeaders headers,
MimeType hintedContentType) {
Schema schema;
SchemaReference schemaReference = extractSchemaReference(hintedContentType);
// the mimeType does not contain a schema reference
if (schemaReference == null) {
schema = extractSchemaForWriting(payload);
SchemaRegistrationResponse schemaRegistrationResponse = this.schemaRegistryClient.register(
toSubject(schema), AVRO_FORMAT, schema.toString(true));
schemaReference = schemaRegistrationResponse.getSchemaReference();
schema = extractSchemaForWriting(payload);
ParsedSchema parsedSchema = this.cacheManager.getCache(REFERENCE_CACHE_NAME)
.get(schema, ParsedSchema.class);
if (parsedSchema == null) {
parsedSchema = new ParsedSchema(schema);
this.cacheManager.getCache(REFERENCE_CACHE_NAME).putIfAbsent(schema,
parsedSchema);
}
else {
Schema.Parser parser = new Schema.Parser();
String schemaContents = this.schemaRegistryClient.fetch(schemaReference);
schema = parser.parse(schemaContents);
if (parsedSchema.getRegistration() == null) {
SchemaRegistrationResponse response = this.schemaRegistryClient.register(
toSubject(schema), AVRO_FORMAT, parsedSchema.getRepresentation());
parsedSchema.setRegistration(response);
}
SchemaReference schemaReference = parsedSchema.getRegistration()
.getSchemaReference();
if (headers instanceof MutableMessageHeaders) {
headers.put(MessageHeaders.CONTENT_TYPE,
"application/vnd." + schemaReference.getSubject() + ".v" + schemaReference
.getVersion() + "+avro");
"application/vnd." + schemaReference.getSubject() + ".v"
+ schemaReference.getVersion() + "+avro");
}
return schema;
}
@@ -216,12 +249,22 @@ public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessag
protected Schema resolveWriterSchemaForDeserialization(MimeType mimeType) {
if (this.readerSchema == null) {
Schema schema = null;
ParsedSchema parsedSchema = null;
SchemaReference schemaReference = extractSchemaReference(mimeType);
if (schemaReference != null) {
String schemaContent = this.schemaRegistryClient.fetch(schemaReference);
schema = new Schema.Parser().parse(schemaContent);
parsedSchema = cacheManager.getCache(REFERENCE_CACHE_NAME)
.get(schemaReference, ParsedSchema.class);
if (parsedSchema == null) {
String schemaContent = this.schemaRegistryClient
.fetch(schemaReference);
schema = new Schema.Parser().parse(schemaContent);
parsedSchema = new ParsedSchema(schema);
cacheManager.getCache(REFERENCE_CACHE_NAME)
.putIfAbsent(schemaReference, parsedSchema);
}
}
return schema;
return parsedSchema.getSchema();
}
else {
return this.readerSchema;
@@ -255,20 +298,25 @@ public class AvroSchemaRegistryClientMessageConverter extends AbstractAvroMessag
}
}
else {
schema = this.localSchemaMap.get(payload.getClass().getName());
schema = this.cacheManager.getCache(REFLECTION_CACHE_NAME)
.get(payload.getClass().getName(), Schema.class);
if (schema == null) {
if (!isDynamicSchemaGenerationEnabled()) {
throw new SchemaNotFoundException(
String.format("No schema found in the local cache for %s, and dynamic schema generation " +
"is not enabled", payload.getClass()));
throw new SchemaNotFoundException(String
.format("No schema found in the local cache for %s, and dynamic schema generation "
+ "is not enabled", payload.getClass()));
}
else {
schema = ReflectData.get().getSchema(payload.getClass());
this.schemaRegistryClient.register(toSubject(schema), AVRO_FORMAT, schema.toString(true));
}
this.localSchemaMap.put(payload.getClass().getName(), schema);
this.cacheManager.getCache(REFLECTION_CACHE_NAME)
.put(payload.getClass().getName(), schema);
}
}
return schema;
}
public void setCacheManager(CacheManager cacheManager) {
this.cacheManager = cacheManager;
}
}

View File

@@ -0,0 +1,68 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.schema.client;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cloud.stream.schema.SchemaReference;
import org.springframework.cloud.stream.schema.SchemaRegistrationResponse;
/**
* @author Vinicius Carvalho
*/
public class CachingRegistryClient implements SchemaRegistryClient {
protected static final String CACHE_PREFIX = "org.springframework.cloud.stream.schema.client";
protected static final String ID_CACHE = CACHE_PREFIX + ".schemaByIdCache";
protected static final String REF_CACHE = CACHE_PREFIX + ".schemaByReferenceCache";
private SchemaRegistryClient delegate;
@Autowired
private CacheManager cacheManager;
public CachingRegistryClient(SchemaRegistryClient delegate) {
this.delegate = delegate;
}
@Override
public SchemaRegistrationResponse register(String subject, String format, String schema) {
SchemaRegistrationResponse response = delegate.register(subject,format,schema);
cacheManager.getCache(ID_CACHE).put(response.getSchemaReference(),schema);
cacheManager.getCache(REF_CACHE).put(response.getId(),schema);
return response;
}
@Override
@Cacheable(cacheNames = REF_CACHE)
public String fetch(SchemaReference schemaReference) {
return delegate.fetch(schemaReference);
}
@Override
@Cacheable(cacheNames = ID_CACHE)
public String fetch(int id) {
return delegate.fetch(id);
}
public void setDelegate(SchemaRegistryClient delegate) {
this.delegate = delegate;
}
}

View File

@@ -27,6 +27,7 @@ import org.springframework.web.client.RestTemplate;
/**
* @author Marius Bogoevici
* @author Vinicius Carvalho
*/
public class DefaultSchemaRegistryClient implements SchemaRegistryClient {

View File

@@ -16,7 +16,9 @@
package org.springframework.cloud.stream.schema.client.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.schema.client.CachingRegistryClient;
import org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient;
import org.springframework.cloud.stream.schema.client.SchemaRegistryClient;
import org.springframework.context.annotation.Bean;
@@ -25,17 +27,26 @@ import org.springframework.util.StringUtils;
/**
* @author Marius Bogoevici
* @author Vinicius Carvalho
*/
@Configuration
@EnableConfigurationProperties(SchemaRegistryClientProperties.class)
public class SchemaRegistryClientConfiguration {
@Autowired
private SchemaRegistryClientProperties schemaRegistryClientProperties;
@Bean
public SchemaRegistryClient schemaRegistryClient(SchemaRegistryClientProperties schemaRegistryClientProperties) {
public SchemaRegistryClient schemaRegistryClient() {
DefaultSchemaRegistryClient defaultSchemaRegistryClient = new DefaultSchemaRegistryClient();
if (StringUtils.hasText(schemaRegistryClientProperties.getEndpoint())) {
defaultSchemaRegistryClient.setEndpoint(schemaRegistryClientProperties.getEndpoint());
}
return defaultSchemaRegistryClient;
SchemaRegistryClient client = (schemaRegistryClientProperties.isCached()) ? new CachingRegistryClient(defaultSchemaRegistryClient) : defaultSchemaRegistryClient;
return client;
}
}

View File

@@ -20,13 +20,14 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author Marius Bogoevici
* @author Vinicius Carvalho
*/
@ConfigurationProperties(prefix = "spring.cloud.stream.schemaRegistryClient")
public class SchemaRegistryClientProperties {
private String endpoint;
private boolean enabled = true;
private boolean cached = false;
public String getEndpoint() {
return this.endpoint;
@@ -36,11 +37,11 @@ public class SchemaRegistryClientProperties {
this.endpoint = endpoint;
}
public boolean isEnabled() {
return enabled;
public boolean isCached() {
return cached;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
public void setCached(boolean cached) {
this.cached = cached;
}
}

View File

@@ -0,0 +1,106 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.schema.avro;
import java.util.Collections;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import example.avro.User;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.cache.concurrent.ConcurrentMapCacheManager;
import org.springframework.cloud.stream.binder.StringConvertingContentTypeResolver;
import org.springframework.cloud.stream.schema.SchemaReference;
import org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter;
import org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient;
import org.springframework.cloud.stream.schema.client.SchemaRegistryClient;
import org.springframework.cloud.stream.schema.server.SchemaRegistryServerApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.integration.support.MutableMessageHeaders;
import org.springframework.messaging.Message;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
/**
* @author Vinicius Carvalho
*/
public class AvroMessageConverterSerializationTests {
Pattern versionedSchema = Pattern.compile(
"application/" + "vnd" + "\\.([\\p{Alnum}\\$\\.]+)\\.v(\\p{Digit}+)\\+avro");
private ConfigurableApplicationContext schemaRegistryServerContext;
Log logger = LogFactory.getLog(getClass());
@Before
public void setup(){
schemaRegistryServerContext = SpringApplication.run(
SchemaRegistryServerApplication.class);
}
@After
public void tearDown() {
schemaRegistryServerContext.close();
}
@Test
public void sourceWriteSameVersion() throws Exception {
User specificRecord = new User();
specificRecord.setName("joe");
Schema v1 = new Schema.Parser().parse(AvroMessageConverterSerializationTests.class.getClassLoader().getResourceAsStream("schemas/user.avsc"));
GenericRecord genericRecord = new GenericData.Record(v1);
genericRecord.put("name","joe");
SchemaRegistryClient client = new DefaultSchemaRegistryClient();
AvroSchemaRegistryClientMessageConverter converter = new AvroSchemaRegistryClientMessageConverter(client);
converter.setDynamicSchemaGenerationEnabled(false);
converter.setContentTypeResolver(new StringConvertingContentTypeResolver());
converter.setCacheManager(new ConcurrentMapCacheManager());
converter.afterPropertiesSet();
Message specificMessage = converter.toMessage(specificRecord,new MutableMessageHeaders(Collections.<String,Object>emptyMap()), MimeTypeUtils.parseMimeType("application/*+avro"));
SchemaReference specificRef = extractSchemaReference( MimeTypeUtils.parseMimeType(specificMessage.getHeaders().get("contentType").toString()));
Message genericMessage = converter.toMessage(genericRecord,new MutableMessageHeaders(Collections.<String,Object>emptyMap()), MimeTypeUtils.parseMimeType("application/*+avro"));
SchemaReference genericRef = extractSchemaReference( MimeTypeUtils.parseMimeType(genericMessage.getHeaders().get("contentType").toString()));
Assert.assertEquals(genericRef,specificRef);
Assert.assertEquals(1, genericRef.getVersion());
}
private SchemaReference extractSchemaReference(MimeType mimeType) {
SchemaReference schemaReference = null;
Matcher schemaMatcher = this.versionedSchema.matcher(mimeType.toString());
if (schemaMatcher.find()) {
String subject = schemaMatcher.group(1);
Integer version = Integer.parseInt(schemaMatcher.group(2));
schemaReference = new SchemaReference(subject, version, AvroSchemaRegistryClientMessageConverter.AVRO_FORMAT);
}
return schemaReference;
}
}

View File

@@ -108,7 +108,7 @@ public class AvroSchemaRegistryClientMessageConverterTests {
assertThat(receivedPojos.get(1)).isNotSameAs(firstOutboundUser2);
assertThat(receivedPojos.get(1).getFavoriteColor()).isEqualTo(firstOutboundUser2.getFavoriteColor());
assertThat(receivedPojos.get(1).getName()).isEqualTo(firstOutboundUser2.getName());
assertThat(receivedPojos.get(1).getFavoritePlace()).isEqualTo("NYC");
assertThat(receivedPojos.get(1).getFavoritePlace()).isEqualTo("Boston");
assertThat(receivedPojos.get(2)).isNotSameAs(secondBarOutboundPojo);

View File

@@ -49,6 +49,7 @@ public class AvroStubSchemaRegistryClientMessageConverterTests {
public void testSendMessage() throws Exception {
ConfigurableApplicationContext sourceContext = SpringApplication.run(AvroSourceApplication.class,
"--server.port=0",
"--debug",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.output.contentType=application/*+avro",
"--spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true");
@@ -103,7 +104,7 @@ public class AvroStubSchemaRegistryClientMessageConverterTests {
assertThat(receivedPojos.get(1)).isNotSameAs(firstOutboundUser2);
assertThat(receivedPojos.get(1).getFavoriteColor()).isEqualTo(firstOutboundUser2.getFavoriteColor());
assertThat(receivedPojos.get(1).getName()).isEqualTo(firstOutboundUser2.getName());
assertThat(receivedPojos.get(1).getFavoritePlace()).isEqualTo("NYC");
assertThat(receivedPojos.get(1).getFavoritePlace()).isEqualTo("Boston");
assertThat(receivedPojos.get(2)).isNotSameAs(secondBarOutboundPojo);

View File

@@ -0,0 +1,10 @@
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favoriteNumber", "type": ["int", "null"]},
{"name": "favoriteColor", "type": ["string", "null"]}
]
}

View File

@@ -5,4 +5,5 @@
<suppressions>
<suppress files="[\\/]src[\\/]main[\\/]java[\\/]org[\\/]springframework[\\/]cloud[\\/]stream[\\/]reactive[\\/]reactor[\\/]core[\\/]scheduler[\\/]" checks=".*" />
<suppress files="[\\/]src[\\/]test[\\/]java[\\/]" checks="AvoidStaticImport" />
<suppress files="[\\/]target[\\/]generated-test-sources[\\/]" checks="[a-zA-Z0-9]*"/>
</suppressions>