diff --git a/spring-cloud-function-context/pom.xml b/spring-cloud-function-context/pom.xml
index 1e480bb43..6d02e289e 100644
--- a/spring-cloud-function-context/pom.xml
+++ b/spring-cloud-function-context/pom.xml
@@ -15,6 +15,10 @@
3.2.0-SNAPSHOT
+
+ 1.10.2
+
+
net.jodah
@@ -86,6 +90,14 @@
kotlinx-coroutines-reactor
true
+
+
+
+ org.apache.avro
+ avro
+ ${avro.version}
+ true
+
@@ -165,6 +177,24 @@
+
+ org.apache.avro
+ avro-maven-plugin
+ ${avro.version}
+
+
+ generate-test-sources
+
+ schema
+
+
+ ${project.basedir}/target/generated-test-sources
+ ${project.basedir}/target/generated-test-sources
+ ${project.basedir}/src/test/resources/avro
+
+
+
+
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java
index b1bbba6ee..4d57459dd 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java
@@ -29,6 +29,7 @@ import com.google.gson.Gson;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@@ -38,6 +39,8 @@ import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry;
+import org.springframework.cloud.function.context.converter.avro.AvroSchemaMessageConverter;
+import org.springframework.cloud.function.context.converter.avro.AvroSchemaServiceManagerImpl;
import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.cloud.function.json.GsonMapper;
import org.springframework.cloud.function.json.JacksonMapper;
@@ -71,6 +74,7 @@ import org.springframework.util.StringUtils;
* @author Oleg Zhurakousky
* @author Artem Bilan
* @author Anshul Mehra
+ * @author Soby Chacko
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingBean(FunctionCatalog.class)
@@ -88,7 +92,7 @@ public class ContextFunctionCatalogAutoConfiguration {
@Bean
public FunctionRegistry functionCatalog(List messageConverters, JsonMapper jsonMapper,
- ConfigurableApplicationContext context, @Nullable FunctionInvocationHelper> functionInvocationHelper) {
+ ConfigurableApplicationContext context, @Nullable FunctionInvocationHelper> functionInvocationHelper) {
ConfigurableConversionService conversionService = (ConfigurableConversionService) context.getBeanFactory().getConversionService();
if (conversionService == null) {
conversionService = new DefaultConversionService();
@@ -113,8 +117,8 @@ public class ContextFunctionCatalogAutoConfiguration {
}
mcList = mcList.stream()
- .filter(c -> isConverterEligible(c))
- .collect(Collectors.toList());
+ .filter(c -> isConverterEligible(c))
+ .collect(Collectors.toList());
mcList.add(new JsonMessageConverter(jsonMapper));
mcList.add(new ByteArrayMessageConverter());
@@ -131,9 +135,16 @@ public class ContextFunctionCatalogAutoConfiguration {
return new BeanFactoryAwareFunctionRegistry(conversionService, messageConverter, jsonMapper, functionProperties, functionInvocationHelper);
}
+ @Bean
+ @ConditionalOnMissingBean
+ @ConditionalOnClass(name = "org.apache.avro.Schema")
+ public MessageConverter avroSchemaMessageConverter() {
+ return new AvroSchemaMessageConverter(new AvroSchemaServiceManagerImpl());
+ }
+
@Bean(RoutingFunction.FUNCTION_NAME)
RoutingFunction functionRouter(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
- BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
+ BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
return new RoutingFunction(functionCatalog, functionProperties, new BeanFactoryResolver(beanFactory), routingCallback);
}
@@ -150,10 +161,10 @@ public class ContextFunctionCatalogAutoConfiguration {
@Configuration(proxyBeanMethods = false)
@ComponentScan(basePackages = "${spring.cloud.function.scan.packages:functions}", //
- includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE,
- classes = { Supplier.class, Function.class, Consumer.class }))
+ includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE,
+ classes = {Supplier.class, Function.class, Consumer.class}))
@ConditionalOnProperty(prefix = "spring.cloud.function.scan", name = "enabled", havingValue = "true",
- matchIfMissing = true)
+ matchIfMissing = true)
protected static class PlainFunctionScanConfiguration {
}
@@ -163,8 +174,8 @@ public class ContextFunctionCatalogAutoConfiguration {
@Bean
public JsonMapper jsonMapper(ApplicationContext context) {
String preferredMapper = context.getEnvironment().containsProperty(JSON_MAPPER_PROPERTY)
- ? context.getEnvironment().getProperty(JSON_MAPPER_PROPERTY)
- : context.getEnvironment().getProperty(PREFERRED_MAPPER_PROPERTY);
+ ? context.getEnvironment().getProperty(JSON_MAPPER_PROPERTY)
+ : context.getEnvironment().getProperty(PREFERRED_MAPPER_PROPERTY);
if (StringUtils.hasText(preferredMapper)) {
if ("gson".equals(preferredMapper) && ClassUtils.isPresent("com.google.gson.Gson", null)) {
return gson(context);
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/converter/avro/AbstractAvroMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/converter/avro/AbstractAvroMessageConverter.java
new file mode 100644
index 000000000..b91cab1bd
--- /dev/null
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/converter/avro/AbstractAvroMessageConverter.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2016-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.context.converter.avro;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
+
+import org.springframework.core.io.Resource;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.converter.AbstractMessageConverter;
+import org.springframework.messaging.converter.MessageConversionException;
+import org.springframework.util.MimeType;
+
+/**
+ * Base class for Apache Avro
+ * {@link org.springframework.messaging.converter.MessageConverter} implementations.
+ *
+ * @author Marius Bogoevici
+ * @author Vinicius Carvalho
+ * @author Sercan Karaoglu
+ * @author Ish Mahajan
+ *
+ * @since 3.2.0
+ */
+public abstract class AbstractAvroMessageConverter extends AbstractMessageConverter {
+
+ /**
+ * common parser will let user to import external schemas.
+ */
+ private Schema.Parser schemaParser = new Schema.Parser();
+ private AvroSchemaServiceManager avroSchemaServiceManager;
+
+ protected AbstractAvroMessageConverter(MimeType supportedMimeType, AvroSchemaServiceManager avroSchemaServiceManager) {
+ this(Collections.singletonList(supportedMimeType), avroSchemaServiceManager);
+ }
+
+ protected AbstractAvroMessageConverter(Collection supportedMimeTypes, AvroSchemaServiceManager manager) {
+ super(supportedMimeTypes);
+ this.avroSchemaServiceManager = manager;
+ }
+
+ protected AvroSchemaServiceManager avroSchemaServiceManager() {
+ return this.avroSchemaServiceManager;
+ }
+
+ protected Schema parseSchema(Resource r) throws IOException {
+ return this.schemaParser.parse(r.getInputStream());
+ }
+
+ @Override
+ protected boolean canConvertFrom(Message> message, Class> targetClass) {
+ return super.canConvertFrom(message, targetClass)
+ && (message.getPayload() instanceof byte[]);
+ }
+
+ @Override
+ protected Object convertFromInternal(Message> message, Class> targetClass, Object conversionHint) {
+ Object result;
+ try {
+ byte[] payload = (byte[]) message.getPayload();
+
+ MimeType mimeType = getContentTypeResolver().resolve(message.getHeaders());
+ if (mimeType == null) {
+ if (conversionHint instanceof MimeType) {
+ mimeType = (MimeType) conversionHint;
+ }
+ else {
+ return null;
+ }
+ }
+
+ Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType);
+ Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass);
+
+ result = avroSchemaServiceManager().readData(targetClass, payload, readerSchema, writerSchema);
+ }
+ catch (IOException e) {
+ throw new MessageConversionException(message, "Failed to read payload", e);
+ }
+ return result;
+ }
+
+ @Override
+ protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ MimeType hintedContentType = null;
+ if (conversionHint instanceof MimeType) {
+ hintedContentType = (MimeType) conversionHint;
+ }
+ Schema schema = resolveSchemaForWriting(payload, headers, hintedContentType);
+ @SuppressWarnings("unchecked")
+ DatumWriter