GH-726 Enhance MessageRoutingCallback to optionally return enriched Message

Resolves #726
This commit is contained in:
Oleg Zhurakousky
2021-11-11 17:16:19 +01:00
parent d69b2d2076
commit ad901f23eb
5 changed files with 246 additions and 32 deletions

View File

@@ -152,20 +152,13 @@ The `MessageRoutingCallback` is a strategy to assist with determining the name o
[source, java]
----
public interface MessageRoutingCallback {
/**
* Determines the name of the function definition to route incoming {@link Message}.
*
* @param message instance of incoming {@link Message}
* @return the name of the route-to function definition
*/
String functionDefinition(Message<?> message);
FunctionRoutingResult routingResult(Message<?> message);
. . .
}
----
All you need to do is implement it and and register it as a bean. The framework will automatically
pick it up and use it for routing decisions.
For example
All you need to do is implement and register it as a bean to be picked up by the `RoutingFunction`.
For example:
[source, java]
----
@@ -173,15 +166,25 @@ For example
public MessageRoutingCallback customRouter() {
return new MessageRoutingCallback() {
@Override
public String functionDefinition(Message<?> message) {
return (String) message.getHeaders().get("func_name");
FunctionRoutingResult routingResult(Message<?> message) {
return new FunctionRoutingResult((String) message.getHeaders().get("func_name"));
}
};
}
----
In the preceding example you can see a very simple implementation of `MessageRoutingCallback` which determines the function definition from
`func_name` header of the incoming Message.
`func_name` Message header of the incoming Message and returns the instance of `FunctionRoutingResult` containing the definition of function to invoke.
Additionally, the `FunctionRoutingResult` provides another constructor allowing you to provide an instance of `Message` as second argument to be used down stream.
This is primarily for runtime optimizations. To better understand this case let's look at the following scenario.
You need to route based on the payoload type. However, an input Message typically comes in as let's say JSON payload (as `byte[]`) . In order
to determine the route-to function definition you need to first process such JSON and potentially create an instance of the target type.
Once that determination is done you can pass it to `RoutingFunction` which still has a reference to the original Message with un-processed payload
This means that somewhere downstream, type conversion/transformation would need to be repeated.
Allowing you to create a new `Message` with converted payload as part of the `FunctionRoutingResult` will instruct `RoutingFunction` to use such `Message`
downstream. So effectively you letting the framework to benefit from the work you already did.
*Message Headers*

View File

@@ -22,8 +22,7 @@ import org.springframework.messaging.Message;
/**
* Java-based strategy to assist with determining the name of the route-to function definition.
* Once implementation is registered as a bean in application context
* it will be picked up by a {@link RoutingFunction} and used to determine the name of the
* route-to function definition.
* it will be picked up by the {@link RoutingFunction}.
*
* While {@link RoutingFunction} provides several mechanisms to determine the route-to function definition
* this callback takes precedence over all of them.
@@ -34,10 +33,58 @@ import org.springframework.messaging.Message;
public interface MessageRoutingCallback {
/**
* Determines the name of the function definition to route incoming {@link Message}.
*
* @param message instance of incoming {@link Message}
* @return the name of the route-to function definition
* @deprecated in 3.1 in favor of {@link #routingResult(Message)}
*/
String functionDefinition(Message<?> message);
@Deprecated
default String functionDefinition(Message<?> message) {
return null;
}
/**
* Computes and returns the instance of {@link FunctionRoutingResult} which encapsulates,
* at the very minimum, function definition and optionally Message to be used downstream.
* <br><br>
* Providing such message is primarily an optimization feature. It could be useful for cases
* where routing procedure is complex and results in, let's say, conversion of the payload to
* the target type, which would effectively be thrown away if the ability to modify the target
* message for downstream use didn't exist, resulting in repeated transformation, type conversion etc.
*
* @param message input message
* @return instance of {@link FunctionRoutingResult} containing the result of the routing computation
*/
default FunctionRoutingResult routingResult(Message<?> message) {
return new FunctionRoutingResult(functionDefinition(message));
}
/**
* Domain object that represents the result of the {@link MessageRoutingCallback#routingResult(Message)}
* computation. It consists of function definition and optional Message to be used downstream
* (see {@link MessageRoutingCallback#routingResult(Message)} for more details.
*
* @author Oleg Zhurakousky
*
*/
final class FunctionRoutingResult {
private final String functionDefinition;
private final Message<?> message;
FunctionRoutingResult(String functionDefinition, Message<?> message) {
this.functionDefinition = functionDefinition;
this.message = message;
}
public FunctionRoutingResult(String functionDefinition) {
this(functionDefinition, null);
}
public String getFunctionDefinition() {
return functionDefinition;
}
public Message<?> getMessage() {
return message;
}
}
}

View File

@@ -1090,7 +1090,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
}
private boolean isExtractPayload(Message<?> message, Type type) {
if (this.isRoutingFunction()) {
return false;
}
if (FunctionTypeUtils.isCollectionOfMessage(type)) {
return true;
}

View File

@@ -27,6 +27,7 @@ import reactor.core.publisher.Mono;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.MessageRoutingCallback;
import org.springframework.cloud.function.context.MessageRoutingCallback.FunctionRoutingResult;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.context.expression.MapAccessor;
import org.springframework.expression.BeanResolver;
@@ -104,7 +105,15 @@ public class RoutingFunction implements Function<Object, Object> {
if (input instanceof Message) {
Message<?> message = (Message<?>) input;
if (this.routingCallback != null) {
function = this.functionFromCallback(message);
FunctionRoutingResult routingResult = this.routingCallback.routingResult(message);
if (routingResult != null) {
if (StringUtils.hasText(routingResult.getFunctionDefinition())) {
function = this.functionFromDefinition(routingResult.getFunctionDefinition());
}
if (routingResult.getMessage() != null) {
message = routingResult.getMessage();
}
}
}
if (function == null) {
if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.definition"))) {
@@ -172,15 +181,19 @@ public class RoutingFunction implements Function<Object, Object> {
+ "spring.cloud.function.routing-expression' as application properties.");
}
private FunctionInvocationWrapper functionFromCallback(Object input) {
if (input instanceof Message) {
String functionDefinition = this.routingCallback.functionDefinition((Message<?>) input);
if (StringUtils.hasText(functionDefinition)) {
return this.functionFromDefinition(functionDefinition);
}
}
return null;
}
// private FunctionInvocationWrapper functionFromCallback(Object input) {
// if (input instanceof Message) {
// Object routingResult = this.routingCallback.functionDefinition((Message<?>) input);
// if (routingResult != null && routingResult instanceof String) {
//
// }
// if (StringUtils.hasText(functionDefinition)) {
// return this.functionFromDefinition(functionDefinition);
// }
// }
// logger.info("Unable to determine route-to function from the provided MessageRoutingCallback");
// return null;
// }
private FunctionInvocationWrapper functionFromDefinition(String definition) {
FunctionInvocationWrapper function = functionCatalog.lookup(definition);

View File

@@ -0,0 +1,149 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
public class MessageRoutingCallbackTests {
private ApplicationContext context;
@BeforeEach
public void before() {
System.clearProperty("spring.cloud.function.definition");
}
@SuppressWarnings("unchecked")
@Test
public void testRoutingCallbackWithMessageModification() {
FunctionCatalog catalog = this.configureCatalog(SamppleConfiguration.class);
SamppleConfiguration conf = context.getBean(SamppleConfiguration.class);
FunctionInvocationWrapper function = (FunctionInvocationWrapper) catalog.lookup(RoutingFunction.FUNCTION_NAME, "application/json");
String foo = "{\"foo\":\"blah\"}";
Message<byte[]> fooResult = (Message<byte[]>) function.apply(MessageBuilder.withPayload(foo.getBytes()).build());
String bar = "{\"bar\":\"blah\"}";
Message<byte[]> barResult = (Message<byte[]>) function.apply(MessageBuilder.withPayload(bar.getBytes()).build());
assertThat(fooResult.getPayload()).isEqualTo("\"foo\"".getBytes());
assertThat(barResult.getPayload()).isEqualTo("\"bar\"".getBytes());
assertThat(fooResult.getHeaders().get("originalId")).isEqualTo(conf.createdMessageIds.get("foo"));
assertThat(barResult.getHeaders().get("originalId")).isEqualTo(conf.createdMessageIds.get("bar"));
}
private FunctionCatalog configureCatalog(Class<?>... configClass) {
this.context = new SpringApplicationBuilder(configClass)
.run("--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.main.lazy-initialization=true");
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
return catalog;
}
@EnableAutoConfiguration
private static class SamppleConfiguration {
Map<String, UUID> createdMessageIds = new HashMap<>();
@Bean
public MessageRoutingCallback messageRoutingCallback(JsonMapper jsonMapper) {
return new MessageRoutingCallback() {
@Override
public FunctionRoutingResult routingResult(Message<?> message) {
String payload = new String((byte[]) message.getPayload());
MessageBuilder<?> builder;
String functionDefinition;
if (payload.contains("foo")) {
builder = MessageBuilder.withPayload(jsonMapper.fromJson(payload, Foo.class));
functionDefinition = "foo";
}
else {
builder = MessageBuilder.withPayload(jsonMapper.fromJson(payload, Bar.class));
functionDefinition = "bar";
}
Message<?> m = builder.copyHeaders(message.getHeaders()).build();
createdMessageIds.put(functionDefinition, m.getHeaders().getId());
FunctionRoutingResult functionRoutingResult = new FunctionRoutingResult(functionDefinition, m);
return functionRoutingResult;
}
};
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Function<Message<Foo>, Message<String>> foo() {
return foo -> {
Message m = MessageBuilder.withPayload("foo").setHeader("originalId", foo.getHeaders().getId()).build();
createdMessageIds.put("foo", foo.getHeaders().getId());
return m;
};
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Function<Message<Bar>, Message<String>> bar() {
return bar -> {
Message m = MessageBuilder.withPayload("bar").setHeader("originalId", bar.getHeaders().getId()).build();
createdMessageIds.put("bar", bar.getHeaders().getId());
return m;
};
}
}
public static class Foo {
private String foo;
public String getFoo() {
return foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
}
public static class Bar {
private String bar;
public String getBar() {
return bar;
}
public void setBar(String bar) {
this.bar = bar;
}
}
}