DSL: Add support for IntegrationFlow Lambda

To simplify a bit the `direct` integration flow definition, the `IntegrationFlow` Lambda is introduced.

* Extract `IntegrationFlow` functional interface with `define(IntegrationFlowDefinition<?> flow)` method.
* Rename existing `IntegrationFlow` class to the `StandardIntegrationFlow` and move it to the inner class within `IntegrationFlowBuilder`
* Extract `IntegrationFlowDefinition` superclass for the `IntegrationFlowBuilder` to be used within `IntegrationFlow` Lambda
to hide `get()` method from end-users.
* Change `IntegrationFlowBeanPostProcessor` for a new logic around `IntegrationFlow` Lambda
* Rename `MessagingProducerSpec` to the `MessageProducerSpec`

Conflicts:
	spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java
This commit is contained in:
Artem Bilan
2014-08-26 11:05:31 +03:00
parent a5bb4e125f
commit f0bfe4656d
11 changed files with 791 additions and 746 deletions

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,26 +16,11 @@
package org.springframework.integration.dsl;
import java.util.LinkedHashSet;
import java.util.Set;
/**
* @author Artem Bilan
*/
public final class IntegrationFlow {
public interface IntegrationFlow {
private final Set<Object> integrationComponents = new LinkedHashSet<Object>();
IntegrationFlow() {
}
public Set<Object> getIntegrationComponents() {
return integrationComponents;
}
IntegrationFlow addComponent(Object component) {
this.integrationComponents.add(component);
return this;
}
void define(IntegrationFlowDefinition<?> flow);
}

View File

@@ -16,612 +16,17 @@
package org.springframework.integration.dsl;
import java.util.Collection;
import java.util.Set;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler;
import org.springframework.integration.aggregator.AggregatingMessageHandler;
import org.springframework.integration.aggregator.ResequencingMessageHandler;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean;
import org.springframework.integration.core.GenericSelector;
import org.springframework.integration.core.MessageSelector;
import org.springframework.integration.dsl.channel.MessageChannelSpec;
import org.springframework.integration.dsl.core.ConsumerEndpointSpec;
import org.springframework.integration.dsl.core.MessageHandlerSpec;
import org.springframework.integration.dsl.support.BeanNameMessageProcessor;
import org.springframework.integration.dsl.support.ComponentConfigurer;
import org.springframework.integration.dsl.support.EndpointConfigurer;
import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.dsl.support.GenericRouter;
import org.springframework.integration.dsl.support.GenericSplitter;
import org.springframework.integration.dsl.support.MessageChannelReference;
import org.springframework.integration.expression.ControlBusMethodFilter;
import org.springframework.integration.filter.ExpressionEvaluatingSelector;
import org.springframework.integration.filter.MessageFilter;
import org.springframework.integration.filter.MethodInvokingSelector;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.BridgeHandler;
import org.springframework.integration.handler.DelayHandler;
import org.springframework.integration.handler.ExpressionCommandMessageProcessor;
import org.springframework.integration.handler.ServiceActivatingHandler;
import org.springframework.integration.router.AbstractMappingMessageRouter;
import org.springframework.integration.router.AbstractMessageRouter;
import org.springframework.integration.router.ExpressionEvaluatingRouter;
import org.springframework.integration.router.MethodInvokingRouter;
import org.springframework.integration.router.RecipientListRouter;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.splitter.DefaultMessageSplitter;
import org.springframework.integration.splitter.ExpressionEvaluatingSplitter;
import org.springframework.integration.splitter.MethodInvokingSplitter;
import org.springframework.integration.store.MessageStore;
import org.springframework.integration.transformer.ClaimCheckInTransformer;
import org.springframework.integration.transformer.ClaimCheckOutTransformer;
import org.springframework.integration.transformer.ContentEnricher;
import org.springframework.integration.transformer.ExpressionEvaluatingTransformer;
import org.springframework.integration.transformer.GenericTransformer;
import org.springframework.integration.transformer.HeaderFilter;
import org.springframework.integration.transformer.MessageTransformingHandler;
import org.springframework.integration.transformer.MethodInvokingTransformer;
import org.springframework.integration.transformer.Transformer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author Artem Bilan
*/
public final class IntegrationFlowBuilder {
public final class IntegrationFlowBuilder extends IntegrationFlowDefinition<IntegrationFlowBuilder> {
private final static SpelExpressionParser PARSER = new SpelExpressionParser();
private final IntegrationFlow flow = new IntegrationFlow();
private MessageChannel currentMessageChannel;
private Object currentComponent;
IntegrationFlowBuilder() {
}
IntegrationFlowBuilder addComponent(Object component) {
this.flow.addComponent(component);
return this;
}
IntegrationFlowBuilder addComponents(Collection<Object> components) {
if (components != null) {
for (Object component : components) {
this.flow.addComponent(component);
}
}
return this;
}
IntegrationFlowBuilder currentComponent(Object component) {
this.currentComponent = component;
return this;
}
public IntegrationFlowBuilder fixedSubscriberChannel() {
return this.fixedSubscriberChannel(null);
}
public IntegrationFlowBuilder fixedSubscriberChannel(String messageChannelName) {
return this.channel(new FixedSubscriberChannelPrototype(messageChannelName));
}
public IntegrationFlowBuilder channel(String messageChannelName) {
return this.channel(new MessageChannelReference(messageChannelName));
}
public IntegrationFlowBuilder channel(MessageChannel messageChannel) {
Assert.notNull(messageChannel);
if (this.currentMessageChannel != null) {
this.register(new GenericEndpointSpec<BridgeHandler>(new BridgeHandler()), null);
}
this.currentMessageChannel = messageChannel;
return this.registerOutputChannelIfCan(this.currentMessageChannel);
}
public IntegrationFlowBuilder channel(MessageChannelSpec<?, ?> messageChannelSpec) {
Assert.notNull(messageChannelSpec);
return this.channel(messageChannelSpec.get());
}
public IntegrationFlowBuilder controlBus() {
return controlBus(null);
}
public IntegrationFlowBuilder controlBus(EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
return this.handle(new ServiceActivatingHandler(new ExpressionCommandMessageProcessor(new ControlBusMethodFilter())), endpointConfigurer);
}
public IntegrationFlowBuilder transform(String expression) {
Assert.hasText(expression);
return this.transform(new ExpressionEvaluatingTransformer(PARSER.parseExpression(expression)));
}
public <S, T> IntegrationFlowBuilder transform(GenericTransformer<S, T> genericTransformer) {
return this.transform(null, genericTransformer);
}
public <P, T> IntegrationFlowBuilder transform(Class<P> payloadType, GenericTransformer<P, T> genericTransformer) {
return this.transform(payloadType, genericTransformer, null);
}
public <S, T> IntegrationFlowBuilder transform(GenericTransformer<S, T> genericTransformer,
EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
return this.transform(null, genericTransformer, endpointConfigurer);
}
public <P, T> IntegrationFlowBuilder transform(Class<P> payloadType, GenericTransformer<P, T> genericTransformer,
EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
Assert.notNull(genericTransformer);
Transformer transformer = genericTransformer instanceof Transformer ? (Transformer) genericTransformer :
(isLambda(genericTransformer)
? new MethodInvokingTransformer(new LambdaMessageProcessor(genericTransformer, payloadType))
: new MethodInvokingTransformer(genericTransformer));
return addComponent(transformer)
.handle(new MessageTransformingHandler(transformer), endpointConfigurer);
}
public IntegrationFlowBuilder filter(String expression) {
Assert.hasText(expression);
return this.filter(new ExpressionEvaluatingSelector(PARSER.parseExpression(expression)));
}
public <S> IntegrationFlowBuilder filter(GenericSelector<S> genericSelector) {
return this.filter(null, genericSelector);
}
public <P> IntegrationFlowBuilder filter(Class<P> payloadType, GenericSelector<P> genericSelector) {
return this.filter(payloadType, genericSelector, null);
}
public <P> IntegrationFlowBuilder filter(GenericSelector<P> genericSelector,
EndpointConfigurer<FilterEndpointSpec> endpointConfigurer) {
return filter(null, genericSelector, endpointConfigurer);
}
public <P> IntegrationFlowBuilder filter(Class<P> payloadType, GenericSelector<P> genericSelector,
EndpointConfigurer<FilterEndpointSpec> endpointConfigurer) {
Assert.notNull(genericSelector);
MessageSelector selector = genericSelector instanceof MessageSelector ? (MessageSelector) genericSelector :
(isLambda(genericSelector)
? new MethodInvokingSelector(new LambdaMessageProcessor(genericSelector, payloadType))
: new MethodInvokingSelector(genericSelector));
return this.register(new FilterEndpointSpec(new MessageFilter(selector)), endpointConfigurer);
}
public <S extends MessageHandlerSpec<S, ? extends MessageHandler>> IntegrationFlowBuilder handle(S messageHandlerSpec) {
Assert.notNull(messageHandlerSpec);
return handle(messageHandlerSpec.get());
}
public IntegrationFlowBuilder handle(MessageHandler messageHandler) {
return this.handle(messageHandler, null);
}
public IntegrationFlowBuilder handle(String beanName, String methodName) {
return this.handle(beanName, methodName, null);
}
public IntegrationFlowBuilder handle(String beanName, String methodName,
EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
return this.handle(new ServiceActivatingHandler(new BeanNameMessageProcessor<Object>(beanName, methodName)),
endpointConfigurer);
}
public <P> IntegrationFlowBuilder handle(GenericHandler<P> handler) {
return this.handle(null, handler);
}
public <P> IntegrationFlowBuilder handle(GenericHandler<P> handler,
EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
return this.handle(null, handler, endpointConfigurer);
}
public <P> IntegrationFlowBuilder handle(Class<P> payloadType, GenericHandler<P> handler) {
return this.handle(payloadType, handler, null);
}
public <P> IntegrationFlowBuilder handle(Class<P> payloadType, GenericHandler<P> handler,
EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
ServiceActivatingHandler serviceActivatingHandler = null;
if (isLambda(handler)) {
serviceActivatingHandler = new ServiceActivatingHandler(new LambdaMessageProcessor(handler, payloadType));
}
else {
serviceActivatingHandler = new ServiceActivatingHandler(handler);
}
return this.handle(serviceActivatingHandler, endpointConfigurer);
}
public <H extends MessageHandler, S extends MessageHandlerSpec<S, H>>
IntegrationFlowBuilder handle(S messageHandlerSpec, EndpointConfigurer<GenericEndpointSpec<H>> endpointConfigurer) {
Assert.notNull(messageHandlerSpec);
return handle(messageHandlerSpec.get(), endpointConfigurer);
}
public <H extends MessageHandler> IntegrationFlowBuilder handle(H messageHandler,
EndpointConfigurer<GenericEndpointSpec<H>> endpointConfigurer) {
Assert.notNull(messageHandler);
return this.register(new GenericEndpointSpec<H>(messageHandler), endpointConfigurer);
}
public IntegrationFlowBuilder bridge(EndpointConfigurer<GenericEndpointSpec<BridgeHandler>> endpointConfigurer) {
return this.register(new GenericEndpointSpec<BridgeHandler>(new BridgeHandler()), endpointConfigurer);
}
public IntegrationFlowBuilder delay(String groupId, String expression) {
return this.delay(groupId, expression, null);
}
public IntegrationFlowBuilder delay(String groupId, String expression,
EndpointConfigurer<DelayerEndpointSpec> endpointConfigurer) {
DelayHandler delayHandler = new DelayHandler(groupId);
if (StringUtils.hasText(expression)) {
delayHandler.setDelayExpression(PARSER.parseExpression(expression));
}
return this.register(new DelayerEndpointSpec(delayHandler), endpointConfigurer);
}
public IntegrationFlowBuilder enrich(ComponentConfigurer<EnricherSpec> enricherConfigurer) {
return this.enrich(enricherConfigurer, null);
}
public IntegrationFlowBuilder enrich(ComponentConfigurer<EnricherSpec> enricherConfigurer,
EndpointConfigurer<GenericEndpointSpec<ContentEnricher>> endpointConfigurer) {
Assert.notNull(enricherConfigurer);
EnricherSpec enricherSpec = new EnricherSpec();
enricherConfigurer.configure(enricherSpec);
return this.handle(enricherSpec.get(), endpointConfigurer);
}
public IntegrationFlowBuilder enrichHeaders(ComponentConfigurer<HeaderEnricherSpec> headerEnricherConfigurer) {
return this.enrichHeaders(headerEnricherConfigurer, null);
}
public IntegrationFlowBuilder enrichHeaders(ComponentConfigurer<HeaderEnricherSpec> headerEnricherConfigurer,
EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
Assert.notNull(headerEnricherConfigurer);
HeaderEnricherSpec headerEnricherSpec = new HeaderEnricherSpec();
headerEnricherConfigurer.configure(headerEnricherSpec);
return transform(headerEnricherSpec.get(), endpointConfigurer);
}
public IntegrationFlowBuilder split(EndpointConfigurer<SplitterEndpointSpec<DefaultMessageSplitter>> endpointConfigurer) {
return this.split(new DefaultMessageSplitter(), endpointConfigurer);
}
public IntegrationFlowBuilder split(String expression,
EndpointConfigurer<SplitterEndpointSpec<ExpressionEvaluatingSplitter>> endpointConfigurer) {
return this.split(new ExpressionEvaluatingSplitter(PARSER.parseExpression(expression)), endpointConfigurer);
}
public IntegrationFlowBuilder split(String beanName, String methodName) {
return this.split(beanName, methodName, null);
}
public IntegrationFlowBuilder split(String beanName, String methodName,
EndpointConfigurer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
return this.split(new MethodInvokingSplitter(new BeanNameMessageProcessor<Collection<?>>(beanName, methodName)),
endpointConfigurer);
}
public <P> IntegrationFlowBuilder split(Class<P> payloadType, GenericSplitter<P> splitter) {
return this.split(payloadType, splitter, null);
}
public <T> IntegrationFlowBuilder split(GenericSplitter<T> splitter,
EndpointConfigurer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
return split(null, splitter, endpointConfigurer);
}
public <P> IntegrationFlowBuilder split(Class<P> payloadType, GenericSplitter<P> splitter,
EndpointConfigurer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
MethodInvokingSplitter split = isLambda(splitter)
? new MethodInvokingSplitter(new LambdaMessageProcessor(splitter, payloadType))
: new MethodInvokingSplitter(splitter, "split");
return this.split(split, endpointConfigurer);
}
public <S extends AbstractMessageSplitter> IntegrationFlowBuilder split(S splitter,
EndpointConfigurer<SplitterEndpointSpec<S>> endpointConfigurer) {
Assert.notNull(splitter);
return this.register(new SplitterEndpointSpec<S>(splitter), endpointConfigurer);
}
/**
* Provides the {@link HeaderFilter} to the current {@link IntegrationFlow}.
* @param headersToRemove the array of headers (or patterns)
* to remove from {@link org.springframework.messaging.MessageHeaders}.
* @return the {@link IntegrationFlowBuilder}.
*/
public IntegrationFlowBuilder headerFilter(String... headersToRemove) {
return this.headerFilter(new HeaderFilter(headersToRemove), null);
}
/**
* Provides the {@link HeaderFilter} to the current {@link IntegrationFlow}.
* @param headersToRemove the comma separated headers (or patterns) to remove from
* {@link org.springframework.messaging.MessageHeaders}.
* @param patternMatch the {@code boolean} flag to indicate if {@code headersToRemove}
* should be interpreted as patterns or direct header names.
* @return the {@link IntegrationFlowBuilder}.
*/
public IntegrationFlowBuilder headerFilter(String headersToRemove, boolean patternMatch) {
HeaderFilter headerFilter = new HeaderFilter(StringUtils.delimitedListToStringArray(headersToRemove, ",", " "));
headerFilter.setPatternMatch(patternMatch);
return this.headerFilter(headerFilter, null);
}
public IntegrationFlowBuilder headerFilter(HeaderFilter headerFilter,
EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
return this.transform(headerFilter, endpointConfigurer);
}
public IntegrationFlowBuilder claimCheckIn(MessageStore messageStore) {
return this.claimCheckIn(messageStore, null);
}
public IntegrationFlowBuilder claimCheckIn(MessageStore messageStore,
EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
return this.transform(new ClaimCheckInTransformer(messageStore), endpointConfigurer);
}
public IntegrationFlowBuilder claimCheckOut(MessageStore messageStore) {
return this.claimCheckOut(messageStore, false);
}
public IntegrationFlowBuilder claimCheckOut(MessageStore messageStore, boolean removeMessage) {
return this.claimCheckOut(messageStore, removeMessage, null);
}
public IntegrationFlowBuilder claimCheckOut(MessageStore messageStore, boolean removeMessage,
EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
ClaimCheckOutTransformer claimCheckOutTransformer = new ClaimCheckOutTransformer(messageStore);
claimCheckOutTransformer.setRemoveMessage(removeMessage);
return this.transform(claimCheckOutTransformer, endpointConfigurer);
}
public IntegrationFlowBuilder resequence() {
return this.resequence((EndpointConfigurer<GenericEndpointSpec<ResequencingMessageHandler>>) null);
}
public IntegrationFlowBuilder resequence(EndpointConfigurer<GenericEndpointSpec<ResequencingMessageHandler>> endpointConfigurer) {
return this.handle(new ResequencerSpec().get(), endpointConfigurer);
}
public IntegrationFlowBuilder resequence(ComponentConfigurer<ResequencerSpec> resequencerConfigurer,
EndpointConfigurer<GenericEndpointSpec<ResequencingMessageHandler>> endpointConfigurer) {
Assert.notNull(resequencerConfigurer);
ResequencerSpec spec = new ResequencerSpec();
resequencerConfigurer.configure(spec);
return this.handle(spec.get(), endpointConfigurer);
}
public IntegrationFlowBuilder aggregate() {
return aggregate((EndpointConfigurer<GenericEndpointSpec<AggregatingMessageHandler>>) null);
}
public IntegrationFlowBuilder
aggregate(EndpointConfigurer<GenericEndpointSpec<AggregatingMessageHandler>> endpointConfigurer) {
return handle(new AggregatorSpec().get(), endpointConfigurer);
}
public IntegrationFlowBuilder aggregate(ComponentConfigurer<AggregatorSpec> aggregatorConfigurer,
EndpointConfigurer<GenericEndpointSpec<AggregatingMessageHandler>> endpointConfigurer) {
Assert.notNull(aggregatorConfigurer);
AggregatorSpec spec = new AggregatorSpec();
aggregatorConfigurer.configure(spec);
return this.handle(spec.get(), endpointConfigurer);
}
public IntegrationFlowBuilder route(String beanName, String method) {
return this.route(beanName, method, null);
}
public IntegrationFlowBuilder route(String beanName, String method,
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer) {
return this.route(beanName, method, routerConfigurer, null);
}
public IntegrationFlowBuilder route(String beanName, String method,
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer,
EndpointConfigurer<GenericEndpointSpec<MethodInvokingRouter>> endpointConfigurer) {
return this.route(new MethodInvokingRouter(new BeanNameMessageProcessor<Object>(beanName, method)),
routerConfigurer, endpointConfigurer);
}
public IntegrationFlowBuilder route(String expression) {
return this.route(expression, (ComponentConfigurer<RouterSpec<ExpressionEvaluatingRouter>>) null);
}
public IntegrationFlowBuilder route(String expression,
ComponentConfigurer<RouterSpec<ExpressionEvaluatingRouter>> routerConfigurer) {
return this.route(expression, routerConfigurer, null);
}
public IntegrationFlowBuilder route(String expression,
ComponentConfigurer<RouterSpec<ExpressionEvaluatingRouter>> routerConfigurer,
EndpointConfigurer<GenericEndpointSpec<ExpressionEvaluatingRouter>> endpointConfigurer) {
return this.route(new ExpressionEvaluatingRouter(PARSER.parseExpression(expression)), routerConfigurer,
endpointConfigurer);
}
public <S, T> IntegrationFlowBuilder route(GenericRouter<S, T> router) {
return this.route(null, router);
}
public <S, T> IntegrationFlowBuilder route(GenericRouter<S, T> router,
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer) {
return this.route(null, router, routerConfigurer);
}
public <P, T> IntegrationFlowBuilder route(Class<P> payloadType, GenericRouter<P, T> router) {
return this.route(payloadType, router, null, null);
}
public <P, T> IntegrationFlowBuilder route(Class<P> payloadType, GenericRouter<P, T> router,
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer) {
return this.route(payloadType, router, routerConfigurer, null);
}
public <S, T> IntegrationFlowBuilder route(GenericRouter<S, T> router,
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer,
EndpointConfigurer<GenericEndpointSpec<MethodInvokingRouter>> endpointConfigurer) {
return route(null, router, routerConfigurer, endpointConfigurer);
}
public <P, T> IntegrationFlowBuilder route(Class<P> payloadType, GenericRouter<P, T> router,
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer,
EndpointConfigurer<GenericEndpointSpec<MethodInvokingRouter>> endpointConfigurer) {
MethodInvokingRouter methodInvokingRouter = isLambda(router)
? new MethodInvokingRouter(new LambdaMessageProcessor(router, payloadType))
: new MethodInvokingRouter(router);
return this.route(methodInvokingRouter, routerConfigurer, endpointConfigurer);
}
public <R extends AbstractMappingMessageRouter> IntegrationFlowBuilder route(R router,
ComponentConfigurer<RouterSpec<R>> routerConfigurer,
EndpointConfigurer<GenericEndpointSpec<R>> endpointConfigurer) {
if (routerConfigurer != null) {
RouterSpec<R> routerSpec = new RouterSpec<R>(router);
routerConfigurer.configure(routerSpec);
}
return this.route(router, endpointConfigurer);
}
public IntegrationFlowBuilder recipientListRoute(ComponentConfigurer<RecipientListRouterSpec> routerConfigurer) {
return this.recipientListRoute(routerConfigurer, null);
}
public IntegrationFlowBuilder recipientListRoute(ComponentConfigurer<RecipientListRouterSpec> routerConfigurer,
EndpointConfigurer<GenericEndpointSpec<RecipientListRouter>> endpointConfigurer) {
Assert.notNull(routerConfigurer);
RecipientListRouterSpec spec = new RecipientListRouterSpec();
routerConfigurer.configure(spec);
DslRecipientListRouter recipientListRouter = (DslRecipientListRouter) spec.get();
Assert.notEmpty(recipientListRouter.getRecipients(), "recipient list must not be empty");
return this.route(recipientListRouter, endpointConfigurer);
}
public IntegrationFlowBuilder route(AbstractMessageRouter router) {
return this.route(router, null);
}
public <R extends AbstractMessageRouter> IntegrationFlowBuilder route(R router,
EndpointConfigurer<GenericEndpointSpec<R>> endpointConfigurer) {
return this.handle(router, endpointConfigurer);
}
public IntegrationFlowBuilder gateway(String requestChannel) {
return gateway(requestChannel, null);
}
public IntegrationFlowBuilder gateway(String requestChannel,
EndpointConfigurer<GatewayEndpointSpec> endpointConfigurer) {
return register(new GatewayEndpointSpec(requestChannel), endpointConfigurer);
}
public IntegrationFlowBuilder gateway(MessageChannel requestChannel) {
return gateway(requestChannel, null);
}
public IntegrationFlowBuilder gateway(MessageChannel requestChannel,
EndpointConfigurer<GatewayEndpointSpec> endpointConfigurer) {
return register(new GatewayEndpointSpec(requestChannel), endpointConfigurer);
}
private <S extends ConsumerEndpointSpec<S, ?>> IntegrationFlowBuilder register(S endpointSpec,
EndpointConfigurer<S> endpointConfigurer) {
if (endpointConfigurer != null) {
endpointConfigurer.configure(endpointSpec);
}
MessageChannel inputChannel = this.currentMessageChannel;
this.currentMessageChannel = null;
if (inputChannel == null) {
inputChannel = new DirectChannel();
this.registerOutputChannelIfCan(inputChannel);
}
if (inputChannel instanceof MessageChannelReference) {
endpointSpec.get().getT1().setInputChannelName(((MessageChannelReference) inputChannel).getName());
}
else {
if (inputChannel instanceof FixedSubscriberChannelPrototype) {
String beanName = ((FixedSubscriberChannelPrototype) inputChannel).getName();
inputChannel = new FixedSubscriberChannel(endpointSpec.get().getT2());
if (beanName != null) {
((FixedSubscriberChannel) inputChannel).setBeanName(beanName);
}
this.registerOutputChannelIfCan(inputChannel);
}
endpointSpec.get().getT1().setInputChannel(inputChannel);
}
return this.addComponent(endpointSpec).currentComponent(endpointSpec.get().getT2());
}
private IntegrationFlowBuilder registerOutputChannelIfCan(MessageChannel outputChannel) {
if (!(outputChannel instanceof FixedSubscriberChannelPrototype)) {
this.flow.addComponent(outputChannel);
if (this.currentComponent != null) {
String channelName = null;
if (outputChannel instanceof MessageChannelReference) {
channelName = ((MessageChannelReference) outputChannel).getName();
}
if (this.currentComponent instanceof AbstractReplyProducingMessageHandler) {
AbstractReplyProducingMessageHandler messageProducer =
(AbstractReplyProducingMessageHandler) this.currentComponent;
if (channelName != null) {
messageProducer.setOutputChannelName(channelName);
}
else {
messageProducer.setOutputChannel(outputChannel);
}
}
else if (this.currentComponent instanceof SourcePollingChannelAdapterSpec) {
SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean =
((SourcePollingChannelAdapterSpec) this.currentComponent).get().getT1();
if (channelName != null) {
pollingChannelAdapterFactoryBean.setOutputChannelName(channelName);
}
else {
pollingChannelAdapterFactoryBean.setOutputChannel(outputChannel);
}
}
else if (this.currentComponent instanceof AbstractCorrelatingMessageHandler) {
AbstractCorrelatingMessageHandler messageProducer =
(AbstractCorrelatingMessageHandler) this.currentComponent;
if (channelName != null) {
messageProducer.setOutputChannelName(channelName);
}
else {
messageProducer.setOutputChannel(outputChannel);
}
}
else {
throw new BeanCreationException("The 'currentComponent' (" + this.currentComponent +
") is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. " +
"This is the end of the integration flow.");
}
this.currentComponent = null;
}
}
return this;
}
public IntegrationFlow get() {
public StandardIntegrationFlow get() {
if (this.currentMessageChannel instanceof FixedSubscriberChannelPrototype) {
throw new BeanCreationException("The 'currentMessageChannel' (" + this.currentMessageChannel +
") is a prototype for FixedSubscriberChannel which can't be created without MessageHandler " +
@@ -629,7 +34,7 @@ public final class IntegrationFlowBuilder {
"in the IntegrationFlow definition.");
}
if (this.flow.getIntegrationComponents().size() == 1) {
if (this.integrationComponents.size() == 1) {
if (this.currentComponent != null) {
if (this.currentComponent instanceof SourcePollingChannelAdapterSpec) {
throw new BeanCreationException("The 'SourcePollingChannelAdapter' (" + this.currentComponent + ") " +
@@ -641,12 +46,26 @@ public final class IntegrationFlowBuilder {
"Add at lest '.bridge()' EIP-method before the end of flow.");
}
}
return this.flow;
return new StandardIntegrationFlow(this.integrationComponents);
}
private static boolean isLambda(Object o) {
Class<?> aClass = o.getClass();
return aClass.isSynthetic() && !aClass.isAnonymousClass() && !aClass.isLocalClass();
public static final class StandardIntegrationFlow implements IntegrationFlow {
private final Set<Object> integrationComponents;
StandardIntegrationFlow(Set<Object> integrationComponents) {
this.integrationComponents = integrationComponents;
}
public Set<Object> getIntegrationComponents() {
return integrationComponents;
}
@Override
public void define(IntegrationFlowDefinition<?> flow) {
throw new UnsupportedOperationException();
}
}
}

View File

@@ -0,0 +1,632 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.dsl;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Set;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler;
import org.springframework.integration.aggregator.AggregatingMessageHandler;
import org.springframework.integration.aggregator.ResequencingMessageHandler;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean;
import org.springframework.integration.core.GenericSelector;
import org.springframework.integration.core.MessageSelector;
import org.springframework.integration.dsl.channel.MessageChannelSpec;
import org.springframework.integration.dsl.core.ConsumerEndpointSpec;
import org.springframework.integration.dsl.core.MessageHandlerSpec;
import org.springframework.integration.dsl.support.BeanNameMessageProcessor;
import org.springframework.integration.dsl.support.ComponentConfigurer;
import org.springframework.integration.dsl.support.EndpointConfigurer;
import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.dsl.support.GenericRouter;
import org.springframework.integration.dsl.support.GenericSplitter;
import org.springframework.integration.dsl.support.MessageChannelReference;
import org.springframework.integration.expression.ControlBusMethodFilter;
import org.springframework.integration.filter.ExpressionEvaluatingSelector;
import org.springframework.integration.filter.MessageFilter;
import org.springframework.integration.filter.MethodInvokingSelector;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.BridgeHandler;
import org.springframework.integration.handler.DelayHandler;
import org.springframework.integration.handler.ExpressionCommandMessageProcessor;
import org.springframework.integration.handler.ServiceActivatingHandler;
import org.springframework.integration.router.AbstractMappingMessageRouter;
import org.springframework.integration.router.AbstractMessageRouter;
import org.springframework.integration.router.ExpressionEvaluatingRouter;
import org.springframework.integration.router.MethodInvokingRouter;
import org.springframework.integration.router.RecipientListRouter;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.splitter.DefaultMessageSplitter;
import org.springframework.integration.splitter.ExpressionEvaluatingSplitter;
import org.springframework.integration.splitter.MethodInvokingSplitter;
import org.springframework.integration.store.MessageStore;
import org.springframework.integration.transformer.ClaimCheckInTransformer;
import org.springframework.integration.transformer.ClaimCheckOutTransformer;
import org.springframework.integration.transformer.ContentEnricher;
import org.springframework.integration.transformer.ExpressionEvaluatingTransformer;
import org.springframework.integration.transformer.GenericTransformer;
import org.springframework.integration.transformer.HeaderFilter;
import org.springframework.integration.transformer.MessageTransformingHandler;
import org.springframework.integration.transformer.MethodInvokingTransformer;
import org.springframework.integration.transformer.Transformer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author Artem Bilan
*/
public abstract class IntegrationFlowDefinition<B extends IntegrationFlowDefinition<B>> {
private final static SpelExpressionParser PARSER = new SpelExpressionParser();
protected final Set<Object> integrationComponents = new LinkedHashSet<Object>();
protected MessageChannel currentMessageChannel;
protected Object currentComponent;
IntegrationFlowDefinition() {
}
B addComponent(Object component) {
this.integrationComponents.add(component);
return _this();
}
B addComponents(Collection<Object> components) {
for (Object component : components) {
this.integrationComponents.add(component);
}
return _this();
}
B currentComponent(Object component) {
this.currentComponent = component;
return _this();
}
public B fixedSubscriberChannel() {
return this.fixedSubscriberChannel(null);
}
public B fixedSubscriberChannel(String messageChannelName) {
return this.channel(new FixedSubscriberChannelPrototype(messageChannelName));
}
public B channel(String messageChannelName) {
return this.channel(new MessageChannelReference(messageChannelName));
}
public B channel(MessageChannel messageChannel) {
Assert.notNull(messageChannel);
if (this.currentMessageChannel != null) {
this.register(new GenericEndpointSpec<BridgeHandler>(new BridgeHandler()), null);
}
this.currentMessageChannel = messageChannel;
return this.registerOutputChannelIfCan(this.currentMessageChannel);
}
public B channel(MessageChannelSpec<?, ?> messageChannelSpec) {
Assert.notNull(messageChannelSpec);
return this.channel(messageChannelSpec.get());
}
public B controlBus() {
return controlBus(null);
}
public B controlBus(EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
return this.handle(new ServiceActivatingHandler(new ExpressionCommandMessageProcessor(new ControlBusMethodFilter())),
endpointConfigurer);
}
public B transform(String expression) {
Assert.hasText(expression);
return this.transform(new ExpressionEvaluatingTransformer(PARSER.parseExpression(expression)));
}
public <S, T> B transform(GenericTransformer<S, T> genericTransformer) {
return this.transform(null, genericTransformer);
}
public <P, T> B transform(Class<P> payloadType, GenericTransformer<P, T> genericTransformer) {
return this.transform(payloadType, genericTransformer, null);
}
public <S, T> B transform(GenericTransformer<S, T> genericTransformer,
EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
return this.transform(null, genericTransformer, endpointConfigurer);
}
public <P, T> B transform(Class<P> payloadType, GenericTransformer<P, T> genericTransformer,
EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
Assert.notNull(genericTransformer);
Transformer transformer = genericTransformer instanceof Transformer ? (Transformer) genericTransformer :
(isLambda(genericTransformer)
? new MethodInvokingTransformer(new LambdaMessageProcessor(genericTransformer, payloadType))
: new MethodInvokingTransformer(genericTransformer));
return addComponent(transformer)
.handle(new MessageTransformingHandler(transformer), endpointConfigurer);
}
public B filter(String expression) {
Assert.hasText(expression);
return this.filter(new ExpressionEvaluatingSelector(PARSER.parseExpression(expression)));
}
public <S> B filter(GenericSelector<S> genericSelector) {
return this.filter(null, genericSelector);
}
public <P> B filter(Class<P> payloadType, GenericSelector<P> genericSelector) {
return this.filter(payloadType, genericSelector, null);
}
public <P> B filter(GenericSelector<P> genericSelector,
EndpointConfigurer<FilterEndpointSpec> endpointConfigurer) {
return filter(null, genericSelector, endpointConfigurer);
}
public <P> B filter(Class<P> payloadType, GenericSelector<P> genericSelector,
EndpointConfigurer<FilterEndpointSpec> endpointConfigurer) {
Assert.notNull(genericSelector);
MessageSelector selector = genericSelector instanceof MessageSelector ? (MessageSelector) genericSelector :
(isLambda(genericSelector)
? new MethodInvokingSelector(new LambdaMessageProcessor(genericSelector, payloadType))
: new MethodInvokingSelector(genericSelector));
return this.register(new FilterEndpointSpec(new MessageFilter(selector)), endpointConfigurer);
}
public <S extends MessageHandlerSpec<S, ? extends MessageHandler>> B handle(S messageHandlerSpec) {
Assert.notNull(messageHandlerSpec);
return handle(messageHandlerSpec.get());
}
public B handle(MessageHandler messageHandler) {
return this.handle(messageHandler, null);
}
public B handle(String beanName, String methodName) {
return this.handle(beanName, methodName, null);
}
public B handle(String beanName, String methodName,
EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
return this.handle(new ServiceActivatingHandler(new BeanNameMessageProcessor<Object>(beanName, methodName)),
endpointConfigurer);
}
public <P> B handle(GenericHandler<P> handler) {
return this.handle(null, handler);
}
public <P> B handle(GenericHandler<P> handler,
EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
return this.handle(null, handler, endpointConfigurer);
}
public <P> B handle(Class<P> payloadType, GenericHandler<P> handler) {
return this.handle(payloadType, handler, null);
}
public <P> B handle(Class<P> payloadType, GenericHandler<P> handler,
EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
ServiceActivatingHandler serviceActivatingHandler = null;
if (isLambda(handler)) {
serviceActivatingHandler = new ServiceActivatingHandler(new LambdaMessageProcessor(handler, payloadType));
}
else {
serviceActivatingHandler = new ServiceActivatingHandler(handler);
}
return this.handle(serviceActivatingHandler, endpointConfigurer);
}
public <H extends MessageHandler, S extends MessageHandlerSpec<S, H>>
B handle(S messageHandlerSpec, EndpointConfigurer<GenericEndpointSpec<H>> endpointConfigurer) {
Assert.notNull(messageHandlerSpec);
return handle(messageHandlerSpec.get(), endpointConfigurer);
}
public <H extends MessageHandler> B handle(H messageHandler,
EndpointConfigurer<GenericEndpointSpec<H>> endpointConfigurer) {
Assert.notNull(messageHandler);
return this.register(new GenericEndpointSpec<H>(messageHandler), endpointConfigurer);
}
public B bridge(EndpointConfigurer<GenericEndpointSpec<BridgeHandler>> endpointConfigurer) {
return this.register(new GenericEndpointSpec<BridgeHandler>(new BridgeHandler()), endpointConfigurer);
}
public B delay(String groupId, String expression) {
return this.delay(groupId, expression, null);
}
public B delay(String groupId, String expression,
EndpointConfigurer<DelayerEndpointSpec> endpointConfigurer) {
DelayHandler delayHandler = new DelayHandler(groupId);
if (StringUtils.hasText(expression)) {
delayHandler.setDelayExpression(PARSER.parseExpression(expression));
}
return this.register(new DelayerEndpointSpec(delayHandler), endpointConfigurer);
}
public B enrich(ComponentConfigurer<EnricherSpec> enricherConfigurer) {
return this.enrich(enricherConfigurer, null);
}
public B enrich(ComponentConfigurer<EnricherSpec> enricherConfigurer,
EndpointConfigurer<GenericEndpointSpec<ContentEnricher>> endpointConfigurer) {
Assert.notNull(enricherConfigurer);
EnricherSpec enricherSpec = new EnricherSpec();
enricherConfigurer.configure(enricherSpec);
return this.handle(enricherSpec.get(), endpointConfigurer);
}
public B enrichHeaders(ComponentConfigurer<HeaderEnricherSpec> headerEnricherConfigurer) {
return this.enrichHeaders(headerEnricherConfigurer, null);
}
public B enrichHeaders(ComponentConfigurer<HeaderEnricherSpec> headerEnricherConfigurer,
EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
Assert.notNull(headerEnricherConfigurer);
HeaderEnricherSpec headerEnricherSpec = new HeaderEnricherSpec();
headerEnricherConfigurer.configure(headerEnricherSpec);
return transform(headerEnricherSpec.get(), endpointConfigurer);
}
public B split(EndpointConfigurer<SplitterEndpointSpec<DefaultMessageSplitter>> endpointConfigurer) {
return this.split(new DefaultMessageSplitter(), endpointConfigurer);
}
public B split(String expression,
EndpointConfigurer<SplitterEndpointSpec<ExpressionEvaluatingSplitter>> endpointConfigurer) {
return this.split(new ExpressionEvaluatingSplitter(PARSER.parseExpression(expression)), endpointConfigurer);
}
public B split(String beanName, String methodName) {
return this.split(beanName, methodName, null);
}
public B split(String beanName, String methodName,
EndpointConfigurer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
return this.split(new MethodInvokingSplitter(new BeanNameMessageProcessor<Collection<?>>(beanName, methodName)),
endpointConfigurer);
}
public <P> B split(Class<P> payloadType, GenericSplitter<P> splitter) {
return this.split(payloadType, splitter, null);
}
public <T> B split(GenericSplitter<T> splitter,
EndpointConfigurer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
return split(null, splitter, endpointConfigurer);
}
public <P> B split(Class<P> payloadType, GenericSplitter<P> splitter,
EndpointConfigurer<SplitterEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
MethodInvokingSplitter split = isLambda(splitter)
? new MethodInvokingSplitter(new LambdaMessageProcessor(splitter, payloadType))
: new MethodInvokingSplitter(splitter, "split");
return this.split(split, endpointConfigurer);
}
public <S extends AbstractMessageSplitter> B split(S splitter,
EndpointConfigurer<SplitterEndpointSpec<S>> endpointConfigurer) {
Assert.notNull(splitter);
return this.register(new SplitterEndpointSpec<S>(splitter), endpointConfigurer);
}
/**
* Provides the {@link HeaderFilter} to the current {@link org.springframework.integration.dsl.IntegrationFlowBuilder.StandardIntegrationFlow}.
* @param headersToRemove the array of headers (or patterns)
* to remove from {@link org.springframework.messaging.MessageHeaders}.
* @return this {@link IntegrationFlowDefinition}.
*/
public B headerFilter(String... headersToRemove) {
return this.headerFilter(new HeaderFilter(headersToRemove), null);
}
/**
* Provides the {@link HeaderFilter} to the current {@link org.springframework.integration.dsl.IntegrationFlowBuilder.StandardIntegrationFlow}.
* @param headersToRemove the comma separated headers (or patterns) to remove from
* {@link org.springframework.messaging.MessageHeaders}.
* @param patternMatch the {@code boolean} flag to indicate if {@code headersToRemove}
* should be interpreted as patterns or direct header names.
* @return this {@link IntegrationFlowDefinition}.
*/
public B headerFilter(String headersToRemove, boolean patternMatch) {
HeaderFilter headerFilter = new HeaderFilter(StringUtils.delimitedListToStringArray(headersToRemove, ",", " "));
headerFilter.setPatternMatch(patternMatch);
return this.headerFilter(headerFilter, null);
}
public B headerFilter(HeaderFilter headerFilter,
EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
return this.transform(headerFilter, endpointConfigurer);
}
public B claimCheckIn(MessageStore messageStore) {
return this.claimCheckIn(messageStore, null);
}
public B claimCheckIn(MessageStore messageStore,
EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
return this.transform(new ClaimCheckInTransformer(messageStore), endpointConfigurer);
}
public B claimCheckOut(MessageStore messageStore) {
return this.claimCheckOut(messageStore, false);
}
public B claimCheckOut(MessageStore messageStore, boolean removeMessage) {
return this.claimCheckOut(messageStore, removeMessage, null);
}
public B claimCheckOut(MessageStore messageStore, boolean removeMessage,
EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
ClaimCheckOutTransformer claimCheckOutTransformer = new ClaimCheckOutTransformer(messageStore);
claimCheckOutTransformer.setRemoveMessage(removeMessage);
return this.transform(claimCheckOutTransformer, endpointConfigurer);
}
public B resequence() {
return this.resequence((EndpointConfigurer<GenericEndpointSpec<ResequencingMessageHandler>>) null);
}
public B resequence(EndpointConfigurer<GenericEndpointSpec<ResequencingMessageHandler>> endpointConfigurer) {
return this.handle(new ResequencerSpec().get(), endpointConfigurer);
}
public B resequence(ComponentConfigurer<ResequencerSpec> resequencerConfigurer,
EndpointConfigurer<GenericEndpointSpec<ResequencingMessageHandler>> endpointConfigurer) {
Assert.notNull(resequencerConfigurer);
ResequencerSpec spec = new ResequencerSpec();
resequencerConfigurer.configure(spec);
return this.handle(spec.get(), endpointConfigurer);
}
public B aggregate() {
return aggregate((EndpointConfigurer<GenericEndpointSpec<AggregatingMessageHandler>>) null);
}
public B
aggregate(EndpointConfigurer<GenericEndpointSpec<AggregatingMessageHandler>> endpointConfigurer) {
return handle(new AggregatorSpec().get(), endpointConfigurer);
}
public B aggregate(ComponentConfigurer<AggregatorSpec> aggregatorConfigurer,
EndpointConfigurer<GenericEndpointSpec<AggregatingMessageHandler>> endpointConfigurer) {
Assert.notNull(aggregatorConfigurer);
AggregatorSpec spec = new AggregatorSpec();
aggregatorConfigurer.configure(spec);
return this.handle(spec.get(), endpointConfigurer);
}
public B route(String beanName, String method) {
return this.route(beanName, method, null);
}
public B route(String beanName, String method,
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer) {
return this.route(beanName, method, routerConfigurer, null);
}
public B route(String beanName, String method,
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer,
EndpointConfigurer<GenericEndpointSpec<MethodInvokingRouter>> endpointConfigurer) {
return this.route(new MethodInvokingRouter(new BeanNameMessageProcessor<Object>(beanName, method)),
routerConfigurer, endpointConfigurer);
}
public B route(String expression) {
return this.route(expression, (ComponentConfigurer<RouterSpec<ExpressionEvaluatingRouter>>) null);
}
public B route(String expression,
ComponentConfigurer<RouterSpec<ExpressionEvaluatingRouter>> routerConfigurer) {
return this.route(expression, routerConfigurer, null);
}
public B route(String expression,
ComponentConfigurer<RouterSpec<ExpressionEvaluatingRouter>> routerConfigurer,
EndpointConfigurer<GenericEndpointSpec<ExpressionEvaluatingRouter>> endpointConfigurer) {
return this.route(new ExpressionEvaluatingRouter(PARSER.parseExpression(expression)), routerConfigurer,
endpointConfigurer);
}
public <S, T> B route(GenericRouter<S, T> router) {
return this.route(null, router);
}
public <S, T> B route(GenericRouter<S, T> router,
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer) {
return this.route(null, router, routerConfigurer);
}
public <P, T> B route(Class<P> payloadType, GenericRouter<P, T> router) {
return this.route(payloadType, router, null, null);
}
public <P, T> B route(Class<P> payloadType, GenericRouter<P, T> router,
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer) {
return this.route(payloadType, router, routerConfigurer, null);
}
public <S, T> B route(GenericRouter<S, T> router,
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer,
EndpointConfigurer<GenericEndpointSpec<MethodInvokingRouter>> endpointConfigurer) {
return route(null, router, routerConfigurer, endpointConfigurer);
}
public <P, T> B route(Class<P> payloadType, GenericRouter<P, T> router,
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer,
EndpointConfigurer<GenericEndpointSpec<MethodInvokingRouter>> endpointConfigurer) {
MethodInvokingRouter methodInvokingRouter = isLambda(router)
? new MethodInvokingRouter(new LambdaMessageProcessor(router, payloadType))
: new MethodInvokingRouter(router);
return this.route(methodInvokingRouter, routerConfigurer, endpointConfigurer);
}
public <R extends AbstractMappingMessageRouter> B route(R router,
ComponentConfigurer<RouterSpec<R>> routerConfigurer,
EndpointConfigurer<GenericEndpointSpec<R>> endpointConfigurer) {
if (routerConfigurer != null) {
RouterSpec<R> routerSpec = new RouterSpec<R>(router);
routerConfigurer.configure(routerSpec);
}
return this.route(router, endpointConfigurer);
}
public B recipientListRoute(ComponentConfigurer<RecipientListRouterSpec> routerConfigurer) {
return this.recipientListRoute(routerConfigurer, null);
}
public B recipientListRoute(ComponentConfigurer<RecipientListRouterSpec> routerConfigurer,
EndpointConfigurer<GenericEndpointSpec<RecipientListRouter>> endpointConfigurer) {
Assert.notNull(routerConfigurer);
RecipientListRouterSpec spec = new RecipientListRouterSpec();
routerConfigurer.configure(spec);
DslRecipientListRouter recipientListRouter = (DslRecipientListRouter) spec.get();
Assert.notEmpty(recipientListRouter.getRecipients(), "recipient list must not be empty");
return this.route(recipientListRouter, endpointConfigurer);
}
public B route(AbstractMessageRouter router) {
return this.route(router, null);
}
public <R extends AbstractMessageRouter> B route(R router,
EndpointConfigurer<GenericEndpointSpec<R>> endpointConfigurer) {
return this.handle(router, endpointConfigurer);
}
public B gateway(String requestChannel) {
return gateway(requestChannel, null);
}
public B gateway(String requestChannel,
EndpointConfigurer<GatewayEndpointSpec> endpointConfigurer) {
return register(new GatewayEndpointSpec(requestChannel), endpointConfigurer);
}
public B gateway(MessageChannel requestChannel) {
return gateway(requestChannel, null);
}
public B gateway(MessageChannel requestChannel,
EndpointConfigurer<GatewayEndpointSpec> endpointConfigurer) {
return register(new GatewayEndpointSpec(requestChannel), endpointConfigurer);
}
private <S extends ConsumerEndpointSpec<S, ?>> B register(S endpointSpec,
EndpointConfigurer<S> endpointConfigurer) {
if (endpointConfigurer != null) {
endpointConfigurer.configure(endpointSpec);
}
MessageChannel inputChannel = this.currentMessageChannel;
this.currentMessageChannel = null;
if (inputChannel == null) {
inputChannel = new DirectChannel();
this.registerOutputChannelIfCan(inputChannel);
}
if (inputChannel instanceof MessageChannelReference) {
endpointSpec.get().getT1().setInputChannelName(((MessageChannelReference) inputChannel).getName());
}
else {
if (inputChannel instanceof FixedSubscriberChannelPrototype) {
String beanName = ((FixedSubscriberChannelPrototype) inputChannel).getName();
inputChannel = new FixedSubscriberChannel(endpointSpec.get().getT2());
if (beanName != null) {
((FixedSubscriberChannel) inputChannel).setBeanName(beanName);
}
this.registerOutputChannelIfCan(inputChannel);
}
endpointSpec.get().getT1().setInputChannel(inputChannel);
}
return this.addComponent(endpointSpec).currentComponent(endpointSpec.get().getT2());
}
private B registerOutputChannelIfCan(MessageChannel outputChannel) {
if (!(outputChannel instanceof FixedSubscriberChannelPrototype)) {
this.integrationComponents.add(outputChannel);
if (this.currentComponent != null) {
String channelName = null;
if (outputChannel instanceof MessageChannelReference) {
channelName = ((MessageChannelReference) outputChannel).getName();
}
if (this.currentComponent instanceof AbstractReplyProducingMessageHandler) {
AbstractReplyProducingMessageHandler messageProducer =
(AbstractReplyProducingMessageHandler) this.currentComponent;
if (channelName != null) {
messageProducer.setOutputChannelName(channelName);
}
else {
messageProducer.setOutputChannel(outputChannel);
}
}
else if (this.currentComponent instanceof SourcePollingChannelAdapterSpec) {
SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean =
((SourcePollingChannelAdapterSpec) this.currentComponent).get().getT1();
if (channelName != null) {
pollingChannelAdapterFactoryBean.setOutputChannelName(channelName);
}
else {
pollingChannelAdapterFactoryBean.setOutputChannel(outputChannel);
}
}
else if (this.currentComponent instanceof AbstractCorrelatingMessageHandler) {
AbstractCorrelatingMessageHandler messageProducer =
(AbstractCorrelatingMessageHandler) this.currentComponent;
if (channelName != null) {
messageProducer.setOutputChannelName(channelName);
}
else {
messageProducer.setOutputChannel(outputChannel);
}
}
else {
throw new BeanCreationException("The 'currentComponent' (" + this.currentComponent +
") is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. " +
"This is the end of the integration flow.");
}
this.currentComponent = null;
}
}
return _this();
}
@SuppressWarnings("unchecked")
protected final B _this() {
return (B) this;
}
private static boolean isLambda(Object o) {
Class<?> aClass = o.getClass();
return aClass.isSynthetic() && !aClass.isAnonymousClass() && !aClass.isLocalClass();
}
}

View File

@@ -21,9 +21,9 @@ import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.channel.MessageChannelSpec;
import org.springframework.integration.dsl.core.ComponentsRegistration;
import org.springframework.integration.dsl.core.MessageProducerSpec;
import org.springframework.integration.dsl.core.MessageSourceSpec;
import org.springframework.integration.dsl.core.MessagingGatewaySpec;
import org.springframework.integration.dsl.core.MessagingProducerSpec;
import org.springframework.integration.dsl.support.EndpointConfigurer;
import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype;
import org.springframework.integration.dsl.support.MessageChannelReference;
@@ -98,8 +98,8 @@ public final class IntegrationFlows {
.currentComponent(spec);
}
public static IntegrationFlowBuilder from(MessagingProducerSpec<?, ?> messagingProducerSpec) {
return from(messagingProducerSpec.get(), registerComponents(messagingProducerSpec));
public static IntegrationFlowBuilder from(MessageProducerSpec<?, ?> messageProducerSpec) {
return from(messageProducerSpec.get(), registerComponents(messageProducerSpec));
}
public static IntegrationFlowBuilder from(MessageProducerSupport messageProducer) {

View File

@@ -22,8 +22,8 @@ import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.inbound.AmqpInboundGateway;
import org.springframework.integration.dsl.core.MessageProducerSpec;
import org.springframework.integration.dsl.core.MessagingGatewaySpec;
import org.springframework.integration.dsl.core.MessagingProducerSpec;
/**
* @author Artem Bilan
@@ -59,7 +59,7 @@ public abstract class Amqp {
return (AmqpInboundChannelAdapterSpec) inboundAdapter(listenerContainer);
}
public static MessagingProducerSpec<AmqpInboundChannelAdapterSpec, AmqpInboundChannelAdapter> inboundAdapter(
public static MessageProducerSpec<AmqpInboundChannelAdapterSpec, AmqpInboundChannelAdapter> inboundAdapter(
SimpleMessageListenerContainer listenerContainer) {
return new AmqpInboundChannelAdapterSpec(listenerContainer);
}

View File

@@ -27,14 +27,14 @@ import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.dsl.core.MessagingProducerSpec;
import org.springframework.integration.dsl.core.MessageProducerSpec;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.ErrorHandler;
/**
* @author Artem Bilan
*/
public class AmqpInboundChannelAdapterSpec extends MessagingProducerSpec<AmqpInboundChannelAdapterSpec, AmqpInboundChannelAdapter> {
public class AmqpInboundChannelAdapterSpec extends MessageProducerSpec<AmqpInboundChannelAdapterSpec, AmqpInboundChannelAdapter> {
private final SimpleMessageListenerContainer listenerContainer;

View File

@@ -33,6 +33,8 @@ import org.springframework.integration.config.IntegrationConfigUtils;
import org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec;
import org.springframework.integration.dsl.support.MessageChannelReference;
import org.springframework.integration.support.context.NamedComponent;
@@ -65,104 +67,118 @@ public class IntegrationFlowBeanPostProcessor implements BeanPostProcessor, Bean
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof IntegrationFlow) {
IntegrationFlow flow = (IntegrationFlow) bean;
String flowNamePrefix = beanName + ".";
int channelNameIndex = 0;
for (Object component : flow.getIntegrationComponents()) {
if (component instanceof ConsumerEndpointSpec) {
ConsumerEndpointSpec<?, ?> endpointSpec = (ConsumerEndpointSpec<?, ?>) component;
MessageHandler messageHandler = endpointSpec.get().getT2();
ConsumerEndpointFactoryBean endpoint = endpointSpec.get().getT1();
String id = endpointSpec.getId();
if (bean instanceof IntegrationFlowBuilder.StandardIntegrationFlow) {
return processStandardIntegrationFlow((IntegrationFlowBuilder.StandardIntegrationFlow) bean, beanName);
}
else if (bean instanceof IntegrationFlow) {
return processLambdaIntegrationFlow((IntegrationFlow) bean, beanName);
}
return bean;
}
Collection<?> messageHandlers = this.beanFactory.getBeansOfType(MessageHandler.class, false,
false).values();
private Object processStandardIntegrationFlow(IntegrationFlowBuilder.StandardIntegrationFlow flow,
String beanName) {
String flowNamePrefix = beanName + ".";
int channelNameIndex = 0;
for (Object component : flow.getIntegrationComponents()) {
if (component instanceof ConsumerEndpointSpec) {
ConsumerEndpointSpec<?, ?> endpointSpec = (ConsumerEndpointSpec<?, ?>) component;
MessageHandler messageHandler = endpointSpec.get().getT2();
ConsumerEndpointFactoryBean endpoint = endpointSpec.get().getT1();
String id = endpointSpec.getId();
if (!messageHandlers.contains(messageHandler)) {
String handlerBeanName = generateBeanName(messageHandler);
String[] handlerAlias = id != null
? new String[] {id + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX}
: null;
Collection<?> messageHandlers = this.beanFactory.getBeansOfType(MessageHandler.class, false,
false).values();
registerComponent(messageHandler, handlerBeanName);
if (handlerAlias != null) {
for (String alias : handlerAlias) {
this.beanFactory.registerAlias(handlerBeanName, alias);
}
if (!messageHandlers.contains(messageHandler)) {
String handlerBeanName = generateBeanName(messageHandler);
String[] handlerAlias = id != null
? new String[] {id + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX}
: null;
registerComponent(messageHandler, handlerBeanName);
if (handlerAlias != null) {
for (String alias : handlerAlias) {
this.beanFactory.registerAlias(handlerBeanName, alias);
}
}
String endpointBeanName = id;
if (endpointBeanName == null) {
endpointBeanName = generateBeanName(endpoint);
}
registerComponent(endpoint, endpointBeanName);
}
else {
//TODO workaround until SF will fix 'TypeDescriptor.forObject'
if (component instanceof MessageChannel) {
Collection<?> messageChannels =
this.beanFactory.getBeansOfType(MessageChannel.class, false, false).values();
if (!messageChannels.contains(component)) {
if (component instanceof AbstractMessageChannel) {
String channelBeanName = ((AbstractMessageChannel) component).getComponentName();
if (channelBeanName == null) {
channelBeanName = flowNamePrefix + "channel" +
BeanFactoryUtils.GENERATED_BEAN_NAME_SEPARATOR + channelNameIndex++;
}
registerComponent(component, channelBeanName);
}
else if (component instanceof MessageChannelReference) {
String channelBeanName = ((MessageChannelReference) component).getName();
if (!this.beanFactory.containsBean(channelBeanName)) {
DirectChannel directChannel = new DirectChannel();
registerComponent(directChannel, channelBeanName);
}
}
else if (component instanceof FixedSubscriberChannel) {
FixedSubscriberChannel fixedSubscriberChannel = (FixedSubscriberChannel) component;
String channelBeanName = fixedSubscriberChannel.getComponentName();
if ("Unnamed fixed subscriber channel".equals(channelBeanName)) {
channelBeanName = flowNamePrefix + "channel" +
BeanFactoryUtils.GENERATED_BEAN_NAME_SEPARATOR + channelNameIndex++;
}
registerComponent(component, channelBeanName);
}
}
}
else if (component instanceof SourcePollingChannelAdapterSpec) {
SourcePollingChannelAdapterSpec spec = (SourcePollingChannelAdapterSpec) component;
SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean = spec.get().getT1();
String id = ((IntegrationComponentSpec) spec).getId();
if (!StringUtils.hasText(id)) {
id = generateBeanName(pollingChannelAdapterFactoryBean);
}
registerComponent(pollingChannelAdapterFactoryBean, id);
MessageSource<?> messageSource = spec.get().getT2();
if (!this.beanFactory
.getBeansOfType(MessageSource.class, false, false)
.values()
.contains(messageSource)) {
String messageSourceId = id + ".source";
if (messageSource instanceof NamedComponent
&& ((NamedComponent) messageSource).getComponentName() != null) {
messageSourceId = ((NamedComponent) messageSource).getComponentName();
String endpointBeanName = id;
if (endpointBeanName == null) {
endpointBeanName = generateBeanName(endpoint);
}
registerComponent(endpoint, endpointBeanName);
}
else {
//TODO workaround until SF will fix 'TypeDescriptor.forObject'
if (component instanceof MessageChannel) {
Collection<?> messageChannels =
this.beanFactory.getBeansOfType(MessageChannel.class, false, false).values();
if (!messageChannels.contains(component)) {
if (component instanceof AbstractMessageChannel) {
String channelBeanName = ((AbstractMessageChannel) component).getComponentName();
if (channelBeanName == null) {
channelBeanName = flowNamePrefix + "channel" +
BeanFactoryUtils.GENERATED_BEAN_NAME_SEPARATOR + channelNameIndex++;
}
registerComponent(messageSource, messageSourceId);
registerComponent(component, channelBeanName);
}
else if (component instanceof MessageChannelReference) {
String channelBeanName = ((MessageChannelReference) component).getName();
if (!this.beanFactory.containsBean(channelBeanName)) {
DirectChannel directChannel = new DirectChannel();
registerComponent(directChannel, channelBeanName);
}
}
else if (component instanceof FixedSubscriberChannel) {
FixedSubscriberChannel fixedSubscriberChannel = (FixedSubscriberChannel) component;
String channelBeanName = fixedSubscriberChannel.getComponentName();
if ("Unnamed fixed subscriber channel".equals(channelBeanName)) {
channelBeanName = flowNamePrefix + "channel" +
BeanFactoryUtils.GENERATED_BEAN_NAME_SEPARATOR + channelNameIndex++;
}
registerComponent(component, channelBeanName);
}
}
else if (!this.beanFactory
.getBeansOfType(AopUtils.getTargetClass(component), false, false)
.values()
.contains(component)) {
registerComponent(component, generateBeanName(component));
}
else if (component instanceof SourcePollingChannelAdapterSpec) {
SourcePollingChannelAdapterSpec spec = (SourcePollingChannelAdapterSpec) component;
SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean = spec.get().getT1();
String id = ((IntegrationComponentSpec) spec).getId();
if (!StringUtils.hasText(id)) {
id = generateBeanName(pollingChannelAdapterFactoryBean);
}
registerComponent(pollingChannelAdapterFactoryBean, id);
MessageSource<?> messageSource = spec.get().getT2();
if (!this.beanFactory
.getBeansOfType(MessageSource.class, false, false)
.values()
.contains(messageSource)) {
String messageSourceId = id + ".source";
if (messageSource instanceof NamedComponent
&& ((NamedComponent) messageSource).getComponentName() != null) {
messageSourceId = ((NamedComponent) messageSource).getComponentName();
}
registerComponent(messageSource, messageSourceId);
}
}
else if (!this.beanFactory
.getBeansOfType(AopUtils.getTargetClass(component), false, false)
.values()
.contains(component)) {
registerComponent(component, generateBeanName(component));
}
}
}
return bean;
return flow;
}
private Object processLambdaIntegrationFlow(IntegrationFlow flow, String beanName) {
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(beanName + ".input");
flow.define(flowBuilder);
return processStandardIntegrationFlow(flowBuilder.get(), beanName);
}
private void registerComponent(Object component, String beanName) {

View File

@@ -22,10 +22,10 @@ import org.springframework.messaging.MessageChannel;
/**
* @author Artem Bilan
*/
public abstract class MessagingProducerSpec<S extends MessagingProducerSpec<S, P>, P extends MessageProducerSupport>
public abstract class MessageProducerSpec<S extends MessageProducerSpec<S, P>, P extends MessageProducerSupport>
extends IntegrationComponentSpec<S, P> {
public MessagingProducerSpec(P producer) {
public MessageProducerSpec(P producer) {
this.target = producer;
}

View File

@@ -19,7 +19,7 @@ package org.springframework.integration.dsl.file;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.dsl.core.MessagingProducerSpec;
import org.springframework.integration.dsl.core.MessageProducerSpec;
import org.springframework.integration.file.config.FileTailInboundChannelAdapterFactoryBean;
import org.springframework.integration.file.tail.FileTailingMessageProducerSupport;
import org.springframework.messaging.MessageChannel;
@@ -29,7 +29,7 @@ import org.springframework.util.Assert;
/**
* @author Artem Bilan
*/
public class TailAdapterSpec extends MessagingProducerSpec<TailAdapterSpec, FileTailingMessageProducerSupport> {
public class TailAdapterSpec extends MessageProducerSpec<TailAdapterSpec, FileTailingMessageProducerSupport> {
private final FileTailInboundChannelAdapterFactoryBean factoryBean = new FileTailInboundChannelAdapterFactoryBean();

View File

@@ -18,7 +18,7 @@ package org.springframework.integration.dsl.jms;
import javax.jms.Destination;
import org.springframework.integration.dsl.core.MessagingProducerSpec;
import org.springframework.integration.dsl.core.MessageProducerSpec;
import org.springframework.integration.dsl.support.ComponentConfigurer;
import org.springframework.integration.jms.ChannelPublishingJmsMessageListener;
import org.springframework.integration.jms.JmsHeaderMapper;
@@ -30,7 +30,7 @@ import org.springframework.util.Assert;
* @author Artem Bilan
*/
public class JmsMessageDrivenChannelAdapterSpec<S extends JmsMessageDrivenChannelAdapterSpec<S>>
extends MessagingProducerSpec<S, JmsMessageDrivenChannelAdapter> {
extends MessageProducerSpec<S, JmsMessageDrivenChannelAdapter> {
JmsMessageDrivenChannelAdapterSpec(AbstractMessageListenerContainer listenerContainer) {
super(new JmsMessageDrivenChannelAdapter(listenerContainer, new ChannelPublishingJmsMessageListener()));

View File

@@ -46,14 +46,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import com.jcraft.jsch.ChannelSftp;
import com.mongodb.MongoClient;
import de.flapdoodle.embed.mongo.MongodExecutable;
import de.flapdoodle.embed.mongo.MongodStarter;
import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
import de.flapdoodle.embed.mongo.config.Net;
import de.flapdoodle.embed.mongo.distribution.Version;
import de.flapdoodle.embed.process.runtime.Network;
import org.aopalliance.aop.Advice;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
@@ -66,6 +58,15 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import com.jcraft.jsch.ChannelSftp;
import com.mongodb.MongoClient;
import de.flapdoodle.embed.mongo.MongodExecutable;
import de.flapdoodle.embed.mongo.MongodStarter;
import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
import de.flapdoodle.embed.mongo.config.Net;
import de.flapdoodle.embed.mongo.distribution.Version;
import de.flapdoodle.embed.process.runtime.Network;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Queue;
@@ -255,7 +256,7 @@ public class IntegrationFlowTests {
private FixedSubscriberChannel enricherInput3;
@Autowired
@Qualifier("splitInput")
@Qualifier("splitResequenceFlow.input")
private MessageChannel splitInput;
@Autowired
@@ -295,7 +296,7 @@ public class IntegrationFlowTests {
private MessageChannel routerMethod2Input;
@Autowired
@Qualifier("routerMethod3Input")
@Qualifier("routeMethodInvocationFlow3.input")
private MessageChannel routerMethod3Input;
@Autowired
@@ -1017,7 +1018,7 @@ public class IntegrationFlowTests {
}
@Autowired
@Qualifier("jmsOutboundInboundChannel")
@Qualifier("jmsOutboundFlow.input")
private MessageChannel jmsOutboundInboundChannel;
@Autowired
@@ -1046,7 +1047,7 @@ public class IntegrationFlowTests {
}
@Autowired
@Qualifier("jmsOutboundGatewayChannel")
@Qualifier("jmsOutboundGatewayFlow.input")
private MessageChannel jmsOutboundGatewayChannel;
@Test
@@ -1337,10 +1338,8 @@ public class IntegrationFlowTests {
@Bean
public IntegrationFlow jmsOutboundFlow() {
return IntegrationFlows.from("jmsOutboundInboundChannel")
.handle(Jms.outboundAdapter(this.jmsConnectionFactory)
.destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER))
.get();
return f -> f.handle(Jms.outboundAdapter(this.jmsConnectionFactory)
.destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER));
}
@Bean
@@ -1370,11 +1369,9 @@ public class IntegrationFlowTests {
@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
return IntegrationFlows.from("jmsOutboundGatewayChannel")
.handle(Jms.outboundGateway(this.jmsConnectionFactory)
.replyContainer()
.requestDestination("jmsPipelineTest"))
.get();
return f -> f.handle(Jms.outboundGateway(this.jmsConnectionFactory)
.replyContainer()
.requestDestination("jmsPipelineTest"));
}
@Bean
@@ -1760,8 +1757,7 @@ public class IntegrationFlowTests {
@Bean
public IntegrationFlow splitResequenceFlow() {
return IntegrationFlows.from("splitInput")
.enrichHeaders(s -> s.header("FOO", "BAR"))
return f -> f.enrichHeaders(s -> s.header("FOO", "BAR"))
.split("testSplitterData", "buildList", c -> c.applySequence(false))
.channel(MessageChannels.executor(this.taskExecutor()))
.split(Message.class, target -> (List<?>) target.getPayload(), c -> c.applySequence(false))
@@ -1771,8 +1767,7 @@ public class IntegrationFlowTests {
.<String, Integer>transform(Integer::parseInt)
.enrichHeaders(s -> s.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, "payload"))
.resequence(r -> r.releasePartialSequences(true).correlationExpression("'foo'"), null)
.headerFilter("foo", false)
.get();
.headerFilter("foo", false);
}
@Bean
@@ -1838,9 +1833,7 @@ public class IntegrationFlowTests {
@Bean
public IntegrationFlow routeMethodInvocationFlow3() {
return IntegrationFlows.from("routerMethod3Input")
.route((String p) -> ContextConfiguration4.this.routingTestBean().routePayload(p))
.get();
return f -> f.route((String p) -> ContextConfiguration4.this.routingTestBean().routePayload(p));
}
@Bean