From 4cf18e71c89543edfe74f42fe7dfce31c3b238e2 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 14 Mar 2014 13:37:31 +0200 Subject: [PATCH] Add `route()`, `recipientListRoute()` EIP-methods --- .../integration/dsl/AbstractRouterSpec.java | 56 +++ .../dsl/DslRecipientListRouter.java | 88 +++++ .../dsl/IntegrationFlowBuilder.java | 140 ++++--- .../dsl/RecipientListRouterSpec.java | 44 +++ .../integration/dsl/RouterSpec.java | 50 +++ .../dsl/support/GenericRouter.java | 26 ++ .../dsl/test/IntegrationFlowTests.java | 344 +++++++++++++++++- 7 files changed, 693 insertions(+), 55 deletions(-) create mode 100644 spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/AbstractRouterSpec.java create mode 100644 spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/DslRecipientListRouter.java create mode 100644 spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/RecipientListRouterSpec.java create mode 100644 spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/RouterSpec.java create mode 100644 spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/GenericRouter.java diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/AbstractRouterSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/AbstractRouterSpec.java new file mode 100644 index 0000000..aade96c --- /dev/null +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/AbstractRouterSpec.java @@ -0,0 +1,56 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl; + +import org.springframework.integration.dsl.core.IntegrationComponentSpec; +import org.springframework.integration.router.AbstractMessageRouter; +import org.springframework.messaging.MessageChannel; + +/** + * @author Artem Bilan + */ +public class AbstractRouterSpec, R extends AbstractMessageRouter> extends IntegrationComponentSpec { + + AbstractRouterSpec(R router) { + this.target = router; + } + + public S defaultOutputChannel(MessageChannel defaultOutputChannel) { + this.target.setDefaultOutputChannel(defaultOutputChannel); + return _this(); + } + + public S defaultOutputChannel(String defaultOutputChannel) { + this.target.setDefaultOutputChannelName(defaultOutputChannel); + return _this(); + } + + public S ignoreSendFailures(boolean ignoreSendFailures) { + this.target.setIgnoreSendFailures(ignoreSendFailures); + return _this(); + } + + public S applySequence(boolean applySequence) { + this.target.setApplySequence(applySequence); + return _this(); + } + + @Override + protected R doGet() { + throw new UnsupportedOperationException(); + } +} diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/DslRecipientListRouter.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/DslRecipientListRouter.java new file mode 100644 index 0000000..aabe5c2 --- /dev/null +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/DslRecipientListRouter.java @@ -0,0 +1,88 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.springframework.beans.BeansException; +import org.springframework.integration.core.MessageSelector; +import org.springframework.integration.filter.ExpressionEvaluatingSelector; +import org.springframework.integration.router.RecipientListRouter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.core.DestinationResolutionException; +import org.springframework.util.StringUtils; + +/** + * @author Artem Bilan + */ +class DslRecipientListRouter extends RecipientListRouter { + + private final Map expressionRecipientMap = new HashMap(); + + private final Map selectorRecipientMap = new HashMap(); + + void addRecipient(String channelName, String expression) { + this.expressionRecipientMap.put(channelName, expression); + } + + void addRecipient(String channelName, MessageSelector selector) { + this.selectorRecipientMap.put(channelName, selector); + } + + Map getRecipients() { + Map recipients = new HashMap(this.expressionRecipientMap.size() + this.selectorRecipientMap.size()); + recipients.putAll(this.expressionRecipientMap); + recipients.putAll(this.selectorRecipientMap); + return recipients; + } + + @Override + public void onInit() { + for (Map.Entry recipient : this.expressionRecipientMap.entrySet()) { + ExpressionEvaluatingSelector selector = null; + String expression = recipient.getValue(); + if (StringUtils.hasText(expression)) { + selector = new ExpressionEvaluatingSelector(expression); + selector.setBeanFactory(this.getBeanFactory()); + } + this.selectorRecipientMap.put(recipient.getKey(), selector); + } + + List recipients = new ArrayList(this.selectorRecipientMap.size()); + + for (Map.Entry entry : selectorRecipientMap.entrySet()) { + recipients.add(new Recipient(this.resolveChannelName(entry.getKey()), entry.getValue())); + } + + this.setRecipients(recipients); + super.onInit(); + } + + private MessageChannel resolveChannelName(String channelName) { + try { + return this.getBeanFactory().getBean(channelName, MessageChannel.class); + } + catch (BeansException e) { + throw new DestinationResolutionException("Failed to look up MessageChannel with name '" + + channelName + "' in the BeanFactory."); + } + } + +} diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java index d50515b..05aab83 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java @@ -16,16 +16,9 @@ package org.springframework.integration.dsl; -import java.lang.annotation.Annotation; -import java.lang.reflect.Method; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; import org.springframework.beans.factory.BeanCreationException; -import org.springframework.context.Lifecycle; -import org.springframework.core.annotation.AnnotationUtils; -import org.springframework.expression.MethodFilter; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler; import org.springframework.integration.aggregator.AggregatingMessageHandler; @@ -43,8 +36,10 @@ import org.springframework.integration.dsl.support.BeanNameMessageProcessor; import org.springframework.integration.dsl.support.ComponentConfigurer; import org.springframework.integration.dsl.support.EndpointConfigurer; import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype; +import org.springframework.integration.dsl.support.GenericRouter; import org.springframework.integration.dsl.support.GenericSplitter; import org.springframework.integration.dsl.support.MessageChannelReference; +import org.springframework.integration.expression.ControlBusMethodFilter; import org.springframework.integration.filter.ExpressionEvaluatingSelector; import org.springframework.integration.filter.MessageFilter; import org.springframework.integration.filter.MethodInvokingSelector; @@ -53,6 +48,11 @@ import org.springframework.integration.handler.BridgeHandler; import org.springframework.integration.handler.DelayHandler; import org.springframework.integration.handler.ExpressionCommandMessageProcessor; import org.springframework.integration.handler.ServiceActivatingHandler; +import org.springframework.integration.router.AbstractMappingMessageRouter; +import org.springframework.integration.router.AbstractMessageRouter; +import org.springframework.integration.router.ExpressionEvaluatingRouter; +import org.springframework.integration.router.MethodInvokingRouter; +import org.springframework.integration.router.RecipientListRouter; import org.springframework.integration.splitter.AbstractMessageSplitter; import org.springframework.integration.splitter.DefaultMessageSplitter; import org.springframework.integration.splitter.ExpressionEvaluatingSplitter; @@ -65,13 +65,9 @@ import org.springframework.integration.transformer.HeaderFilter; import org.springframework.integration.transformer.MessageTransformingHandler; import org.springframework.integration.transformer.MethodInvokingTransformer; import org.springframework.integration.transformer.Transformer; -import org.springframework.jmx.export.annotation.ManagedAttribute; -import org.springframework.jmx.export.annotation.ManagedOperation; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.util.Assert; -import org.springframework.util.CustomizableThreadCreator; -import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; /** @@ -297,7 +293,7 @@ public final class IntegrationFlowBuilder { * Provides the {@link HeaderFilter} to the current {@link IntegrationFlow}. * * @param headersToRemove the comma separated headers (or patterns) to remove from {@link org.springframework.messaging.MessageHeaders}. - * @param patternMatch the {@code boolean} flag to indicate if {@code headersToRemove} should be interpreted as patterns or direct header names. + * @param patternMatch the {@code boolean} flag to indicate if {@code headersToRemove} should be interpreted as patterns or direct header names. * @return the {@link IntegrationFlowBuilder}. */ @@ -354,7 +350,7 @@ public final class IntegrationFlowBuilder { } public IntegrationFlowBuilder aggregate(ComponentConfigurer aggregatorConfigurer, - EndpointConfigurer> endpointConfigurer) { + EndpointConfigurer> endpointConfigurer) { Assert.notNull(aggregatorConfigurer); AggregatorSpec spec = new AggregatorSpec(); aggregatorConfigurer.configure(spec); @@ -371,6 +367,88 @@ public final class IntegrationFlowBuilder { } + public IntegrationFlowBuilder route(String beanName, String method) { + return this.route(beanName, method, null); + } + + public IntegrationFlowBuilder route(String beanName, String method, + ComponentConfigurer> routerConfigurer) { + return this.route(beanName, method, routerConfigurer, null); + } + + public IntegrationFlowBuilder route(String beanName, String method, + ComponentConfigurer> routerConfigurer, + EndpointConfigurer> endpointConfigurer) { + return this.route(new MethodInvokingRouter(new BeanNameMessageProcessor(beanName, method)), routerConfigurer, endpointConfigurer); + } + + + public IntegrationFlowBuilder route(String expression) { + return this.route(expression, (ComponentConfigurer>) null); + } + + public IntegrationFlowBuilder route(String expression, ComponentConfigurer> routerConfigurer) { + return this.route(expression, routerConfigurer, null); + } + + public IntegrationFlowBuilder route(String expression, + ComponentConfigurer> routerConfigurer, + EndpointConfigurer> endpointConfigurer) { + return this.route(new ExpressionEvaluatingRouter(PARSER.parseExpression(expression)), routerConfigurer, endpointConfigurer); + } + + public IntegrationFlowBuilder route(GenericRouter router) { + return this.route(router, (ComponentConfigurer>) null); + } + + public IntegrationFlowBuilder route(GenericRouter router, ComponentConfigurer> routerConfigurer) { + return this.route(router, routerConfigurer, null); + } + + public IntegrationFlowBuilder route(GenericRouter router, + ComponentConfigurer> routerConfigurer, + EndpointConfigurer> endpointConfigurer) { + return this.route(new MethodInvokingRouter(router), routerConfigurer, endpointConfigurer); + } + + public IntegrationFlowBuilder route(R router, ComponentConfigurer> routerConfigurer) { + return this.route(router, routerConfigurer, null); + } + + public IntegrationFlowBuilder route(R router, + ComponentConfigurer> routerConfigurer, + EndpointConfigurer> endpointConfigurer) { + if (routerConfigurer != null) { + RouterSpec routerSpec = new RouterSpec(router); + routerConfigurer.configure(routerSpec); + } + return this.route(router, endpointConfigurer); + } + + public IntegrationFlowBuilder recipientListRoute(ComponentConfigurer routerConfigurer) { + return this.recipientListRoute(routerConfigurer, null); + } + + public IntegrationFlowBuilder recipientListRoute(ComponentConfigurer routerConfigurer, + EndpointConfigurer> endpointConfigurer) { + Assert.notNull(routerConfigurer); + RecipientListRouterSpec spec = new RecipientListRouterSpec(); + routerConfigurer.configure(spec); + DslRecipientListRouter recipientListRouter = (DslRecipientListRouter) spec.get(); + Assert.notEmpty(recipientListRouter.getRecipients(), "recipient list must not be empty"); + return this.route(recipientListRouter, endpointConfigurer); + } + + public IntegrationFlowBuilder route(AbstractMessageRouter router) { + return this.route(router, null); + } + + public IntegrationFlowBuilder route(R router, + EndpointConfigurer> endpointConfigurer) { + return this.handle(router, endpointConfigurer); + } + + private > IntegrationFlowBuilder register(S endpointSpec, EndpointConfigurer endpointConfigurer) { if (endpointConfigurer != null) { endpointConfigurer.configure(endpointSpec); @@ -454,40 +532,4 @@ public final class IntegrationFlowBuilder { return this.flow; } - - private static class ControlBusMethodFilter implements MethodFilter { - - public List filter(List methods) { - List supportedMethods = new ArrayList(); - for (Method method : methods) { - if (this.accept(method)) { - supportedMethods.add(method); - } - } - return supportedMethods; - } - - private boolean accept(Method method) { - Class declaringClass = method.getDeclaringClass(); - if (Lifecycle.class.isAssignableFrom(declaringClass) - && ReflectionUtils.findMethod(Lifecycle.class, method.getName(), method.getParameterTypes()) != null) { - return true; - } - if (CustomizableThreadCreator.class.isAssignableFrom(declaringClass) - && (method.getName().startsWith("get") - || method.getName().startsWith("set") - || method.getName().startsWith("shutdown"))) { - return true; - } - if (this.hasAnnotation(method, ManagedAttribute.class) || this.hasAnnotation(method, ManagedOperation.class)) { - return true; - } - return false; - } - - private boolean hasAnnotation(Method method, Class annotationType) { - return AnnotationUtils.findAnnotation(method, annotationType) != null; - } - } - } diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/RecipientListRouterSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/RecipientListRouterSpec.java new file mode 100644 index 0000000..ce0f512 --- /dev/null +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/RecipientListRouterSpec.java @@ -0,0 +1,44 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl; + +import org.springframework.integration.core.MessageSelector; +import org.springframework.integration.router.RecipientListRouter; +import org.springframework.util.Assert; + +/** + * @author Artem Bilan + */ +public class RecipientListRouterSpec extends AbstractRouterSpec { + + RecipientListRouterSpec() { + super(new DslRecipientListRouter()); + } + + public RecipientListRouterSpec recipient(String channelName, String expression) { + Assert.hasText(channelName); + ((DslRecipientListRouter) this.target).addRecipient(channelName, expression); + return _this(); + } + + public RecipientListRouterSpec recipient(String channelName, MessageSelector selector) { + Assert.hasText(channelName); + ((DslRecipientListRouter) this.target).addRecipient(channelName, selector); + return _this(); + } + +} diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/RouterSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/RouterSpec.java new file mode 100644 index 0000000..bdd38e9 --- /dev/null +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/RouterSpec.java @@ -0,0 +1,50 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl; + +import org.springframework.integration.router.AbstractMappingMessageRouter; + +/** + * @author Artem Bilan + */ +public final class RouterSpec extends AbstractRouterSpec, R> { + + RouterSpec(R router) { + super(router); + } + + public RouterSpec resolutionRequired(boolean resolutionRequired) { + this.target.setResolutionRequired(resolutionRequired); + return _this(); + } + + public RouterSpec prefix(String prefix) { + this.target.setPrefix(prefix); + return _this(); + } + + public RouterSpec suffix(String suffix) { + this.target.setSuffix(suffix); + return _this(); + } + + public RouterSpec channelMapping(String key, String channelName) { + this.target.setChannelMapping(key, channelName); + return _this(); + } + +} diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/GenericRouter.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/GenericRouter.java new file mode 100644 index 0000000..a9be8ee --- /dev/null +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/GenericRouter.java @@ -0,0 +1,26 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl.support; + +/** + * @author Artem Bilan + */ +public interface GenericRouter { + + T route(S source); + +} diff --git a/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/IntegrationFlowTests.java b/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/IntegrationFlowTests.java index 1cce053..5480f9d 100644 --- a/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/IntegrationFlowTests.java +++ b/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/IntegrationFlowTests.java @@ -56,7 +56,10 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.MessageDispatchingException; +import org.springframework.integration.annotation.Header; +import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.MessageEndpoint; +import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.FixedSubscriberChannel; @@ -79,6 +82,7 @@ import org.springframework.integration.file.DefaultFileNameGenerator; import org.springframework.integration.file.FileHeaders; import org.springframework.integration.file.FileWritingMessageHandler; import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice; +import org.springframework.integration.router.MethodInvokingRouter; import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.integration.splitter.DefaultMessageSplitter; import org.springframework.integration.store.SimpleMessageStore; @@ -91,7 +95,9 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageDeliveryException; import org.springframework.messaging.MessageHandlingException; import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.core.DestinationResolutionException; import org.springframework.messaging.support.GenericMessage; import org.springframework.stereotype.Component; import org.springframework.test.context.ContextConfiguration; @@ -110,8 +116,7 @@ public class IntegrationFlowTests { private ListableBeanFactory beanFactory; @Autowired - @Qualifier("controlBus") - private MessageChannel controlBus; + private ControlBusGateway controlBus; @Autowired @Qualifier("flow1QueueChannel") @@ -184,6 +189,49 @@ public class IntegrationFlowTests { @Qualifier("splitAggregateInput") private MessageChannel splitAggregateInput; + @Autowired + @Qualifier("routerInput") + private MessageChannel routerInput; + + @Autowired + @Qualifier("oddChannel") + private PollableChannel oddChannel; + + @Autowired + @Qualifier("evenChannel") + private PollableChannel evenChannel; + + @Autowired + @Qualifier("routerMethodInput") + private MessageChannel routerMethodInput; + + @Autowired + @Qualifier("foo-channel") + private PollableChannel fooChannel; + + @Autowired + @Qualifier("bar-channel") + private PollableChannel barChannel; + + @Autowired + @Qualifier("routerMethod2Input") + private MessageChannel routerMethod2Input; + + @Autowired + @Qualifier("routerMethod3Input") + private MessageChannel routerMethod3Input; + + @Autowired + @Qualifier("routerMultiInput") + private MessageChannel routerMultiInput; + + @Autowired + @Qualifier("recipientListInput") + private MessageChannel recipientListInput; + + @Autowired + @Qualifier("defaultOutputChannel") + private QueueChannel defaultOutputChannel; @Test public void testPollingFlow() { @@ -210,7 +258,7 @@ public class IntegrationFlowTests { assertThat(e.getCause(), Matchers.instanceOf(MessageDispatchingException.class)); assertThat(e.getMessage(), Matchers.containsString("Dispatcher has no subscribers")); } - this.controlBus.send(new GenericMessage("@payloadSerializingTransformer.start()")); + this.controlBus.send("@payloadSerializingTransformer.start()"); final AtomicBoolean used = new AtomicBoolean(); @@ -256,7 +304,7 @@ public class IntegrationFlowTests { assertThat(e.getCause(), Matchers.instanceOf(MessageDispatchingException.class)); assertThat(e.getMessage(), Matchers.containsString("Dispatcher has no subscribers")); } - this.controlBus.send(new GenericMessage("@bridge.start()")); + this.controlBus.send("@bridge.start()"); this.bridgeFlow2Input.send(message); reply = this.bridgeFlow2Output.receive(5000); assertNotNull(reply); @@ -399,7 +447,7 @@ public class IntegrationFlowTests { assertThat(e.getMessage(), Matchers.containsString("Dispatcher has no subscribers")); } - this.controlBus.send(new GenericMessage("@xpathHeaderEnricher.start()")); + this.controlBus.send("@xpathHeaderEnricher.start()"); this.xpathHeaderEnricherInput.send(message); Message result = replyChannel.receive(2000); @@ -409,8 +457,188 @@ public class IntegrationFlowTests { assertEquals("2", headers.get("two")); } + @Test + public void testRouter() { + + int[] payloads = new int[]{1, 2, 3, 4, 5, 6}; + + for (int payload : payloads) { + this.routerInput.send(new GenericMessage(payload)); + } + + for (int i = 0; i < 3; i++) { + Message receive = this.oddChannel.receive(2000); + assertNotNull(receive); + assertEquals(new Integer(i * 2 + 1), receive.getPayload()); + + receive = this.evenChannel.receive(2000); + assertNotNull(receive); + assertEquals(new Integer(i * 2 + 2), receive.getPayload()); + } + + } + + @Test + public void testMethodInvokingRouter() { + Message fooMessage = new GenericMessage("foo"); + Message barMessage = new GenericMessage("bar"); + Message badMessage = new GenericMessage("bad"); + + this.routerMethodInput.send(fooMessage); + + Message result1a = this.fooChannel.receive(2000); + assertNotNull(result1a); + assertEquals("foo", result1a.getPayload()); + assertNull(this.barChannel.receive(0)); + + this.routerMethodInput.send(barMessage); + assertNull(this.fooChannel.receive(0)); + Message result2b = this.barChannel.receive(2000); + assertNotNull(result2b); + assertEquals("bar", result2b.getPayload()); + + try { + this.routerMethodInput.send(badMessage); + fail("MessageDeliveryException expected."); + } + catch (MessageDeliveryException e) { + assertThat(e.getMessage(), Matchers.containsString("no channel resolved by router and no default output channel defined")); + } + + } + + @Test + public void testMethodInvokingRouter2() { + Message fooMessage = MessageBuilder.withPayload("foo").setHeader("targetChannel", "foo").build(); + Message barMessage = MessageBuilder.withPayload("bar").setHeader("targetChannel", "bar").build(); + Message badMessage = MessageBuilder.withPayload("bad").setHeader("targetChannel", "bad").build(); + + this.routerMethod2Input.send(fooMessage); + + Message result1a = this.fooChannel.receive(2000); + assertNotNull(result1a); + assertEquals("foo", result1a.getPayload()); + assertNull(this.barChannel.receive(0)); + + this.routerMethod2Input.send(barMessage); + assertNull(this.fooChannel.receive(0)); + Message result2b = this.barChannel.receive(2000); + assertNotNull(result2b); + assertEquals("bar", result2b.getPayload()); + + try { + this.routerMethod2Input.send(badMessage); + fail("DestinationResolutionException expected."); + } + catch (MessagingException e) { + assertThat(e.getCause(), Matchers.instanceOf(DestinationResolutionException.class)); + assertThat(e.getCause().getMessage(), Matchers.containsString("failed to look up MessageChannel with name 'bad-channel'")); + } + + } + + @Test + public void testMethodInvokingRouter3() { + Message fooMessage = new GenericMessage("foo"); + Message barMessage = new GenericMessage("bar"); + Message badMessage = new GenericMessage("bad"); + + this.routerMethod3Input.send(fooMessage); + + Message result1a = this.fooChannel.receive(2000); + assertNotNull(result1a); + assertEquals("foo", result1a.getPayload()); + assertNull(this.barChannel.receive(0)); + + this.routerMethod3Input.send(barMessage); + assertNull(this.fooChannel.receive(0)); + Message result2b = this.barChannel.receive(2000); + assertNotNull(result2b); + assertEquals("bar", result2b.getPayload()); + + try { + this.routerMethod3Input.send(badMessage); + fail("DestinationResolutionException expected."); + } + catch (MessagingException e) { + assertThat(e.getCause(), Matchers.instanceOf(DestinationResolutionException.class)); + assertThat(e.getCause().getMessage(), Matchers.containsString("failed to look up MessageChannel with name 'bad-channel'")); + } + } + + @Test + public void testMultiRouter() { + + Message fooMessage = new GenericMessage("foo"); + Message barMessage = new GenericMessage("bar"); + Message badMessage = new GenericMessage("bad"); + + this.routerMultiInput.send(fooMessage); + Message result1a = this.fooChannel.receive(2000); + assertNotNull(result1a); + assertEquals("foo", result1a.getPayload()); + Message result1b = this.barChannel.receive(2000); + assertNotNull(result1b); + assertEquals("foo", result1b.getPayload()); + + this.routerMultiInput.send(barMessage); + Message result2a = this.fooChannel.receive(2000); + assertNotNull(result2a); + assertEquals("bar", result2a.getPayload()); + Message result2b = this.barChannel.receive(2000); + assertNotNull(result2b); + assertEquals("bar", result2b.getPayload()); + + try { + this.routerMultiInput.send(badMessage); + fail("MessageDeliveryException expected."); + } + catch (MessageDeliveryException e) { + assertThat(e.getMessage(), Matchers.containsString("no channel resolved by router and no default output channel defined")); + } + } + + @Test + public void testRecipientListRouter() { + + Message fooMessage = MessageBuilder.withPayload("fooPayload").setHeader("recipient", true).build(); + Message barMessage = MessageBuilder.withPayload("barPayload").setHeader("recipient", true).build(); + Message badMessage = new GenericMessage("badPayload"); + + this.recipientListInput.send(fooMessage); + Message result1a = this.fooChannel.receive(2000); + assertNotNull(result1a); + assertEquals("foo", result1a.getPayload()); + Message result1b = this.barChannel.receive(2000); + assertNotNull(result1b); + assertEquals("foo", result1b.getPayload()); + + this.recipientListInput.send(barMessage); + assertNull(this.fooChannel.receive(0)); + Message result2b = this.barChannel.receive(2000); + assertNotNull(result2b); + assertEquals("bar", result2b.getPayload()); + + + this.recipientListInput.send(badMessage); + assertNull(this.fooChannel.receive(0)); + assertNull(this.barChannel.receive(0)); + Message result3c = this.defaultOutputChannel.receive(2000); + assertNotNull(result3c); + assertEquals("bad", result3c.getPayload()); + + } + + + @MessagingGateway(defaultRequestChannel = "controlBus") + private static interface ControlBusGateway { + + void send(String command); + } + @Configuration @EnableIntegration + @IntegrationComponentScan public static class ContextConfiguration { @Bean @@ -552,6 +780,33 @@ public class IntegrationFlowTests { .get(); } + @Bean(name = "foo-channel") + public QueueChannel fooChannel() { + return new QueueChannel(); + } + + @Bean(name = "bar-channel") + public QueueChannel barChannel() { + return new QueueChannel(); + } + + @Bean + public QueueChannel defaultOutputChannel() { + return new QueueChannel(); + } + + @Bean + public IntegrationFlow recipientListFlow() { + return IntegrationFlows.from("recipientListInput") + .transform(p -> p.replaceFirst("Payload", "")) + .recipientListRoute(r -> + r.defaultOutputChannel(defaultOutputChannel()) + .recipient("foo-channel", "'foo' == payload") + .recipient("bar-channel", m -> + m.getHeaders().containsKey("recipient") && (boolean) m.getHeaders().get("recipient")) + ) + .get(); + } } @Component("delayedAdvice") @@ -679,6 +934,83 @@ public class IntegrationFlowTests { .get(); } + @Bean + public QueueChannel oddChannel() { + return new QueueChannel(); + } + + @Bean + public QueueChannel evenChannel() { + return new QueueChannel(); + } + + @Bean + public IntegrationFlow routeFlow() { + return IntegrationFlows.from("routerInput") + .route(p -> p % 2 == 0, + m -> m.suffix("Channel") + .channelMapping("true", "even") + .channelMapping("false", "odd") + ) + .get(); + } + + @Bean + public RoutingTestBean routingTestBean() { + return new RoutingTestBean(); + } + + @Bean + public IntegrationFlow routeMethodInvocationFlow() { + return IntegrationFlows.from("routerMethodInput") + .route("routingTestBean", "routeMessage") + .get(); + } + + @Bean + public IntegrationFlow routeMethodInvocationFlow2() { + return IntegrationFlows.from("routerMethod2Input") + .route(new MethodInvokingRouter(new RoutingTestBean(), "routeByHeader")) + .get(); + } + + @Bean + public IntegrationFlow routeMethodInvocationFlow3() { + return IntegrationFlows.from("routerMethod3Input") + .route((String p) -> ContextConfiguration4.this.routingTestBean().routePayload(p)) + .get(); + } + + @Bean + public IntegrationFlow routeMultiMethodInvocationFlow() { + return IntegrationFlows.from("routerMultiInput") + .route(p -> p.equals("foo") || p.equals("bar") ? new String[]{"foo", "bar"} : null, + s -> s.suffix("-channel") + ) + .get(); + } + + } + + private static class RoutingTestBean { + + public String routePayload(String name) { + return name + "-channel"; + } + + public String routeByHeader(@Header("targetChannel") String name) { + return name + "-channel"; + } + + public String routeMessage(Message message) { + if (message.getPayload().equals("foo")) { + return "foo-channel"; + } + else if (message.getPayload().equals("bar")) { + return "bar-channel"; + } + return null; + } } @Component("greetingService") @@ -695,7 +1027,7 @@ public class IntegrationFlowTests { @Bean public IntegrationFlow wrongLastComponent() { return IntegrationFlows.from(MessageChannels.direct()) - .handle(Object::toString) + .route(Object::toString) .channel(MessageChannels.direct()) .get(); }