Remove AVRO message converters in 4.0.x
Since we are migrating Schema Registry to Spring Cloud Stream, the AVRO message converters in Spring Cloud Function could reside in Spring Cloud Stream as part of its Schema Registry. Resolves https://github.com/spring-cloud/spring-cloud-function/issues/921 Resolves #922
This commit is contained in:
committed by
Oleg Zhurakousky
parent
e83b0dfabe
commit
c8109270d2
@@ -40,9 +40,6 @@ import org.springframework.cloud.function.context.FunctionProperties;
|
||||
import org.springframework.cloud.function.context.FunctionRegistry;
|
||||
import org.springframework.cloud.function.context.MessageRoutingCallback;
|
||||
import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry;
|
||||
import org.springframework.cloud.function.context.converter.avro.AvroSchemaMessageConverter;
|
||||
import org.springframework.cloud.function.context.converter.avro.AvroSchemaServiceManager;
|
||||
import org.springframework.cloud.function.context.converter.avro.AvroSchemaServiceManagerImpl;
|
||||
import org.springframework.cloud.function.core.FunctionInvocationHelper;
|
||||
import org.springframework.cloud.function.json.GsonMapper;
|
||||
import org.springframework.cloud.function.json.JacksonMapper;
|
||||
@@ -161,23 +158,6 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
}
|
||||
}
|
||||
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@ConditionalOnClass(name = "org.apache.avro.Schema")
|
||||
static class AvroSchemaMessageConverterConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public AvroSchemaServiceManager avroSchemaServiceManager() {
|
||||
return new AvroSchemaServiceManagerImpl();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public AvroSchemaMessageConverter avroSchemaMessageConverter(AvroSchemaServiceManager avroSchemaServiceManager) {
|
||||
return new AvroSchemaMessageConverter(avroSchemaServiceManager);
|
||||
}
|
||||
}
|
||||
|
||||
@ComponentScan(basePackages = "${spring.cloud.function.scan.packages:functions}",
|
||||
includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE, classes = { Supplier.class, Function.class, Consumer.class }),
|
||||
excludeFilters = @Filter(type = FilterType.ANNOTATION, classes = { Configuration.class, Component.class}))
|
||||
|
||||
@@ -1,132 +0,0 @@
|
||||
/*
|
||||
* Copyright 2016-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.context.converter.avro;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.io.DatumWriter;
|
||||
import org.apache.avro.io.Encoder;
|
||||
import org.apache.avro.io.EncoderFactory;
|
||||
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.converter.AbstractMessageConverter;
|
||||
import org.springframework.messaging.converter.MessageConversionException;
|
||||
import org.springframework.util.MimeType;
|
||||
|
||||
/**
|
||||
* Base class for Apache Avro
|
||||
* {@link org.springframework.messaging.converter.MessageConverter} implementations.
|
||||
*
|
||||
* @author Marius Bogoevici
|
||||
* @author Vinicius Carvalho
|
||||
* @author Sercan Karaoglu
|
||||
* @author Ish Mahajan
|
||||
*
|
||||
* @since 3.2.0
|
||||
*/
|
||||
public abstract class AbstractAvroMessageConverter extends AbstractMessageConverter {
|
||||
|
||||
/**
|
||||
* common parser will let user to import external schemas.
|
||||
*/
|
||||
private Schema.Parser schemaParser = new Schema.Parser();
|
||||
private AvroSchemaServiceManager avroSchemaServiceManager;
|
||||
|
||||
protected AbstractAvroMessageConverter(MimeType supportedMimeType, AvroSchemaServiceManager avroSchemaServiceManager) {
|
||||
this(Collections.singletonList(supportedMimeType), avroSchemaServiceManager);
|
||||
}
|
||||
|
||||
protected AbstractAvroMessageConverter(Collection<MimeType> supportedMimeTypes, AvroSchemaServiceManager manager) {
|
||||
super(supportedMimeTypes);
|
||||
this.avroSchemaServiceManager = manager;
|
||||
}
|
||||
|
||||
protected AvroSchemaServiceManager avroSchemaServiceManager() {
|
||||
return this.avroSchemaServiceManager;
|
||||
}
|
||||
|
||||
protected Schema parseSchema(Resource r) throws IOException {
|
||||
return this.schemaParser.parse(r.getInputStream());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean canConvertFrom(Message<?> message, Class<?> targetClass) {
|
||||
return super.canConvertFrom(message, targetClass)
|
||||
&& (message.getPayload() instanceof byte[]);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
|
||||
Object result;
|
||||
try {
|
||||
byte[] payload = (byte[]) message.getPayload();
|
||||
|
||||
MimeType mimeType = getContentTypeResolver().resolve(message.getHeaders());
|
||||
if (mimeType == null) {
|
||||
if (conversionHint instanceof MimeType) {
|
||||
mimeType = (MimeType) conversionHint;
|
||||
}
|
||||
else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType);
|
||||
Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass);
|
||||
|
||||
result = avroSchemaServiceManager().readData(targetClass, payload, readerSchema, writerSchema);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new MessageConversionException(message, "Failed to read payload", e);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
try {
|
||||
MimeType hintedContentType = null;
|
||||
if (conversionHint instanceof MimeType) {
|
||||
hintedContentType = (MimeType) conversionHint;
|
||||
}
|
||||
Schema schema = resolveSchemaForWriting(payload, headers, hintedContentType);
|
||||
@SuppressWarnings("unchecked")
|
||||
DatumWriter<Object> writer = avroSchemaServiceManager().getDatumWriter(payload.getClass(), schema);
|
||||
Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
|
||||
writer.write(payload, encoder);
|
||||
encoder.flush();
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new MessageConversionException("Failed to write payload", e);
|
||||
}
|
||||
return baos.toByteArray();
|
||||
}
|
||||
|
||||
protected abstract Schema resolveSchemaForWriting(Object payload, MessageHeaders headers, MimeType hintedContentType);
|
||||
|
||||
protected abstract Schema resolveWriterSchemaForDeserialization(MimeType mimeType);
|
||||
|
||||
protected abstract Schema resolveReaderSchemaForDeserialization(Class<?> targetClass);
|
||||
|
||||
}
|
||||
@@ -1,126 +0,0 @@
|
||||
/*
|
||||
* Copyright 2016-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.context.converter.avro;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.MimeType;
|
||||
|
||||
/**
|
||||
* A {@link org.springframework.messaging.converter.MessageConverter} using Apache Avro.
|
||||
* The schema for serializing and deserializing will be automatically inferred from the
|
||||
* class for {@link org.apache.avro.specific.SpecificRecord} and regular classes, unless a
|
||||
* specific schema is set, case in which that schema will be used instead. For converting
|
||||
* to {@link org.apache.avro.generic.GenericRecord} targets, a schema must be set.s
|
||||
*
|
||||
* @author Marius Bogoevici
|
||||
* @author Ish Mahajan
|
||||
* @author Soby Chacko
|
||||
*
|
||||
* @since 3.2.0
|
||||
*/
|
||||
|
||||
public class AvroSchemaMessageConverter extends AbstractAvroMessageConverter {
|
||||
|
||||
private Schema schema;
|
||||
|
||||
/**
|
||||
* Create a {@link AvroSchemaMessageConverter}. Uses the default {@link MimeType} of
|
||||
* {@code "application/avro"}.
|
||||
* @param manager for schema management
|
||||
*/
|
||||
public AvroSchemaMessageConverter(AvroSchemaServiceManager manager) {
|
||||
super(new MimeType("application", "avro"), manager);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link AvroSchemaMessageConverter}. The converter will be used for the
|
||||
* provided {@link MimeType}.
|
||||
* @param supportedMimeType mime type to be supported by
|
||||
* {@link AvroSchemaMessageConverter}
|
||||
* @param manager for schema management
|
||||
*/
|
||||
public AvroSchemaMessageConverter(MimeType supportedMimeType, AvroSchemaServiceManager manager) {
|
||||
super(supportedMimeType, manager);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link AvroSchemaMessageConverter}. The converter will be used for the
|
||||
* provided {@link MimeType}s.
|
||||
* @param supportedMimeTypes the mime types supported by this converter
|
||||
* @param manager for schema management
|
||||
*/
|
||||
public AvroSchemaMessageConverter(Collection<MimeType> supportedMimeTypes, AvroSchemaServiceManager manager) {
|
||||
super(supportedMimeTypes, manager);
|
||||
}
|
||||
|
||||
public Schema getSchema() {
|
||||
return this.schema;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the Apache Avro schema to be used by this converter.
|
||||
* @param schema schema to be used by this converter
|
||||
*/
|
||||
public void setSchema(Schema schema) {
|
||||
Assert.notNull(schema, "schema cannot be null");
|
||||
this.schema = schema;
|
||||
}
|
||||
|
||||
/**
|
||||
* The location of the Apache Avro schema to be used by this converter.
|
||||
* @param schemaLocation the location of the schema used by this converter.
|
||||
*/
|
||||
public void setSchemaLocation(Resource schemaLocation) {
|
||||
Assert.notNull(schemaLocation, "schema cannot be null");
|
||||
try {
|
||||
this.schema = parseSchema(schemaLocation);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new IllegalStateException("Schema cannot be parsed:", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supports(Class<?> clazz) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Schema resolveWriterSchemaForDeserialization(MimeType mimeType) {
|
||||
return this.schema;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Schema resolveReaderSchemaForDeserialization(Class<?> targetClass) {
|
||||
return this.schema;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Schema resolveSchemaForWriting(Object payload, MessageHeaders headers,
|
||||
MimeType hintedContentType) {
|
||||
return this.schema;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,78 +0,0 @@
|
||||
/*
|
||||
* Copyright 2016-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.context.converter.avro;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.io.DatumReader;
|
||||
import org.apache.avro.io.DatumWriter;
|
||||
|
||||
/**
|
||||
* Manage a {@link Schema} together with its String representation.
|
||||
*
|
||||
* Helps to substitute the default implementation of {@link org.apache.avro.Schema}
|
||||
* Generation using Custom Avro schema generator
|
||||
*
|
||||
* Provide a custom bean definition of {@link AvroSchemaServiceManager} to override the default implementation.
|
||||
*
|
||||
* Migrating this interface from the original Spring Cloud Schema Registry project.
|
||||
*
|
||||
* @author Ish Mahajan
|
||||
* @author Soby Chacko
|
||||
*
|
||||
* @since 3.2.0
|
||||
*
|
||||
*/
|
||||
public interface AvroSchemaServiceManager {
|
||||
|
||||
/**
|
||||
* get {@link Schema}.
|
||||
* @param clazz {@link Class} for which schema generation is required
|
||||
* @return returns avro schema for given class
|
||||
*/
|
||||
Schema getSchema(Class<?> clazz);
|
||||
|
||||
/**
|
||||
* get {@link DatumWriter}.
|
||||
* @param type {@link Class} of java object which needs to be serialized
|
||||
* @param schema {@link Schema} of object which needs to be serialized
|
||||
* @return datum writer which can be used to write Avro payload
|
||||
*/
|
||||
DatumWriter<Object> getDatumWriter(Class<? extends Object> type, Schema schema);
|
||||
|
||||
/**
|
||||
* get {@link DatumReader}.
|
||||
* @param type {@link Class} of java object which needs to be serialized
|
||||
* @param schema {@link Schema} default schema of object which needs to be de-serialized
|
||||
* @param writerSchema {@link Schema} writerSchema provided at run time
|
||||
* @return datum reader which can be used to read Avro payload
|
||||
*/
|
||||
DatumReader<Object> getDatumReader(Class<? extends Object> type, Schema schema, Schema writerSchema);
|
||||
|
||||
/**
|
||||
* read data from avro type payload {@link DatumReader}.
|
||||
* @param targetClass {@link Class} of java object which needs to be serialized
|
||||
* @param payload {@link byte} serialized payload of object which needs to be de-serialized
|
||||
* @param readerSchema {@link Schema} readerSchema of object which needs to be de-serialized
|
||||
* @param writerSchema {@link Schema} writerSchema used to while serializing payload
|
||||
* @return java object after reading Avro Payload
|
||||
* @throws IOException in case of error
|
||||
*/
|
||||
Object readData(Class<? extends Object> targetClass, byte[] payload, Schema readerSchema, Schema writerSchema)
|
||||
throws IOException;
|
||||
}
|
||||
@@ -1,172 +0,0 @@
|
||||
/*
|
||||
* Copyright 2016-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.context.converter.avro;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericDatumReader;
|
||||
import org.apache.avro.generic.GenericDatumWriter;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.io.DatumReader;
|
||||
import org.apache.avro.io.DatumWriter;
|
||||
import org.apache.avro.io.Decoder;
|
||||
import org.apache.avro.io.DecoderFactory;
|
||||
import org.apache.avro.reflect.ReflectData;
|
||||
import org.apache.avro.reflect.ReflectDatumReader;
|
||||
import org.apache.avro.reflect.ReflectDatumWriter;
|
||||
import org.apache.avro.specific.SpecificDatumReader;
|
||||
import org.apache.avro.specific.SpecificDatumWriter;
|
||||
import org.apache.avro.specific.SpecificRecord;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.messaging.converter.MessageConversionException;
|
||||
|
||||
/**
|
||||
* Default Concrete implementation of {@link AvroSchemaServiceManager}.
|
||||
*
|
||||
* Helps to substitute the default implementation of {@link org.apache.avro.Schema} Generation using Custom Avro
|
||||
* schema generator
|
||||
*
|
||||
* Migrating this class from the original Spring Cloud Schema Registry project.
|
||||
*
|
||||
* @author Ish Mahajan
|
||||
* @author Soby Chacko
|
||||
*
|
||||
* @since 3.2.0
|
||||
*
|
||||
*/
|
||||
public class AvroSchemaServiceManagerImpl implements AvroSchemaServiceManager {
|
||||
|
||||
protected final Log logger = LogFactory.getLog(this.getClass());
|
||||
|
||||
/**
|
||||
* get {@link Schema}.
|
||||
* @param clazz {@link Class} for which schema generation is required
|
||||
* @return returns avro schema for given class
|
||||
*/
|
||||
@Override
|
||||
public Schema getSchema(Class<?> clazz) {
|
||||
return ReflectData.get().getSchema(clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* get {@link DatumWriter}.
|
||||
* @param type {@link Class} of java object which needs to be serialized
|
||||
* @param schema {@link Schema} of object which needs to be serialized
|
||||
* @return datum writer which can be used to write Avro payload
|
||||
*/
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Override
|
||||
public DatumWriter<Object> getDatumWriter(Class<?> type, Schema schema) {
|
||||
DatumWriter<Object> writer;
|
||||
this.logger.debug("Finding correct DatumWriter for type " + type.getName());
|
||||
if (SpecificRecord.class.isAssignableFrom(type)) {
|
||||
if (schema != null) {
|
||||
writer = new SpecificDatumWriter<>(schema);
|
||||
}
|
||||
else {
|
||||
writer = new SpecificDatumWriter(type);
|
||||
}
|
||||
}
|
||||
else if (GenericRecord.class.isAssignableFrom(type)) {
|
||||
writer = new GenericDatumWriter<>(schema);
|
||||
}
|
||||
else {
|
||||
if (schema != null) {
|
||||
writer = new ReflectDatumWriter<>(schema);
|
||||
}
|
||||
else {
|
||||
writer = new ReflectDatumWriter(type);
|
||||
}
|
||||
}
|
||||
return writer;
|
||||
}
|
||||
|
||||
/**
|
||||
* get {@link DatumReader}.
|
||||
* @param type {@link Class} of java object which needs to be serialized
|
||||
* @param readerSchema {@link Schema} default schema of object which needs to be de-serialized
|
||||
* @param writerSchema {@link Schema} writerSchema provided at run time
|
||||
* @return datum reader which can be used to read Avro payload
|
||||
*/
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@Override
|
||||
public DatumReader<Object> getDatumReader(Class<?> type, Schema readerSchema, Schema writerSchema) {
|
||||
DatumReader<Object> reader = null;
|
||||
if (SpecificRecord.class.isAssignableFrom(type)) {
|
||||
if (readerSchema != null) {
|
||||
if (writerSchema != null) {
|
||||
reader = new SpecificDatumReader<>(writerSchema, readerSchema);
|
||||
}
|
||||
else {
|
||||
reader = new SpecificDatumReader<>(readerSchema);
|
||||
}
|
||||
}
|
||||
else {
|
||||
reader = new SpecificDatumReader(type);
|
||||
if (writerSchema != null) {
|
||||
reader.setSchema(writerSchema);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (GenericRecord.class.isAssignableFrom(type)) {
|
||||
if (readerSchema != null) {
|
||||
if (writerSchema != null) {
|
||||
reader = new GenericDatumReader<>(writerSchema, readerSchema);
|
||||
}
|
||||
else {
|
||||
reader = new GenericDatumReader<>(readerSchema);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (writerSchema != null) {
|
||||
reader = new GenericDatumReader(writerSchema);
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
reader = new ReflectDatumReader(type);
|
||||
if (writerSchema != null) {
|
||||
reader.setSchema(writerSchema);
|
||||
}
|
||||
}
|
||||
if (reader == null) {
|
||||
throw new MessageConversionException("No schema can be inferred from type "
|
||||
+ type.getName() + " and no schema has been explicitly configured.");
|
||||
}
|
||||
return reader;
|
||||
}
|
||||
|
||||
/**
|
||||
* read data from avro type payload {@link DatumReader}.
|
||||
* @param clazz {@link Class} of java object which needs to be serialized
|
||||
* @param payload {@link byte} serialized payload of object which needs to be de-serialized
|
||||
* @param readerSchema {@link Schema} readerSchema of object which needs to be de-serialized
|
||||
* @param writerSchema {@link Schema} writerSchema used to while serializing payload
|
||||
* @return java object after reading Avro Payload
|
||||
* @throws IOException is thrown in case of error
|
||||
*/
|
||||
@Override
|
||||
public Object readData(Class<? extends Object> clazz, byte[] payload, Schema readerSchema, Schema writerSchema)
|
||||
throws IOException {
|
||||
DatumReader<Object> reader = this.getDatumReader(clazz, readerSchema, writerSchema);
|
||||
Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null);
|
||||
return reader.read(null, decoder);
|
||||
}
|
||||
}
|
||||
@@ -17,7 +17,6 @@
|
||||
package org.springframework.cloud.function.context.config;
|
||||
|
||||
import io.cloudevents.spring.messaging.CloudEventMessageConverter;
|
||||
import org.apache.avro.Schema;
|
||||
import org.junit.jupiter.api.Nested;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -26,8 +25,6 @@ import org.springframework.boot.test.context.FilteredClassLoader;
|
||||
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.FunctionRegistry;
|
||||
import org.springframework.cloud.function.context.converter.avro.AvroSchemaMessageConverter;
|
||||
import org.springframework.cloud.function.context.converter.avro.AvroSchemaServiceManager;
|
||||
import org.springframework.cloud.function.context.scan.TestFunction;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
@@ -49,37 +46,6 @@ public class ContextFunctionCatalogAutoConfigurationConditionalLoadingTests {
|
||||
.run((context) -> assertThat(context).doesNotHaveBean(FunctionRegistry.class));
|
||||
}
|
||||
|
||||
@Nested
|
||||
class AvroSchemaMessageConverterConfig {
|
||||
|
||||
@Test
|
||||
void avroSchemaMessageConverterBeansLoadedWhenAvroOnClasspath() {
|
||||
contextRunner.run((context) -> assertThat(context).hasSingleBean(AvroSchemaServiceManager.class)
|
||||
.hasSingleBean(AvroSchemaMessageConverter.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void avroSchemaMessageConverterBeansNotLoadedWhenAvroNotOnClasspath() {
|
||||
contextRunner.withClassLoader(new FilteredClassLoader(Schema.class)).run((context) ->
|
||||
assertThat(context).doesNotHaveBean(AvroSchemaServiceManager.class)
|
||||
.doesNotHaveBean(AvroSchemaMessageConverter.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void customAvroSchemaServiceManagerIsRespected() {
|
||||
AvroSchemaServiceManager customManager = mock(AvroSchemaServiceManager.class);
|
||||
contextRunner.withBean(AvroSchemaServiceManager.class, () -> customManager)
|
||||
.run((context) -> assertThat(context).getBean(AvroSchemaServiceManager.class).isSameAs(customManager));
|
||||
}
|
||||
|
||||
@Test
|
||||
void customAvroSchemaMessageConverterIsRespected() {
|
||||
AvroSchemaMessageConverter customConverter = mock(AvroSchemaMessageConverter.class);
|
||||
contextRunner.withBean(AvroSchemaMessageConverter.class, () -> customConverter)
|
||||
.run((context) -> assertThat(context).getBean(AvroSchemaMessageConverter.class).isSameAs(customConverter));
|
||||
}
|
||||
}
|
||||
|
||||
@Nested
|
||||
class CloudEventsMessageConverterConfig {
|
||||
|
||||
|
||||
@@ -1,83 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.context.converter.avro;
|
||||
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Function;
|
||||
|
||||
import com.example.Sensor;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class AvroSchemaMessageConverterTests {
|
||||
|
||||
@Test
|
||||
public void testAvroSchemaMessageConverter() {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.main.lazy-initialization=true")) {
|
||||
|
||||
FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
|
||||
SimpleFunctionRegistry.FunctionInvocationWrapper function = functionCatalog.lookup("avroSensor");
|
||||
|
||||
Random random = new Random();
|
||||
Sensor sensor = new Sensor();
|
||||
sensor.setId(UUID.randomUUID().toString() + "-v1");
|
||||
sensor.setAcceleration(random.nextFloat() * 10);
|
||||
sensor.setVelocity(random.nextFloat() * 100);
|
||||
sensor.setTemperature(random.nextFloat() * 50);
|
||||
|
||||
final AvroSchemaMessageConverter bean = context.getBean(AvroSchemaMessageConverter.class);
|
||||
// Explicitly convert the Sensor to byte[]
|
||||
final Message<?> message = bean.toMessage(sensor, new MessageHeaders(null));
|
||||
// Now send with the sensor->byte[] converted payload.
|
||||
final Sensor enrichedSensor = (Sensor) function.apply(message);
|
||||
|
||||
assertThat(enrichedSensor.getTemperature()).isEqualTo(sensor.getTemperature() + 5);
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
protected static class SampleFunctionConfiguration {
|
||||
|
||||
@Bean
|
||||
public Function<Sensor, Sensor> avroSensor() {
|
||||
return s -> {
|
||||
s.setTemperature(s.getTemperature() + 5);
|
||||
return s;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user