diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java index 15f309454..311c3e842 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java @@ -165,8 +165,8 @@ public class BeanFactoryAwareFunctionRegistry @Override public FunctionRegistration getRegistration(Object function) { FunctionRegistration registration = this.registrationsByFunction.get(function); -// // need to do this due to the deployer not wrapping the actual target into FunctionInvocationWrapper -// // hence the lookup would need to be made by the actual target + // need to do this due to the deployer not wrapping the actual target into FunctionInvocationWrapper + // hence the lookup would need to be made by the actual target if (registration == null && function instanceof FunctionInvocationWrapper) { function = ((FunctionInvocationWrapper) function).target; } @@ -463,21 +463,42 @@ public class BeanFactoryAwareFunctionRegistry @Override public void accept(Object input) { - this.doApply(input, true); + this.doApply(input, true, null); } @Override public Object apply(Object input) { - return this.doApply(input, false); + return this.apply(input, null); + } + + /** + * !! Experimental, may change. Is not yet intended as public API !! + * @param input input value + * @param enricher enricher function instance + * @return the result + */ + @SuppressWarnings("rawtypes") + public Object apply(Object input, Function enricher) { + return this.doApply(input, false, enricher); } @Override public Object get() { + return this.get(null); + } + + /** + * !! Experimental, may change. Is not yet intended as public API !! + * @param enricher enricher function instance + * @return the result + */ + @SuppressWarnings("rawtypes") + protected Object get(Function enricher) { Object input = FunctionTypeUtils.isMono(this.functionType) ? Mono.empty() : (FunctionTypeUtils.isMono(this.functionType) ? Flux.empty() : null); - return this.doApply(input, false); + return this.doApply(input, false, enricher); } public Type getFunctionType() { @@ -516,7 +537,7 @@ public class BeanFactoryAwareFunctionRegistry } @SuppressWarnings({ "unchecked", "rawtypes" }) - private Object doApply(Object input, boolean consumer) { + private Object doApply(Object input, boolean consumer, Function enricher) { if (logger.isDebugEnabled()) { logger.debug("Applying function: " + this.functionDefinition); } @@ -571,15 +592,15 @@ public class BeanFactoryAwareFunctionRegistry // Outputs will be converted only if we're told how (via acceptedOutputMimeTypes), otherwise output returned as is. if (!ObjectUtils.isEmpty(this.acceptedOutputMimeTypes)) { result = result instanceof Publisher - ? this.convertOutputPublisherIfNecessary((Publisher) result, this.acceptedOutputMimeTypes) - : this.convertOutputValueIfNecessary(result, this.acceptedOutputMimeTypes); + ? this.convertOutputPublisherIfNecessary((Publisher) result, enricher, this.acceptedOutputMimeTypes) + : this.convertOutputValueIfNecessary(result, enricher, this.acceptedOutputMimeTypes); } return result; } @SuppressWarnings({ "rawtypes", "unchecked" }) - private Object convertOutputValueIfNecessary(Object value, String... acceptedOutputMimeTypes) { + private Object convertOutputValueIfNecessary(Object value, Function enricher, String... acceptedOutputMimeTypes) { logger.debug("Applying type conversion on output value"); Object convertedValue = null; if (FunctionTypeUtils.isMultipleArgumentsHolder(value)) { @@ -590,8 +611,8 @@ public class BeanFactoryAwareFunctionRegistry Object outputArgument = parsed.getValue(value); try { convertedInputArray[i] = outputArgument instanceof Publisher - ? this.convertOutputPublisherIfNecessary((Publisher) outputArgument, acceptedOutputMimeTypes[i]) - : this.convertOutputValueIfNecessary(outputArgument, acceptedOutputMimeTypes[i]); + ? this.convertOutputPublisherIfNecessary((Publisher) outputArgument, enricher, acceptedOutputMimeTypes[i]) + : this.convertOutputValueIfNecessary(outputArgument, enricher, acceptedOutputMimeTypes[i]); } catch (ArrayIndexOutOfBoundsException e) { throw new IllegalStateException("The number of 'acceptedOutputMimeTypes' for function '" + this.functionDefinition @@ -613,7 +634,7 @@ public class BeanFactoryAwareFunctionRegistry convertedValue = message; } else { - convertedValue = messageConverter.toMessage(message.getPayload(), message.getHeaders()); + convertedValue = this.convertValueToMessage(message, enricher, acceptedContentType); } } else if (value instanceof byte[]) { @@ -626,12 +647,11 @@ public class BeanFactoryAwareFunctionRegistry } AtomicReference> messages = new AtomicReference>(new ArrayList<>()); ((Iterable) value).forEach(element -> - messages.get().add((Message) convertOutputValueIfNecessary(element, acceptedContentType.toString()))); + messages.get().add((Message) convertOutputValueIfNecessary(element, enricher, acceptedContentType.toString()))); convertedValue = messages.get(); } else { - convertedValue = messageConverter - .toMessage(value, new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, acceptedContentType))); + convertedValue = this.convertValueToMessage(value, enricher, acceptedContentType); } } } @@ -639,14 +659,37 @@ public class BeanFactoryAwareFunctionRegistry } - private Publisher convertOutputPublisherIfNecessary(Publisher publisher, String... acceptedOutputMimeTypes) { + @SuppressWarnings("rawtypes") + private Message convertValueToMessage(Object value, Function enricher, MimeType acceptedContentType) { + Message outputMessage = null; + if (enricher != null) { + if (!(value instanceof Message)) { + value = MessageBuilder.withPayload(value).setHeader(MessageHeaders.CONTENT_TYPE, acceptedContentType).build(); + } + value = enricher.apply((Message) value); + outputMessage = messageConverter.toMessage(((Message) value).getPayload(), ((Message) value).getHeaders()); + } + else { + if (value instanceof Message) { + outputMessage = messageConverter.toMessage(((Message) value).getPayload(), ((Message) value).getHeaders()); + } + else { + outputMessage = messageConverter.toMessage(value, + new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, acceptedContentType))); + } + } + return outputMessage; + } + + @SuppressWarnings("rawtypes") + private Publisher convertOutputPublisherIfNecessary(Publisher publisher, Function enricher, String... acceptedOutputMimeTypes) { if (logger.isDebugEnabled()) { logger.debug("Applying type conversion on output Publisher " + publisher); } Publisher result = publisher instanceof Mono - ? Mono.from(publisher) .map(value -> this.convertOutputValueIfNecessary(value, acceptedOutputMimeTypes)) - : Flux.from(publisher).map(value -> this.convertOutputValueIfNecessary(value, acceptedOutputMimeTypes)); + ? Mono.from(publisher) .map(value -> this.convertOutputValueIfNecessary(value, enricher, acceptedOutputMimeTypes)) + : Flux.from(publisher).map(value -> this.convertOutputValueIfNecessary(value, enricher, acceptedOutputMimeTypes)); return result; }