diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 1e3611d78..2d0c11041 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -40,9 +40,6 @@ import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.MessageRoutingCallback; import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry; -import org.springframework.cloud.function.context.converter.avro.AvroSchemaMessageConverter; -import org.springframework.cloud.function.context.converter.avro.AvroSchemaServiceManager; -import org.springframework.cloud.function.context.converter.avro.AvroSchemaServiceManagerImpl; import org.springframework.cloud.function.core.FunctionInvocationHelper; import org.springframework.cloud.function.json.GsonMapper; import org.springframework.cloud.function.json.JacksonMapper; @@ -161,23 +158,6 @@ public class ContextFunctionCatalogAutoConfiguration { } } - @Configuration(proxyBeanMethods = false) - @ConditionalOnClass(name = "org.apache.avro.Schema") - static class AvroSchemaMessageConverterConfiguration { - - @Bean - @ConditionalOnMissingBean - public AvroSchemaServiceManager avroSchemaServiceManager() { - return new AvroSchemaServiceManagerImpl(); - } - - @Bean - @ConditionalOnMissingBean - public AvroSchemaMessageConverter avroSchemaMessageConverter(AvroSchemaServiceManager avroSchemaServiceManager) { - return new AvroSchemaMessageConverter(avroSchemaServiceManager); - } - } - @ComponentScan(basePackages = "${spring.cloud.function.scan.packages:functions}", includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE, classes = { Supplier.class, Function.class, Consumer.class }), excludeFilters = @Filter(type = FilterType.ANNOTATION, classes = { Configuration.class, Component.class})) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/converter/avro/AbstractAvroMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/converter/avro/AbstractAvroMessageConverter.java deleted file mode 100644 index b91cab1bd..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/converter/avro/AbstractAvroMessageConverter.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright 2016-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.converter.avro; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; - -import org.apache.avro.Schema; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.Encoder; -import org.apache.avro.io.EncoderFactory; - -import org.springframework.core.io.Resource; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.converter.AbstractMessageConverter; -import org.springframework.messaging.converter.MessageConversionException; -import org.springframework.util.MimeType; - -/** - * Base class for Apache Avro - * {@link org.springframework.messaging.converter.MessageConverter} implementations. - * - * @author Marius Bogoevici - * @author Vinicius Carvalho - * @author Sercan Karaoglu - * @author Ish Mahajan - * - * @since 3.2.0 - */ -public abstract class AbstractAvroMessageConverter extends AbstractMessageConverter { - - /** - * common parser will let user to import external schemas. - */ - private Schema.Parser schemaParser = new Schema.Parser(); - private AvroSchemaServiceManager avroSchemaServiceManager; - - protected AbstractAvroMessageConverter(MimeType supportedMimeType, AvroSchemaServiceManager avroSchemaServiceManager) { - this(Collections.singletonList(supportedMimeType), avroSchemaServiceManager); - } - - protected AbstractAvroMessageConverter(Collection supportedMimeTypes, AvroSchemaServiceManager manager) { - super(supportedMimeTypes); - this.avroSchemaServiceManager = manager; - } - - protected AvroSchemaServiceManager avroSchemaServiceManager() { - return this.avroSchemaServiceManager; - } - - protected Schema parseSchema(Resource r) throws IOException { - return this.schemaParser.parse(r.getInputStream()); - } - - @Override - protected boolean canConvertFrom(Message message, Class targetClass) { - return super.canConvertFrom(message, targetClass) - && (message.getPayload() instanceof byte[]); - } - - @Override - protected Object convertFromInternal(Message message, Class targetClass, Object conversionHint) { - Object result; - try { - byte[] payload = (byte[]) message.getPayload(); - - MimeType mimeType = getContentTypeResolver().resolve(message.getHeaders()); - if (mimeType == null) { - if (conversionHint instanceof MimeType) { - mimeType = (MimeType) conversionHint; - } - else { - return null; - } - } - - Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType); - Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass); - - result = avroSchemaServiceManager().readData(targetClass, payload, readerSchema, writerSchema); - } - catch (IOException e) { - throw new MessageConversionException(message, "Failed to read payload", e); - } - return result; - } - - @Override - protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try { - MimeType hintedContentType = null; - if (conversionHint instanceof MimeType) { - hintedContentType = (MimeType) conversionHint; - } - Schema schema = resolveSchemaForWriting(payload, headers, hintedContentType); - @SuppressWarnings("unchecked") - DatumWriter writer = avroSchemaServiceManager().getDatumWriter(payload.getClass(), schema); - Encoder encoder = EncoderFactory.get().binaryEncoder(baos, null); - writer.write(payload, encoder); - encoder.flush(); - } - catch (IOException e) { - throw new MessageConversionException("Failed to write payload", e); - } - return baos.toByteArray(); - } - - protected abstract Schema resolveSchemaForWriting(Object payload, MessageHeaders headers, MimeType hintedContentType); - - protected abstract Schema resolveWriterSchemaForDeserialization(MimeType mimeType); - - protected abstract Schema resolveReaderSchemaForDeserialization(Class targetClass); - -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/converter/avro/AvroSchemaMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/converter/avro/AvroSchemaMessageConverter.java deleted file mode 100644 index 6d9129e53..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/converter/avro/AvroSchemaMessageConverter.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright 2016-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.converter.avro; - -import java.io.IOException; -import java.util.Collection; - -import org.apache.avro.Schema; - -import org.springframework.core.io.Resource; -import org.springframework.messaging.MessageHeaders; -import org.springframework.util.Assert; -import org.springframework.util.MimeType; - -/** - * A {@link org.springframework.messaging.converter.MessageConverter} using Apache Avro. - * The schema for serializing and deserializing will be automatically inferred from the - * class for {@link org.apache.avro.specific.SpecificRecord} and regular classes, unless a - * specific schema is set, case in which that schema will be used instead. For converting - * to {@link org.apache.avro.generic.GenericRecord} targets, a schema must be set.s - * - * @author Marius Bogoevici - * @author Ish Mahajan - * @author Soby Chacko - * - * @since 3.2.0 - */ - -public class AvroSchemaMessageConverter extends AbstractAvroMessageConverter { - - private Schema schema; - - /** - * Create a {@link AvroSchemaMessageConverter}. Uses the default {@link MimeType} of - * {@code "application/avro"}. - * @param manager for schema management - */ - public AvroSchemaMessageConverter(AvroSchemaServiceManager manager) { - super(new MimeType("application", "avro"), manager); - } - - /** - * Create a {@link AvroSchemaMessageConverter}. The converter will be used for the - * provided {@link MimeType}. - * @param supportedMimeType mime type to be supported by - * {@link AvroSchemaMessageConverter} - * @param manager for schema management - */ - public AvroSchemaMessageConverter(MimeType supportedMimeType, AvroSchemaServiceManager manager) { - super(supportedMimeType, manager); - } - - /** - * Create a {@link AvroSchemaMessageConverter}. The converter will be used for the - * provided {@link MimeType}s. - * @param supportedMimeTypes the mime types supported by this converter - * @param manager for schema management - */ - public AvroSchemaMessageConverter(Collection supportedMimeTypes, AvroSchemaServiceManager manager) { - super(supportedMimeTypes, manager); - } - - public Schema getSchema() { - return this.schema; - } - - /** - * Sets the Apache Avro schema to be used by this converter. - * @param schema schema to be used by this converter - */ - public void setSchema(Schema schema) { - Assert.notNull(schema, "schema cannot be null"); - this.schema = schema; - } - - /** - * The location of the Apache Avro schema to be used by this converter. - * @param schemaLocation the location of the schema used by this converter. - */ - public void setSchemaLocation(Resource schemaLocation) { - Assert.notNull(schemaLocation, "schema cannot be null"); - try { - this.schema = parseSchema(schemaLocation); - } - catch (IOException e) { - throw new IllegalStateException("Schema cannot be parsed:", e); - } - } - - @Override - protected boolean supports(Class clazz) { - return true; - } - - @Override - protected Schema resolveWriterSchemaForDeserialization(MimeType mimeType) { - return this.schema; - } - - @Override - protected Schema resolveReaderSchemaForDeserialization(Class targetClass) { - return this.schema; - } - - @Override - protected Schema resolveSchemaForWriting(Object payload, MessageHeaders headers, - MimeType hintedContentType) { - return this.schema; - } - -} - diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/converter/avro/AvroSchemaServiceManager.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/converter/avro/AvroSchemaServiceManager.java deleted file mode 100644 index 7927a43dc..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/converter/avro/AvroSchemaServiceManager.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2016-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.converter.avro; - -import java.io.IOException; - -import org.apache.avro.Schema; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DatumWriter; - -/** - * Manage a {@link Schema} together with its String representation. - * - * Helps to substitute the default implementation of {@link org.apache.avro.Schema} - * Generation using Custom Avro schema generator - * - * Provide a custom bean definition of {@link AvroSchemaServiceManager} to override the default implementation. - * - * Migrating this interface from the original Spring Cloud Schema Registry project. - * - * @author Ish Mahajan - * @author Soby Chacko - * - * @since 3.2.0 - * - */ -public interface AvroSchemaServiceManager { - - /** - * get {@link Schema}. - * @param clazz {@link Class} for which schema generation is required - * @return returns avro schema for given class - */ - Schema getSchema(Class clazz); - - /** - * get {@link DatumWriter}. - * @param type {@link Class} of java object which needs to be serialized - * @param schema {@link Schema} of object which needs to be serialized - * @return datum writer which can be used to write Avro payload - */ - DatumWriter getDatumWriter(Class type, Schema schema); - - /** - * get {@link DatumReader}. - * @param type {@link Class} of java object which needs to be serialized - * @param schema {@link Schema} default schema of object which needs to be de-serialized - * @param writerSchema {@link Schema} writerSchema provided at run time - * @return datum reader which can be used to read Avro payload - */ - DatumReader getDatumReader(Class type, Schema schema, Schema writerSchema); - - /** - * read data from avro type payload {@link DatumReader}. - * @param targetClass {@link Class} of java object which needs to be serialized - * @param payload {@link byte} serialized payload of object which needs to be de-serialized - * @param readerSchema {@link Schema} readerSchema of object which needs to be de-serialized - * @param writerSchema {@link Schema} writerSchema used to while serializing payload - * @return java object after reading Avro Payload - * @throws IOException in case of error - */ - Object readData(Class targetClass, byte[] payload, Schema readerSchema, Schema writerSchema) - throws IOException; -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/converter/avro/AvroSchemaServiceManagerImpl.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/converter/avro/AvroSchemaServiceManagerImpl.java deleted file mode 100644 index eee848412..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/converter/avro/AvroSchemaServiceManagerImpl.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Copyright 2016-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.converter.avro; - -import java.io.IOException; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.io.Decoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.reflect.ReflectData; -import org.apache.avro.reflect.ReflectDatumReader; -import org.apache.avro.reflect.ReflectDatumWriter; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.avro.specific.SpecificRecord; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.messaging.converter.MessageConversionException; - -/** - * Default Concrete implementation of {@link AvroSchemaServiceManager}. - * - * Helps to substitute the default implementation of {@link org.apache.avro.Schema} Generation using Custom Avro - * schema generator - * - * Migrating this class from the original Spring Cloud Schema Registry project. - * - * @author Ish Mahajan - * @author Soby Chacko - * - * @since 3.2.0 - * - */ -public class AvroSchemaServiceManagerImpl implements AvroSchemaServiceManager { - - protected final Log logger = LogFactory.getLog(this.getClass()); - - /** - * get {@link Schema}. - * @param clazz {@link Class} for which schema generation is required - * @return returns avro schema for given class - */ - @Override - public Schema getSchema(Class clazz) { - return ReflectData.get().getSchema(clazz); - } - - /** - * get {@link DatumWriter}. - * @param type {@link Class} of java object which needs to be serialized - * @param schema {@link Schema} of object which needs to be serialized - * @return datum writer which can be used to write Avro payload - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public DatumWriter getDatumWriter(Class type, Schema schema) { - DatumWriter writer; - this.logger.debug("Finding correct DatumWriter for type " + type.getName()); - if (SpecificRecord.class.isAssignableFrom(type)) { - if (schema != null) { - writer = new SpecificDatumWriter<>(schema); - } - else { - writer = new SpecificDatumWriter(type); - } - } - else if (GenericRecord.class.isAssignableFrom(type)) { - writer = new GenericDatumWriter<>(schema); - } - else { - if (schema != null) { - writer = new ReflectDatumWriter<>(schema); - } - else { - writer = new ReflectDatumWriter(type); - } - } - return writer; - } - - /** - * get {@link DatumReader}. - * @param type {@link Class} of java object which needs to be serialized - * @param readerSchema {@link Schema} default schema of object which needs to be de-serialized - * @param writerSchema {@link Schema} writerSchema provided at run time - * @return datum reader which can be used to read Avro payload - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public DatumReader getDatumReader(Class type, Schema readerSchema, Schema writerSchema) { - DatumReader reader = null; - if (SpecificRecord.class.isAssignableFrom(type)) { - if (readerSchema != null) { - if (writerSchema != null) { - reader = new SpecificDatumReader<>(writerSchema, readerSchema); - } - else { - reader = new SpecificDatumReader<>(readerSchema); - } - } - else { - reader = new SpecificDatumReader(type); - if (writerSchema != null) { - reader.setSchema(writerSchema); - } - } - } - else if (GenericRecord.class.isAssignableFrom(type)) { - if (readerSchema != null) { - if (writerSchema != null) { - reader = new GenericDatumReader<>(writerSchema, readerSchema); - } - else { - reader = new GenericDatumReader<>(readerSchema); - } - } - else { - if (writerSchema != null) { - reader = new GenericDatumReader(writerSchema); - } - } - } - else { - reader = new ReflectDatumReader(type); - if (writerSchema != null) { - reader.setSchema(writerSchema); - } - } - if (reader == null) { - throw new MessageConversionException("No schema can be inferred from type " - + type.getName() + " and no schema has been explicitly configured."); - } - return reader; - } - - /** - * read data from avro type payload {@link DatumReader}. - * @param clazz {@link Class} of java object which needs to be serialized - * @param payload {@link byte} serialized payload of object which needs to be de-serialized - * @param readerSchema {@link Schema} readerSchema of object which needs to be de-serialized - * @param writerSchema {@link Schema} writerSchema used to while serializing payload - * @return java object after reading Avro Payload - * @throws IOException is thrown in case of error - */ - @Override - public Object readData(Class clazz, byte[] payload, Schema readerSchema, Schema writerSchema) - throws IOException { - DatumReader reader = this.getDatumReader(clazz, readerSchema, writerSchema); - Decoder decoder = DecoderFactory.get().binaryDecoder(payload, null); - return reader.read(null, decoder); - } -} diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationConditionalLoadingTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationConditionalLoadingTests.java index 768ef643d..62b8d06fe 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationConditionalLoadingTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationConditionalLoadingTests.java @@ -17,7 +17,6 @@ package org.springframework.cloud.function.context.config; import io.cloudevents.spring.messaging.CloudEventMessageConverter; -import org.apache.avro.Schema; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -26,8 +25,6 @@ import org.springframework.boot.test.context.FilteredClassLoader; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionRegistry; -import org.springframework.cloud.function.context.converter.avro.AvroSchemaMessageConverter; -import org.springframework.cloud.function.context.converter.avro.AvroSchemaServiceManager; import org.springframework.cloud.function.context.scan.TestFunction; import static org.assertj.core.api.Assertions.assertThat; @@ -49,37 +46,6 @@ public class ContextFunctionCatalogAutoConfigurationConditionalLoadingTests { .run((context) -> assertThat(context).doesNotHaveBean(FunctionRegistry.class)); } - @Nested - class AvroSchemaMessageConverterConfig { - - @Test - void avroSchemaMessageConverterBeansLoadedWhenAvroOnClasspath() { - contextRunner.run((context) -> assertThat(context).hasSingleBean(AvroSchemaServiceManager.class) - .hasSingleBean(AvroSchemaMessageConverter.class)); - } - - @Test - void avroSchemaMessageConverterBeansNotLoadedWhenAvroNotOnClasspath() { - contextRunner.withClassLoader(new FilteredClassLoader(Schema.class)).run((context) -> - assertThat(context).doesNotHaveBean(AvroSchemaServiceManager.class) - .doesNotHaveBean(AvroSchemaMessageConverter.class)); - } - - @Test - void customAvroSchemaServiceManagerIsRespected() { - AvroSchemaServiceManager customManager = mock(AvroSchemaServiceManager.class); - contextRunner.withBean(AvroSchemaServiceManager.class, () -> customManager) - .run((context) -> assertThat(context).getBean(AvroSchemaServiceManager.class).isSameAs(customManager)); - } - - @Test - void customAvroSchemaMessageConverterIsRespected() { - AvroSchemaMessageConverter customConverter = mock(AvroSchemaMessageConverter.class); - contextRunner.withBean(AvroSchemaMessageConverter.class, () -> customConverter) - .run((context) -> assertThat(context).getBean(AvroSchemaMessageConverter.class).isSameAs(customConverter)); - } - } - @Nested class CloudEventsMessageConverterConfig { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/converter/avro/AvroSchemaMessageConverterTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/converter/avro/AvroSchemaMessageConverterTests.java deleted file mode 100644 index 91e5b5ca9..000000000 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/converter/avro/AvroSchemaMessageConverterTests.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2021-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.converter.avro; - -import java.util.Random; -import java.util.UUID; -import java.util.function.Function; - -import com.example.Sensor; -import org.junit.jupiter.api.Test; - -import org.springframework.boot.WebApplicationType; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.builder.SpringApplicationBuilder; -import org.springframework.cloud.function.context.FunctionCatalog; -import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; - -import static org.assertj.core.api.Assertions.assertThat; - -/** - * @author Soby Chacko - */ -public class AvroSchemaMessageConverterTests { - - @Test - public void testAvroSchemaMessageConverter() { - try (ConfigurableApplicationContext context = new SpringApplicationBuilder( - SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run( - "--spring.main.lazy-initialization=true")) { - - FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class); - SimpleFunctionRegistry.FunctionInvocationWrapper function = functionCatalog.lookup("avroSensor"); - - Random random = new Random(); - Sensor sensor = new Sensor(); - sensor.setId(UUID.randomUUID().toString() + "-v1"); - sensor.setAcceleration(random.nextFloat() * 10); - sensor.setVelocity(random.nextFloat() * 100); - sensor.setTemperature(random.nextFloat() * 50); - - final AvroSchemaMessageConverter bean = context.getBean(AvroSchemaMessageConverter.class); - // Explicitly convert the Sensor to byte[] - final Message message = bean.toMessage(sensor, new MessageHeaders(null)); - // Now send with the sensor->byte[] converted payload. - final Sensor enrichedSensor = (Sensor) function.apply(message); - - assertThat(enrichedSensor.getTemperature()).isEqualTo(sensor.getTemperature() + 5); - } - } - - @EnableAutoConfiguration - @Configuration - protected static class SampleFunctionConfiguration { - - @Bean - public Function avroSensor() { - return s -> { - s.setTemperature(s.getTemperature() + 5); - return s; - }; - } - } - -}