GH-449 Added initial support for providing a message enriching callback
For now an undocumented feature as it may need some more work Resolves #449
This commit is contained in:
@@ -165,8 +165,8 @@ public class BeanFactoryAwareFunctionRegistry
|
||||
@Override
|
||||
public FunctionRegistration<?> getRegistration(Object function) {
|
||||
FunctionRegistration<?> registration = this.registrationsByFunction.get(function);
|
||||
// // need to do this due to the deployer not wrapping the actual target into FunctionInvocationWrapper
|
||||
// // hence the lookup would need to be made by the actual target
|
||||
// need to do this due to the deployer not wrapping the actual target into FunctionInvocationWrapper
|
||||
// hence the lookup would need to be made by the actual target
|
||||
if (registration == null && function instanceof FunctionInvocationWrapper) {
|
||||
function = ((FunctionInvocationWrapper) function).target;
|
||||
}
|
||||
@@ -463,21 +463,42 @@ public class BeanFactoryAwareFunctionRegistry
|
||||
|
||||
@Override
|
||||
public void accept(Object input) {
|
||||
this.doApply(input, true);
|
||||
this.doApply(input, true, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object apply(Object input) {
|
||||
return this.doApply(input, false);
|
||||
return this.apply(input, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* !! Experimental, may change. Is not yet intended as public API !!
|
||||
* @param input input value
|
||||
* @param enricher enricher function instance
|
||||
* @return the result
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
public Object apply(Object input, Function<Message, Message> enricher) {
|
||||
return this.doApply(input, false, enricher);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get() {
|
||||
return this.get(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* !! Experimental, may change. Is not yet intended as public API !!
|
||||
* @param enricher enricher function instance
|
||||
* @return the result
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
protected Object get(Function<Message, Message> enricher) {
|
||||
Object input = FunctionTypeUtils.isMono(this.functionType)
|
||||
? Mono.empty()
|
||||
: (FunctionTypeUtils.isMono(this.functionType) ? Flux.empty() : null);
|
||||
|
||||
return this.doApply(input, false);
|
||||
return this.doApply(input, false, enricher);
|
||||
}
|
||||
|
||||
public Type getFunctionType() {
|
||||
@@ -516,7 +537,7 @@ public class BeanFactoryAwareFunctionRegistry
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
private Object doApply(Object input, boolean consumer) {
|
||||
private Object doApply(Object input, boolean consumer, Function<Message, Message> enricher) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Applying function: " + this.functionDefinition);
|
||||
}
|
||||
@@ -571,15 +592,15 @@ public class BeanFactoryAwareFunctionRegistry
|
||||
// Outputs will be converted only if we're told how (via acceptedOutputMimeTypes), otherwise output returned as is.
|
||||
if (!ObjectUtils.isEmpty(this.acceptedOutputMimeTypes)) {
|
||||
result = result instanceof Publisher
|
||||
? this.convertOutputPublisherIfNecessary((Publisher<?>) result, this.acceptedOutputMimeTypes)
|
||||
: this.convertOutputValueIfNecessary(result, this.acceptedOutputMimeTypes);
|
||||
? this.convertOutputPublisherIfNecessary((Publisher<?>) result, enricher, this.acceptedOutputMimeTypes)
|
||||
: this.convertOutputValueIfNecessary(result, enricher, this.acceptedOutputMimeTypes);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
private Object convertOutputValueIfNecessary(Object value, String... acceptedOutputMimeTypes) {
|
||||
private Object convertOutputValueIfNecessary(Object value, Function<Message, Message> enricher, String... acceptedOutputMimeTypes) {
|
||||
logger.debug("Applying type conversion on output value");
|
||||
Object convertedValue = null;
|
||||
if (FunctionTypeUtils.isMultipleArgumentsHolder(value)) {
|
||||
@@ -590,8 +611,8 @@ public class BeanFactoryAwareFunctionRegistry
|
||||
Object outputArgument = parsed.getValue(value);
|
||||
try {
|
||||
convertedInputArray[i] = outputArgument instanceof Publisher
|
||||
? this.convertOutputPublisherIfNecessary((Publisher<?>) outputArgument, acceptedOutputMimeTypes[i])
|
||||
: this.convertOutputValueIfNecessary(outputArgument, acceptedOutputMimeTypes[i]);
|
||||
? this.convertOutputPublisherIfNecessary((Publisher<?>) outputArgument, enricher, acceptedOutputMimeTypes[i])
|
||||
: this.convertOutputValueIfNecessary(outputArgument, enricher, acceptedOutputMimeTypes[i]);
|
||||
}
|
||||
catch (ArrayIndexOutOfBoundsException e) {
|
||||
throw new IllegalStateException("The number of 'acceptedOutputMimeTypes' for function '" + this.functionDefinition
|
||||
@@ -613,7 +634,7 @@ public class BeanFactoryAwareFunctionRegistry
|
||||
convertedValue = message;
|
||||
}
|
||||
else {
|
||||
convertedValue = messageConverter.toMessage(message.getPayload(), message.getHeaders());
|
||||
convertedValue = this.convertValueToMessage(message, enricher, acceptedContentType);
|
||||
}
|
||||
}
|
||||
else if (value instanceof byte[]) {
|
||||
@@ -626,12 +647,11 @@ public class BeanFactoryAwareFunctionRegistry
|
||||
}
|
||||
AtomicReference<List<Message>> messages = new AtomicReference<List<Message>>(new ArrayList<>());
|
||||
((Iterable) value).forEach(element ->
|
||||
messages.get().add((Message) convertOutputValueIfNecessary(element, acceptedContentType.toString())));
|
||||
messages.get().add((Message) convertOutputValueIfNecessary(element, enricher, acceptedContentType.toString())));
|
||||
convertedValue = messages.get();
|
||||
}
|
||||
else {
|
||||
convertedValue = messageConverter
|
||||
.toMessage(value, new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, acceptedContentType)));
|
||||
convertedValue = this.convertValueToMessage(value, enricher, acceptedContentType);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -639,14 +659,37 @@ public class BeanFactoryAwareFunctionRegistry
|
||||
|
||||
}
|
||||
|
||||
private Publisher<?> convertOutputPublisherIfNecessary(Publisher<?> publisher, String... acceptedOutputMimeTypes) {
|
||||
@SuppressWarnings("rawtypes")
|
||||
private Message convertValueToMessage(Object value, Function<Message, Message> enricher, MimeType acceptedContentType) {
|
||||
Message outputMessage = null;
|
||||
if (enricher != null) {
|
||||
if (!(value instanceof Message)) {
|
||||
value = MessageBuilder.withPayload(value).setHeader(MessageHeaders.CONTENT_TYPE, acceptedContentType).build();
|
||||
}
|
||||
value = enricher.apply((Message) value);
|
||||
outputMessage = messageConverter.toMessage(((Message) value).getPayload(), ((Message) value).getHeaders());
|
||||
}
|
||||
else {
|
||||
if (value instanceof Message) {
|
||||
outputMessage = messageConverter.toMessage(((Message) value).getPayload(), ((Message) value).getHeaders());
|
||||
}
|
||||
else {
|
||||
outputMessage = messageConverter.toMessage(value,
|
||||
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, acceptedContentType)));
|
||||
}
|
||||
}
|
||||
return outputMessage;
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
private Publisher<?> convertOutputPublisherIfNecessary(Publisher<?> publisher, Function<Message, Message> enricher, String... acceptedOutputMimeTypes) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Applying type conversion on output Publisher " + publisher);
|
||||
}
|
||||
|
||||
Publisher<?> result = publisher instanceof Mono
|
||||
? Mono.from(publisher) .map(value -> this.convertOutputValueIfNecessary(value, acceptedOutputMimeTypes))
|
||||
: Flux.from(publisher).map(value -> this.convertOutputValueIfNecessary(value, acceptedOutputMimeTypes));
|
||||
? Mono.from(publisher) .map(value -> this.convertOutputValueIfNecessary(value, enricher, acceptedOutputMimeTypes))
|
||||
: Flux.from(publisher).map(value -> this.convertOutputValueIfNecessary(value, enricher, acceptedOutputMimeTypes));
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user