Simplify logic around getting item type of a function type
This commit is contained in:
@@ -16,7 +16,6 @@
|
||||
|
||||
package org.springframework.cloud.function.web;
|
||||
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
@@ -26,7 +25,6 @@ import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import net.jodah.typetools.TypeResolver;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.reactivestreams.Publisher;
|
||||
@@ -37,7 +35,6 @@ import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.cloud.function.context.config.RoutingFunction;
|
||||
import org.springframework.cloud.function.context.message.MessageUtils;
|
||||
import org.springframework.cloud.function.json.JsonMapper;
|
||||
import org.springframework.cloud.function.web.util.FunctionWebUtils;
|
||||
@@ -79,7 +76,7 @@ public class RequestProcessor {
|
||||
@SuppressWarnings("rawtypes")
|
||||
public Mono<ResponseEntity<?>> get(FunctionWrapper wrapper) {
|
||||
if (wrapper.function().isFunction()) {
|
||||
return response(wrapper, wrapper.function(), value(wrapper), true, true);
|
||||
return response(wrapper, wrapper.function(), invokeFunction(wrapper), true, true);
|
||||
}
|
||||
else {
|
||||
FunctionInvocationWrapper function = (wrapper.function);
|
||||
@@ -93,7 +90,7 @@ public class RequestProcessor {
|
||||
public Mono<ResponseEntity<?>> post(FunctionWrapper wrapper, String body,
|
||||
boolean stream) {
|
||||
FunctionInvocationWrapper function = (FunctionInvocationWrapper) wrapper.handler();
|
||||
Type itemType = getItemType(function);
|
||||
Type itemType = function != null ? function.getItemType(function.getInputType()) : Object.class;
|
||||
|
||||
Object input = body == null ? "" : body;
|
||||
|
||||
@@ -113,7 +110,7 @@ public class RequestProcessor {
|
||||
|
||||
public Mono<ResponseEntity<?>> stream(FunctionWrapper functionWrapper) {
|
||||
Publisher<?> result = functionWrapper.function.isFunction()
|
||||
? value(functionWrapper)
|
||||
? invokeFunction(functionWrapper)
|
||||
: (Publisher<?>) functionWrapper.function.get();
|
||||
return stream(functionWrapper, result);
|
||||
}
|
||||
@@ -176,7 +173,7 @@ public class RequestProcessor {
|
||||
responseEntityMono = stream(wrapper, result);
|
||||
}
|
||||
else {
|
||||
responseEntityMono = response(wrapper, getTargetIfRouting(wrapper, function), result,
|
||||
responseEntityMono = response(wrapper, function, result,
|
||||
body == null ? null : !(body instanceof Collection), false);
|
||||
}
|
||||
}
|
||||
@@ -190,9 +187,7 @@ public class RequestProcessor {
|
||||
if (((FunctionInvocationWrapper) handler).isInputTypeMessage()) {
|
||||
result = Flux.from(result)
|
||||
.map(message -> MessageUtils.unpack(handler, message))
|
||||
.doOnNext(value -> {
|
||||
addHeaders(builder, value);
|
||||
})
|
||||
.doOnNext(value -> addHeaders(builder, value))
|
||||
.map(message -> message.getPayload());
|
||||
}
|
||||
else {
|
||||
@@ -213,20 +208,6 @@ public class RequestProcessor {
|
||||
return Mono.from(result).flatMap(body -> Mono.just(builder.body(body)));
|
||||
}
|
||||
|
||||
/*
|
||||
* Called when building response and returns the actual
|
||||
* target function in case the current function is RoutingFunction.
|
||||
* This is necessary to determine the type of the output (e.g., Flux =
|
||||
* multiple or Mono = single etc). See isOutputSingle(..).
|
||||
*/
|
||||
private Object getTargetIfRouting(FunctionWrapper wrapper, Object function) {
|
||||
if (function instanceof RoutingFunction) {
|
||||
String name = wrapper.headers.get("function.name").iterator().next();
|
||||
function = this.functionCatalog.lookup(name);
|
||||
}
|
||||
return function;
|
||||
}
|
||||
|
||||
// this seem to be very relevant to AWS container tests
|
||||
private Flux<?> messages(FunctionWrapper request, Object function, Flux<?> flux) {
|
||||
Map<String, Object> headers = new HashMap<>(HeaderUtils.fromHttp(request.headers()));
|
||||
@@ -280,59 +261,12 @@ public class RequestProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
private Publisher<?> value(FunctionWrapper wrapper) {
|
||||
private Publisher<?> invokeFunction(FunctionWrapper wrapper) {
|
||||
Flux<?> input = Flux.from(wrapper.argument);
|
||||
FunctionInvocationWrapper function = (wrapper.function);
|
||||
Object result = FunctionWebUtils.invokeFunction(function, input, function.isInputTypeMessage());
|
||||
Object result = FunctionWebUtils.invokeFunction(wrapper.function, input, wrapper.function.isInputTypeMessage());
|
||||
return Mono.from((Publisher<?>) result);
|
||||
}
|
||||
|
||||
private Type getItemType(Object function) {
|
||||
if (function == null || ((FunctionInvocationWrapper) function).getInputType() == Object.class) {
|
||||
return Object.class;
|
||||
}
|
||||
|
||||
Type itemType;
|
||||
if (((FunctionInvocationWrapper) function).isInputTypePublisher() && ((FunctionInvocationWrapper) function).isInputTypeMessage()) {
|
||||
itemType = FunctionTypeUtils.getImmediateGenericType(((FunctionInvocationWrapper) function).getInputType(), 0);
|
||||
itemType = FunctionTypeUtils.getImmediateGenericType(itemType, 0);
|
||||
}
|
||||
else {
|
||||
itemType = FunctionTypeUtils.getImmediateGenericType(((FunctionInvocationWrapper) function).getInputType(), 0);
|
||||
}
|
||||
|
||||
if (itemType != null) {
|
||||
return itemType;
|
||||
}
|
||||
|
||||
Class<?> inputType = ((FunctionInvocationWrapper) function).isInputTypeMessage() || ((FunctionInvocationWrapper) function).isInputTypePublisher()
|
||||
? TypeResolver.resolveRawClass(itemType, null)
|
||||
: ((FunctionInvocationWrapper) function).getRawInputType();
|
||||
if (!Collection.class.isAssignableFrom(inputType)) {
|
||||
return inputType;
|
||||
}
|
||||
|
||||
Type type = ((FunctionInvocationWrapper) function).getInputType();
|
||||
if (type instanceof ParameterizedType) {
|
||||
type = ((ParameterizedType) type).getActualTypeArguments()[0];
|
||||
}
|
||||
else {
|
||||
for (Type iface : ((Class<?>) type).getGenericInterfaces()) {
|
||||
if (iface.getTypeName().startsWith("java.util.function")) {
|
||||
type = ((ParameterizedType) iface).getActualTypeArguments()[0];
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (type instanceof ParameterizedType) {
|
||||
type = ((ParameterizedType) type).getActualTypeArguments()[0];
|
||||
}
|
||||
else {
|
||||
type = inputType;
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper for functions.
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user