diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java
index d4bb3fe36..4adfb20fb 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java
@@ -347,6 +347,21 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
return this.inputType;
}
+ /**
+ * Return the actual {@link Type} of the item of the provided type.
+ * This method is context specific and is not a general purpose utility method. The context is that the provided
+ * {@link Type} may represent the input/output of a function where such type could be wrapped in
+ * {@link Message}, {@link Flux} or {@link Mono}, so this method returns generic value of such type or itself if not wrapped.
+ * @param type typically input or output Type of the function (see {@link #getInputType()} or {@link #getOutputType()}.
+ * @return the type of the item if wrapped otherwise the provided type.
+ */
+ public Type getItemType(Type type) {
+ if (FunctionTypeUtils.isPublisher(type) || FunctionTypeUtils.isMessage(type)) {
+ type = FunctionTypeUtils.getGenericType(type);
+ }
+ return type;
+ }
+
/**
* Use individual {@link #getInputType()}, {@link #getOutputType()} and their variants as well as
* other supporting operations instead.
diff --git a/spring-cloud-function-web/pom.xml b/spring-cloud-function-web/pom.xml
index 9224859dd..c4a58454e 100644
--- a/spring-cloud-function-web/pom.xml
+++ b/spring-cloud-function-web/pom.xml
@@ -61,11 +61,6 @@
spring-boot-configuration-processor
true
-
-
-
-
-
diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java
index a6c24aa71..74dcd5145 100644
--- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java
+++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java
@@ -16,7 +16,6 @@
package org.springframework.cloud.function.web;
-import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Collections;
@@ -26,7 +25,6 @@ import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Stream;
-import net.jodah.typetools.TypeResolver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
@@ -37,7 +35,6 @@ import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
-import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.cloud.function.web.util.FunctionWebUtils;
@@ -79,7 +76,7 @@ public class RequestProcessor {
@SuppressWarnings("rawtypes")
public Mono> get(FunctionWrapper wrapper) {
if (wrapper.function().isFunction()) {
- return response(wrapper, wrapper.function(), value(wrapper), true, true);
+ return response(wrapper, wrapper.function(), invokeFunction(wrapper), true, true);
}
else {
FunctionInvocationWrapper function = (wrapper.function);
@@ -93,7 +90,7 @@ public class RequestProcessor {
public Mono> post(FunctionWrapper wrapper, String body,
boolean stream) {
FunctionInvocationWrapper function = (FunctionInvocationWrapper) wrapper.handler();
- Type itemType = getItemType(function);
+ Type itemType = function != null ? function.getItemType(function.getInputType()) : Object.class;
Object input = body == null ? "" : body;
@@ -113,7 +110,7 @@ public class RequestProcessor {
public Mono> stream(FunctionWrapper functionWrapper) {
Publisher> result = functionWrapper.function.isFunction()
- ? value(functionWrapper)
+ ? invokeFunction(functionWrapper)
: (Publisher>) functionWrapper.function.get();
return stream(functionWrapper, result);
}
@@ -176,7 +173,7 @@ public class RequestProcessor {
responseEntityMono = stream(wrapper, result);
}
else {
- responseEntityMono = response(wrapper, getTargetIfRouting(wrapper, function), result,
+ responseEntityMono = response(wrapper, function, result,
body == null ? null : !(body instanceof Collection), false);
}
}
@@ -190,9 +187,7 @@ public class RequestProcessor {
if (((FunctionInvocationWrapper) handler).isInputTypeMessage()) {
result = Flux.from(result)
.map(message -> MessageUtils.unpack(handler, message))
- .doOnNext(value -> {
- addHeaders(builder, value);
- })
+ .doOnNext(value -> addHeaders(builder, value))
.map(message -> message.getPayload());
}
else {
@@ -213,20 +208,6 @@ public class RequestProcessor {
return Mono.from(result).flatMap(body -> Mono.just(builder.body(body)));
}
- /*
- * Called when building response and returns the actual
- * target function in case the current function is RoutingFunction.
- * This is necessary to determine the type of the output (e.g., Flux =
- * multiple or Mono = single etc). See isOutputSingle(..).
- */
- private Object getTargetIfRouting(FunctionWrapper wrapper, Object function) {
- if (function instanceof RoutingFunction) {
- String name = wrapper.headers.get("function.name").iterator().next();
- function = this.functionCatalog.lookup(name);
- }
- return function;
- }
-
// this seem to be very relevant to AWS container tests
private Flux> messages(FunctionWrapper request, Object function, Flux> flux) {
Map headers = new HashMap<>(HeaderUtils.fromHttp(request.headers()));
@@ -280,59 +261,12 @@ public class RequestProcessor {
}
}
- private Publisher> value(FunctionWrapper wrapper) {
+ private Publisher> invokeFunction(FunctionWrapper wrapper) {
Flux> input = Flux.from(wrapper.argument);
- FunctionInvocationWrapper function = (wrapper.function);
- Object result = FunctionWebUtils.invokeFunction(function, input, function.isInputTypeMessage());
+ Object result = FunctionWebUtils.invokeFunction(wrapper.function, input, wrapper.function.isInputTypeMessage());
return Mono.from((Publisher>) result);
}
- private Type getItemType(Object function) {
- if (function == null || ((FunctionInvocationWrapper) function).getInputType() == Object.class) {
- return Object.class;
- }
-
- Type itemType;
- if (((FunctionInvocationWrapper) function).isInputTypePublisher() && ((FunctionInvocationWrapper) function).isInputTypeMessage()) {
- itemType = FunctionTypeUtils.getImmediateGenericType(((FunctionInvocationWrapper) function).getInputType(), 0);
- itemType = FunctionTypeUtils.getImmediateGenericType(itemType, 0);
- }
- else {
- itemType = FunctionTypeUtils.getImmediateGenericType(((FunctionInvocationWrapper) function).getInputType(), 0);
- }
-
- if (itemType != null) {
- return itemType;
- }
-
- Class> inputType = ((FunctionInvocationWrapper) function).isInputTypeMessage() || ((FunctionInvocationWrapper) function).isInputTypePublisher()
- ? TypeResolver.resolveRawClass(itemType, null)
- : ((FunctionInvocationWrapper) function).getRawInputType();
- if (!Collection.class.isAssignableFrom(inputType)) {
- return inputType;
- }
-
- Type type = ((FunctionInvocationWrapper) function).getInputType();
- if (type instanceof ParameterizedType) {
- type = ((ParameterizedType) type).getActualTypeArguments()[0];
- }
- else {
- for (Type iface : ((Class>) type).getGenericInterfaces()) {
- if (iface.getTypeName().startsWith("java.util.function")) {
- type = ((ParameterizedType) iface).getActualTypeArguments()[0];
- break;
- }
- }
- }
- if (type instanceof ParameterizedType) {
- type = ((ParameterizedType) type).getActualTypeArguments()[0];
- }
- else {
- type = inputType;
- }
- return type;
- }
-
/**
* Wrapper for functions.
*/