From c8646d64d841477be4cd365740e4009a9a044bb0 Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Wed, 12 Jul 2017 14:29:44 +0100 Subject: [PATCH] Strangle old inspector methods Fixes gh-81 --- .../aws/SpringFunctionInitializer.java | 12 +-- .../openwhisk/FunctionInitializer.java | 8 +- ...ntextFunctionCatalogAutoConfiguration.java | 85 +++---------------- .../function/context/FunctionInspector.java | 17 +--- ...FunctionCatalogAutoConfigurationTests.java | 74 ++++++++++------ .../FunctionExtractingFunctionCatalog.java | 29 +------ .../StreamListeningConsumerInvoker.java | 8 +- .../StreamListeningFunctionInvoker.java | 46 +++++----- .../function/web/flux/FunctionController.java | 2 +- .../web/flux/request/DelegateHandler.java | 13 ++- .../FluxHandlerMethodArgumentResolver.java | 8 +- .../flux/response/FluxReturnValueHandler.java | 7 +- 12 files changed, 123 insertions(+), 186 deletions(-) diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java index 8072439f4..a1418e879 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java @@ -62,8 +62,6 @@ public class SpringFunctionInitializer implements Closeable { @Autowired(required = false) private FunctionCatalog catalog; - private String name; - private ConfigurableApplicationContext context; public SpringFunctionInitializer(Class configurationClass) { @@ -102,19 +100,16 @@ public class SpringFunctionInitializer implements Closeable { } else { this.function = this.catalog.lookupFunction(name); - this.name = name; if (this.function == null) { if (defaultName) { name = "consumer"; } this.consumer = this.catalog.lookupConsumer(name); - this.name = name; if (this.consumer == null) { if (defaultName) { name = "supplier"; } this.supplier = this.catalog.lookupSupplier(name); - this.name = name; } } } @@ -123,11 +118,16 @@ public class SpringFunctionInitializer implements Closeable { protected Class getInputType() { if (inspector != null) { - return inspector.getInputType(this.name); + return inspector.getInputType(function()); } return Object.class; } + private Object function() { + return this.function != null ? this.function + : (this.consumer != null ? this.consumer : this.supplier); + } + protected Flux apply(Flux input) { if (this.function != null) { return function.apply(input); diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-openwhisk/src/main/java/org/springframework/cloud/function/adapter/openwhisk/FunctionInitializer.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-openwhisk/src/main/java/org/springframework/cloud/function/adapter/openwhisk/FunctionInitializer.java index 8e6fd09b8..80daf9664 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-openwhisk/src/main/java/org/springframework/cloud/function/adapter/openwhisk/FunctionInitializer.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-openwhisk/src/main/java/org/springframework/cloud/function/adapter/openwhisk/FunctionInitializer.java @@ -62,6 +62,7 @@ public class FunctionInitializer { if ("function".equals(type)) { this.function = this.catalog.lookupFunction(name); if (this.function != null && !FunctionUtils.isFluxFunction(this.function)) { + // TODO: this shouldn't be necessary this.function = new FluxFunction(this.function); } } @@ -75,11 +76,16 @@ public class FunctionInitializer { protected Class getInputType() { if (inspector != null) { - return inspector.getInputType(this.properties.getName()); + return inspector.getInputType(function()); } return Object.class; } + private Object function() { + return this.function != null ? this.function + : (this.consumer != null ? this.consumer : this.supplier); + } + protected Flux apply(Flux input) { if (this.function != null) { return function.apply(input); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java index 639f9ec77..a3ebc25ac 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfiguration.java @@ -110,33 +110,8 @@ public class ContextFunctionCatalogAutoConfiguration { } @Override - public boolean isMessage(String name) { - return processor.isMessage(name); - } - - @Override - public Class getInputWrapper(String name) { - return processor.findInputWrapper(name); - } - - @Override - public Class getOutputWrapper(String name) { - return processor.findOutputWrapper(name); - } - - @Override - public Class getInputType(String name) { - return processor.findInputType(name); - } - - @Override - public Class getOutputType(String name) { - return processor.findOutputType(name); - } - - @Override - public Object convert(String name, String value) { - return processor.convert(name, value); + public boolean isMessage(Object function) { + return processor.isMessage(function); } @Override @@ -178,13 +153,10 @@ public class ContextFunctionCatalogAutoConfiguration { private Set suppliers = new HashSet<>(); private Set functions = new HashSet<>(); private Set consumers = new HashSet<>(); - private Map beans = new HashMap<>(); private BeanDefinitionRegistry registry; private ConversionService conversionService; private Map registrations = new HashMap<>(); - // TODO: keys are not unique - private Map reverse = new HashMap<>(); @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) { @@ -210,10 +182,6 @@ public class ContextFunctionCatalogAutoConfiguration { : value; } - public Object convert(String name, String value) { - return convert(reverse.get(name), value); - } - public Set> merge( Map> initial, Map> consumers, Map> suppliers, @@ -281,11 +249,7 @@ public class ContextFunctionCatalogAutoConfiguration { private void wrap(FunctionRegistration registration, String key) { Object target = registration.getTarget(); - this.registrations.put(registration.getTarget(), key); - for (String name : registration.getNames()) { - beans.put(name, key); - this.reverse.put(name, registration.getTarget()); - } + this.registrations.put(target, key); if (target instanceof Supplier) { registration.target(target((Supplier) target, key)); } @@ -295,9 +259,7 @@ public class ContextFunctionCatalogAutoConfiguration { else if (target instanceof Function) { registration.target(target((Function) target, key)); } - for (String name : registration.getNames()) { - this.reverse.put(name, registration.getTarget()); - } + registrations.remove(target); this.registrations.put(registration.getTarget(), key); } @@ -384,23 +346,23 @@ public class ContextFunctionCatalogAutoConfiguration { } private boolean isFluxFunction(String name, Function function) { - boolean fluxTypes = this.hasFluxTypes(name); + boolean fluxTypes = this.hasFluxTypes(function); return fluxTypes || FunctionUtils.isFluxFunction(function); } private boolean isFluxConsumer(String name, Consumer consumer) { - boolean fluxTypes = this.hasFluxTypes(name); + boolean fluxTypes = this.hasFluxTypes(consumer); return fluxTypes || FunctionUtils.isFluxConsumer(consumer); } private boolean isFluxSupplier(String name, Supplier supplier) { - boolean fluxTypes = this.hasFluxTypes(name); + boolean fluxTypes = this.hasFluxTypes(supplier); return fluxTypes || FunctionUtils.isFluxSupplier(supplier); } - private boolean hasFluxTypes(String name) { - return FunctionInspector.isWrapper(findInputWrapper(name)) - || FunctionInspector.isWrapper(findOutputWrapper(name)); + private boolean hasFluxTypes(Object function) { + return FunctionInspector.isWrapper(findInputWrapper(function)) + || FunctionInspector.isWrapper(findOutputWrapper(function)); } private boolean isGenericSupplier(ConfigurableListableBeanFactory factory, @@ -569,10 +531,8 @@ public class ContextFunctionCatalogAutoConfiguration { return ReflectionUtils.getField(field, target); } - private boolean isMessage(String name) { - if (name != null) { - name = beans.get(name); - } + private boolean isMessage(Object function) { + String name = registrations.get(function); if (name == null || !registry.containsBeanDefinition(name)) { return false; } @@ -582,7 +542,6 @@ public class ContextFunctionCatalogAutoConfiguration { || Message.class.isAssignableFrom(findType(name, (AbstractBeanDefinition) registry.getBeanDefinition(name), ParamType.OUTPUT_INNER_WRAPPER)); - } private Class findInputWrapper(Object function) { @@ -595,14 +554,6 @@ public class ContextFunctionCatalogAutoConfiguration { ParamType.INPUT_WRAPPER); } - private Class findInputWrapper(String name) { - return findInputWrapper(function(name)); - } - - private Object function(String name) { - return reverse.containsKey(name) ? reverse.get(name) : null; - } - private Class findOutputWrapper(Object function) { String name = registrations.get(function); if (name == null || !registry.containsBeanDefinition(name)) { @@ -613,10 +564,6 @@ public class ContextFunctionCatalogAutoConfiguration { ParamType.OUTPUT_WRAPPER); } - private Class findOutputWrapper(String name) { - return findOutputWrapper(reverse.get(name)); - } - private Class findInputType(Object function) { String name = registrations.get(function); if (name == null || !registry.containsBeanDefinition(name)) { @@ -627,10 +574,6 @@ public class ContextFunctionCatalogAutoConfiguration { ParamType.INPUT); } - private Class findInputType(String name) { - return findInputType(reverse.get(name)); - } - private Class findOutputType(Object function) { String name = registrations.get(function); if (name == null || !registry.containsBeanDefinition(name)) { @@ -641,10 +584,6 @@ public class ContextFunctionCatalogAutoConfiguration { ParamType.OUTPUT); } - private Class findOutputType(String name) { - return findOutputType(reverse.get(name)); - } - static enum ParamType { INPUT, OUTPUT, INPUT_WRAPPER, OUTPUT_WRAPPER, INPUT_INNER_WRAPPER, OUTPUT_INNER_WRAPPER; diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java index ee5214601..bbb2dd2b7 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionInspector.java @@ -28,19 +28,7 @@ import reactor.core.publisher.Mono; */ public interface FunctionInspector { - boolean isMessage(String name); - - @Deprecated - Class getInputType(String name); - - @Deprecated - Class getOutputType(String name); - - @Deprecated - Class getInputWrapper(String name); - - @Deprecated - Class getOutputWrapper(String name); + boolean isMessage(Object function); Class getInputType(Object function); @@ -50,9 +38,6 @@ public interface FunctionInspector { Class getOutputWrapper(Object function); - @Deprecated - Object convert(String name, String value); - Object convert(Object function, String value); String getName(Object function); diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfigurationTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfigurationTests.java index 1abfe89fb..00c18cd0f 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfigurationTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/ContextFunctionCatalogAutoConfigurationTests.java @@ -27,7 +27,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import org.junit.After; -import org.junit.Ignore; import org.junit.Test; import org.springframework.beans.factory.annotation.Qualifier; @@ -81,14 +80,15 @@ public class ContextFunctionCatalogAutoConfigurationTests { } @Test - @Ignore("see https://github.com/spring-cloud/spring-cloud-function/issues/81") public void ambiguousFunction() { create(AmbiguousConfiguration.class); assertThat(context.getBean("foos")).isInstanceOf(Function.class); assertThat(catalog.lookupFunction("foos")).isInstanceOf(Function.class); assertThat(catalog.lookupConsumer("foos")).isInstanceOf(Consumer.class); - // Could be String or Foo - assertThat(inspector.getInputType("foos")).isEqualTo(Foo.class); + assertThat(inspector.getInputType(catalog.lookupFunction("foos"))) + .isEqualTo(String.class); + assertThat(inspector.getInputType(catalog.lookupConsumer("foos"))) + .isEqualTo(Foo.class); } @@ -97,8 +97,10 @@ public class ContextFunctionCatalogAutoConfigurationTests { create(GenericConfiguration.class); assertThat(context.getBean("function")).isInstanceOf(Function.class); assertThat(catalog.lookupFunction("function")).isInstanceOf(Function.class); - assertThat(inspector.getInputType("function")).isAssignableFrom(Map.class); - assertThat(inspector.getInputWrapper("function")).isAssignableFrom(Map.class); + assertThat(inspector.getInputType(catalog.lookupFunction("function"))) + .isAssignableFrom(Map.class); + assertThat(inspector.getInputWrapper(catalog.lookupFunction("function"))) + .isAssignableFrom(Map.class); } @Test @@ -106,9 +108,11 @@ public class ContextFunctionCatalogAutoConfigurationTests { create(FluxMessageConfiguration.class); assertThat(context.getBean("function")).isInstanceOf(Function.class); assertThat(catalog.lookupFunction("function")).isInstanceOf(Function.class); - assertThat(inspector.isMessage("function")).isTrue(); - assertThat(inspector.getInputType("function")).isAssignableFrom(String.class); - assertThat(inspector.getInputWrapper("function")).isAssignableFrom(Flux.class); + assertThat(inspector.isMessage(catalog.lookupFunction("function"))).isTrue(); + assertThat(inspector.getInputType(catalog.lookupFunction("function"))) + .isAssignableFrom(String.class); + assertThat(inspector.getInputWrapper(catalog.lookupFunction("function"))) + .isAssignableFrom(Flux.class); } @Test @@ -116,9 +120,11 @@ public class ContextFunctionCatalogAutoConfigurationTests { create(MessageConfiguration.class); assertThat(context.getBean("function")).isInstanceOf(Function.class); assertThat(catalog.lookupFunction("function")).isInstanceOf(Function.class); - assertThat(inspector.isMessage("function")).isTrue(); - assertThat(inspector.getInputType("function")).isAssignableFrom(String.class); - assertThat(inspector.getInputWrapper("function")).isAssignableFrom(String.class); + assertThat(inspector.isMessage(catalog.lookupFunction("function"))).isTrue(); + assertThat(inspector.getInputType(catalog.lookupFunction("function"))) + .isAssignableFrom(String.class); + assertThat(inspector.getInputWrapper(catalog.lookupFunction("function"))) + .isAssignableFrom(String.class); } @Test @@ -126,8 +132,10 @@ public class ContextFunctionCatalogAutoConfigurationTests { create(GenericFluxConfiguration.class); assertThat(context.getBean("function")).isInstanceOf(Function.class); assertThat(catalog.lookupFunction("function")).isInstanceOf(Function.class); - assertThat(inspector.getInputType("function")).isAssignableFrom(Map.class); - assertThat(inspector.getInputWrapper("function")).isAssignableFrom(Flux.class); + assertThat(inspector.getInputType(catalog.lookupFunction("function"))) + .isAssignableFrom(Map.class); + assertThat(inspector.getInputWrapper(catalog.lookupFunction("function"))) + .isAssignableFrom(Flux.class); } @Test @@ -135,8 +143,10 @@ public class ContextFunctionCatalogAutoConfigurationTests { create(ExternalConfiguration.class); assertThat(context.getBean("function")).isInstanceOf(Function.class); assertThat(catalog.lookupFunction("function")).isInstanceOf(Function.class); - assertThat(inspector.getInputType("function")).isAssignableFrom(Map.class); - assertThat(inspector.getInputWrapper("function")).isAssignableFrom(Map.class); + assertThat(inspector.getInputType(catalog.lookupFunction("function"))) + .isAssignableFrom(Map.class); + assertThat(inspector.getInputWrapper(catalog.lookupFunction("function"))) + .isAssignableFrom(Map.class); } @Test @@ -144,8 +154,10 @@ public class ContextFunctionCatalogAutoConfigurationTests { create(ComponentScanBeanConfiguration.class); assertThat(context.getBean("function")).isInstanceOf(Function.class); assertThat(catalog.lookupFunction("function")).isInstanceOf(Function.class); - assertThat(inspector.getInputType("function")).isAssignableFrom(Map.class); - assertThat(inspector.getInputWrapper("function")).isAssignableFrom(Map.class); + assertThat(inspector.getInputType(catalog.lookupFunction("function"))) + .isAssignableFrom(Map.class); + assertThat(inspector.getInputWrapper(catalog.lookupFunction("function"))) + .isAssignableFrom(Map.class); } @Test @@ -153,8 +165,10 @@ public class ContextFunctionCatalogAutoConfigurationTests { create(ComponentScanConfiguration.class); assertThat(context.getBean("function")).isInstanceOf(Function.class); assertThat(catalog.lookupFunction("function")).isInstanceOf(Function.class); - assertThat(inspector.getInputType("function")).isAssignableFrom(Map.class); - assertThat(inspector.getInputWrapper("function")).isAssignableFrom(Map.class); + assertThat(inspector.getInputType(catalog.lookupFunction("function"))) + .isAssignableFrom(Map.class); + assertThat(inspector.getInputWrapper(catalog.lookupFunction("function"))) + .isAssignableFrom(Map.class); } @Test @@ -163,8 +177,9 @@ public class ContextFunctionCatalogAutoConfigurationTests { create("greeter.jar", ComponentScanJarConfiguration.class); assertThat(context.getBean("greeter")).isInstanceOf(Function.class); assertThat(catalog.lookupFunction("greeter")).isInstanceOf(Function.class); - assertThat(inspector.getInputType("greeter")).isAssignableFrom(String.class); - assertThat(inspector.getInputWrapper("greeter")) + assertThat(inspector.getInputType(catalog.lookupFunction("greeter"))) + .isAssignableFrom(String.class); + assertThat(inspector.getInputWrapper(catalog.lookupFunction("greeter"))) .isAssignableFrom(String.class); } finally { @@ -207,7 +222,8 @@ public class ContextFunctionCatalogAutoConfigurationTests { assertThat(context.getBean("function")).isInstanceOf(Function.class); assertThat(catalog.lookupFunction("function")).isNull(); assertThat(catalog.lookupFunction("other")).isInstanceOf(Function.class); - assertThat(inspector.getInputType("other")).isEqualTo(String.class); + assertThat(inspector.getInputType(catalog.lookupFunction("other"))) + .isEqualTo(String.class); } @Test @@ -235,7 +251,8 @@ public class ContextFunctionCatalogAutoConfigurationTests { "spring.cloud.function.compile.foos.outputType=String"); assertThat(context.getBean("foos")).isInstanceOf(Function.class); assertThat(catalog.lookupFunction("foos")).isInstanceOf(Function.class); - assertThat(inspector.getInputWrapper("foos")).isEqualTo(String.class); + assertThat(inspector.getInputWrapper(catalog.lookupFunction("foos"))) + .isEqualTo(String.class); } @Test @@ -249,7 +266,8 @@ public class ContextFunctionCatalogAutoConfigurationTests { "spring.cloud.function.import.foos.location=file:./target/foos.fun"); assertThat(context.getBean("foos")).isInstanceOf(Function.class); assertThat(catalog.lookupFunction("foos")).isInstanceOf(Function.class); - assertThat(inspector.getInputWrapper("foos")).isEqualTo(String.class); + assertThat(inspector.getInputWrapper(catalog.lookupFunction("foos"))) + .isEqualTo(String.class); } @Test @@ -260,7 +278,8 @@ public class ContextFunctionCatalogAutoConfigurationTests { "spring.cloud.function.compile.foos.type=consumer", "spring.cloud.function.compile.foos.inputType=String"); assertThat(catalog.lookupConsumer("foos")).isInstanceOf(Consumer.class); - assertThat(inspector.getInputWrapper("foos")).isEqualTo(String.class); + assertThat(inspector.getInputWrapper(catalog.lookupConsumer("foos"))) + .isEqualTo(String.class); @SuppressWarnings("unchecked") Consumer consumer = (Consumer) context.getBean("foos"); consumer.accept("hello"); @@ -274,7 +293,8 @@ public class ContextFunctionCatalogAutoConfigurationTests { + getClass().getName() + "::set)", "spring.cloud.function.compile.foos.type=consumer"); assertThat(catalog.lookupConsumer("foos")).isInstanceOf(Consumer.class); - assertThat(inspector.getInputWrapper("foos")).isEqualTo(Flux.class); + assertThat(inspector.getInputWrapper(catalog.lookupConsumer("foos"))) + .isEqualTo(Flux.class); @SuppressWarnings("unchecked") Consumer> consumer = (Consumer>) context .getBean("foos"); diff --git a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java index 9eb199be8..41f5df27d 100644 --- a/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java +++ b/spring-cloud-function-deployer/src/main/java/org/springframework/cloud/function/deployer/FunctionExtractingFunctionCatalog.java @@ -109,8 +109,8 @@ public class FunctionExtractingFunctionCatalog } @Override - public boolean isMessage(String name) { - return (Boolean) inspect(name, "isMessage"); + public boolean isMessage(Object function) { + return (Boolean) inspect(function, "isMessage"); } @Override @@ -138,31 +138,6 @@ public class FunctionExtractingFunctionCatalog return inspect(function, "convert"); } - @Override - public Class getInputType(String name) { - return (Class) inspect(name, "getInputType"); - } - - @Override - public Class getOutputType(String name) { - return (Class) inspect(name, "getOutputType"); - } - - @Override - public Class getInputWrapper(String name) { - return (Class) inspect(name, "getInputWrapper"); - } - - @Override - public Class getOutputWrapper(String name) { - return (Class) inspect(name, "getOutputWrapper"); - } - - @Override - public Object convert(String name, String value) { - return inspect(name, "convert"); - } - @Override public String getName(Object function) { return (String) inspect(function, "getName"); diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java index ccd24680a..d28a5e586 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java @@ -87,13 +87,14 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto .containsKey(StreamConfigurationProperties.ROUTE_KEY)) { String key = (String) input.getHeaders() .get(StreamConfigurationProperties.ROUTE_KEY); - if (functionCatalog.lookupFunction(key) != null) { + if (functionCatalog.lookupConsumer(key) != null) { return key; } } else { for (String candidate : names) { - Class inputType = functionInspector.getInputType(candidate); + Class inputType = functionInspector + .getInputType(functionCatalog.lookupConsumer(candidate)); Object value = this.converter.fromMessage(input, inputType); if (value != null && inputType.isInstance(value)) { name = candidate; @@ -110,7 +111,8 @@ public class StreamListeningConsumerInvoker implements SmartInitializingSingleto } private Function, Object> convertInput(String name) { - Class inputType = functionInspector.getInputType(name); + Class inputType = functionInspector + .getInputType(functionCatalog.lookupConsumer(name)); return m -> { if (Message.class.isAssignableFrom(inputType)) { return m; diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java index 2e6657cf0..a40d7b574 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java @@ -18,12 +18,11 @@ package org.springframework.cloud.function.stream; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Function; import org.springframework.beans.factory.SmartInitializingSingleton; @@ -85,17 +84,21 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto input.groupBy(this::select).flatMap(group -> group.key().process(group))); } + // TODO: the routing key could be added here, but really it should be added in + // Spring Cloud Stream + // (https://github.com/spring-cloud/spring-cloud-stream/issues/1010) private Flux> function(String name, Flux> flux) { - // TODO: the routing key could be added here, but really it should be added in - // Spring Cloud Stream - // (https://github.com/spring-cloud/spring-cloud-stream/issues/1010) - AtomicReference> headers = new AtomicReference>( - new LinkedHashMap<>()); - return ((Flux) functionCatalog.lookupFunction(name).apply(flux.map(message -> { - Object applied = convertInput(name).apply(message); - headers.set(message.getHeaders()); - return applied; - }))).map(result -> message(result, headers.get())); + Function> function = functionCatalog.lookupFunction(name); + return flux.publish(values -> { + Flux result = function + .apply(values.map(message -> convertInput(function).apply(message))); + Flux> aggregate = headers(values); + return result.withLatestFrom(aggregate, (p, m) -> message(p, m)); + }); + } + + private Flux> headers(Flux> flux) { + return flux.map(message -> message.getHeaders()); } private Message message(Object result, Map headers) { @@ -104,8 +107,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto } private Flux> consumer(String name, Flux> flux) { - functionCatalog.lookupConsumer(name) - .accept(flux.map(message -> convertInput(name).apply(message))); + Consumer consumer = functionCatalog.lookupConsumer(name); + consumer.accept(flux.map(message -> convertInput(consumer).apply(message))); return Flux.empty(); } @@ -149,7 +152,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto } else { for (String candidate : names) { - Class inputType = functionInspector.getInputType(candidate); + Class inputType = functionInspector + .getInputType(functionCatalog.lookupFunction(candidate)); Object value = this.converter.fromMessage(input, inputType); if (value != null && inputType.isInstance(value)) { matches.add(candidate); @@ -185,20 +189,20 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto return null; } - private Function, Object> convertInput(String name) { - Class inputType = functionInspector.getInputType(name); + private Function, Object> convertInput(Object function) { + Class inputType = functionInspector.getInputType(function); return m -> { - if (functionInspector.isMessage(name)) { - return MessageBuilder.withPayload(convertPayload(name, inputType, m)) + if (functionInspector.isMessage(function)) { + return MessageBuilder.withPayload(convertPayload(inputType, m)) .copyHeaders(m.getHeaders()).build(); } else { - return convertPayload(name, inputType, m); + return convertPayload(inputType, m); } }; } - private Object convertPayload(String name, Class inputType, Message m) { + private Object convertPayload(Class inputType, Message m) { if (inputType.isAssignableFrom(m.getPayload().getClass())) { return m.getPayload(); } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java index 8328264f0..4dda62439 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java @@ -113,7 +113,7 @@ public class FunctionController { private Mono value(Function, Flux> function, @PathVariable String value) { - Object input = inspector.convert(inspector.getName(function), value); + Object input = inspector.convert(function, value); Mono result = Mono.from(function.apply(Flux.just(input))); if (logger.isDebugEnabled()) { logger.debug("Handled GET with function"); diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/DelegateHandler.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/DelegateHandler.java index 462525800..14a9cd07a 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/DelegateHandler.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/DelegateHandler.java @@ -23,6 +23,7 @@ public abstract class DelegateHandler { private final ListableBeanFactory factory; private FunctionInspector processor; + private Object handler; private final Object source; public DelegateHandler(ListableBeanFactory factory, Object source) { @@ -31,9 +32,15 @@ public abstract class DelegateHandler { } public Class type() { - String name = source instanceof String ? (String) source - : processor().getName(source); - return (Class) processor().getInputType(name); + return processor().getInputType(handler()); + } + + private Object handler() { + if (handler == null) { + handler = source instanceof String ? factory.getBean((String) source) + : source; + } + return handler; } private FunctionInspector processor() { diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java index 6db85765b..3a0cebba0 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/request/FluxHandlerMethodArgumentResolver.java @@ -77,11 +77,11 @@ public class FluxHandlerMethodArgumentResolver WebDataBinderFactory binderFactory) throws Exception { Object handler = webRequest.getAttribute(WebRequestConstants.HANDLER, NativeWebRequest.SCOPE_REQUEST); - Class type = inspector.getInputType(inspector.getName(handler)); + Class type = inspector.getInputType(handler); if (type == null) { type = Object.class; } - boolean message = inspector.isMessage(inspector.getName(handler)); + boolean message = inspector.isMessage(handler); List body; ContentCachingRequestWrapper nativeRequest = new ContentCachingRequestWrapper( webRequest.getNativeRequest(HttpServletRequest.class)); @@ -107,8 +107,8 @@ public class FluxHandlerMethodArgumentResolver if (message) { List messages = new ArrayList<>(); for (Object payload : body) { - messages.add(MessageBuilder.withPayload(payload).copyHeaders( - HeaderUtils.fromHttp(new ServletServerHttpRequest( + messages.add(MessageBuilder.withPayload(payload) + .copyHeaders(HeaderUtils.fromHttp(new ServletServerHttpRequest( webRequest.getNativeRequest(HttpServletRequest.class)) .getHeaders())) .build()); diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxReturnValueHandler.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxReturnValueHandler.java index 3bd3572df..2749837eb 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxReturnValueHandler.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/response/FluxReturnValueHandler.java @@ -150,7 +150,7 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand Object handler = webRequest.getAttribute(WebRequestConstants.HANDLER, NativeWebRequest.SCOPE_REQUEST); - Class type = inspector.getOutputType(inspector.getName(handler)); + Class type = inspector.getOutputType(handler); boolean inputSingle = isInputSingle(webRequest, handler); if (inputSingle && isOutputSingle(handler)) { @@ -206,9 +206,8 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand } private boolean isOutputSingle(Object handler) { - String name = inspector.getName(handler); - Class type = inspector.getOutputType(name); - Class wrapper = inspector.getOutputWrapper(name); + Class type = inspector.getOutputType(handler); + Class wrapper = inspector.getOutputWrapper(handler); if (Stream.class.isAssignableFrom(type)) { return false; }