Migrate Avro message converter artifacts
Migrate Avro message converter (non-schema-registry) artifacts from Spring Cloud Schema Registry to Spring Cloud Function. Resolves https://github.com/spring-cloud/spring-cloud-function/issues/732 Resolves #733
This commit is contained in:
committed by
Oleg Zhurakousky
parent
953f332b1e
commit
e5c335dc5f
@@ -29,6 +29,7 @@ import com.google.gson.Gson;
|
||||
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
@@ -38,6 +39,8 @@ import org.springframework.cloud.function.context.FunctionProperties;
|
||||
import org.springframework.cloud.function.context.FunctionRegistry;
|
||||
import org.springframework.cloud.function.context.MessageRoutingCallback;
|
||||
import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry;
|
||||
import org.springframework.cloud.function.context.converter.avro.AvroSchemaMessageConverter;
|
||||
import org.springframework.cloud.function.context.converter.avro.AvroSchemaServiceManagerImpl;
|
||||
import org.springframework.cloud.function.core.FunctionInvocationHelper;
|
||||
import org.springframework.cloud.function.json.GsonMapper;
|
||||
import org.springframework.cloud.function.json.JacksonMapper;
|
||||
@@ -71,6 +74,7 @@ import org.springframework.util.StringUtils;
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Artem Bilan
|
||||
* @author Anshul Mehra
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@ConditionalOnMissingBean(FunctionCatalog.class)
|
||||
@@ -88,7 +92,7 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public FunctionRegistry functionCatalog(List<MessageConverter> messageConverters, JsonMapper jsonMapper,
|
||||
ConfigurableApplicationContext context, @Nullable FunctionInvocationHelper<Message<?>> functionInvocationHelper) {
|
||||
ConfigurableApplicationContext context, @Nullable FunctionInvocationHelper<Message<?>> functionInvocationHelper) {
|
||||
ConfigurableConversionService conversionService = (ConfigurableConversionService) context.getBeanFactory().getConversionService();
|
||||
if (conversionService == null) {
|
||||
conversionService = new DefaultConversionService();
|
||||
@@ -113,8 +117,8 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
}
|
||||
|
||||
mcList = mcList.stream()
|
||||
.filter(c -> isConverterEligible(c))
|
||||
.collect(Collectors.toList());
|
||||
.filter(c -> isConverterEligible(c))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
mcList.add(new JsonMessageConverter(jsonMapper));
|
||||
mcList.add(new ByteArrayMessageConverter());
|
||||
@@ -131,9 +135,16 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper, functionProperties, functionInvocationHelper);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
@ConditionalOnClass(name = "org.apache.avro.Schema")
|
||||
public MessageConverter avroSchemaMessageConverter() {
|
||||
return new AvroSchemaMessageConverter(new AvroSchemaServiceManagerImpl());
|
||||
}
|
||||
|
||||
@Bean(RoutingFunction.FUNCTION_NAME)
|
||||
RoutingFunction functionRouter(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
|
||||
BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
|
||||
BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
|
||||
return new RoutingFunction(functionCatalog, functionProperties, new BeanFactoryResolver(beanFactory), routingCallback);
|
||||
}
|
||||
|
||||
@@ -150,10 +161,10 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@ComponentScan(basePackages = "${spring.cloud.function.scan.packages:functions}", //
|
||||
includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE,
|
||||
classes = { Supplier.class, Function.class, Consumer.class }))
|
||||
includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE,
|
||||
classes = {Supplier.class, Function.class, Consumer.class}))
|
||||
@ConditionalOnProperty(prefix = "spring.cloud.function.scan", name = "enabled", havingValue = "true",
|
||||
matchIfMissing = true)
|
||||
matchIfMissing = true)
|
||||
protected static class PlainFunctionScanConfiguration {
|
||||
|
||||
}
|
||||
@@ -163,8 +174,8 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
@Bean
|
||||
public JsonMapper jsonMapper(ApplicationContext context) {
|
||||
String preferredMapper = context.getEnvironment().containsProperty(JSON_MAPPER_PROPERTY)
|
||||
? context.getEnvironment().getProperty(JSON_MAPPER_PROPERTY)
|
||||
: context.getEnvironment().getProperty(PREFERRED_MAPPER_PROPERTY);
|
||||
? context.getEnvironment().getProperty(JSON_MAPPER_PROPERTY)
|
||||
: context.getEnvironment().getProperty(PREFERRED_MAPPER_PROPERTY);
|
||||
if (StringUtils.hasText(preferredMapper)) {
|
||||
if ("gson".equals(preferredMapper) && ClassUtils.isPresent("com.google.gson.Gson", null)) {
|
||||
return gson(context);
|
||||
|
||||
@@ -0,0 +1,132 @@
|
||||
/*
|
||||
* Copyright 2016-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.context.converter.avro;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.io.DatumWriter;
|
||||
import org.apache.avro.io.Encoder;
|
||||
import org.apache.avro.io.EncoderFactory;
|
||||
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.converter.AbstractMessageConverter;
|
||||
import org.springframework.messaging.converter.MessageConversionException;
|
||||
import org.springframework.util.MimeType;
|
||||
|
||||
/**
|
||||
* Base class for Apache Avro
|
||||
* {@link org.springframework.messaging.converter.MessageConverter} implementations.
|
||||
*
|
||||
* @author Marius Bogoevici
|
||||
* @author Vinicius Carvalho
|
||||
* @author Sercan Karaoglu
|
||||
* @author Ish Mahajan
|
||||
*
|
||||
* @since 3.2.0
|
||||
*/
|
||||
public abstract class AbstractAvroMessageConverter extends AbstractMessageConverter {
|
||||
|
||||
/**
|
||||
* common parser will let user to import external schemas.
|
||||
*/
|
||||
private Schema.Parser schemaParser = new Schema.Parser();
|
||||
private AvroSchemaServiceManager avroSchemaServiceManager;
|
||||
|
||||
protected AbstractAvroMessageConverter(MimeType supportedMimeType, AvroSchemaServiceManager avroSchemaServiceManager) {
|
||||
this(Collections.singletonList(supportedMimeType), avroSchemaServiceManager);
|
||||
}
|
||||
|
||||
protected AbstractAvroMessageConverter(Collection<MimeType> supportedMimeTypes, AvroSchemaServiceManager manager) {
|
||||
super(supportedMimeTypes);
|
||||
this.avroSchemaServiceManager = manager;
|
||||
}
|
||||
|
||||
protected AvroSchemaServiceManager avroSchemaServiceManager() {
|
||||
return this.avroSchemaServiceManager;
|
||||
}
|
||||
|
||||
protected Schema parseSchema(Resource r) throws IOException {
|
||||
return this.schemaParser.parse(r.getInputStream());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean canConvertFrom(Message<?> message, Class<?> targetClass) {
|
||||
return super.canConvertFrom(message, targetClass)
|
||||
&& (message.getPayload() instanceof byte[]);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
|
||||
Object result;
|
||||
try {
|
||||
byte[] payload = (byte[]) message.getPayload();
|
||||
|
||||
MimeType mimeType = getContentTypeResolver().resolve(message.getHeaders());
|
||||
if (mimeType == null) {
|
||||
if (conversionHint instanceof MimeType) {
|
||||
mimeType = (MimeType) conversionHint;
|
||||
}
|
||||
else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType);
|
||||
Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass);
|
||||
|
||||
result = avroSchemaServiceManager().readData(targetClass, payload, readerSchema, writerSchema);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new MessageConversionException(message, "Failed to read payload", e);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
try {
|
||||
MimeType hintedContentType = null;
|
||||
if (conversionHint instanceof MimeType) {
|
||||
hintedContentType = (MimeType) conversionHint;
|
||||
}
|
||||
Schema schema = resolveSchemaForWriting(payload, headers, hintedContentType);
|
||||
@SuppressWarnings("unchecked")
|
||||
DatumWriter<Object> writer = avroSchemaServiceManager().getDatumWriter(payload.getClass(), schema);
|
||||
Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
|
||||
writer.write(payload, encoder);
|
||||
encoder.flush();
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new MessageConversionException("Failed to write payload", e);
|
||||
}
|
||||
return baos.toByteArray();
|
||||
}
|
||||
|
||||
protected abstract Schema resolveSchemaForWriting(Object payload, MessageHeaders headers, MimeType hintedContentType);
|
||||
|
||||
protected abstract Schema resolveWriterSchemaForDeserialization(MimeType mimeType);
|
||||
|
||||
protected abstract Schema resolveReaderSchemaForDeserialization(Class<?> targetClass);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,126 @@
|
||||
/*
|
||||
* Copyright 2016-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.context.converter.avro;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.MimeType;
|
||||
|
||||
/**
|
||||
* A {@link org.springframework.messaging.converter.MessageConverter} using Apache Avro.
|
||||
* The schema for serializing and deserializing will be automatically inferred from the
|
||||
* class for {@link org.apache.avro.specific.SpecificRecord} and regular classes, unless a
|
||||
* specific schema is set, case in which that schema will be used instead. For converting
|
||||
* to {@link org.apache.avro.generic.GenericRecord} targets, a schema must be set.s
|
||||
*
|
||||
* @author Marius Bogoevici
|
||||
* @author Ish Mahajan
|
||||
* @author Soby Chacko
|
||||
*
|
||||
* @since 3.2.0
|
||||
*/
|
||||
|
||||
public class AvroSchemaMessageConverter extends AbstractAvroMessageConverter {
|
||||
|
||||
private Schema schema;
|
||||
|
||||
/**
|
||||
* Create a {@link AvroSchemaMessageConverter}. Uses the default {@link MimeType} of
|
||||
* {@code "application/avro"}.
|
||||
* @param manager for schema management
|
||||
*/
|
||||
public AvroSchemaMessageConverter(AvroSchemaServiceManager manager) {
|
||||
super(new MimeType("application", "avro"), manager);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link AvroSchemaMessageConverter}. The converter will be used for the
|
||||
* provided {@link MimeType}.
|
||||
* @param supportedMimeType mime type to be supported by
|
||||
* {@link AvroSchemaMessageConverter}
|
||||
* @param manager for schema management
|
||||
*/
|
||||
public AvroSchemaMessageConverter(MimeType supportedMimeType, AvroSchemaServiceManager manager) {
|
||||
super(supportedMimeType, manager);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link AvroSchemaMessageConverter}. The converter will be used for the
|
||||
* provided {@link MimeType}s.
|
||||
* @param supportedMimeTypes the mime types supported by this converter
|
||||
* @param manager for schema management
|
||||
*/
|
||||
public AvroSchemaMessageConverter(Collection<MimeType> supportedMimeTypes, AvroSchemaServiceManager manager) {
|
||||
super(supportedMimeTypes, manager);
|
||||
}
|
||||
|
||||
public Schema getSchema() {
|
||||
return this.schema;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the Apache Avro schema to be used by this converter.
|
||||
* @param schema schema to be used by this converter
|
||||
*/
|
||||
public void setSchema(Schema schema) {
|
||||
Assert.notNull(schema, "schema cannot be null");
|
||||
this.schema = schema;
|
||||
}
|
||||
|
||||
/**
|
||||
* The location of the Apache Avro schema to be used by this converter.
|
||||
* @param schemaLocation the location of the schema used by this converter.
|
||||
*/
|
||||
public void setSchemaLocation(Resource schemaLocation) {
|
||||
Assert.notNull(schemaLocation, "schema cannot be null");
|
||||
try {
|
||||
this.schema = parseSchema(schemaLocation);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new IllegalStateException("Schema cannot be parsed:", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supports(Class<?> clazz) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Schema resolveWriterSchemaForDeserialization(MimeType mimeType) {
|
||||
return this.schema;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Schema resolveReaderSchemaForDeserialization(Class<?> targetClass) {
|
||||
return this.schema;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Schema resolveSchemaForWriting(Object payload, MessageHeaders headers,
|
||||
MimeType hintedContentType) {
|
||||
return this.schema;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Copyright 2016-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.context.converter.avro;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.io.DatumReader;
|
||||
import org.apache.avro.io.DatumWriter;
|
||||
|
||||
/**
|
||||
* Manage a {@link Schema} together with its String representation.
|
||||
*
|
||||
* Helps to substitute the default implementation of {@link org.apache.avro.Schema}
|
||||
* Generation using Custom Avro schema generator
|
||||
*
|
||||
* Provide a custom bean definition of {@link AvroSchemaServiceManager} and mark
|
||||
* it as @Primary to override the default implementation
|
||||
*
|
||||
* Migrating this interface from the original Spring Cloud Schema Registry project.
|
||||
*
|
||||
* @author Ish Mahajan
|
||||
* @author Soby Chacko
|
||||
*
|
||||
* @since 3.2.0
|
||||
*
|
||||
*/
|
||||
public interface AvroSchemaServiceManager {
|
||||
|
||||
/**
|
||||
* get {@link Schema}.
|
||||
* @param clazz {@link Class} for which schema generation is required
|
||||
* @return returns avro schema for given class
|
||||
*/
|
||||
Schema getSchema(Class<?> clazz);
|
||||
|
||||
/**
|
||||
* get {@link DatumWriter}.
|
||||
* @param type {@link Class} of java object which needs to be serialized
|
||||
* @param schema {@link Schema} of object which needs to be serialized
|
||||
* @return datum writer which can be used to write Avro payload
|
||||
*/
|
||||
DatumWriter<Object> getDatumWriter(Class<? extends Object> type, Schema schema);
|
||||
|
||||
/**
|
||||
* get {@link DatumReader}.
|
||||
* @param type {@link Class} of java object which needs to be serialized
|
||||
* @param schema {@link Schema} default schema of object which needs to be de-serialized
|
||||
* @param writerSchema {@link Schema} writerSchema provided at run time
|
||||
* @return datum reader which can be used to read Avro payload
|
||||
*/
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
DatumReader<Object> getDatumReader(Class<? extends Object> type, Schema schema, Schema writerSchema);
|
||||
|
||||
/**
|
||||
* read data from avro type payload {@link DatumReader}.
|
||||
* @param targetClass {@link Class} of java object which needs to be serialized
|
||||
* @param payload {@link byte} serialized payload of object which needs to be de-serialized
|
||||
* @param readerSchema {@link Schema} readerSchema of object which needs to be de-serialized
|
||||
* @param writerSchema {@link Schema} writerSchema used to while serializing payload
|
||||
* @return java object after reading Avro Payload
|
||||
* @throws IOException in case of error
|
||||
*/
|
||||
Object readData(Class<? extends Object> targetClass, byte[] payload, Schema readerSchema, Schema writerSchema)
|
||||
throws IOException;
|
||||
}
|
||||
@@ -0,0 +1,171 @@
|
||||
/*
|
||||
* Copyright 2016-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.context.converter.avro;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericDatumReader;
|
||||
import org.apache.avro.generic.GenericDatumWriter;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.io.DatumReader;
|
||||
import org.apache.avro.io.DatumWriter;
|
||||
import org.apache.avro.io.Decoder;
|
||||
import org.apache.avro.io.DecoderFactory;
|
||||
import org.apache.avro.reflect.ReflectData;
|
||||
import org.apache.avro.reflect.ReflectDatumReader;
|
||||
import org.apache.avro.reflect.ReflectDatumWriter;
|
||||
import org.apache.avro.specific.SpecificDatumReader;
|
||||
import org.apache.avro.specific.SpecificDatumWriter;
|
||||
import org.apache.avro.specific.SpecificRecord;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.messaging.converter.MessageConversionException;
|
||||
|
||||
/**
|
||||
* Default Concrete implementation of {@link AvroSchemaServiceManager}.
|
||||
*
|
||||
* Helps to substitute the default implementation of {@link org.apache.avro.Schema} Generation using Custom Avro
|
||||
* schema generator
|
||||
*
|
||||
* Migrating this class from the original Spring Cloud Schema Registry project.
|
||||
*
|
||||
* @author Ish Mahajan
|
||||
* @author Soby Chacko
|
||||
*
|
||||
* @since 3.2.0
|
||||
*
|
||||
*/
|
||||
public class AvroSchemaServiceManagerImpl implements AvroSchemaServiceManager {
|
||||
|
||||
protected final Log logger = LogFactory.getLog(this.getClass());
|
||||
|
||||
/**
|
||||
* get {@link Schema}.
|
||||
* @param clazz {@link Class} for which schema generation is required
|
||||
* @return returns avro schema for given class
|
||||
*/
|
||||
@Override
|
||||
public Schema getSchema(Class<?> clazz) {
|
||||
return ReflectData.get().getSchema(clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* get {@link DatumWriter}.
|
||||
* @param type {@link Class} of java object which needs to be serialized
|
||||
* @param schema {@link Schema} of object which needs to be serialized
|
||||
* @return datum writer which can be used to write Avro payload
|
||||
*/
|
||||
@Override
|
||||
public DatumWriter<Object> getDatumWriter(Class<?> type, Schema schema) {
|
||||
DatumWriter<Object> writer;
|
||||
this.logger.debug("Finding correct DatumWriter for type " + type.getName());
|
||||
if (SpecificRecord.class.isAssignableFrom(type)) {
|
||||
if (schema != null) {
|
||||
writer = new SpecificDatumWriter<>(schema);
|
||||
}
|
||||
else {
|
||||
writer = new SpecificDatumWriter(type);
|
||||
}
|
||||
}
|
||||
else if (GenericRecord.class.isAssignableFrom(type)) {
|
||||
writer = new GenericDatumWriter<>(schema);
|
||||
}
|
||||
else {
|
||||
if (schema != null) {
|
||||
writer = new ReflectDatumWriter<>(schema);
|
||||
}
|
||||
else {
|
||||
writer = new ReflectDatumWriter(type);
|
||||
}
|
||||
}
|
||||
return writer;
|
||||
}
|
||||
|
||||
/**
|
||||
* get {@link DatumReader}.
|
||||
* @param type {@link Class} of java object which needs to be serialized
|
||||
* @param readerSchema {@link Schema} default schema of object which needs to be de-serialized
|
||||
* @param writerSchema {@link Schema} writerSchema provided at run time
|
||||
* @return datum reader which can be used to read Avro payload
|
||||
*/
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Override
|
||||
public DatumReader<Object> getDatumReader(Class<?> type, Schema readerSchema, Schema writerSchema) {
|
||||
DatumReader<Object> reader = null;
|
||||
if (SpecificRecord.class.isAssignableFrom(type)) {
|
||||
if (readerSchema != null) {
|
||||
if (writerSchema != null) {
|
||||
reader = new SpecificDatumReader<>(writerSchema, readerSchema);
|
||||
}
|
||||
else {
|
||||
reader = new SpecificDatumReader<>(readerSchema);
|
||||
}
|
||||
}
|
||||
else {
|
||||
reader = new SpecificDatumReader(type);
|
||||
if (writerSchema != null) {
|
||||
reader.setSchema(writerSchema);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (GenericRecord.class.isAssignableFrom(type)) {
|
||||
if (readerSchema != null) {
|
||||
if (writerSchema != null) {
|
||||
reader = new GenericDatumReader<>(writerSchema, readerSchema);
|
||||
}
|
||||
else {
|
||||
reader = new GenericDatumReader<>(readerSchema);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (writerSchema != null) {
|
||||
reader = new GenericDatumReader(writerSchema);
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
reader = new ReflectDatumReader(type);
|
||||
if (writerSchema != null) {
|
||||
reader.setSchema(writerSchema);
|
||||
}
|
||||
}
|
||||
if (reader == null) {
|
||||
throw new MessageConversionException("No schema can be inferred from type "
|
||||
+ type.getName() + " and no schema has been explicitly configured.");
|
||||
}
|
||||
return reader;
|
||||
}
|
||||
|
||||
/**
|
||||
* read data from avro type payload {@link DatumReader}.
|
||||
* @param clazz {@link Class} of java object which needs to be serialized
|
||||
* @param payload {@link byte} serialized payload of object which needs to be de-serialized
|
||||
* @param readerSchema {@link Schema} readerSchema of object which needs to be de-serialized
|
||||
* @param writerSchema {@link Schema} writerSchema used to while serializing payload
|
||||
* @return java object after reading Avro Payload
|
||||
* @throws IOException is thrown in case of error
|
||||
*/
|
||||
@Override
|
||||
public Object readData(Class<? extends Object> clazz, byte[] payload, Schema readerSchema, Schema writerSchema)
|
||||
throws IOException {
|
||||
DatumReader<Object> reader = this.getDatumReader(clazz, readerSchema, writerSchema);
|
||||
Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null);
|
||||
return reader.read(null, decoder);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
/*
|
||||
* Copyright 2021-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.context.converter.avro;
|
||||
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Function;
|
||||
|
||||
import com.example.Sensor;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class AvroSchemaMessageConverterTests {
|
||||
|
||||
@Test
|
||||
public void testAvroSchemaMessageConverter() {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.main.lazy-initialization=true")) {
|
||||
|
||||
FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
|
||||
SimpleFunctionRegistry.FunctionInvocationWrapper function = functionCatalog.lookup("avroSensor");
|
||||
|
||||
Random random = new Random();
|
||||
Sensor sensor = new Sensor();
|
||||
sensor.setId(UUID.randomUUID().toString() + "-v1");
|
||||
sensor.setAcceleration(random.nextFloat() * 10);
|
||||
sensor.setVelocity(random.nextFloat() * 100);
|
||||
sensor.setTemperature(random.nextFloat() * 50);
|
||||
|
||||
final AvroSchemaMessageConverter bean = context.getBean(AvroSchemaMessageConverter.class);
|
||||
// Explicitly convert the Sensor to byte[]
|
||||
final Message<?> message = bean.toMessage(sensor, new MessageHeaders(null));
|
||||
// Now send with the sensor->byte[] converted payload.
|
||||
final Sensor enrichedSensor = (Sensor) function.apply(message);
|
||||
|
||||
assertThat(enrichedSensor.getTemperature()).isEqualTo(sensor.getTemperature() + 5);
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
protected static class SampleFunctionConfiguration {
|
||||
|
||||
@Bean
|
||||
public Function<Sensor, Sensor> avroSensor() {
|
||||
return s -> {
|
||||
s.setTemperature(s.getTemperature() + 5);
|
||||
return s;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"namespace" : "com.example",
|
||||
"type" : "record",
|
||||
"name" : "Sensor",
|
||||
"fields" : [
|
||||
{"name":"id","type":"string"},
|
||||
{"name":"temperature", "type":"float", "default":0.0},
|
||||
{"name":"acceleration", "type":"float","default":0.0},
|
||||
{"name":"velocity","type":"float","default":0.0}
|
||||
]
|
||||
}
|
||||
Reference in New Issue
Block a user