diff --git a/docs/src/main/asciidoc/spring-cloud-function.adoc b/docs/src/main/asciidoc/spring-cloud-function.adoc index e096a2096..25eea5582 100644 --- a/docs/src/main/asciidoc/spring-cloud-function.adoc +++ b/docs/src/main/asciidoc/spring-cloud-function.adoc @@ -70,6 +70,38 @@ of a non publisher type (which is normal), it will be converted to a function that returns a publisher, so that it can be subscribed to in a controlled way. +=== Function Routing + +Since version 2.2 Spring Cloud Function provides routing feature allowing +you to invoke a single function which acts as a router to an actual function you wish to invoke +This feature is very useful in certain FAAS environments where maintaining configurations +for several functions could be cumbersome or exposing more then one function is not possible. + +You enable this feature via `spring.cloud.function.routing.enabled` property setting it +to `true` (default is `false`). +This enables `RoutingFunction` under the name `router` which is loaded in FunctionCatalog. + +This function has the following signature: + +[source, java] +---- +public class RoutingFunction implements Function>, Publisher>, Consumer>> { +. . . +} +---- + +This allows the above function to act as both `Function` and `Consumer`. +As you can see it takes `Message` as an input argument. This allows you to communicate +the name of the actual function you want to invoke by providing `function.name` Message header. + +In specific execution environments/models the adapters are responsible to translate and communicate `function.name` +via Message header. For example, when using _spring-cloud-function-web_ you can provide `function.name` as an HTTP +header and the framework will propagate it as well as other HTTP headers as Message headers. + +Using Message also allows us to benefit from `MessageConverter`s to convert incoming request to the actual input type +of the target function + + === Kotlin Lambda support We also provide support for Kotlin lambdas (since v2.0). diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/AbstractSpringFunctionAdapterInitializer.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/AbstractSpringFunctionAdapterInitializer.java index ee5251210..a0b8702f0 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/AbstractSpringFunctionAdapterInitializer.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/AbstractSpringFunctionAdapterInitializer.java @@ -23,7 +23,6 @@ import java.net.URL; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -42,6 +41,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.WebApplicationType; import org.springframework.cloud.function.context.catalog.FunctionInspector; import org.springframework.cloud.function.context.config.FunctionContextUtils; +import org.springframework.cloud.function.context.config.RoutingFunction; import org.springframework.context.ApplicationContextInitializer; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.support.GenericApplicationContext; @@ -196,7 +196,6 @@ public abstract class AbstractSpringFunctionAdapterInitializer implements Clo return ""; } - //@SuppressWarnings("unchecked") protected Object convertOutput(Object input, Object output) { return output; } @@ -368,22 +367,19 @@ public abstract class AbstractSpringFunctionAdapterInitializer implements Clo return; } - if (this.catalog.size() == 1) { - Iterator names = this.catalog.getNames(Function.class).iterator(); - if (names.hasNext()) { - this.function = this.catalog.lookup(Function.class, names.next()); + + if (this.catalog.size() >= 1 && this.catalog.size() <= 2) { // we may have RoutingFunction function + String functionName = this.catalog.getNames(Function.class).stream() + .filter(n -> !n.equals(RoutingFunction.FUNCTION_NAME)) + .findFirst().orElseGet(() -> null); + if (functionName != null) { + this.function = this.catalog.lookup(Function.class, functionName); return; } - - names = this.catalog.getNames(Consumer.class).iterator(); - if (names.hasNext()) { - this.consumer = this.catalog.lookup(Consumer.class, names.next()); - return; - } - - names = this.catalog.getNames(Supplier.class).iterator(); - if (names.hasNext()) { - this.supplier = this.catalog.lookup(Supplier.class, names.next()); + functionName = this.catalog.getNames(Supplier.class).stream() + .findFirst().orElseGet(() -> null); + if (functionName != null) { + this.supplier = this.catalog.lookup(Supplier.class, functionName); return; } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java index f2f88a16c..af865f080 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java @@ -31,6 +31,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.beans.factory.BeanNameAware; +import org.springframework.cloud.function.context.config.RoutingFunction; import org.springframework.cloud.function.core.FluxConsumer; import org.springframework.cloud.function.core.FluxFunction; import org.springframework.cloud.function.core.FluxSupplier; @@ -152,6 +153,13 @@ public class FunctionRegistration implements BeanNameAware { if (this.type == null) { result = (FunctionRegistration) this; } + else if (this.target instanceof RoutingFunction) { + S target = (S) this.target; + result = new FunctionRegistration(target); + result.type(this.type.getType()); + result = result.target(target).names(this.names) + .type(result.type.wrap(Flux.class)).properties(this.properties); + } else { S target = (S) this.target; result = new FunctionRegistration(target); @@ -178,7 +186,6 @@ public class FunctionRegistration implements BeanNameAware { else if (target instanceof Function) { target = (S) new FluxedFunction((Function) target); } - result = result.target(target).names(this.names) .type(result.type.wrap(Flux.class)).properties(this.properties); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java index e26b7f3a0..b944a6814 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java @@ -35,6 +35,7 @@ import reactor.core.publisher.Mono; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.FunctionType; +import org.springframework.cloud.function.context.config.RoutingFunction; import org.springframework.cloud.function.core.FluxConsumer; import org.springframework.cloud.function.core.FluxSupplier; import org.springframework.cloud.function.core.FluxToMonoFunction; @@ -292,8 +293,11 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi if (lookup.containsKey(name)) { composedFunction = lookup.get(name); } - else if (name.equals("") && lookup.size() == 1) { - composedFunction = lookup.values().iterator().next(); + else if (name.equals("") && lookup.size() >= 1 && lookup.size() <= 2) { // we may have RoutingFunction function + String functionName = lookup.keySet().stream() + .filter(fName -> !fName.equals(RoutingFunction.FUNCTION_NAME)) + .findFirst().orElseGet(() -> null); + composedFunction = lookup.get(functionName); } else { String[] stages = StringUtils.delimitedListToStringArray(name, "|"); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java index 357d79713..eb7992f30 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java @@ -33,11 +33,12 @@ import org.springframework.messaging.support.MessageBuilder; /** * @author Dave Syer + * @since 2.1 */ public class MessageFunction implements Function>, Publisher>> { - private Function delegate; + private final Function delegate; public MessageFunction(Function delegate) { this.delegate = delegate; @@ -91,5 +92,4 @@ public class MessageFunction value -> MessageBuilder.withPayload(function.apply(value.getPayload())) .copyHeaders(value.getHeaders()).build()); } - } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 43fafdf8d..6398ec22b 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -16,6 +16,7 @@ package org.springframework.cloud.function.context.config; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -50,6 +51,7 @@ import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.FunctionType; import org.springframework.cloud.function.context.catalog.AbstractComposableFunctionRegistry; +import org.springframework.cloud.function.context.catalog.FunctionInspector; import org.springframework.cloud.function.context.catalog.FunctionUnregistrationEvent; import org.springframework.cloud.function.json.GsonMapper; import org.springframework.cloud.function.json.JacksonMapper; @@ -62,6 +64,11 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.FilterType; import org.springframework.core.annotation.AnnotatedElementUtils; import org.springframework.core.type.StandardMethodMetadata; +import org.springframework.messaging.converter.ByteArrayMessageConverter; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.MappingJackson2MessageConverter; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.converter.StringMessageConverter; /** * @author Dave Syer @@ -84,6 +91,17 @@ public class ContextFunctionCatalogAutoConfiguration { return new BeanFactoryFunctionCatalog(); } + @Bean(RoutingFunction.FUNCTION_NAME) + @ConditionalOnProperty(name = "spring.cloud.function.routing.enabled", havingValue = "true") + RoutingFunction gateway(FunctionCatalog functionCatalog, FunctionInspector functionInspector) { + Collection messageConverters = new ArrayList(); + messageConverters.add(new MappingJackson2MessageConverter()); + messageConverters.add(new StringMessageConverter()); + messageConverters.add(new ByteArrayMessageConverter()); + CompositeMessageConverter messageConverter = new CompositeMessageConverter(messageConverters); + return new RoutingFunction(functionCatalog, functionInspector, messageConverter); + } + protected static class BeanFactoryFunctionCatalog extends AbstractComposableFunctionRegistry implements SmartInitializingSingleton, BeanFactoryAware { @@ -96,6 +114,7 @@ public class ContextFunctionCatalogAutoConfiguration { * Will collect all suppliers, functions, consumers and function registration as * late as possible in the lifecycle. */ + @SuppressWarnings("rawtypes") @Override public void afterSingletonsInstantiated() { Map supplierBeans = this.beanFactory diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java new file mode 100644 index 000000000..3bfdc686e --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java @@ -0,0 +1,105 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.config; + +import java.util.function.Function; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.SignalType; + +import org.springframework.cloud.function.context.FunctionCatalog; +import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.core.WrappedFunction; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.MessageConverter; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; + +/** + * An implementation of Function which acts as a gateway/router by actually + * delegating incoming invocation to a function specified via `function.name` + * message header.
+ * {@link Message} is used as a canonical representation of a request which + * contains some metadata and it is the responsibility of the higher level + * framework to convert the incoming request into a Message. For example; + * spring-cloud-function-web will create Message from HttpRequest copying all + * HTTP headers as message headers. + * + * @author Oleg Zhurakousky + * @since 2.1 + * + */ +public class RoutingFunction implements Function>, Publisher> { + + /** + * The name of this function use by BeanFactory. + */ + public static final String FUNCTION_NAME = "router"; + + private final FunctionCatalog functionCatalog; + + private final FunctionInspector functionInspector; + + private final MessageConverter messageConverter; + + RoutingFunction(FunctionCatalog functionCatalog, FunctionInspector functionInspector, + MessageConverter messageConverter) { + this.functionCatalog = functionCatalog; + this.functionInspector = functionInspector; + this.messageConverter = messageConverter; + } + + @SuppressWarnings("unchecked") + @Override + public Publisher apply(Publisher> input) { + return Flux.from(input) + .switchOnFirst((signal, flux) -> { + Assert.isTrue(signal.hasValue() + && signal.getType() == SignalType.ON_NEXT, "Signal has no value or wrong type " + signal); + Function, Publisher> function = this.getRouteToFunction(signal.get()); + return flux.map(message -> { + Object inputValue = this.convertInput(message, function); + return inputValue; + }).transform(function); + }); + } + + @SuppressWarnings("rawtypes") + private WrappedFunction getRouteToFunction(Message message) { + String routeToFunctionName = (String) message.getHeaders().get("function.name"); + WrappedFunction function = functionCatalog.lookup(routeToFunctionName); + Assert.notNull(function, "Failed to locate function specified with 'function.name':" + + message.getHeaders().get("function.name")); + return function; + } + + private Object convertInput(Message message, Object function) { + Class inputType = functionInspector.getInputType(function); + Object inputValue = message.getPayload(); + if (!inputValue.getClass().isAssignableFrom(inputType)) { + inputValue = this.messageConverter.fromMessage(message, functionInspector.getInputType(function)); + } + if (this.functionInspector.isMessage(function)) { + inputValue = MessageBuilder.createMessage(inputValue, message.getHeaders()); + } + Assert.notNull(inputValue, "Failed to determine input value of type " + + inputType + " from Message '" + + message + "'. No suitable Message Converter found."); + return inputValue; + } +} diff --git a/spring-cloud-function-context/src/main/resources/META-INF/spring-configuration-metadata.json b/spring-cloud-function-context/src/main/resources/META-INF/spring-configuration-metadata.json index 7a5ce76a5..8815cd3bc 100644 --- a/spring-cloud-function-context/src/main/resources/META-INF/spring-configuration-metadata.json +++ b/spring-cloud-function-context/src/main/resources/META-INF/spring-configuration-metadata.json @@ -13,6 +13,12 @@ "type": "java.lang.String", "description": "Name (e.g., 'foo') or composition instruction (e.g., 'foo|bar') used to resolve default function especially for cases where there is more then once function available in catalog.", "defaultValue": "" + }, + { + "name": "spring.cloud.function.routing.enabled", + "type": "java.lang.Boolean", + "description": "Enables RoutingFunction which delegates incoming request to a function named via function.name header", + "defaultValue": false } ] } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/WrappedFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/WrappedFunction.java index a6b21042c..83aa190ce 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/WrappedFunction.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/WrappedFunction.java @@ -35,7 +35,7 @@ import org.reactivestreams.Publisher; * @author Oleg Zhurakousky * @since 2.0.1 */ -abstract class WrappedFunction, OP extends Publisher, T> +public abstract class WrappedFunction, OP extends Publisher, T> implements Function, FluxWrapper { private final T target; diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java index 178e03e17..d433a6f02 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java @@ -38,7 +38,9 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.beans.factory.ObjectProvider; +import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.context.config.RoutingFunction; import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.function.core.FluxConsumer; import org.springframework.cloud.function.core.FluxedConsumer; @@ -77,6 +79,8 @@ public class RequestProcessor { private final FunctionInspector inspector; + private final FunctionCatalog functionCatalog; + private final StringConverter converter; private final JsonMapper mapper; @@ -84,10 +88,12 @@ public class RequestProcessor { private final List> messageReaders; public RequestProcessor(FunctionInspector inspector, + FunctionCatalog functionCatalog, ObjectProvider mapper, StringConverter converter, ObjectProvider codecs) { this.mapper = mapper.getIfAvailable(); this.inspector = inspector; + this.functionCatalog = functionCatalog; this.converter = converter; ServerCodecConfigurer source = codecs.getIfAvailable(); this.messageReaders = source == null ? null : source.getReaders(); @@ -97,23 +103,23 @@ public class RequestProcessor { Function, ? extends Publisher> function, Consumer> consumer, Supplier> supplier) { - return new FunctionWrapper(function, consumer, supplier); + return new FunctionWrapper(function, supplier); } public static FunctionWrapper wrapper( Function, ? extends Publisher> function) { - return new FunctionWrapper(function, null, null); + return new FunctionWrapper(function, null); } public Mono> get(FunctionWrapper wrapper) { if (wrapper.function() != null) { - return response(wrapper, wrapper.function(), - value(wrapper.function(), wrapper.argument()), true, true); + return response(wrapper, wrapper.function(), value(wrapper), true, true); } else { return response(wrapper, wrapper.supplier(), wrapper.supplier().get(), null, true); } + } public Mono> post(FunctionWrapper wrapper, @@ -130,7 +136,8 @@ public class RequestProcessor { Object input = body == null && inputType.isAssignableFrom(String.class) ? "" : body; - if (input != null) { + if ((isInputMultiple(this.getTargetIfRouting(wrapper, function)) || !(function instanceof RoutingFunction)) + && input != null) { // TODO rework. . . pretty ugly if (this.shouldUseJsonConversion((String) input, wrapper.headers.getContentType())) { Type jsonType = body.startsWith("[") && Collection.class.isAssignableFrom(inputType) @@ -149,28 +156,53 @@ public class RequestProcessor { return response(wrapper, input, stream); } + public Mono> stream(FunctionWrapper request) { + Publisher result = request.function() != null + ? value(request) + : request.supplier().get(); + return stream(request, result); + } + private boolean shouldUseJsonConversion(String body, MediaType contentType) { return (body.startsWith("[") || body.startsWith("{")) && (contentType == null || (contentType != null && !"text".equalsIgnoreCase(contentType.getType()))); } - public Mono> stream(FunctionWrapper request) { - Publisher result = request.function() != null - ? value(request.function(), request.argument()) - : request.supplier().get(); - return stream(request, result); - } - private List> getMessageReaders() { return this.messageReaders; } + private Mono> response(FunctionWrapper request, Object handler, + Publisher result, Boolean single, boolean getter) { + + BodyBuilder builder = ResponseEntity.ok(); + if (this.inspector.isMessage(handler)) { + result = Flux.from(result) + .map(message -> MessageUtils.unpack(handler, message)) + .doOnNext(value -> addHeaders(builder, value)) + .map(message -> message.getPayload()); + } + else { + builder.headers(HeaderUtils.sanitize(request.headers())); + } + + if (isOutputSingle(handler) + && (single != null && single || getter || isInputMultiple(handler))) { + result = Mono.from(result); + } + + if (result instanceof Flux) { + result = Flux.from(result).collectList(); + } + return Mono.from(result).flatMap(body -> Mono.just(builder.body(body))); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) private Mono> response(FunctionWrapper wrapper, Object body, boolean stream) { - Function, Publisher> function = wrapper.function(); - Consumer> consumer = wrapper.consumer(); + Function function = wrapper.function(); Flux flux; if (body != null) { @@ -197,7 +229,7 @@ public class RequestProcessor { } if (this.inspector.isMessage(function)) { - flux = messages(wrapper, function == null ? consumer : function, flux); + flux = messages(wrapper, function, flux); } Mono> responseEntityMono = null; @@ -208,19 +240,33 @@ public class RequestProcessor { .just(ResponseEntity.status(HttpStatus.ACCEPTED).build()); } else { - Flux result = Flux.from(function.apply(flux)); + Flux result = Flux.from((Publisher) function.apply(flux)); logger.debug("Handled POST with function"); if (stream) { responseEntityMono = stream(wrapper, result); } else { - responseEntityMono = response(wrapper, function, result, + responseEntityMono = response(wrapper, getTargetIfRouting(wrapper, function), result, body == null ? null : !(body instanceof Collection), false); } } return responseEntityMono; } + /* + * Called when building response and returns the actual + * target function in case the current function is RoutingFunction. + * This is necessary to determine the type of the output (e.g., Flux = + * multiple or Mono = single etc). See isOutputSingle(..). + */ + private Object getTargetIfRouting(FunctionWrapper wrapper, Object function) { + if (function instanceof RoutingFunction) { + String name = wrapper.headers.get("function.name").iterator().next(); + function = this.functionCatalog.lookup(name); + } + return function; + } + private Flux messages(FunctionWrapper request, Object function, Flux flux) { Map headers = HeaderUtils.fromHttp(request.headers()); return flux.map(payload -> MessageUtils.create(function, payload, headers)); @@ -246,30 +292,7 @@ public class RequestProcessor { return Flux.from(output).then(Mono.fromSupplier(() -> builder.body(output))); } - private Mono> response(FunctionWrapper request, Object handler, - Publisher result, Boolean single, boolean getter) { - BodyBuilder builder = ResponseEntity.ok(); - if (this.inspector.isMessage(handler)) { - result = Flux.from(result) - .map(message -> MessageUtils.unpack(handler, message)) - .doOnNext(value -> addHeaders(builder, value)) - .map(message -> message.getPayload()); - } - else { - builder.headers(HeaderUtils.sanitize(request.headers())); - } - - if (isOutputSingle(handler) - && (single != null && single || getter || isInputMultiple(handler))) { - result = Mono.from(result); - } - - if (result instanceof Flux) { - result = Flux.from(result).collectList(); - } - return Mono.from(result).flatMap(body -> Mono.just(builder.body(body))); - } private boolean isInputMultiple(Object handler) { Class type = this.inspector.getInputType(handler); @@ -373,11 +396,13 @@ public class RequestProcessor { return ReactiveAdapterRegistry.getSharedInstance(); } - private Publisher value(Function, Publisher> function, - Publisher value) { - Flux input = Flux.from(value) - .map(body -> this.converter.convert(function, body)); - return Mono.from(function.apply(input)); + private Publisher value(FunctionWrapper wrapper) { + Flux input = Flux.from(wrapper.argument) + .map(body -> this.converter.convert(wrapper.function, body)); + if (this.inspector.isMessage(wrapper.function)) { + input = messages(wrapper, wrapper.function, input); + } + return Mono.from(wrapper.function.apply(input)); } private Type getItemType(Object function) { @@ -413,8 +438,6 @@ public class RequestProcessor { private final Function, Publisher> function; - private final Consumer> consumer; - private final Supplier> supplier; private final MultiValueMap params = new LinkedMultiValueMap<>(); @@ -426,26 +449,21 @@ public class RequestProcessor { @SuppressWarnings("unchecked") public FunctionWrapper( Function, ? extends Publisher> function, - Consumer> consumer, Supplier> supplier) { this.function = (Function, Publisher>) function; - this.consumer = (Consumer>) consumer; this.supplier = (Supplier>) supplier; } public Object handler() { - return this.function != null ? this.function - : this.consumer != null ? this.consumer : this.supplier; + return this.function != null + ? this.function + : this.supplier; } public Function, Publisher> function() { return this.function; } - public Consumer> consumer() { - return this.consumer; - } - public Supplier> supplier() { return this.supplier; } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java index 36127b08b..3912b2c98 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java @@ -101,6 +101,7 @@ class FunctionEndpointInitializer implements ApplicationContextInitializer new BasicStringConverter(context.getBean(FunctionInspector.class), context.getBeanFactory())); context.registerBean(RequestProcessor.class, () -> new RequestProcessor(context.getBean(FunctionInspector.class), + context.getBean(FunctionCatalog.class), context.getBeanProvider(JsonMapper.class), context.getBean(StringConverter.class), context.getBeanProvider(ServerCodecConfigurer.class))); context.registerBean(FunctionEndpointFactory.class, diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionHandlerMapping.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionHandlerMapping.java index ceefc6b89..e7c44e797 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionHandlerMapping.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionHandlerMapping.java @@ -16,21 +16,19 @@ package org.springframework.cloud.function.web.mvc; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; +import java.util.HashMap; import javax.servlet.http.HttpServletRequest; -import org.reactivestreams.Publisher; - import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.web.constants.WebRequestConstants; +import org.springframework.cloud.function.web.util.FunctionWebUtils; import org.springframework.context.annotation.Configuration; +import org.springframework.http.HttpMethod; import org.springframework.util.StringUtils; import org.springframework.web.method.HandlerMethod; import org.springframework.web.servlet.HandlerMapping; @@ -38,7 +36,7 @@ import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandl /** * @author Dave Syer - * + * @author Oleg Zhurakousky */ @Configuration @ConditionalOnClass(RequestMappingHandlerMapping.class) @@ -92,7 +90,9 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping if (path.startsWith(this.prefix)) { path = path.substring(this.prefix.length()); } - Object function = findFunctionForGet(request, path); + + Object function = FunctionWebUtils.findFunction(HttpMethod.resolve(request.getMethod()), + this.functions, new HttpRequestAttributeDelegate(request), path); if (function != null) { if (this.logger.isDebugEnabled()) { this.logger.debug("Found function for GET: " + path); @@ -100,65 +100,20 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping request.setAttribute(WebRequestConstants.HANDLER, function); return handler; } - function = findFunctionForPost(request, path); - if (function != null) { - if (this.logger.isDebugEnabled()) { - this.logger.debug("Found function for POST: " + path); - } - request.setAttribute(WebRequestConstants.HANDLER, function); - return handler; - } return null; } - private Object findFunctionForPost(HttpServletRequest request, String path) { - if (!request.getMethod().equals("POST")) { - return null; + @SuppressWarnings("serial") + private static class HttpRequestAttributeDelegate extends HashMap { + private final HttpServletRequest request; + HttpRequestAttributeDelegate(HttpServletRequest request) { + this.request = request; } - path = path.startsWith("/") ? path.substring(1) : path; - Consumer> consumer = this.functions.lookup(Consumer.class, path); - if (consumer != null) { - request.setAttribute(WebRequestConstants.CONSUMER, consumer); - return consumer; - } - Function function = this.functions.lookup(Function.class, path); - if (function != null) { - request.setAttribute(WebRequestConstants.FUNCTION, function); - return function; - } - return null; - } - private Object findFunctionForGet(HttpServletRequest request, String path) { - if (!request.getMethod().equals("GET")) { - return null; + public Object put(String key, Object value) { + this.request.setAttribute(key, value); + return value; } - path = path.startsWith("/") ? path.substring(1) : path; - Supplier> supplier = this.functions.lookup(Supplier.class, path); - if (supplier != null) { - request.setAttribute(WebRequestConstants.SUPPLIER, supplier); - return supplier; - } - StringBuilder builder = new StringBuilder(); - String name = path; - String value = null; - for (String element : path.split("/")) { - if (builder.length() > 0) { - builder.append("/"); - } - builder.append(element); - name = builder.toString(); - value = path.length() > name.length() ? path.substring(name.length() + 1) - : null; - Function function = this.functions.lookup(Function.class, - name); - if (function != null) { - request.setAttribute(WebRequestConstants.FUNCTION, function); - request.setAttribute(WebRequestConstants.ARGUMENT, value); - return function; - } - } - return null; } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java index 504977e73..92401b306 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java @@ -17,7 +17,6 @@ package org.springframework.cloud.function.web.util; import java.util.Map; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -36,66 +35,45 @@ public final class FunctionWebUtils { public static Object findFunction(HttpMethod method, FunctionCatalog functionCatalog, Map attributes, String path) { - if (method.equals(HttpMethod.GET)) { - return findFunctionForGet(functionCatalog, attributes, path); - } - else if (method.equals(HttpMethod.POST)) { - return findFunctionForPost(functionCatalog, attributes, path); + if (method.equals(HttpMethod.GET) || method.equals(HttpMethod.POST)) { + return doFindFunction(method, functionCatalog, attributes, path); } else { throw new IllegalStateException("HTTP method '" + method + "' is not supported;"); } } - private static Object findFunctionForGet(FunctionCatalog functionCatalog, + private static Object doFindFunction(HttpMethod method, FunctionCatalog functionCatalog, Map attributes, String path) { path = path.startsWith("/") ? path.substring(1) : path; - - Object functionForGet = null; - Supplier> supplier = functionCatalog.lookup(Supplier.class, path); - if (supplier != null) { - attributes.put(WebRequestConstants.SUPPLIER, supplier); - functionForGet = supplier; - } - else { - StringBuilder builder = new StringBuilder(); - String name = path; - String[] splitPath = path.split("/"); - Function function = null; - for (int i = 0; i < splitPath.length && functionForGet == null; i++) { - String element = splitPath[i]; - if (builder.length() > 0) { - builder.append("/"); - } - builder.append(element); - name = builder.toString(); - - function = functionCatalog.lookup(Function.class, name); - if (function != null) { - attributes.put(WebRequestConstants.FUNCTION, function); - String value = path.length() > name.length() - ? path.substring(name.length() + 1) : null; - attributes.put(WebRequestConstants.ARGUMENT, value); - functionForGet = function; - } + if (method.equals(HttpMethod.GET)) { + Supplier> supplier = functionCatalog.lookup(Supplier.class, path); + if (supplier != null) { + attributes.put(WebRequestConstants.SUPPLIER, supplier); + return supplier; } } - return functionForGet; - } - - private static Object findFunctionForPost(FunctionCatalog functionCatalog, - Map attributes, String path) { - path = path.startsWith("/") ? path.substring(1) : path; - Consumer> consumer = functionCatalog.lookup(Consumer.class, path); - if (consumer != null) { - attributes.put(WebRequestConstants.CONSUMER, consumer); - return consumer; - } - Function function = functionCatalog.lookup(Function.class, path); - if (function != null) { - attributes.put(WebRequestConstants.FUNCTION, function); - return function; + StringBuilder builder = new StringBuilder(); + String name = path; + String value = null; + for (String element : path.split("/")) { + if (builder.length() > 0) { + builder.append("/"); + } + builder.append(element); + name = builder.toString(); + value = path.length() > name.length() ? path.substring(name.length() + 1) + : null; + Function function = functionCatalog.lookup(Function.class, + name); + if (function != null) { + attributes.put(WebRequestConstants.FUNCTION, function); + if (value != null) { + attributes.put(WebRequestConstants.ARGUMENT, value); + } + return function; + } } return null; } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/HeaderUtils.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/HeaderUtils.java index 6d8a720da..50e445dbe 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/HeaderUtils.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/HeaderUtils.java @@ -83,6 +83,9 @@ public final class HeaderUtils { name = name.toLowerCase(); Object value = values == null ? null : (values.size() == 1 ? values.iterator().next() : values); + if (name.toLowerCase().equals(HttpHeaders.CONTENT_TYPE.toLowerCase())) { + name = MessageHeaders.CONTENT_TYPE; + } map.put(name, value); } return new MessageHeaders(map); diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/RoutingFunctionTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/RoutingFunctionTests.java new file mode 100644 index 000000000..c91391667 --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/RoutingFunctionTests.java @@ -0,0 +1,246 @@ +/* + * Copyright 2012-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.mvc; + +import java.net.URI; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.junit.Test; +import org.junit.runner.RunWith; +import reactor.core.publisher.Flux; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.cloud.function.context.config.RoutingFunction; +import org.springframework.cloud.function.web.RestApplication; +import org.springframework.cloud.function.web.mvc.RoutingFunctionTests.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Oleg Zhurakousky + * + */ +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = { + "spring.main.web-application-type=servlet", + "spring.cloud.function.web.path=/functions", + "spring.cloud.function.routing.enabled=true"}) +@ContextConfiguration(classes = { RestApplication.class, TestConfiguration.class }) +public class RoutingFunctionTests { + + @Autowired + private TestRestTemplate rest; + + @Test + public void testFunctionMessage() throws Exception { + HttpEntity postForEntity = this.rest + .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) + .contentType(MediaType.APPLICATION_JSON) + .header("function.name", "employee") + .body("{\"name\":\"Bob\",\"age\":25}"), String.class); + assertThat(postForEntity.getBody()).isEqualTo("{\"name\":\"Bob\",\"age\":25}"); + assertThat(postForEntity.getHeaders().containsKey("x-content-type")).isTrue(); + assertThat(postForEntity.getHeaders().get("x-content-type").get(0)) + .isEqualTo("application/xml"); + assertThat(postForEntity.getHeaders().get("foo").get(0)).isEqualTo("bar"); + } + + @Test + public void testFunctionPrimitive() throws Exception { + ResponseEntity postForEntity = this.rest + .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) + .contentType(MediaType.TEXT_PLAIN) + .header("function.name", "echo") + .body("{\"name\":\"Bob\",\"age\":25}"), String.class); + assertThat(postForEntity.getBody()).isEqualTo("{\"name\":\"Bob\",\"age\":25}"); + assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK); + } + + @Test + public void testFluxFunctionPrimitive() throws Exception { + ResponseEntity postForEntity = this.rest + .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) + .contentType(MediaType.TEXT_PLAIN) + .header("function.name", "fluxuppercase") + .body("hello"), String.class); + assertThat(postForEntity.getBody()).isEqualTo("[\"HELLO\"]"); + assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK); + + postForEntity = this.rest + .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) + .contentType(MediaType.TEXT_PLAIN) + .header("function.name", "fluxuppercase") + .body("hello1"), String.class); + assertThat(postForEntity.getBody()).isEqualTo("[\"HELLO1\"]"); + assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK); + + postForEntity = this.rest + .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) + .contentType(MediaType.TEXT_PLAIN) + .header("function.name", "fluxuppercase") + .body("hello2"), String.class); + assertThat(postForEntity.getBody()).isEqualTo("[\"HELLO2\"]"); + assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK); + } + + @Test + public void testFluxFunctionPrimitiveArray() throws Exception { + ResponseEntity postForEntity = this.rest + .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) + .contentType(MediaType.APPLICATION_JSON) + .header("function.name", "fluxuppercase") + .body(new String[] {"a", "b", "c"}), String.class); + assertThat(postForEntity.getBody()).isEqualTo("[\"A\",\"B\",\"C\"]"); + assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK); + } + + @Test + public void testFluxConsumer() throws Exception { + ResponseEntity postForEntity = this.rest + .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) + .contentType(MediaType.APPLICATION_JSON) + .header("function.name", "fluxconsumer") + .body(new String[] {"a", "b", "c"}), String.class); + //assertThat(postForEntity.getBody()).isEqualTo("[\"A\",\"B\",\"C\"]"); + assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK); + } + + + @Test + public void testFunctionPojo() throws Exception { + ResponseEntity postForEntity = this.rest + .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) + .contentType(MediaType.APPLICATION_JSON) + .header("function.name", "echoPojo") + .body("{\"value\":\"foo\"}"), String.class); + assertThat(postForEntity.getBody()).isEqualTo("{\"foo\":{\"value\":\"foo\"},\"value\":\"bar\"}"); + assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK); + } + + @Test + public void testConsumerMessage() throws Exception { + ResponseEntity postForEntity = this.rest + .exchange(RequestEntity.post(new URI("/functions/" + RoutingFunction.FUNCTION_NAME)) + .contentType(MediaType.TEXT_PLAIN) + .header("function.name", "messageConsumer") + .body("{\"name\":\"Bob\",\"age\":25}"), String.class); + assertThat(postForEntity.getStatusCode()).isEqualTo(HttpStatus.OK); + } + + @EnableAutoConfiguration + @org.springframework.boot.test.context.TestConfiguration + protected static class TestConfiguration { + + @Bean({ "employee" }) + public Function>, Message>> function() { + return request -> { + Message> message = MessageBuilder + .withPayload(request.getPayload()) + .setHeader("X-Content-Type", "application/xml") + .setHeader("foo", "bar").build(); + return message; + }; + } + + @Bean + public Consumer> messageConsumer() { + return value -> System.out.println("Value: " + value); + } + + @Bean + public Function echo() { + return v -> v; + } + + @Bean + public Function, Flux> fluxuppercase() { + return v -> v.map(s -> { + System.out.println(s); + return s.toUpperCase(); + }); + } + + @Bean + public Consumer> fluxconsumer() { +// return v -> v.map(value -> { +// return value.toUpperCase(); +// }); + return flux -> flux.doOnNext(s -> { + System.out.println(s + " jkljlkjlkj l"); + }).subscribe(); + } + + @Bean + public Function echoPojo() { + return v -> { + Bar bar = new Bar(); + bar.setFoo(v); + bar.setValue("bar"); + return bar; + }; + } + + } + + public static class Foo { + private String value; + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + } + + public static class Bar { + private Foo foo; + private String value; + public Foo getFoo() { + return foo; + } + public void setFoo(Foo foo) { + this.foo = foo; + } + public String getValue() { + return value; + } + public void setValue(String value) { + this.value = value; + } + + + } + +}