Fixed output conversion for Message<byte[]> functions

- cleaned up BeanFactoryAwareFunctionRegistry
- Added javadoc to FunctionCatalog.lookup(String functionDefinition, String... acceptedOutputMimeTypes)
This commit is contained in:
Oleg Zhurakousky
2019-07-27 12:41:07 +02:00
parent bfcea95268
commit f5cf937985
3 changed files with 60 additions and 67 deletions

View File

@@ -17,6 +17,11 @@
package org.springframework.cloud.function.context;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.activation.MimeType;
/**
@@ -27,15 +32,31 @@ public interface FunctionCatalog {
/**
* Will look up the instance of the functional interface by name only and
* acceptedOutputTypes.
* Will look up the instance of the functional interface by name only.
* This lookup method assumes a very specific semantics which are: <i>function sub-type(s)
* expected to be {@code Message<byte[]>}</i>. <br>
* For example,
* <br><br>
* {@code Function<Message<byte[]>, Message<byte[]>>} or
* <br>
* {@code Function<Flux<Message<byte[]>>, Flux<Message<byte[]>>>} or
* <br>
* {@code Consumer<Flux<Message<Flux<Message<byte[]>>>} etc. . .
* <br><br>
* The {@code acceptedOutputMimeTypes} are the string representation of {@link MimeType} where each
* mime-type in the provided array would correspond to the output with the same index
* (for cases of functions with multiple outputs) and is used to convert such output back
* to {@code Message<byte[]>}.
* If you need to provide several accepted types per specific output you can simply delimit
* them with comma (e.g., {@code application/json,text/plain...}).
*
* @param <T> instance type
* @param functionDefinition functionDefinition
* @param acceptedOutputTypes acceptedOutputTypes
* @param <T> instance type which should be one of {@link Supplier}, {@link Function} or {@link Consumer}.
* @param functionDefinition the definition of a function (e.g., 'foo' or 'foo|bar')
* @param acceptedOutputMimeTypes acceptedOutputMimeTypes array of string representation of {@link MimeType}s
* used to convert function output back to {@code Message<byte[]>}.
* @return instance of the functional interface registered with this catalog
*/
default <T> T lookup(String functionDefinition, String... acceptedOutputTypes) {
default <T> T lookup(String functionDefinition, String... acceptedOutputMimeTypes) {
throw new UnsupportedOperationException("This instance of FunctionCatalog does not support this operation");
}

View File

@@ -42,7 +42,6 @@ import reactor.util.function.Tuples;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
@@ -82,7 +81,7 @@ import org.springframework.util.StringUtils;
* @since 3.0
*/
public class BeanFactoryAwareFunctionRegistry
implements FunctionRegistry, FunctionInspector, ApplicationContextAware, SmartInitializingSingleton {
implements FunctionRegistry, FunctionInspector, ApplicationContextAware {
private static Log logger = LogFactory.getLog(AbstractSpringFunctionAdapterInitializer.class);
@@ -121,20 +120,6 @@ public class BeanFactoryAwareFunctionRegistry
return (T) this.compose(null, definition, acceptedOutputTypes);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
//TODO do we really need to do that given we no longer do the same for other gunctions?
public void afterSingletonsInstantiated() {
Map<String, FunctionRegistration> beansOfType = this.applicationContext
.getBeansOfType(FunctionRegistration.class);
for (FunctionRegistration fr : beansOfType.values()) {
this.registrationsByFunction.putIfAbsent(fr.getTarget(), fr);
for (Object name : fr.getNames()) {
this.registrationsByName.putIfAbsent((String) name, fr);
}
}
}
@SuppressWarnings("unchecked")
@Override
public Set<String> getNames(Class<?> type) {
@@ -315,8 +300,6 @@ public class BeanFactoryAwareFunctionRegistry
this.functionDefinition = functionDefinition;
}
@Override
public void accept(Object input) {
this.doApply(input, true);
@@ -372,68 +355,51 @@ public class BeanFactoryAwareFunctionRegistry
logger.debug("Applying function: " + this.functionDefinition);
}
Object result = null;
Object result;
if (input instanceof Publisher) {
input = this.composed ? input :
this.convertInputPublisherIfNecessary((Publisher<?>) input, FunctionTypeUtils.getInputType(functionType, 0));
if (FunctionTypeUtils.isReactive(FunctionTypeUtils.getInputType(functionType, 0))) {
this.convertInputPublisherIfNecessary((Publisher<?>) input, FunctionTypeUtils.getInputType(this.functionType, 0));
if (FunctionTypeUtils.isReactive(FunctionTypeUtils.getInputType(this.functionType, 0))) {
result = this.invokeFunction(input);
if (result == null) {
result = Mono.empty();
}
result = result == null ? Mono.empty() : result;
}
else {
if (this.composed) {
return input instanceof Mono
? Mono.from((Publisher<?>) input).transform((Function) target)
: Flux.from((Publisher<?>) input).transform((Function) target);
? Mono.from((Publisher<?>) input).transform((Function) this.target)
: Flux.from((Publisher<?>) input).transform((Function) this.target);
}
else {
boolean isConsumer = FunctionTypeUtils.isConsumer(functionType);
Publisher res;
if (isConsumer) {
res = input instanceof Mono ? Mono.from((Publisher) input).doOnNext((Consumer) this.target).then()
if (FunctionTypeUtils.isConsumer(functionType)) {
result = input instanceof Mono ? Mono.from((Publisher) input).doOnNext((Consumer) this.target).then()
: Flux.from((Publisher) input).doOnNext((Consumer) this.target).then();
}
else {
res = input instanceof Mono
result = input instanceof Mono
? Mono.from((Publisher) input).map(value -> this.invokeFunction(value))
: Flux.from((Publisher) input).map(value -> this.invokeFunction(value));
}
return res;
}
}
}
else {
Type type = FunctionTypeUtils.getInputType(functionType, 0);
if (!composed && !FunctionTypeUtils.isMultipleInputArguments(functionType) && FunctionTypeUtils.isReactive(type)) {
Type type = FunctionTypeUtils.getInputType(this.functionType, 0);
if (!this.composed && !FunctionTypeUtils.isMultipleInputArguments(this.functionType) && FunctionTypeUtils.isReactive(type)) {
Publisher<?> publisher = FunctionTypeUtils.isFlux(type)
? input == null ? Flux.empty() : Flux.just(input)
: input == null ? Mono.empty() : Mono.just(input);
publisher = this.convertInputPublisherIfNecessary(publisher, FunctionTypeUtils.getInputType(functionType, 0));
result = this.invokeFunction(publisher);
result = this.invokeFunction(this.convertInputPublisherIfNecessary(publisher, FunctionTypeUtils.getInputType(this.functionType, 0)));
}
else {
result = this.invokeFunction(this.composed ? input
: this.convertInputValueIfNecessary(input, FunctionTypeUtils.getInputType(functionType, 0)));
: this.convertInputValueIfNecessary(input, FunctionTypeUtils.getInputType(this.functionType, 0)));
}
}
if (!ObjectUtils.isEmpty(acceptedOutputMimeTypes)) {
if (result instanceof Publisher) {
result = this.convertOutputPublisherIfNecessary((Publisher<?>) result, this.acceptedOutputMimeTypes);
}
else {
result = this.convertOutputValueIfNecessary(result, this.acceptedOutputMimeTypes);
}
}
if (!(result instanceof Publisher) && (!(target instanceof FunctionInvocationWrapper) && target instanceof Supplier)) {
/*
* This is ONLY relevant for web, so consider exposing some property or may be
* the fact that this is a rare case (Supplier) leave it temporarily as is.
*/
// return Flux.just(result);
if (!ObjectUtils.isEmpty(this.acceptedOutputMimeTypes)) {
result = result instanceof Publisher
? this.convertOutputPublisherIfNecessary((Publisher<?>) result, this.acceptedOutputMimeTypes)
: this.convertOutputValueIfNecessary(result, this.acceptedOutputMimeTypes);
}
return result;
@@ -443,18 +409,14 @@ public class BeanFactoryAwareFunctionRegistry
logger.info("Converting output value ");
Object convertedValue = null;
if (FunctionTypeUtils.isMultipleArgumentsHolder(value)) {
int outputCount = FunctionTypeUtils.getOutputCount(functionType);
int outputCount = FunctionTypeUtils.getOutputCount(this.functionType);
Object[] convertedInputArray = new Object[outputCount];
for (int i = 0; i < outputCount; i++) {
Expression parsed = new SpelExpressionParser().parseExpression("getT" + (i + 1) + "()");
Object outputArgument = parsed.getValue(value);
if (outputArgument instanceof Publisher) {
outputArgument = this.convertOutputPublisherIfNecessary((Publisher<?>) outputArgument, acceptedOutputMimeTypes[i]);
}
else {
outputArgument = this.convertOutputValueIfNecessary(outputArgument, acceptedOutputMimeTypes);
}
convertedInputArray[i] = outputArgument;
convertedInputArray[i] = outputArgument instanceof Publisher
? this.convertOutputPublisherIfNecessary((Publisher<?>) outputArgument, acceptedOutputMimeTypes[i])
: this.convertOutputValueIfNecessary(outputArgument, acceptedOutputMimeTypes);
}
convertedValue = Tuples.fromArray(convertedInputArray);
}
@@ -472,7 +434,6 @@ public class BeanFactoryAwareFunctionRegistry
// ignore
}
}
}
return convertedValue;

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.function.context.catalog;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
@@ -37,6 +38,9 @@ import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
@@ -65,6 +69,13 @@ public class BeanFactoryAwareFunctionRegistryTests {
List<String> result = asFlux.apply(Flux.just("uppercaseFlux", "uppercaseFlux2")).collectList().block();
assertThat(result.get(0)).isEqualTo("UPPERCASEFLUX");
assertThat(result.get(1)).isEqualTo("UPPERCASEFLUX2");
Function<Flux<Message<byte[]>>, Flux<Message<byte[]>>> messageFlux = catalog.lookup("uppercase", "application/json");
Message<byte[]> message1 = MessageBuilder.withPayload("\"uppercaseFlux\"".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build();
Message<byte[]> message2 = MessageBuilder.withPayload("\"uppercaseFlux2\"".getBytes()).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build();
List<Message<byte[]>> messageResult = messageFlux.apply(Flux.just(message1, message2)).collectList().block();
assertThat(messageResult.get(0).getPayload()).isEqualTo("\"UPPERCASEFLUX\"".getBytes(StandardCharsets.UTF_8));
assertThat(messageResult.get(1).getPayload()).isEqualTo("\"UPPERCASEFLUX2\"".getBytes(StandardCharsets.UTF_8));
}
/*