Add route(), recipientListRoute() EIP-methods
This commit is contained in:
@@ -0,0 +1,56 @@
|
||||
/*
|
||||
* Copyright 2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.dsl;
|
||||
|
||||
import org.springframework.integration.dsl.core.IntegrationComponentSpec;
|
||||
import org.springframework.integration.router.AbstractMessageRouter;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
|
||||
/**
|
||||
* @author Artem Bilan
|
||||
*/
|
||||
public class AbstractRouterSpec<S extends AbstractRouterSpec<S, R>, R extends AbstractMessageRouter> extends IntegrationComponentSpec<S, R> {
|
||||
|
||||
AbstractRouterSpec(R router) {
|
||||
this.target = router;
|
||||
}
|
||||
|
||||
public S defaultOutputChannel(MessageChannel defaultOutputChannel) {
|
||||
this.target.setDefaultOutputChannel(defaultOutputChannel);
|
||||
return _this();
|
||||
}
|
||||
|
||||
public S defaultOutputChannel(String defaultOutputChannel) {
|
||||
this.target.setDefaultOutputChannelName(defaultOutputChannel);
|
||||
return _this();
|
||||
}
|
||||
|
||||
public S ignoreSendFailures(boolean ignoreSendFailures) {
|
||||
this.target.setIgnoreSendFailures(ignoreSendFailures);
|
||||
return _this();
|
||||
}
|
||||
|
||||
public S applySequence(boolean applySequence) {
|
||||
this.target.setApplySequence(applySequence);
|
||||
return _this();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected R doGet() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,88 @@
|
||||
/*
|
||||
* Copyright 2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.dsl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.integration.core.MessageSelector;
|
||||
import org.springframework.integration.filter.ExpressionEvaluatingSelector;
|
||||
import org.springframework.integration.router.RecipientListRouter;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.core.DestinationResolutionException;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* @author Artem Bilan
|
||||
*/
|
||||
class DslRecipientListRouter extends RecipientListRouter {
|
||||
|
||||
private final Map<String, String> expressionRecipientMap = new HashMap<String, String>();
|
||||
|
||||
private final Map<String, MessageSelector> selectorRecipientMap = new HashMap<String, MessageSelector>();
|
||||
|
||||
void addRecipient(String channelName, String expression) {
|
||||
this.expressionRecipientMap.put(channelName, expression);
|
||||
}
|
||||
|
||||
void addRecipient(String channelName, MessageSelector selector) {
|
||||
this.selectorRecipientMap.put(channelName, selector);
|
||||
}
|
||||
|
||||
Map<String, Object> getRecipients() {
|
||||
Map<String, Object> recipients = new HashMap<String, Object>(this.expressionRecipientMap.size() + this.selectorRecipientMap.size());
|
||||
recipients.putAll(this.expressionRecipientMap);
|
||||
recipients.putAll(this.selectorRecipientMap);
|
||||
return recipients;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInit() {
|
||||
for (Map.Entry<String, String> recipient : this.expressionRecipientMap.entrySet()) {
|
||||
ExpressionEvaluatingSelector selector = null;
|
||||
String expression = recipient.getValue();
|
||||
if (StringUtils.hasText(expression)) {
|
||||
selector = new ExpressionEvaluatingSelector(expression);
|
||||
selector.setBeanFactory(this.getBeanFactory());
|
||||
}
|
||||
this.selectorRecipientMap.put(recipient.getKey(), selector);
|
||||
}
|
||||
|
||||
List<Recipient> recipients = new ArrayList<Recipient>(this.selectorRecipientMap.size());
|
||||
|
||||
for (Map.Entry<String, MessageSelector> entry : selectorRecipientMap.entrySet()) {
|
||||
recipients.add(new Recipient(this.resolveChannelName(entry.getKey()), entry.getValue()));
|
||||
}
|
||||
|
||||
this.setRecipients(recipients);
|
||||
super.onInit();
|
||||
}
|
||||
|
||||
private MessageChannel resolveChannelName(String channelName) {
|
||||
try {
|
||||
return this.getBeanFactory().getBean(channelName, MessageChannel.class);
|
||||
}
|
||||
catch (BeansException e) {
|
||||
throw new DestinationResolutionException("Failed to look up MessageChannel with name '"
|
||||
+ channelName + "' in the BeanFactory.");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -16,16 +16,9 @@
|
||||
|
||||
package org.springframework.integration.dsl;
|
||||
|
||||
import java.lang.annotation.Annotation;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.beans.factory.BeanCreationException;
|
||||
import org.springframework.context.Lifecycle;
|
||||
import org.springframework.core.annotation.AnnotationUtils;
|
||||
import org.springframework.expression.MethodFilter;
|
||||
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||
import org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler;
|
||||
import org.springframework.integration.aggregator.AggregatingMessageHandler;
|
||||
@@ -43,8 +36,10 @@ import org.springframework.integration.dsl.support.BeanNameMessageProcessor;
|
||||
import org.springframework.integration.dsl.support.ComponentConfigurer;
|
||||
import org.springframework.integration.dsl.support.EndpointConfigurer;
|
||||
import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype;
|
||||
import org.springframework.integration.dsl.support.GenericRouter;
|
||||
import org.springframework.integration.dsl.support.GenericSplitter;
|
||||
import org.springframework.integration.dsl.support.MessageChannelReference;
|
||||
import org.springframework.integration.expression.ControlBusMethodFilter;
|
||||
import org.springframework.integration.filter.ExpressionEvaluatingSelector;
|
||||
import org.springframework.integration.filter.MessageFilter;
|
||||
import org.springframework.integration.filter.MethodInvokingSelector;
|
||||
@@ -53,6 +48,11 @@ import org.springframework.integration.handler.BridgeHandler;
|
||||
import org.springframework.integration.handler.DelayHandler;
|
||||
import org.springframework.integration.handler.ExpressionCommandMessageProcessor;
|
||||
import org.springframework.integration.handler.ServiceActivatingHandler;
|
||||
import org.springframework.integration.router.AbstractMappingMessageRouter;
|
||||
import org.springframework.integration.router.AbstractMessageRouter;
|
||||
import org.springframework.integration.router.ExpressionEvaluatingRouter;
|
||||
import org.springframework.integration.router.MethodInvokingRouter;
|
||||
import org.springframework.integration.router.RecipientListRouter;
|
||||
import org.springframework.integration.splitter.AbstractMessageSplitter;
|
||||
import org.springframework.integration.splitter.DefaultMessageSplitter;
|
||||
import org.springframework.integration.splitter.ExpressionEvaluatingSplitter;
|
||||
@@ -65,13 +65,9 @@ import org.springframework.integration.transformer.HeaderFilter;
|
||||
import org.springframework.integration.transformer.MessageTransformingHandler;
|
||||
import org.springframework.integration.transformer.MethodInvokingTransformer;
|
||||
import org.springframework.integration.transformer.Transformer;
|
||||
import org.springframework.jmx.export.annotation.ManagedAttribute;
|
||||
import org.springframework.jmx.export.annotation.ManagedOperation;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CustomizableThreadCreator;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
@@ -297,7 +293,7 @@ public final class IntegrationFlowBuilder {
|
||||
* Provides the {@link HeaderFilter} to the current {@link IntegrationFlow}.
|
||||
*
|
||||
* @param headersToRemove the comma separated headers (or patterns) to remove from {@link org.springframework.messaging.MessageHeaders}.
|
||||
* @param patternMatch the {@code boolean} flag to indicate if {@code headersToRemove} should be interpreted as patterns or direct header names.
|
||||
* @param patternMatch the {@code boolean} flag to indicate if {@code headersToRemove} should be interpreted as patterns or direct header names.
|
||||
* @return the {@link IntegrationFlowBuilder}.
|
||||
*/
|
||||
|
||||
@@ -354,7 +350,7 @@ public final class IntegrationFlowBuilder {
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder aggregate(ComponentConfigurer<AggregatorSpec> aggregatorConfigurer,
|
||||
EndpointConfigurer<GenericEndpointSpec<AggregatingMessageHandler>> endpointConfigurer) {
|
||||
EndpointConfigurer<GenericEndpointSpec<AggregatingMessageHandler>> endpointConfigurer) {
|
||||
Assert.notNull(aggregatorConfigurer);
|
||||
AggregatorSpec spec = new AggregatorSpec();
|
||||
aggregatorConfigurer.configure(spec);
|
||||
@@ -371,6 +367,88 @@ public final class IntegrationFlowBuilder {
|
||||
}
|
||||
|
||||
|
||||
public IntegrationFlowBuilder route(String beanName, String method) {
|
||||
return this.route(beanName, method, null);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder route(String beanName, String method,
|
||||
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer) {
|
||||
return this.route(beanName, method, routerConfigurer, null);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder route(String beanName, String method,
|
||||
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer,
|
||||
EndpointConfigurer<GenericEndpointSpec<MethodInvokingRouter>> endpointConfigurer) {
|
||||
return this.route(new MethodInvokingRouter(new BeanNameMessageProcessor<Object>(beanName, method)), routerConfigurer, endpointConfigurer);
|
||||
}
|
||||
|
||||
|
||||
public IntegrationFlowBuilder route(String expression) {
|
||||
return this.route(expression, (ComponentConfigurer<RouterSpec<ExpressionEvaluatingRouter>>) null);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder route(String expression, ComponentConfigurer<RouterSpec<ExpressionEvaluatingRouter>> routerConfigurer) {
|
||||
return this.route(expression, routerConfigurer, null);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder route(String expression,
|
||||
ComponentConfigurer<RouterSpec<ExpressionEvaluatingRouter>> routerConfigurer,
|
||||
EndpointConfigurer<GenericEndpointSpec<ExpressionEvaluatingRouter>> endpointConfigurer) {
|
||||
return this.route(new ExpressionEvaluatingRouter(PARSER.parseExpression(expression)), routerConfigurer, endpointConfigurer);
|
||||
}
|
||||
|
||||
public <S, T> IntegrationFlowBuilder route(GenericRouter<S, T> router) {
|
||||
return this.route(router, (ComponentConfigurer<RouterSpec<MethodInvokingRouter>>) null);
|
||||
}
|
||||
|
||||
public <S, T> IntegrationFlowBuilder route(GenericRouter<S, T> router, ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer) {
|
||||
return this.route(router, routerConfigurer, null);
|
||||
}
|
||||
|
||||
public <S, T> IntegrationFlowBuilder route(GenericRouter<S, T> router,
|
||||
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer,
|
||||
EndpointConfigurer<GenericEndpointSpec<MethodInvokingRouter>> endpointConfigurer) {
|
||||
return this.route(new MethodInvokingRouter(router), routerConfigurer, endpointConfigurer);
|
||||
}
|
||||
|
||||
public <R extends AbstractMappingMessageRouter> IntegrationFlowBuilder route(R router, ComponentConfigurer<RouterSpec<R>> routerConfigurer) {
|
||||
return this.route(router, routerConfigurer, null);
|
||||
}
|
||||
|
||||
public <R extends AbstractMappingMessageRouter> IntegrationFlowBuilder route(R router,
|
||||
ComponentConfigurer<RouterSpec<R>> routerConfigurer,
|
||||
EndpointConfigurer<GenericEndpointSpec<R>> endpointConfigurer) {
|
||||
if (routerConfigurer != null) {
|
||||
RouterSpec<R> routerSpec = new RouterSpec<R>(router);
|
||||
routerConfigurer.configure(routerSpec);
|
||||
}
|
||||
return this.route(router, endpointConfigurer);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder recipientListRoute(ComponentConfigurer<RecipientListRouterSpec> routerConfigurer) {
|
||||
return this.recipientListRoute(routerConfigurer, null);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder recipientListRoute(ComponentConfigurer<RecipientListRouterSpec> routerConfigurer,
|
||||
EndpointConfigurer<GenericEndpointSpec<RecipientListRouter>> endpointConfigurer) {
|
||||
Assert.notNull(routerConfigurer);
|
||||
RecipientListRouterSpec spec = new RecipientListRouterSpec();
|
||||
routerConfigurer.configure(spec);
|
||||
DslRecipientListRouter recipientListRouter = (DslRecipientListRouter) spec.get();
|
||||
Assert.notEmpty(recipientListRouter.getRecipients(), "recipient list must not be empty");
|
||||
return this.route(recipientListRouter, endpointConfigurer);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder route(AbstractMessageRouter router) {
|
||||
return this.route(router, null);
|
||||
}
|
||||
|
||||
public <R extends AbstractMessageRouter> IntegrationFlowBuilder route(R router,
|
||||
EndpointConfigurer<GenericEndpointSpec<R>> endpointConfigurer) {
|
||||
return this.handle(router, endpointConfigurer);
|
||||
}
|
||||
|
||||
|
||||
private <S extends ConsumerEndpointSpec<S, ?>> IntegrationFlowBuilder register(S endpointSpec, EndpointConfigurer<S> endpointConfigurer) {
|
||||
if (endpointConfigurer != null) {
|
||||
endpointConfigurer.configure(endpointSpec);
|
||||
@@ -454,40 +532,4 @@ public final class IntegrationFlowBuilder {
|
||||
return this.flow;
|
||||
}
|
||||
|
||||
|
||||
private static class ControlBusMethodFilter implements MethodFilter {
|
||||
|
||||
public List<Method> filter(List<Method> methods) {
|
||||
List<Method> supportedMethods = new ArrayList<Method>();
|
||||
for (Method method : methods) {
|
||||
if (this.accept(method)) {
|
||||
supportedMethods.add(method);
|
||||
}
|
||||
}
|
||||
return supportedMethods;
|
||||
}
|
||||
|
||||
private boolean accept(Method method) {
|
||||
Class<?> declaringClass = method.getDeclaringClass();
|
||||
if (Lifecycle.class.isAssignableFrom(declaringClass)
|
||||
&& ReflectionUtils.findMethod(Lifecycle.class, method.getName(), method.getParameterTypes()) != null) {
|
||||
return true;
|
||||
}
|
||||
if (CustomizableThreadCreator.class.isAssignableFrom(declaringClass)
|
||||
&& (method.getName().startsWith("get")
|
||||
|| method.getName().startsWith("set")
|
||||
|| method.getName().startsWith("shutdown"))) {
|
||||
return true;
|
||||
}
|
||||
if (this.hasAnnotation(method, ManagedAttribute.class) || this.hasAnnotation(method, ManagedOperation.class)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean hasAnnotation(Method method, Class<? extends Annotation> annotationType) {
|
||||
return AnnotationUtils.findAnnotation(method, annotationType) != null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
/*
|
||||
* Copyright 2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.dsl;
|
||||
|
||||
import org.springframework.integration.core.MessageSelector;
|
||||
import org.springframework.integration.router.RecipientListRouter;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* @author Artem Bilan
|
||||
*/
|
||||
public class RecipientListRouterSpec extends AbstractRouterSpec<RecipientListRouterSpec, RecipientListRouter> {
|
||||
|
||||
RecipientListRouterSpec() {
|
||||
super(new DslRecipientListRouter());
|
||||
}
|
||||
|
||||
public RecipientListRouterSpec recipient(String channelName, String expression) {
|
||||
Assert.hasText(channelName);
|
||||
((DslRecipientListRouter) this.target).addRecipient(channelName, expression);
|
||||
return _this();
|
||||
}
|
||||
|
||||
public RecipientListRouterSpec recipient(String channelName, MessageSelector selector) {
|
||||
Assert.hasText(channelName);
|
||||
((DslRecipientListRouter) this.target).addRecipient(channelName, selector);
|
||||
return _this();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
/*
|
||||
* Copyright 2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.dsl;
|
||||
|
||||
import org.springframework.integration.router.AbstractMappingMessageRouter;
|
||||
|
||||
/**
|
||||
* @author Artem Bilan
|
||||
*/
|
||||
public final class RouterSpec<R extends AbstractMappingMessageRouter> extends AbstractRouterSpec<RouterSpec<R>, R> {
|
||||
|
||||
RouterSpec(R router) {
|
||||
super(router);
|
||||
}
|
||||
|
||||
public RouterSpec<R> resolutionRequired(boolean resolutionRequired) {
|
||||
this.target.setResolutionRequired(resolutionRequired);
|
||||
return _this();
|
||||
}
|
||||
|
||||
public RouterSpec<R> prefix(String prefix) {
|
||||
this.target.setPrefix(prefix);
|
||||
return _this();
|
||||
}
|
||||
|
||||
public RouterSpec<R> suffix(String suffix) {
|
||||
this.target.setSuffix(suffix);
|
||||
return _this();
|
||||
}
|
||||
|
||||
public RouterSpec<R> channelMapping(String key, String channelName) {
|
||||
this.target.setChannelMapping(key, channelName);
|
||||
return _this();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
/*
|
||||
* Copyright 2014 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.dsl.support;
|
||||
|
||||
/**
|
||||
* @author Artem Bilan
|
||||
*/
|
||||
public interface GenericRouter<S, T> {
|
||||
|
||||
T route(S source);
|
||||
|
||||
}
|
||||
@@ -56,7 +56,10 @@ import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.integration.IntegrationMessageHeaderAccessor;
|
||||
import org.springframework.integration.MessageDispatchingException;
|
||||
import org.springframework.integration.annotation.Header;
|
||||
import org.springframework.integration.annotation.IntegrationComponentScan;
|
||||
import org.springframework.integration.annotation.MessageEndpoint;
|
||||
import org.springframework.integration.annotation.MessagingGateway;
|
||||
import org.springframework.integration.annotation.ServiceActivator;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.FixedSubscriberChannel;
|
||||
@@ -79,6 +82,7 @@ import org.springframework.integration.file.DefaultFileNameGenerator;
|
||||
import org.springframework.integration.file.FileHeaders;
|
||||
import org.springframework.integration.file.FileWritingMessageHandler;
|
||||
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
|
||||
import org.springframework.integration.router.MethodInvokingRouter;
|
||||
import org.springframework.integration.scheduling.PollerMetadata;
|
||||
import org.springframework.integration.splitter.DefaultMessageSplitter;
|
||||
import org.springframework.integration.store.SimpleMessageStore;
|
||||
@@ -91,7 +95,9 @@ import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageDeliveryException;
|
||||
import org.springframework.messaging.MessageHandlingException;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.PollableChannel;
|
||||
import org.springframework.messaging.core.DestinationResolutionException;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
@@ -110,8 +116,7 @@ public class IntegrationFlowTests {
|
||||
private ListableBeanFactory beanFactory;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("controlBus")
|
||||
private MessageChannel controlBus;
|
||||
private ControlBusGateway controlBus;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("flow1QueueChannel")
|
||||
@@ -184,6 +189,49 @@ public class IntegrationFlowTests {
|
||||
@Qualifier("splitAggregateInput")
|
||||
private MessageChannel splitAggregateInput;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("routerInput")
|
||||
private MessageChannel routerInput;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("oddChannel")
|
||||
private PollableChannel oddChannel;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("evenChannel")
|
||||
private PollableChannel evenChannel;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("routerMethodInput")
|
||||
private MessageChannel routerMethodInput;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("foo-channel")
|
||||
private PollableChannel fooChannel;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("bar-channel")
|
||||
private PollableChannel barChannel;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("routerMethod2Input")
|
||||
private MessageChannel routerMethod2Input;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("routerMethod3Input")
|
||||
private MessageChannel routerMethod3Input;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("routerMultiInput")
|
||||
private MessageChannel routerMultiInput;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("recipientListInput")
|
||||
private MessageChannel recipientListInput;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("defaultOutputChannel")
|
||||
private QueueChannel defaultOutputChannel;
|
||||
|
||||
@Test
|
||||
public void testPollingFlow() {
|
||||
@@ -210,7 +258,7 @@ public class IntegrationFlowTests {
|
||||
assertThat(e.getCause(), Matchers.instanceOf(MessageDispatchingException.class));
|
||||
assertThat(e.getMessage(), Matchers.containsString("Dispatcher has no subscribers"));
|
||||
}
|
||||
this.controlBus.send(new GenericMessage<Object>("@payloadSerializingTransformer.start()"));
|
||||
this.controlBus.send("@payloadSerializingTransformer.start()");
|
||||
|
||||
final AtomicBoolean used = new AtomicBoolean();
|
||||
|
||||
@@ -256,7 +304,7 @@ public class IntegrationFlowTests {
|
||||
assertThat(e.getCause(), Matchers.instanceOf(MessageDispatchingException.class));
|
||||
assertThat(e.getMessage(), Matchers.containsString("Dispatcher has no subscribers"));
|
||||
}
|
||||
this.controlBus.send(new GenericMessage<Object>("@bridge.start()"));
|
||||
this.controlBus.send("@bridge.start()");
|
||||
this.bridgeFlow2Input.send(message);
|
||||
reply = this.bridgeFlow2Output.receive(5000);
|
||||
assertNotNull(reply);
|
||||
@@ -399,7 +447,7 @@ public class IntegrationFlowTests {
|
||||
assertThat(e.getMessage(), Matchers.containsString("Dispatcher has no subscribers"));
|
||||
}
|
||||
|
||||
this.controlBus.send(new GenericMessage<Object>("@xpathHeaderEnricher.start()"));
|
||||
this.controlBus.send("@xpathHeaderEnricher.start()");
|
||||
this.xpathHeaderEnricherInput.send(message);
|
||||
|
||||
Message<?> result = replyChannel.receive(2000);
|
||||
@@ -409,8 +457,188 @@ public class IntegrationFlowTests {
|
||||
assertEquals("2", headers.get("two"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRouter() {
|
||||
|
||||
int[] payloads = new int[]{1, 2, 3, 4, 5, 6};
|
||||
|
||||
for (int payload : payloads) {
|
||||
this.routerInput.send(new GenericMessage<Integer>(payload));
|
||||
}
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
Message<?> receive = this.oddChannel.receive(2000);
|
||||
assertNotNull(receive);
|
||||
assertEquals(new Integer(i * 2 + 1), receive.getPayload());
|
||||
|
||||
receive = this.evenChannel.receive(2000);
|
||||
assertNotNull(receive);
|
||||
assertEquals(new Integer(i * 2 + 2), receive.getPayload());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMethodInvokingRouter() {
|
||||
Message<String> fooMessage = new GenericMessage<String>("foo");
|
||||
Message<String> barMessage = new GenericMessage<String>("bar");
|
||||
Message<String> badMessage = new GenericMessage<String>("bad");
|
||||
|
||||
this.routerMethodInput.send(fooMessage);
|
||||
|
||||
Message<?> result1a = this.fooChannel.receive(2000);
|
||||
assertNotNull(result1a);
|
||||
assertEquals("foo", result1a.getPayload());
|
||||
assertNull(this.barChannel.receive(0));
|
||||
|
||||
this.routerMethodInput.send(barMessage);
|
||||
assertNull(this.fooChannel.receive(0));
|
||||
Message<?> result2b = this.barChannel.receive(2000);
|
||||
assertNotNull(result2b);
|
||||
assertEquals("bar", result2b.getPayload());
|
||||
|
||||
try {
|
||||
this.routerMethodInput.send(badMessage);
|
||||
fail("MessageDeliveryException expected.");
|
||||
}
|
||||
catch (MessageDeliveryException e) {
|
||||
assertThat(e.getMessage(), Matchers.containsString("no channel resolved by router and no default output channel defined"));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMethodInvokingRouter2() {
|
||||
Message<String> fooMessage = MessageBuilder.withPayload("foo").setHeader("targetChannel", "foo").build();
|
||||
Message<String> barMessage = MessageBuilder.withPayload("bar").setHeader("targetChannel", "bar").build();
|
||||
Message<String> badMessage = MessageBuilder.withPayload("bad").setHeader("targetChannel", "bad").build();
|
||||
|
||||
this.routerMethod2Input.send(fooMessage);
|
||||
|
||||
Message<?> result1a = this.fooChannel.receive(2000);
|
||||
assertNotNull(result1a);
|
||||
assertEquals("foo", result1a.getPayload());
|
||||
assertNull(this.barChannel.receive(0));
|
||||
|
||||
this.routerMethod2Input.send(barMessage);
|
||||
assertNull(this.fooChannel.receive(0));
|
||||
Message<?> result2b = this.barChannel.receive(2000);
|
||||
assertNotNull(result2b);
|
||||
assertEquals("bar", result2b.getPayload());
|
||||
|
||||
try {
|
||||
this.routerMethod2Input.send(badMessage);
|
||||
fail("DestinationResolutionException expected.");
|
||||
}
|
||||
catch (MessagingException e) {
|
||||
assertThat(e.getCause(), Matchers.instanceOf(DestinationResolutionException.class));
|
||||
assertThat(e.getCause().getMessage(), Matchers.containsString("failed to look up MessageChannel with name 'bad-channel'"));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMethodInvokingRouter3() {
|
||||
Message<String> fooMessage = new GenericMessage<String>("foo");
|
||||
Message<String> barMessage = new GenericMessage<String>("bar");
|
||||
Message<String> badMessage = new GenericMessage<String>("bad");
|
||||
|
||||
this.routerMethod3Input.send(fooMessage);
|
||||
|
||||
Message<?> result1a = this.fooChannel.receive(2000);
|
||||
assertNotNull(result1a);
|
||||
assertEquals("foo", result1a.getPayload());
|
||||
assertNull(this.barChannel.receive(0));
|
||||
|
||||
this.routerMethod3Input.send(barMessage);
|
||||
assertNull(this.fooChannel.receive(0));
|
||||
Message<?> result2b = this.barChannel.receive(2000);
|
||||
assertNotNull(result2b);
|
||||
assertEquals("bar", result2b.getPayload());
|
||||
|
||||
try {
|
||||
this.routerMethod3Input.send(badMessage);
|
||||
fail("DestinationResolutionException expected.");
|
||||
}
|
||||
catch (MessagingException e) {
|
||||
assertThat(e.getCause(), Matchers.instanceOf(DestinationResolutionException.class));
|
||||
assertThat(e.getCause().getMessage(), Matchers.containsString("failed to look up MessageChannel with name 'bad-channel'"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiRouter() {
|
||||
|
||||
Message<String> fooMessage = new GenericMessage<String>("foo");
|
||||
Message<String> barMessage = new GenericMessage<String>("bar");
|
||||
Message<String> badMessage = new GenericMessage<String>("bad");
|
||||
|
||||
this.routerMultiInput.send(fooMessage);
|
||||
Message<?> result1a = this.fooChannel.receive(2000);
|
||||
assertNotNull(result1a);
|
||||
assertEquals("foo", result1a.getPayload());
|
||||
Message<?> result1b = this.barChannel.receive(2000);
|
||||
assertNotNull(result1b);
|
||||
assertEquals("foo", result1b.getPayload());
|
||||
|
||||
this.routerMultiInput.send(barMessage);
|
||||
Message<?> result2a = this.fooChannel.receive(2000);
|
||||
assertNotNull(result2a);
|
||||
assertEquals("bar", result2a.getPayload());
|
||||
Message<?> result2b = this.barChannel.receive(2000);
|
||||
assertNotNull(result2b);
|
||||
assertEquals("bar", result2b.getPayload());
|
||||
|
||||
try {
|
||||
this.routerMultiInput.send(badMessage);
|
||||
fail("MessageDeliveryException expected.");
|
||||
}
|
||||
catch (MessageDeliveryException e) {
|
||||
assertThat(e.getMessage(), Matchers.containsString("no channel resolved by router and no default output channel defined"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecipientListRouter() {
|
||||
|
||||
Message<String> fooMessage = MessageBuilder.withPayload("fooPayload").setHeader("recipient", true).build();
|
||||
Message<String> barMessage = MessageBuilder.withPayload("barPayload").setHeader("recipient", true).build();
|
||||
Message<String> badMessage = new GenericMessage<String>("badPayload");
|
||||
|
||||
this.recipientListInput.send(fooMessage);
|
||||
Message<?> result1a = this.fooChannel.receive(2000);
|
||||
assertNotNull(result1a);
|
||||
assertEquals("foo", result1a.getPayload());
|
||||
Message<?> result1b = this.barChannel.receive(2000);
|
||||
assertNotNull(result1b);
|
||||
assertEquals("foo", result1b.getPayload());
|
||||
|
||||
this.recipientListInput.send(barMessage);
|
||||
assertNull(this.fooChannel.receive(0));
|
||||
Message<?> result2b = this.barChannel.receive(2000);
|
||||
assertNotNull(result2b);
|
||||
assertEquals("bar", result2b.getPayload());
|
||||
|
||||
|
||||
this.recipientListInput.send(badMessage);
|
||||
assertNull(this.fooChannel.receive(0));
|
||||
assertNull(this.barChannel.receive(0));
|
||||
Message<?> result3c = this.defaultOutputChannel.receive(2000);
|
||||
assertNotNull(result3c);
|
||||
assertEquals("bad", result3c.getPayload());
|
||||
|
||||
}
|
||||
|
||||
|
||||
@MessagingGateway(defaultRequestChannel = "controlBus")
|
||||
private static interface ControlBusGateway {
|
||||
|
||||
void send(String command);
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@EnableIntegration
|
||||
@IntegrationComponentScan
|
||||
public static class ContextConfiguration {
|
||||
|
||||
@Bean
|
||||
@@ -552,6 +780,33 @@ public class IntegrationFlowTests {
|
||||
.get();
|
||||
}
|
||||
|
||||
@Bean(name = "foo-channel")
|
||||
public QueueChannel fooChannel() {
|
||||
return new QueueChannel();
|
||||
}
|
||||
|
||||
@Bean(name = "bar-channel")
|
||||
public QueueChannel barChannel() {
|
||||
return new QueueChannel();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public QueueChannel defaultOutputChannel() {
|
||||
return new QueueChannel();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IntegrationFlow recipientListFlow() {
|
||||
return IntegrationFlows.from("recipientListInput")
|
||||
.<String, String>transform(p -> p.replaceFirst("Payload", ""))
|
||||
.recipientListRoute(r ->
|
||||
r.defaultOutputChannel(defaultOutputChannel())
|
||||
.recipient("foo-channel", "'foo' == payload")
|
||||
.recipient("bar-channel", m ->
|
||||
m.getHeaders().containsKey("recipient") && (boolean) m.getHeaders().get("recipient"))
|
||||
)
|
||||
.get();
|
||||
}
|
||||
}
|
||||
|
||||
@Component("delayedAdvice")
|
||||
@@ -679,6 +934,83 @@ public class IntegrationFlowTests {
|
||||
.get();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public QueueChannel oddChannel() {
|
||||
return new QueueChannel();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public QueueChannel evenChannel() {
|
||||
return new QueueChannel();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IntegrationFlow routeFlow() {
|
||||
return IntegrationFlows.from("routerInput")
|
||||
.<Integer, Boolean>route(p -> p % 2 == 0,
|
||||
m -> m.suffix("Channel")
|
||||
.channelMapping("true", "even")
|
||||
.channelMapping("false", "odd")
|
||||
)
|
||||
.get();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public RoutingTestBean routingTestBean() {
|
||||
return new RoutingTestBean();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IntegrationFlow routeMethodInvocationFlow() {
|
||||
return IntegrationFlows.from("routerMethodInput")
|
||||
.route("routingTestBean", "routeMessage")
|
||||
.get();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IntegrationFlow routeMethodInvocationFlow2() {
|
||||
return IntegrationFlows.from("routerMethod2Input")
|
||||
.route(new MethodInvokingRouter(new RoutingTestBean(), "routeByHeader"))
|
||||
.get();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IntegrationFlow routeMethodInvocationFlow3() {
|
||||
return IntegrationFlows.from("routerMethod3Input")
|
||||
.route((String p) -> ContextConfiguration4.this.routingTestBean().routePayload(p))
|
||||
.get();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IntegrationFlow routeMultiMethodInvocationFlow() {
|
||||
return IntegrationFlows.from("routerMultiInput")
|
||||
.<String, String[]>route(p -> p.equals("foo") || p.equals("bar") ? new String[]{"foo", "bar"} : null,
|
||||
s -> s.suffix("-channel")
|
||||
)
|
||||
.get();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class RoutingTestBean {
|
||||
|
||||
public String routePayload(String name) {
|
||||
return name + "-channel";
|
||||
}
|
||||
|
||||
public String routeByHeader(@Header("targetChannel") String name) {
|
||||
return name + "-channel";
|
||||
}
|
||||
|
||||
public String routeMessage(Message<?> message) {
|
||||
if (message.getPayload().equals("foo")) {
|
||||
return "foo-channel";
|
||||
}
|
||||
else if (message.getPayload().equals("bar")) {
|
||||
return "bar-channel";
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Component("greetingService")
|
||||
@@ -695,7 +1027,7 @@ public class IntegrationFlowTests {
|
||||
@Bean
|
||||
public IntegrationFlow wrongLastComponent() {
|
||||
return IntegrationFlows.from(MessageChannels.direct())
|
||||
.handle(Object::toString)
|
||||
.route(Object::toString)
|
||||
.channel(MessageChannels.direct())
|
||||
.get();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user