diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootHttpRequestHandler.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootHttpRequestHandler.java index 8dc3ae7ea..f8fc6240d 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootHttpRequestHandler.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootHttpRequestHandler.java @@ -109,7 +109,9 @@ public class AzureSpringBootHttpRequestHandler extends .createResponseBuilder(com.microsoft.azure.functions.HttpStatus.OK) .body(message.getPayload()); for (Map.Entry entry : message.getHeaders().entrySet()) { - builder = builder.header(entry.getKey(), entry.getValue().toString()); + if (entry.getValue() != null) { + builder = builder.header(entry.getKey(), entry.getValue().toString()); + } } return builder.build(); } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootRequestHandler.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootRequestHandler.java index 1574f2906..a6c9f8893 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootRequestHandler.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootRequestHandler.java @@ -21,6 +21,7 @@ import java.util.function.Function; import java.util.logging.Logger; import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.HttpRequestMessage; import com.microsoft.azure.functions.OutputBinding; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -29,6 +30,8 @@ import reactor.core.publisher.Mono; import org.springframework.cloud.function.context.AbstractSpringFunctionAdapterInitializer; import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; /** * @param input type @@ -36,6 +39,7 @@ import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry * @author Soby Chacko * @author Oleg Zhurakousky */ +@SuppressWarnings("deprecation") public class AzureSpringBootRequestHandler extends AbstractSpringFunctionAdapterInitializer { @SuppressWarnings("rawtypes") @@ -86,6 +90,9 @@ public class AzureSpringBootRequestHandler extends AbstractSpringFunctionA } else { Publisher events = input == null ? Mono.empty() : extract(convertEvent(input)); + if (events instanceof Flux) { + events = Flux.from(events).map(v -> this.toMessage(v, context)); + } Publisher output = thisInitializer.apply(events); O result = result(input, output); if (context != null) { @@ -102,6 +109,23 @@ public class AzureSpringBootRequestHandler extends AbstractSpringFunctionA } } + private Message toMessage(Object value, ExecutionContext context) { + if (value instanceof Message) { + return (Message) value; + } + else { + Object payload = value; + if (value instanceof HttpRequestMessage) { + payload = ((HttpRequestMessage) value).getBody(); + if (payload == null) { + payload = ((HttpRequestMessage) value).getQueryParameters(); + } + } + return MessageBuilder.withPayload(payload) + .setHeader(AbstractSpringFunctionAdapterInitializer.TARGET_EXECUTION_CTX_NAME, context).build(); + } + } + @Override protected String doResolveName(Object targetContext) { return ((ExecutionContext) targetContext).getFunctionName(); diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/test/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootRequestHandlerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/test/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootRequestHandlerTests.java index e91d7125a..d57fafb95 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/test/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootRequestHandlerTests.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/test/java/org/springframework/cloud/function/adapter/azure/AzureSpringBootRequestHandlerTests.java @@ -32,6 +32,7 @@ import reactor.core.publisher.Flux; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.Message; import static org.assertj.core.api.Assertions.assertThat; @@ -194,8 +195,10 @@ public class AzureSpringBootRequestHandlerTests { protected static class AutoConfig { @Bean - public Function uppercase(ExecutionContext targetContext) { - return foo -> { + public Function, Bar> uppercase() { + return message -> { + Foo foo = message.getPayload(); + ExecutionContext targetContext = (ExecutionContext) message.getHeaders().get("executionContext"); targetContext.getLogger().info("Invoking 'uppercase' on " + foo.getValue()); return new Bar(foo.getValue().toUpperCase()); }; @@ -232,17 +235,21 @@ public class AzureSpringBootRequestHandlerTests { protected static class MultiConfig { @Bean - public Function uppercase(ExecutionContext context) { + public Function, Bar> uppercase() { - return foo -> { + return message -> { + ExecutionContext context = (ExecutionContext) message.getHeaders().get("executionContext"); + Foo foo = message.getPayload(); context.getLogger().info("Executing uppercase function"); return new Bar(foo.getValue().toUpperCase()); }; } @Bean - public Function lowercase(ExecutionContext context) { - return bar -> { + public Function, Foo> lowercase() { + return message -> { + ExecutionContext context = (ExecutionContext) message.getHeaders().get("executionContext"); + Bar bar = message.getPayload(); context.getLogger().info("Executing lowercase function"); return new Foo(bar.getValue().toLowerCase()); }; diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/AbstractSpringFunctionAdapterInitializer.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/AbstractSpringFunctionAdapterInitializer.java index bcef91468..f8df8d180 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/AbstractSpringFunctionAdapterInitializer.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/AbstractSpringFunctionAdapterInitializer.java @@ -20,12 +20,14 @@ import java.io.Closeable; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; @@ -36,14 +38,17 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.WebApplicationType; import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.cloud.function.context.config.FunctionContextUtils; +import org.springframework.cloud.function.context.config.JsonMessageConverter; import org.springframework.cloud.function.context.config.RoutingFunction; +import org.springframework.cloud.function.context.config.SmartCompositeMessageConverter; +import org.springframework.cloud.function.json.JacksonMapper; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.cloud.function.utils.FunctionClassUtils; -import org.springframework.context.ApplicationContextInitializer; import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.support.GenericApplicationContext; +import org.springframework.core.convert.support.GenericConversionService; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @@ -65,7 +70,7 @@ public abstract class AbstractSpringFunctionAdapterInitializer implements Clo /** * Name of the bean for registering the target execution context passed to `initialize(context)` operation. */ - public static final String TARGET_EXECUTION_CTX_BEAN_NAME = "targetExecutionContext"; + public static final String TARGET_EXECUTION_CTX_NAME = "executionContext"; private final Class configurationClass; @@ -113,12 +118,14 @@ public abstract class AbstractSpringFunctionAdapterInitializer implements Clo } logger.info("Initializing: " + this.configurationClass); SpringApplication builder = springApplication(); - - this.registerTargetContext(targetContext, builder); ConfigurableApplicationContext context = builder.run(); context.getAutowireCapableBeanFactory().autowireBean(this); this.context = context; if (this.catalog == null) { + SmartCompositeMessageConverter messageConverter = + new SmartCompositeMessageConverter(Collections.singletonList(new JsonMessageConverter(new JacksonMapper(new ObjectMapper())))); + this.catalog = new SimpleFunctionRegistry(new GenericConversionService(), + messageConverter, new JacksonMapper(new ObjectMapper())); initFunctionConsumerOrSupplierFromContext(targetContext); } else { @@ -126,19 +133,6 @@ public abstract class AbstractSpringFunctionAdapterInitializer implements Clo } } - private void registerTargetContext(C targetContext, SpringApplication builder) { - if (targetContext != null) { - builder.addInitializers(new ApplicationContextInitializer() { - @SuppressWarnings("unchecked") - @Override - public void initialize(ConfigurableApplicationContext applicationContext) { - ((GenericApplicationContext) applicationContext).registerBean(TARGET_EXECUTION_CTX_BEAN_NAME, - (Class) targetContext.getClass(), (Supplier) () -> targetContext); - } - }); - } - } - protected Class getInputType() { Object func = function(); @@ -170,7 +164,6 @@ public abstract class AbstractSpringFunctionAdapterInitializer implements Clo protected Publisher apply(Publisher input) { if (this.function != null) { - //return Flux.from(this.function.apply(input)); Object result = this.function.apply(input); if (result instanceof Publisher) { return Flux.from((Publisher) result); @@ -278,9 +271,10 @@ public abstract class AbstractSpringFunctionAdapterInitializer implements Clo Type type = FunctionContextUtils. findType(name, this.context.getBeanFactory()); - this.functionRegistration = functionRegistration.type(new FunctionType(type)).wrap(); + this.functionRegistration = functionRegistration.type(new FunctionType(type)); - return (T) functionRegistration.getTarget(); + ((FunctionRegistry) this.catalog).register(functionRegistration); + return this.catalog.lookup(name); } private void initFunctionConsumerOrSupplierFromContext(Object targetContext) { diff --git a/spring-cloud-function-samples/function-sample-azure/src/main/java/example/Config.java b/spring-cloud-function-samples/function-sample-azure/src/main/java/example/Config.java index 6aaaad99d..7f33999e7 100644 --- a/spring-cloud-function-samples/function-sample-azure/src/main/java/example/Config.java +++ b/spring-cloud-function-samples/function-sample-azure/src/main/java/example/Config.java @@ -16,14 +16,14 @@ package example; -import java.io.IOException; import java.util.Map; import java.util.function.Function; -import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.function.json.JsonMapper; import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; import com.microsoft.azure.functions.ExecutionContext; @@ -35,12 +35,12 @@ public class Config { } @Bean - public Function uppercase(ExecutionContext context) { - return value -> { - ObjectMapper mapper = new ObjectMapper(); - + public Function, String> uppercase(JsonMapper mapper) { + return message -> { + String value = message.getPayload(); + ExecutionContext context = (ExecutionContext) message.getHeaders().get("executionContext"); try { - Map map = mapper.readValue(value, Map.class); + Map map = mapper.fromJson(value, Map.class); if(map != null) map.forEach((k, v) -> map.put(k, v != null ? v.toUpperCase() : null)); @@ -48,8 +48,9 @@ public class Config { if(context != null) context.getLogger().info(new StringBuilder().append("Function: ").append(context.getFunctionName()).append(" is uppercasing ").append(value.toString()).toString()); - return mapper.writeValueAsString(map); - } catch (IOException e) { + return mapper.toString(map); + } catch (Exception e) { + e.printStackTrace(); if(context != null) context.getLogger().severe("Function could not parse incoming request");