From fc39f09f1aa3aa1e499b5c2f50e027a3f1885d85 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Thu, 11 Nov 2021 17:16:19 +0100 Subject: [PATCH] GH-726 Enhance MessageRoutingCallback to optionally return enriched Message Resolves #726 --- .../main/asciidoc/spring-cloud-function.adoc | 31 ++-- .../context/MessageRoutingCallback.java | 61 ++++++- .../catalog/SimpleFunctionRegistry.java | 3 + .../context/config/RoutingFunction.java | 33 ++-- .../context/MessageRoutingCallbackTests.java | 149 ++++++++++++++++++ 5 files changed, 246 insertions(+), 31 deletions(-) create mode 100644 spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/MessageRoutingCallbackTests.java diff --git a/docs/src/main/asciidoc/spring-cloud-function.adoc b/docs/src/main/asciidoc/spring-cloud-function.adoc index 36bd0c7e4..12d7784f5 100644 --- a/docs/src/main/asciidoc/spring-cloud-function.adoc +++ b/docs/src/main/asciidoc/spring-cloud-function.adoc @@ -152,20 +152,13 @@ The `MessageRoutingCallback` is a strategy to assist with determining the name o [source, java] ---- public interface MessageRoutingCallback { - - /** - * Determines the name of the function definition to route incoming {@link Message}. - * - * @param message instance of incoming {@link Message} - * @return the name of the route-to function definition - */ - String functionDefinition(Message message); + FunctionRoutingResult routingResult(Message message); + . . . } ---- -All you need to do is implement it and and register it as a bean. The framework will automatically -pick it up and use it for routing decisions. -For example +All you need to do is implement and register it as a bean to be picked up by the `RoutingFunction`. +For example: [source, java] ---- @@ -173,15 +166,25 @@ For example public MessageRoutingCallback customRouter() { return new MessageRoutingCallback() { @Override - public String functionDefinition(Message message) { - return (String) message.getHeaders().get("func_name"); + FunctionRoutingResult routingResult(Message message) { + return new FunctionRoutingResult((String) message.getHeaders().get("func_name")); } }; } ---- In the preceding example you can see a very simple implementation of `MessageRoutingCallback` which determines the function definition from -`func_name` header of the incoming Message. +`func_name` Message header of the incoming Message and returns the instance of `FunctionRoutingResult` containing the definition of function to invoke. + +Additionally, the `FunctionRoutingResult` provides another constructor allowing you to provide an instance of `Message` as second argument to be used down stream. +This is primarily for runtime optimizations. To better understand this case let's look at the following scenario. +You need to route based on the payoload type. However, an input Message typically comes in as let's say JSON payload (as `byte[]`) . In order +to determine the route-to function definition you need to first process such JSON and potentially create an instance of the target type. +Once that determination is done you can pass it to `RoutingFunction` which still has a reference to the original Message with un-processed payload +This means that somewhere downstream, type conversion/transformation would need to be repeated. + +Allowing you to create a new `Message` with converted payload as part of the `FunctionRoutingResult` will instruct `RoutingFunction` to use such `Message` +downstream. So effectively you letting the framework to benefit from the work you already did. *Message Headers* diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/MessageRoutingCallback.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/MessageRoutingCallback.java index 90eda9b8d..1d7b0ada1 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/MessageRoutingCallback.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/MessageRoutingCallback.java @@ -22,8 +22,7 @@ import org.springframework.messaging.Message; /** * Java-based strategy to assist with determining the name of the route-to function definition. * Once implementation is registered as a bean in application context - * it will be picked up by a {@link RoutingFunction} and used to determine the name of the - * route-to function definition. + * it will be picked up by the {@link RoutingFunction}. * * While {@link RoutingFunction} provides several mechanisms to determine the route-to function definition * this callback takes precedence over all of them. @@ -34,10 +33,58 @@ import org.springframework.messaging.Message; public interface MessageRoutingCallback { /** - * Determines the name of the function definition to route incoming {@link Message}. - * - * @param message instance of incoming {@link Message} - * @return the name of the route-to function definition + * @deprecated in 3.1 in favor of {@link #routingResult(Message)} */ - String functionDefinition(Message message); + @Deprecated + default String functionDefinition(Message message) { + return null; + } + + /** + * Computes and returns the instance of {@link FunctionRoutingResult} which encapsulates, + * at the very minimum, function definition and optionally Message to be used downstream. + *

+ * Providing such message is primarily an optimization feature. It could be useful for cases + * where routing procedure is complex and results in, let's say, conversion of the payload to + * the target type, which would effectively be thrown away if the ability to modify the target + * message for downstream use didn't exist, resulting in repeated transformation, type conversion etc. + * + * @param message input message + * @return instance of {@link FunctionRoutingResult} containing the result of the routing computation + */ + default FunctionRoutingResult routingResult(Message message) { + return new FunctionRoutingResult(functionDefinition(message)); + } + + /** + * Domain object that represents the result of the {@link MessageRoutingCallback#routingResult(Message)} + * computation. It consists of function definition and optional Message to be used downstream + * (see {@link MessageRoutingCallback#routingResult(Message)} for more details. + * + * @author Oleg Zhurakousky + * + */ + final class FunctionRoutingResult { + + private final String functionDefinition; + + private final Message message; + + FunctionRoutingResult(String functionDefinition, Message message) { + this.functionDefinition = functionDefinition; + this.message = message; + } + + public FunctionRoutingResult(String functionDefinition) { + this(functionDefinition, null); + } + + public String getFunctionDefinition() { + return functionDefinition; + } + + public Message getMessage() { + return message; + } + } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 65d8050db..3df3a1b6f 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -1091,6 +1091,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } private boolean isExtractPayload(Message message, Type type) { + if (this.isRoutingFunction()) { + return false; + } if (FunctionTypeUtils.isCollectionOfMessage(type)) { return true; } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java index e223a7f2d..a77c7f3ee 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java @@ -27,6 +27,7 @@ import reactor.core.publisher.Mono; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.MessageRoutingCallback; +import org.springframework.cloud.function.context.MessageRoutingCallback.FunctionRoutingResult; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.context.expression.MapAccessor; import org.springframework.expression.BeanResolver; @@ -104,7 +105,15 @@ public class RoutingFunction implements Function { if (input instanceof Message) { Message message = (Message) input; if (this.routingCallback != null) { - function = this.functionFromCallback(message); + FunctionRoutingResult routingResult = this.routingCallback.routingResult(message); + if (routingResult != null) { + if (StringUtils.hasText(routingResult.getFunctionDefinition())) { + function = this.functionFromDefinition(routingResult.getFunctionDefinition()); + } + if (routingResult.getMessage() != null) { + message = routingResult.getMessage(); + } + } } if (function == null) { if (StringUtils.hasText((String) message.getHeaders().get("spring.cloud.function.definition"))) { @@ -172,15 +181,19 @@ public class RoutingFunction implements Function { + "spring.cloud.function.routing-expression' as application properties."); } - private FunctionInvocationWrapper functionFromCallback(Object input) { - if (input instanceof Message) { - String functionDefinition = this.routingCallback.functionDefinition((Message) input); - if (StringUtils.hasText(functionDefinition)) { - return this.functionFromDefinition(functionDefinition); - } - } - return null; - } +// private FunctionInvocationWrapper functionFromCallback(Object input) { +// if (input instanceof Message) { +// Object routingResult = this.routingCallback.functionDefinition((Message) input); +// if (routingResult != null && routingResult instanceof String) { +// +// } +// if (StringUtils.hasText(functionDefinition)) { +// return this.functionFromDefinition(functionDefinition); +// } +// } +// logger.info("Unable to determine route-to function from the provided MessageRoutingCallback"); +// return null; +// } private FunctionInvocationWrapper functionFromDefinition(String definition) { FunctionInvocationWrapper function = functionCatalog.lookup(definition); diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/MessageRoutingCallbackTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/MessageRoutingCallbackTests.java new file mode 100644 index 000000000..42fc259f3 --- /dev/null +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/MessageRoutingCallbackTests.java @@ -0,0 +1,149 @@ +/* + * Copyright 2021-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; +import org.springframework.cloud.function.context.config.RoutingFunction; +import org.springframework.cloud.function.json.JsonMapper; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; + +public class MessageRoutingCallbackTests { + + private ApplicationContext context; + + @BeforeEach + public void before() { + System.clearProperty("spring.cloud.function.definition"); + } + + @SuppressWarnings("unchecked") + @Test + public void testRoutingCallbackWithMessageModification() { + FunctionCatalog catalog = this.configureCatalog(SamppleConfiguration.class); + SamppleConfiguration conf = context.getBean(SamppleConfiguration.class); + FunctionInvocationWrapper function = (FunctionInvocationWrapper) catalog.lookup(RoutingFunction.FUNCTION_NAME, "application/json"); + String foo = "{\"foo\":\"blah\"}"; + Message fooResult = (Message) function.apply(MessageBuilder.withPayload(foo.getBytes()).build()); + String bar = "{\"bar\":\"blah\"}"; + Message barResult = (Message) function.apply(MessageBuilder.withPayload(bar.getBytes()).build()); + assertThat(fooResult.getPayload()).isEqualTo("\"foo\"".getBytes()); + assertThat(barResult.getPayload()).isEqualTo("\"bar\"".getBytes()); + + assertThat(fooResult.getHeaders().get("originalId")).isEqualTo(conf.createdMessageIds.get("foo")); + assertThat(barResult.getHeaders().get("originalId")).isEqualTo(conf.createdMessageIds.get("bar")); + } + + private FunctionCatalog configureCatalog(Class... configClass) { + this.context = new SpringApplicationBuilder(configClass) + .run("--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.main.lazy-initialization=true"); + FunctionCatalog catalog = context.getBean(FunctionCatalog.class); + return catalog; + } + + @EnableAutoConfiguration + private static class SamppleConfiguration { + + Map createdMessageIds = new HashMap<>(); + + @Bean + public MessageRoutingCallback messageRoutingCallback(JsonMapper jsonMapper) { + return new MessageRoutingCallback() { + + @Override + public FunctionRoutingResult routingResult(Message message) { + String payload = new String((byte[]) message.getPayload()); + + MessageBuilder builder; + String functionDefinition; + if (payload.contains("foo")) { + builder = MessageBuilder.withPayload(jsonMapper.fromJson(payload, Foo.class)); + functionDefinition = "foo"; + } + else { + builder = MessageBuilder.withPayload(jsonMapper.fromJson(payload, Bar.class)); + functionDefinition = "bar"; + } + Message m = builder.copyHeaders(message.getHeaders()).build(); + createdMessageIds.put(functionDefinition, m.getHeaders().getId()); + FunctionRoutingResult functionRoutingResult = new FunctionRoutingResult(functionDefinition, m); + return functionRoutingResult; + } + }; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public Function, Message> foo() { + return foo -> { + Message m = MessageBuilder.withPayload("foo").setHeader("originalId", foo.getHeaders().getId()).build(); + createdMessageIds.put("foo", foo.getHeaders().getId()); + return m; + }; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Bean + public Function, Message> bar() { + return bar -> { + Message m = MessageBuilder.withPayload("bar").setHeader("originalId", bar.getHeaders().getId()).build(); + createdMessageIds.put("bar", bar.getHeaders().getId()); + return m; + }; + } + } + + + public static class Foo { + private String foo; + + public String getFoo() { + return foo; + } + + public void setFoo(String foo) { + this.foo = foo; + } + } + + public static class Bar { + private String bar; + + public String getBar() { + return bar; + } + + public void setBar(String bar) { + this.bar = bar; + } + } +}