diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlow.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlow.java index de75671..a0e0458 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlow.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlow.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,26 +16,11 @@ package org.springframework.integration.dsl; -import java.util.LinkedHashSet; -import java.util.Set; - /** * @author Artem Bilan */ -public final class IntegrationFlow { +public interface IntegrationFlow { - private final Set integrationComponents = new LinkedHashSet(); - - IntegrationFlow() { - } - - public Set getIntegrationComponents() { - return integrationComponents; - } - - IntegrationFlow addComponent(Object component) { - this.integrationComponents.add(component); - return this; - } + void define(IntegrationFlowDefinition flow); } diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java index f2f4c2d..710b9c0 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java @@ -16,612 +16,17 @@ package org.springframework.integration.dsl; -import java.util.Collection; +import java.util.Set; import org.springframework.beans.factory.BeanCreationException; -import org.springframework.expression.spel.standard.SpelExpressionParser; -import org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler; -import org.springframework.integration.aggregator.AggregatingMessageHandler; -import org.springframework.integration.aggregator.ResequencingMessageHandler; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.channel.FixedSubscriberChannel; -import org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean; -import org.springframework.integration.core.GenericSelector; -import org.springframework.integration.core.MessageSelector; -import org.springframework.integration.dsl.channel.MessageChannelSpec; -import org.springframework.integration.dsl.core.ConsumerEndpointSpec; -import org.springframework.integration.dsl.core.MessageHandlerSpec; -import org.springframework.integration.dsl.support.BeanNameMessageProcessor; -import org.springframework.integration.dsl.support.ComponentConfigurer; -import org.springframework.integration.dsl.support.EndpointConfigurer; import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype; -import org.springframework.integration.dsl.support.GenericHandler; -import org.springframework.integration.dsl.support.GenericRouter; -import org.springframework.integration.dsl.support.GenericSplitter; -import org.springframework.integration.dsl.support.MessageChannelReference; -import org.springframework.integration.expression.ControlBusMethodFilter; -import org.springframework.integration.filter.ExpressionEvaluatingSelector; -import org.springframework.integration.filter.MessageFilter; -import org.springframework.integration.filter.MethodInvokingSelector; -import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; -import org.springframework.integration.handler.BridgeHandler; -import org.springframework.integration.handler.DelayHandler; -import org.springframework.integration.handler.ExpressionCommandMessageProcessor; -import org.springframework.integration.handler.ServiceActivatingHandler; -import org.springframework.integration.router.AbstractMappingMessageRouter; -import org.springframework.integration.router.AbstractMessageRouter; -import org.springframework.integration.router.ExpressionEvaluatingRouter; -import org.springframework.integration.router.MethodInvokingRouter; -import org.springframework.integration.router.RecipientListRouter; -import org.springframework.integration.splitter.AbstractMessageSplitter; -import org.springframework.integration.splitter.DefaultMessageSplitter; -import org.springframework.integration.splitter.ExpressionEvaluatingSplitter; -import org.springframework.integration.splitter.MethodInvokingSplitter; -import org.springframework.integration.store.MessageStore; -import org.springframework.integration.transformer.ClaimCheckInTransformer; -import org.springframework.integration.transformer.ClaimCheckOutTransformer; -import org.springframework.integration.transformer.ContentEnricher; -import org.springframework.integration.transformer.ExpressionEvaluatingTransformer; -import org.springframework.integration.transformer.GenericTransformer; -import org.springframework.integration.transformer.HeaderFilter; -import org.springframework.integration.transformer.MessageTransformingHandler; -import org.springframework.integration.transformer.MethodInvokingTransformer; -import org.springframework.integration.transformer.Transformer; -import org.springframework.messaging.MessageChannel; -import org.springframework.messaging.MessageHandler; -import org.springframework.util.Assert; -import org.springframework.util.StringUtils; /** * @author Artem Bilan */ -public final class IntegrationFlowBuilder { +public final class IntegrationFlowBuilder extends IntegrationFlowDefinition { - private final static SpelExpressionParser PARSER = new SpelExpressionParser(); - - private final IntegrationFlow flow = new IntegrationFlow(); - - private MessageChannel currentMessageChannel; - - private Object currentComponent; - - IntegrationFlowBuilder() { - } - - IntegrationFlowBuilder addComponent(Object component) { - this.flow.addComponent(component); - return this; - } - - IntegrationFlowBuilder addComponents(Collection components) { - if (components != null) { - for (Object component : components) { - this.flow.addComponent(component); - } - } - return this; - } - - IntegrationFlowBuilder currentComponent(Object component) { - this.currentComponent = component; - return this; - } - - public IntegrationFlowBuilder fixedSubscriberChannel() { - return this.fixedSubscriberChannel(null); - } - - public IntegrationFlowBuilder fixedSubscriberChannel(String messageChannelName) { - return this.channel(new FixedSubscriberChannelPrototype(messageChannelName)); - } - - public IntegrationFlowBuilder channel(String messageChannelName) { - return this.channel(new MessageChannelReference(messageChannelName)); - } - - public IntegrationFlowBuilder channel(MessageChannel messageChannel) { - Assert.notNull(messageChannel); - if (this.currentMessageChannel != null) { - this.register(new GenericEndpointSpec(new BridgeHandler()), null); - } - this.currentMessageChannel = messageChannel; - return this.registerOutputChannelIfCan(this.currentMessageChannel); - } - - public IntegrationFlowBuilder channel(MessageChannelSpec messageChannelSpec) { - Assert.notNull(messageChannelSpec); - return this.channel(messageChannelSpec.get()); - } - - public IntegrationFlowBuilder controlBus() { - return controlBus(null); - } - - public IntegrationFlowBuilder controlBus(EndpointConfigurer> endpointConfigurer) { - return this.handle(new ServiceActivatingHandler(new ExpressionCommandMessageProcessor(new ControlBusMethodFilter())), endpointConfigurer); - } - - public IntegrationFlowBuilder transform(String expression) { - Assert.hasText(expression); - return this.transform(new ExpressionEvaluatingTransformer(PARSER.parseExpression(expression))); - } - - public IntegrationFlowBuilder transform(GenericTransformer genericTransformer) { - return this.transform(null, genericTransformer); - } - - public IntegrationFlowBuilder transform(Class

payloadType, GenericTransformer genericTransformer) { - return this.transform(payloadType, genericTransformer, null); - } - - public IntegrationFlowBuilder transform(GenericTransformer genericTransformer, - EndpointConfigurer> endpointConfigurer) { - return this.transform(null, genericTransformer, endpointConfigurer); - } - - public IntegrationFlowBuilder transform(Class

payloadType, GenericTransformer genericTransformer, - EndpointConfigurer> endpointConfigurer) { - Assert.notNull(genericTransformer); - Transformer transformer = genericTransformer instanceof Transformer ? (Transformer) genericTransformer : - (isLambda(genericTransformer) - ? new MethodInvokingTransformer(new LambdaMessageProcessor(genericTransformer, payloadType)) - : new MethodInvokingTransformer(genericTransformer)); - return addComponent(transformer) - .handle(new MessageTransformingHandler(transformer), endpointConfigurer); - } - - public IntegrationFlowBuilder filter(String expression) { - Assert.hasText(expression); - return this.filter(new ExpressionEvaluatingSelector(PARSER.parseExpression(expression))); - } - - public IntegrationFlowBuilder filter(GenericSelector genericSelector) { - return this.filter(null, genericSelector); - } - - public

IntegrationFlowBuilder filter(Class

payloadType, GenericSelector

genericSelector) { - return this.filter(payloadType, genericSelector, null); - } - - public

IntegrationFlowBuilder filter(GenericSelector

genericSelector, - EndpointConfigurer endpointConfigurer) { - return filter(null, genericSelector, endpointConfigurer); - } - - public

IntegrationFlowBuilder filter(Class

payloadType, GenericSelector

genericSelector, - EndpointConfigurer endpointConfigurer) { - Assert.notNull(genericSelector); - MessageSelector selector = genericSelector instanceof MessageSelector ? (MessageSelector) genericSelector : - (isLambda(genericSelector) - ? new MethodInvokingSelector(new LambdaMessageProcessor(genericSelector, payloadType)) - : new MethodInvokingSelector(genericSelector)); - return this.register(new FilterEndpointSpec(new MessageFilter(selector)), endpointConfigurer); - } - - public > IntegrationFlowBuilder handle(S messageHandlerSpec) { - Assert.notNull(messageHandlerSpec); - return handle(messageHandlerSpec.get()); - } - - public IntegrationFlowBuilder handle(MessageHandler messageHandler) { - return this.handle(messageHandler, null); - } - - public IntegrationFlowBuilder handle(String beanName, String methodName) { - return this.handle(beanName, methodName, null); - } - - public IntegrationFlowBuilder handle(String beanName, String methodName, - EndpointConfigurer> endpointConfigurer) { - return this.handle(new ServiceActivatingHandler(new BeanNameMessageProcessor(beanName, methodName)), - endpointConfigurer); - } - - public

IntegrationFlowBuilder handle(GenericHandler

handler) { - return this.handle(null, handler); - } - - public

IntegrationFlowBuilder handle(GenericHandler

handler, - EndpointConfigurer> endpointConfigurer) { - return this.handle(null, handler, endpointConfigurer); - } - - - public

IntegrationFlowBuilder handle(Class

payloadType, GenericHandler

handler) { - return this.handle(payloadType, handler, null); - } - - public

IntegrationFlowBuilder handle(Class

payloadType, GenericHandler

handler, - EndpointConfigurer> endpointConfigurer) { - ServiceActivatingHandler serviceActivatingHandler = null; - if (isLambda(handler)) { - serviceActivatingHandler = new ServiceActivatingHandler(new LambdaMessageProcessor(handler, payloadType)); - } - else { - serviceActivatingHandler = new ServiceActivatingHandler(handler); - } - return this.handle(serviceActivatingHandler, endpointConfigurer); - } - - public > - IntegrationFlowBuilder handle(S messageHandlerSpec, EndpointConfigurer> endpointConfigurer) { - Assert.notNull(messageHandlerSpec); - return handle(messageHandlerSpec.get(), endpointConfigurer); - } - - public IntegrationFlowBuilder handle(H messageHandler, - EndpointConfigurer> endpointConfigurer) { - Assert.notNull(messageHandler); - return this.register(new GenericEndpointSpec(messageHandler), endpointConfigurer); - } - - public IntegrationFlowBuilder bridge(EndpointConfigurer> endpointConfigurer) { - return this.register(new GenericEndpointSpec(new BridgeHandler()), endpointConfigurer); - } - - public IntegrationFlowBuilder delay(String groupId, String expression) { - return this.delay(groupId, expression, null); - } - - public IntegrationFlowBuilder delay(String groupId, String expression, - EndpointConfigurer endpointConfigurer) { - DelayHandler delayHandler = new DelayHandler(groupId); - if (StringUtils.hasText(expression)) { - delayHandler.setDelayExpression(PARSER.parseExpression(expression)); - } - return this.register(new DelayerEndpointSpec(delayHandler), endpointConfigurer); - } - - public IntegrationFlowBuilder enrich(ComponentConfigurer enricherConfigurer) { - return this.enrich(enricherConfigurer, null); - } - - public IntegrationFlowBuilder enrich(ComponentConfigurer enricherConfigurer, - EndpointConfigurer> endpointConfigurer) { - Assert.notNull(enricherConfigurer); - EnricherSpec enricherSpec = new EnricherSpec(); - enricherConfigurer.configure(enricherSpec); - return this.handle(enricherSpec.get(), endpointConfigurer); - } - - public IntegrationFlowBuilder enrichHeaders(ComponentConfigurer headerEnricherConfigurer) { - return this.enrichHeaders(headerEnricherConfigurer, null); - } - - public IntegrationFlowBuilder enrichHeaders(ComponentConfigurer headerEnricherConfigurer, - EndpointConfigurer> endpointConfigurer) { - Assert.notNull(headerEnricherConfigurer); - HeaderEnricherSpec headerEnricherSpec = new HeaderEnricherSpec(); - headerEnricherConfigurer.configure(headerEnricherSpec); - return transform(headerEnricherSpec.get(), endpointConfigurer); - } - - public IntegrationFlowBuilder split(EndpointConfigurer> endpointConfigurer) { - return this.split(new DefaultMessageSplitter(), endpointConfigurer); - } - - public IntegrationFlowBuilder split(String expression, - EndpointConfigurer> endpointConfigurer) { - return this.split(new ExpressionEvaluatingSplitter(PARSER.parseExpression(expression)), endpointConfigurer); - } - - public IntegrationFlowBuilder split(String beanName, String methodName) { - return this.split(beanName, methodName, null); - } - - public IntegrationFlowBuilder split(String beanName, String methodName, - EndpointConfigurer> endpointConfigurer) { - return this.split(new MethodInvokingSplitter(new BeanNameMessageProcessor>(beanName, methodName)), - endpointConfigurer); - } - - public

IntegrationFlowBuilder split(Class

payloadType, GenericSplitter

splitter) { - return this.split(payloadType, splitter, null); - } - - public IntegrationFlowBuilder split(GenericSplitter splitter, - EndpointConfigurer> endpointConfigurer) { - return split(null, splitter, endpointConfigurer); - } - - public

IntegrationFlowBuilder split(Class

payloadType, GenericSplitter

splitter, - EndpointConfigurer> endpointConfigurer) { - MethodInvokingSplitter split = isLambda(splitter) - ? new MethodInvokingSplitter(new LambdaMessageProcessor(splitter, payloadType)) - : new MethodInvokingSplitter(splitter, "split"); - return this.split(split, endpointConfigurer); - } - - public IntegrationFlowBuilder split(S splitter, - EndpointConfigurer> endpointConfigurer) { - Assert.notNull(splitter); - return this.register(new SplitterEndpointSpec(splitter), endpointConfigurer); - } - - /** - * Provides the {@link HeaderFilter} to the current {@link IntegrationFlow}. - * @param headersToRemove the array of headers (or patterns) - * to remove from {@link org.springframework.messaging.MessageHeaders}. - * @return the {@link IntegrationFlowBuilder}. - */ - public IntegrationFlowBuilder headerFilter(String... headersToRemove) { - return this.headerFilter(new HeaderFilter(headersToRemove), null); - } - - /** - * Provides the {@link HeaderFilter} to the current {@link IntegrationFlow}. - * @param headersToRemove the comma separated headers (or patterns) to remove from - * {@link org.springframework.messaging.MessageHeaders}. - * @param patternMatch the {@code boolean} flag to indicate if {@code headersToRemove} - * should be interpreted as patterns or direct header names. - * @return the {@link IntegrationFlowBuilder}. - */ - - public IntegrationFlowBuilder headerFilter(String headersToRemove, boolean patternMatch) { - HeaderFilter headerFilter = new HeaderFilter(StringUtils.delimitedListToStringArray(headersToRemove, ",", " ")); - headerFilter.setPatternMatch(patternMatch); - return this.headerFilter(headerFilter, null); - } - - public IntegrationFlowBuilder headerFilter(HeaderFilter headerFilter, - EndpointConfigurer> endpointConfigurer) { - return this.transform(headerFilter, endpointConfigurer); - } - - public IntegrationFlowBuilder claimCheckIn(MessageStore messageStore) { - return this.claimCheckIn(messageStore, null); - } - - public IntegrationFlowBuilder claimCheckIn(MessageStore messageStore, - EndpointConfigurer> endpointConfigurer) { - return this.transform(new ClaimCheckInTransformer(messageStore), endpointConfigurer); - } - - public IntegrationFlowBuilder claimCheckOut(MessageStore messageStore) { - return this.claimCheckOut(messageStore, false); - } - - public IntegrationFlowBuilder claimCheckOut(MessageStore messageStore, boolean removeMessage) { - return this.claimCheckOut(messageStore, removeMessage, null); - } - - public IntegrationFlowBuilder claimCheckOut(MessageStore messageStore, boolean removeMessage, - EndpointConfigurer> endpointConfigurer) { - ClaimCheckOutTransformer claimCheckOutTransformer = new ClaimCheckOutTransformer(messageStore); - claimCheckOutTransformer.setRemoveMessage(removeMessage); - return this.transform(claimCheckOutTransformer, endpointConfigurer); - } - - public IntegrationFlowBuilder resequence() { - return this.resequence((EndpointConfigurer>) null); - } - - public IntegrationFlowBuilder resequence(EndpointConfigurer> endpointConfigurer) { - return this.handle(new ResequencerSpec().get(), endpointConfigurer); - } - - public IntegrationFlowBuilder resequence(ComponentConfigurer resequencerConfigurer, - EndpointConfigurer> endpointConfigurer) { - Assert.notNull(resequencerConfigurer); - ResequencerSpec spec = new ResequencerSpec(); - resequencerConfigurer.configure(spec); - return this.handle(spec.get(), endpointConfigurer); - } - - public IntegrationFlowBuilder aggregate() { - return aggregate((EndpointConfigurer>) null); - } - - public IntegrationFlowBuilder - aggregate(EndpointConfigurer> endpointConfigurer) { - return handle(new AggregatorSpec().get(), endpointConfigurer); - } - - public IntegrationFlowBuilder aggregate(ComponentConfigurer aggregatorConfigurer, - EndpointConfigurer> endpointConfigurer) { - Assert.notNull(aggregatorConfigurer); - AggregatorSpec spec = new AggregatorSpec(); - aggregatorConfigurer.configure(spec); - return this.handle(spec.get(), endpointConfigurer); - } - - public IntegrationFlowBuilder route(String beanName, String method) { - return this.route(beanName, method, null); - } - - public IntegrationFlowBuilder route(String beanName, String method, - ComponentConfigurer> routerConfigurer) { - return this.route(beanName, method, routerConfigurer, null); - } - - public IntegrationFlowBuilder route(String beanName, String method, - ComponentConfigurer> routerConfigurer, - EndpointConfigurer> endpointConfigurer) { - return this.route(new MethodInvokingRouter(new BeanNameMessageProcessor(beanName, method)), - routerConfigurer, endpointConfigurer); - } - - - public IntegrationFlowBuilder route(String expression) { - return this.route(expression, (ComponentConfigurer>) null); - } - - public IntegrationFlowBuilder route(String expression, - ComponentConfigurer> routerConfigurer) { - return this.route(expression, routerConfigurer, null); - } - - public IntegrationFlowBuilder route(String expression, - ComponentConfigurer> routerConfigurer, - EndpointConfigurer> endpointConfigurer) { - return this.route(new ExpressionEvaluatingRouter(PARSER.parseExpression(expression)), routerConfigurer, - endpointConfigurer); - } - - public IntegrationFlowBuilder route(GenericRouter router) { - return this.route(null, router); - } - - public IntegrationFlowBuilder route(GenericRouter router, - ComponentConfigurer> routerConfigurer) { - return this.route(null, router, routerConfigurer); - } - - public IntegrationFlowBuilder route(Class

payloadType, GenericRouter router) { - return this.route(payloadType, router, null, null); - } - - public IntegrationFlowBuilder route(Class

payloadType, GenericRouter router, - ComponentConfigurer> routerConfigurer) { - return this.route(payloadType, router, routerConfigurer, null); - } - - public IntegrationFlowBuilder route(GenericRouter router, - ComponentConfigurer> routerConfigurer, - EndpointConfigurer> endpointConfigurer) { - return route(null, router, routerConfigurer, endpointConfigurer); - } - - public IntegrationFlowBuilder route(Class

payloadType, GenericRouter router, - ComponentConfigurer> routerConfigurer, - EndpointConfigurer> endpointConfigurer) { - MethodInvokingRouter methodInvokingRouter = isLambda(router) - ? new MethodInvokingRouter(new LambdaMessageProcessor(router, payloadType)) - : new MethodInvokingRouter(router); - return this.route(methodInvokingRouter, routerConfigurer, endpointConfigurer); - } - - public IntegrationFlowBuilder route(R router, - ComponentConfigurer> routerConfigurer, - EndpointConfigurer> endpointConfigurer) { - if (routerConfigurer != null) { - RouterSpec routerSpec = new RouterSpec(router); - routerConfigurer.configure(routerSpec); - } - return this.route(router, endpointConfigurer); - } - - public IntegrationFlowBuilder recipientListRoute(ComponentConfigurer routerConfigurer) { - return this.recipientListRoute(routerConfigurer, null); - } - - public IntegrationFlowBuilder recipientListRoute(ComponentConfigurer routerConfigurer, - EndpointConfigurer> endpointConfigurer) { - Assert.notNull(routerConfigurer); - RecipientListRouterSpec spec = new RecipientListRouterSpec(); - routerConfigurer.configure(spec); - DslRecipientListRouter recipientListRouter = (DslRecipientListRouter) spec.get(); - Assert.notEmpty(recipientListRouter.getRecipients(), "recipient list must not be empty"); - return this.route(recipientListRouter, endpointConfigurer); - } - - public IntegrationFlowBuilder route(AbstractMessageRouter router) { - return this.route(router, null); - } - - public IntegrationFlowBuilder route(R router, - EndpointConfigurer> endpointConfigurer) { - return this.handle(router, endpointConfigurer); - } - - public IntegrationFlowBuilder gateway(String requestChannel) { - return gateway(requestChannel, null); - } - - public IntegrationFlowBuilder gateway(String requestChannel, - EndpointConfigurer endpointConfigurer) { - return register(new GatewayEndpointSpec(requestChannel), endpointConfigurer); - } - - public IntegrationFlowBuilder gateway(MessageChannel requestChannel) { - return gateway(requestChannel, null); - } - - public IntegrationFlowBuilder gateway(MessageChannel requestChannel, - EndpointConfigurer endpointConfigurer) { - return register(new GatewayEndpointSpec(requestChannel), endpointConfigurer); - } - - private > IntegrationFlowBuilder register(S endpointSpec, - EndpointConfigurer endpointConfigurer) { - if (endpointConfigurer != null) { - endpointConfigurer.configure(endpointSpec); - } - MessageChannel inputChannel = this.currentMessageChannel; - this.currentMessageChannel = null; - if (inputChannel == null) { - inputChannel = new DirectChannel(); - this.registerOutputChannelIfCan(inputChannel); - } - - if (inputChannel instanceof MessageChannelReference) { - endpointSpec.get().getT1().setInputChannelName(((MessageChannelReference) inputChannel).getName()); - } - else { - if (inputChannel instanceof FixedSubscriberChannelPrototype) { - String beanName = ((FixedSubscriberChannelPrototype) inputChannel).getName(); - inputChannel = new FixedSubscriberChannel(endpointSpec.get().getT2()); - if (beanName != null) { - ((FixedSubscriberChannel) inputChannel).setBeanName(beanName); - } - this.registerOutputChannelIfCan(inputChannel); - } - endpointSpec.get().getT1().setInputChannel(inputChannel); - } - - return this.addComponent(endpointSpec).currentComponent(endpointSpec.get().getT2()); - } - - private IntegrationFlowBuilder registerOutputChannelIfCan(MessageChannel outputChannel) { - if (!(outputChannel instanceof FixedSubscriberChannelPrototype)) { - this.flow.addComponent(outputChannel); - if (this.currentComponent != null) { - String channelName = null; - if (outputChannel instanceof MessageChannelReference) { - channelName = ((MessageChannelReference) outputChannel).getName(); - } - if (this.currentComponent instanceof AbstractReplyProducingMessageHandler) { - AbstractReplyProducingMessageHandler messageProducer = - (AbstractReplyProducingMessageHandler) this.currentComponent; - if (channelName != null) { - messageProducer.setOutputChannelName(channelName); - } - else { - messageProducer.setOutputChannel(outputChannel); - } - } - else if (this.currentComponent instanceof SourcePollingChannelAdapterSpec) { - SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean = - ((SourcePollingChannelAdapterSpec) this.currentComponent).get().getT1(); - if (channelName != null) { - pollingChannelAdapterFactoryBean.setOutputChannelName(channelName); - } - else { - pollingChannelAdapterFactoryBean.setOutputChannel(outputChannel); - } - } - else if (this.currentComponent instanceof AbstractCorrelatingMessageHandler) { - AbstractCorrelatingMessageHandler messageProducer = - (AbstractCorrelatingMessageHandler) this.currentComponent; - if (channelName != null) { - messageProducer.setOutputChannelName(channelName); - } - else { - messageProducer.setOutputChannel(outputChannel); - } - } - else { - throw new BeanCreationException("The 'currentComponent' (" + this.currentComponent + - ") is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. " + - "This is the end of the integration flow."); - } - this.currentComponent = null; - } - } - return this; - } - - public IntegrationFlow get() { + public StandardIntegrationFlow get() { if (this.currentMessageChannel instanceof FixedSubscriberChannelPrototype) { throw new BeanCreationException("The 'currentMessageChannel' (" + this.currentMessageChannel + ") is a prototype for FixedSubscriberChannel which can't be created without MessageHandler " + @@ -629,7 +34,7 @@ public final class IntegrationFlowBuilder { "in the IntegrationFlow definition."); } - if (this.flow.getIntegrationComponents().size() == 1) { + if (this.integrationComponents.size() == 1) { if (this.currentComponent != null) { if (this.currentComponent instanceof SourcePollingChannelAdapterSpec) { throw new BeanCreationException("The 'SourcePollingChannelAdapter' (" + this.currentComponent + ") " + @@ -641,12 +46,26 @@ public final class IntegrationFlowBuilder { "Add at lest '.bridge()' EIP-method before the end of flow."); } } - return this.flow; + return new StandardIntegrationFlow(this.integrationComponents); } - private static boolean isLambda(Object o) { - Class aClass = o.getClass(); - return aClass.isSynthetic() && !aClass.isAnonymousClass() && !aClass.isLocalClass(); + public static final class StandardIntegrationFlow implements IntegrationFlow { + + private final Set integrationComponents; + + StandardIntegrationFlow(Set integrationComponents) { + this.integrationComponents = integrationComponents; + } + + public Set getIntegrationComponents() { + return integrationComponents; + } + + @Override + public void define(IntegrationFlowDefinition flow) { + throw new UnsupportedOperationException(); + } + } } diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java new file mode 100644 index 0000000..fa3d2b2 --- /dev/null +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java @@ -0,0 +1,632 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dsl; + +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.Set; + +import org.springframework.beans.factory.BeanCreationException; +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler; +import org.springframework.integration.aggregator.AggregatingMessageHandler; +import org.springframework.integration.aggregator.ResequencingMessageHandler; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.FixedSubscriberChannel; +import org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean; +import org.springframework.integration.core.GenericSelector; +import org.springframework.integration.core.MessageSelector; +import org.springframework.integration.dsl.channel.MessageChannelSpec; +import org.springframework.integration.dsl.core.ConsumerEndpointSpec; +import org.springframework.integration.dsl.core.MessageHandlerSpec; +import org.springframework.integration.dsl.support.BeanNameMessageProcessor; +import org.springframework.integration.dsl.support.ComponentConfigurer; +import org.springframework.integration.dsl.support.EndpointConfigurer; +import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype; +import org.springframework.integration.dsl.support.GenericHandler; +import org.springframework.integration.dsl.support.GenericRouter; +import org.springframework.integration.dsl.support.GenericSplitter; +import org.springframework.integration.dsl.support.MessageChannelReference; +import org.springframework.integration.expression.ControlBusMethodFilter; +import org.springframework.integration.filter.ExpressionEvaluatingSelector; +import org.springframework.integration.filter.MessageFilter; +import org.springframework.integration.filter.MethodInvokingSelector; +import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; +import org.springframework.integration.handler.BridgeHandler; +import org.springframework.integration.handler.DelayHandler; +import org.springframework.integration.handler.ExpressionCommandMessageProcessor; +import org.springframework.integration.handler.ServiceActivatingHandler; +import org.springframework.integration.router.AbstractMappingMessageRouter; +import org.springframework.integration.router.AbstractMessageRouter; +import org.springframework.integration.router.ExpressionEvaluatingRouter; +import org.springframework.integration.router.MethodInvokingRouter; +import org.springframework.integration.router.RecipientListRouter; +import org.springframework.integration.splitter.AbstractMessageSplitter; +import org.springframework.integration.splitter.DefaultMessageSplitter; +import org.springframework.integration.splitter.ExpressionEvaluatingSplitter; +import org.springframework.integration.splitter.MethodInvokingSplitter; +import org.springframework.integration.store.MessageStore; +import org.springframework.integration.transformer.ClaimCheckInTransformer; +import org.springframework.integration.transformer.ClaimCheckOutTransformer; +import org.springframework.integration.transformer.ContentEnricher; +import org.springframework.integration.transformer.ExpressionEvaluatingTransformer; +import org.springframework.integration.transformer.GenericTransformer; +import org.springframework.integration.transformer.HeaderFilter; +import org.springframework.integration.transformer.MessageTransformingHandler; +import org.springframework.integration.transformer.MethodInvokingTransformer; +import org.springframework.integration.transformer.Transformer; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * @author Artem Bilan + */ +public abstract class IntegrationFlowDefinition> { + + private final static SpelExpressionParser PARSER = new SpelExpressionParser(); + + protected final Set integrationComponents = new LinkedHashSet(); + + protected MessageChannel currentMessageChannel; + + protected Object currentComponent; + + IntegrationFlowDefinition() { + } + + B addComponent(Object component) { + this.integrationComponents.add(component); + return _this(); + } + + B addComponents(Collection components) { + for (Object component : components) { + this.integrationComponents.add(component); + } + return _this(); + } + + B currentComponent(Object component) { + this.currentComponent = component; + return _this(); + } + + public B fixedSubscriberChannel() { + return this.fixedSubscriberChannel(null); + } + + public B fixedSubscriberChannel(String messageChannelName) { + return this.channel(new FixedSubscriberChannelPrototype(messageChannelName)); + } + + public B channel(String messageChannelName) { + return this.channel(new MessageChannelReference(messageChannelName)); + } + + public B channel(MessageChannel messageChannel) { + Assert.notNull(messageChannel); + if (this.currentMessageChannel != null) { + this.register(new GenericEndpointSpec(new BridgeHandler()), null); + } + this.currentMessageChannel = messageChannel; + return this.registerOutputChannelIfCan(this.currentMessageChannel); + } + + public B channel(MessageChannelSpec messageChannelSpec) { + Assert.notNull(messageChannelSpec); + return this.channel(messageChannelSpec.get()); + } + + public B controlBus() { + return controlBus(null); + } + + public B controlBus(EndpointConfigurer> endpointConfigurer) { + return this.handle(new ServiceActivatingHandler(new ExpressionCommandMessageProcessor(new ControlBusMethodFilter())), + endpointConfigurer); + } + + public B transform(String expression) { + Assert.hasText(expression); + return this.transform(new ExpressionEvaluatingTransformer(PARSER.parseExpression(expression))); + } + + public B transform(GenericTransformer genericTransformer) { + return this.transform(null, genericTransformer); + } + + public B transform(Class

payloadType, GenericTransformer genericTransformer) { + return this.transform(payloadType, genericTransformer, null); + } + + public B transform(GenericTransformer genericTransformer, + EndpointConfigurer> endpointConfigurer) { + return this.transform(null, genericTransformer, endpointConfigurer); + } + + public B transform(Class

payloadType, GenericTransformer genericTransformer, + EndpointConfigurer> endpointConfigurer) { + Assert.notNull(genericTransformer); + Transformer transformer = genericTransformer instanceof Transformer ? (Transformer) genericTransformer : + (isLambda(genericTransformer) + ? new MethodInvokingTransformer(new LambdaMessageProcessor(genericTransformer, payloadType)) + : new MethodInvokingTransformer(genericTransformer)); + return addComponent(transformer) + .handle(new MessageTransformingHandler(transformer), endpointConfigurer); + } + + public B filter(String expression) { + Assert.hasText(expression); + return this.filter(new ExpressionEvaluatingSelector(PARSER.parseExpression(expression))); + } + + public B filter(GenericSelector genericSelector) { + return this.filter(null, genericSelector); + } + + public

B filter(Class

payloadType, GenericSelector

genericSelector) { + return this.filter(payloadType, genericSelector, null); + } + + public

B filter(GenericSelector

genericSelector, + EndpointConfigurer endpointConfigurer) { + return filter(null, genericSelector, endpointConfigurer); + } + + public

B filter(Class

payloadType, GenericSelector

genericSelector, + EndpointConfigurer endpointConfigurer) { + Assert.notNull(genericSelector); + MessageSelector selector = genericSelector instanceof MessageSelector ? (MessageSelector) genericSelector : + (isLambda(genericSelector) + ? new MethodInvokingSelector(new LambdaMessageProcessor(genericSelector, payloadType)) + : new MethodInvokingSelector(genericSelector)); + return this.register(new FilterEndpointSpec(new MessageFilter(selector)), endpointConfigurer); + } + + public > B handle(S messageHandlerSpec) { + Assert.notNull(messageHandlerSpec); + return handle(messageHandlerSpec.get()); + } + + public B handle(MessageHandler messageHandler) { + return this.handle(messageHandler, null); + } + + public B handle(String beanName, String methodName) { + return this.handle(beanName, methodName, null); + } + + public B handle(String beanName, String methodName, + EndpointConfigurer> endpointConfigurer) { + return this.handle(new ServiceActivatingHandler(new BeanNameMessageProcessor(beanName, methodName)), + endpointConfigurer); + } + + public

B handle(GenericHandler

handler) { + return this.handle(null, handler); + } + + public

B handle(GenericHandler

handler, + EndpointConfigurer> endpointConfigurer) { + return this.handle(null, handler, endpointConfigurer); + } + + public

B handle(Class

payloadType, GenericHandler

handler) { + return this.handle(payloadType, handler, null); + } + + public

B handle(Class

payloadType, GenericHandler

handler, + EndpointConfigurer> endpointConfigurer) { + ServiceActivatingHandler serviceActivatingHandler = null; + if (isLambda(handler)) { + serviceActivatingHandler = new ServiceActivatingHandler(new LambdaMessageProcessor(handler, payloadType)); + } + else { + serviceActivatingHandler = new ServiceActivatingHandler(handler); + } + return this.handle(serviceActivatingHandler, endpointConfigurer); + } + + public > + B handle(S messageHandlerSpec, EndpointConfigurer> endpointConfigurer) { + Assert.notNull(messageHandlerSpec); + return handle(messageHandlerSpec.get(), endpointConfigurer); + } + + public B handle(H messageHandler, + EndpointConfigurer> endpointConfigurer) { + Assert.notNull(messageHandler); + return this.register(new GenericEndpointSpec(messageHandler), endpointConfigurer); + } + + public B bridge(EndpointConfigurer> endpointConfigurer) { + return this.register(new GenericEndpointSpec(new BridgeHandler()), endpointConfigurer); + } + + public B delay(String groupId, String expression) { + return this.delay(groupId, expression, null); + } + + public B delay(String groupId, String expression, + EndpointConfigurer endpointConfigurer) { + DelayHandler delayHandler = new DelayHandler(groupId); + if (StringUtils.hasText(expression)) { + delayHandler.setDelayExpression(PARSER.parseExpression(expression)); + } + return this.register(new DelayerEndpointSpec(delayHandler), endpointConfigurer); + } + + public B enrich(ComponentConfigurer enricherConfigurer) { + return this.enrich(enricherConfigurer, null); + } + + public B enrich(ComponentConfigurer enricherConfigurer, + EndpointConfigurer> endpointConfigurer) { + Assert.notNull(enricherConfigurer); + EnricherSpec enricherSpec = new EnricherSpec(); + enricherConfigurer.configure(enricherSpec); + return this.handle(enricherSpec.get(), endpointConfigurer); + } + + public B enrichHeaders(ComponentConfigurer headerEnricherConfigurer) { + return this.enrichHeaders(headerEnricherConfigurer, null); + } + + public B enrichHeaders(ComponentConfigurer headerEnricherConfigurer, + EndpointConfigurer> endpointConfigurer) { + Assert.notNull(headerEnricherConfigurer); + HeaderEnricherSpec headerEnricherSpec = new HeaderEnricherSpec(); + headerEnricherConfigurer.configure(headerEnricherSpec); + return transform(headerEnricherSpec.get(), endpointConfigurer); + } + + public B split(EndpointConfigurer> endpointConfigurer) { + return this.split(new DefaultMessageSplitter(), endpointConfigurer); + } + + public B split(String expression, + EndpointConfigurer> endpointConfigurer) { + return this.split(new ExpressionEvaluatingSplitter(PARSER.parseExpression(expression)), endpointConfigurer); + } + + public B split(String beanName, String methodName) { + return this.split(beanName, methodName, null); + } + + public B split(String beanName, String methodName, + EndpointConfigurer> endpointConfigurer) { + return this.split(new MethodInvokingSplitter(new BeanNameMessageProcessor>(beanName, methodName)), + endpointConfigurer); + } + + public

B split(Class

payloadType, GenericSplitter

splitter) { + return this.split(payloadType, splitter, null); + } + + public B split(GenericSplitter splitter, + EndpointConfigurer> endpointConfigurer) { + return split(null, splitter, endpointConfigurer); + } + + public

B split(Class

payloadType, GenericSplitter

splitter, + EndpointConfigurer> endpointConfigurer) { + MethodInvokingSplitter split = isLambda(splitter) + ? new MethodInvokingSplitter(new LambdaMessageProcessor(splitter, payloadType)) + : new MethodInvokingSplitter(splitter, "split"); + return this.split(split, endpointConfigurer); + } + + public B split(S splitter, + EndpointConfigurer> endpointConfigurer) { + Assert.notNull(splitter); + return this.register(new SplitterEndpointSpec(splitter), endpointConfigurer); + } + + /** + * Provides the {@link HeaderFilter} to the current {@link org.springframework.integration.dsl.IntegrationFlowBuilder.StandardIntegrationFlow}. + * @param headersToRemove the array of headers (or patterns) + * to remove from {@link org.springframework.messaging.MessageHeaders}. + * @return this {@link IntegrationFlowDefinition}. + */ + public B headerFilter(String... headersToRemove) { + return this.headerFilter(new HeaderFilter(headersToRemove), null); + } + + /** + * Provides the {@link HeaderFilter} to the current {@link org.springframework.integration.dsl.IntegrationFlowBuilder.StandardIntegrationFlow}. + * @param headersToRemove the comma separated headers (or patterns) to remove from + * {@link org.springframework.messaging.MessageHeaders}. + * @param patternMatch the {@code boolean} flag to indicate if {@code headersToRemove} + * should be interpreted as patterns or direct header names. + * @return this {@link IntegrationFlowDefinition}. + */ + public B headerFilter(String headersToRemove, boolean patternMatch) { + HeaderFilter headerFilter = new HeaderFilter(StringUtils.delimitedListToStringArray(headersToRemove, ",", " ")); + headerFilter.setPatternMatch(patternMatch); + return this.headerFilter(headerFilter, null); + } + + public B headerFilter(HeaderFilter headerFilter, + EndpointConfigurer> endpointConfigurer) { + return this.transform(headerFilter, endpointConfigurer); + } + + public B claimCheckIn(MessageStore messageStore) { + return this.claimCheckIn(messageStore, null); + } + + public B claimCheckIn(MessageStore messageStore, + EndpointConfigurer> endpointConfigurer) { + return this.transform(new ClaimCheckInTransformer(messageStore), endpointConfigurer); + } + + public B claimCheckOut(MessageStore messageStore) { + return this.claimCheckOut(messageStore, false); + } + + public B claimCheckOut(MessageStore messageStore, boolean removeMessage) { + return this.claimCheckOut(messageStore, removeMessage, null); + } + + public B claimCheckOut(MessageStore messageStore, boolean removeMessage, + EndpointConfigurer> endpointConfigurer) { + ClaimCheckOutTransformer claimCheckOutTransformer = new ClaimCheckOutTransformer(messageStore); + claimCheckOutTransformer.setRemoveMessage(removeMessage); + return this.transform(claimCheckOutTransformer, endpointConfigurer); + } + + public B resequence() { + return this.resequence((EndpointConfigurer>) null); + } + + public B resequence(EndpointConfigurer> endpointConfigurer) { + return this.handle(new ResequencerSpec().get(), endpointConfigurer); + } + + public B resequence(ComponentConfigurer resequencerConfigurer, + EndpointConfigurer> endpointConfigurer) { + Assert.notNull(resequencerConfigurer); + ResequencerSpec spec = new ResequencerSpec(); + resequencerConfigurer.configure(spec); + return this.handle(spec.get(), endpointConfigurer); + } + + public B aggregate() { + return aggregate((EndpointConfigurer>) null); + } + + public B + aggregate(EndpointConfigurer> endpointConfigurer) { + return handle(new AggregatorSpec().get(), endpointConfigurer); + } + + public B aggregate(ComponentConfigurer aggregatorConfigurer, + EndpointConfigurer> endpointConfigurer) { + Assert.notNull(aggregatorConfigurer); + AggregatorSpec spec = new AggregatorSpec(); + aggregatorConfigurer.configure(spec); + return this.handle(spec.get(), endpointConfigurer); + } + + public B route(String beanName, String method) { + return this.route(beanName, method, null); + } + + public B route(String beanName, String method, + ComponentConfigurer> routerConfigurer) { + return this.route(beanName, method, routerConfigurer, null); + } + + public B route(String beanName, String method, + ComponentConfigurer> routerConfigurer, + EndpointConfigurer> endpointConfigurer) { + return this.route(new MethodInvokingRouter(new BeanNameMessageProcessor(beanName, method)), + routerConfigurer, endpointConfigurer); + } + + public B route(String expression) { + return this.route(expression, (ComponentConfigurer>) null); + } + + public B route(String expression, + ComponentConfigurer> routerConfigurer) { + return this.route(expression, routerConfigurer, null); + } + + public B route(String expression, + ComponentConfigurer> routerConfigurer, + EndpointConfigurer> endpointConfigurer) { + return this.route(new ExpressionEvaluatingRouter(PARSER.parseExpression(expression)), routerConfigurer, + endpointConfigurer); + } + + public B route(GenericRouter router) { + return this.route(null, router); + } + + public B route(GenericRouter router, + ComponentConfigurer> routerConfigurer) { + return this.route(null, router, routerConfigurer); + } + + public B route(Class

payloadType, GenericRouter router) { + return this.route(payloadType, router, null, null); + } + + public B route(Class

payloadType, GenericRouter router, + ComponentConfigurer> routerConfigurer) { + return this.route(payloadType, router, routerConfigurer, null); + } + + public B route(GenericRouter router, + ComponentConfigurer> routerConfigurer, + EndpointConfigurer> endpointConfigurer) { + return route(null, router, routerConfigurer, endpointConfigurer); + } + + public B route(Class

payloadType, GenericRouter router, + ComponentConfigurer> routerConfigurer, + EndpointConfigurer> endpointConfigurer) { + MethodInvokingRouter methodInvokingRouter = isLambda(router) + ? new MethodInvokingRouter(new LambdaMessageProcessor(router, payloadType)) + : new MethodInvokingRouter(router); + return this.route(methodInvokingRouter, routerConfigurer, endpointConfigurer); + } + + public B route(R router, + ComponentConfigurer> routerConfigurer, + EndpointConfigurer> endpointConfigurer) { + if (routerConfigurer != null) { + RouterSpec routerSpec = new RouterSpec(router); + routerConfigurer.configure(routerSpec); + } + return this.route(router, endpointConfigurer); + } + + public B recipientListRoute(ComponentConfigurer routerConfigurer) { + return this.recipientListRoute(routerConfigurer, null); + } + + public B recipientListRoute(ComponentConfigurer routerConfigurer, + EndpointConfigurer> endpointConfigurer) { + Assert.notNull(routerConfigurer); + RecipientListRouterSpec spec = new RecipientListRouterSpec(); + routerConfigurer.configure(spec); + DslRecipientListRouter recipientListRouter = (DslRecipientListRouter) spec.get(); + Assert.notEmpty(recipientListRouter.getRecipients(), "recipient list must not be empty"); + return this.route(recipientListRouter, endpointConfigurer); + } + + public B route(AbstractMessageRouter router) { + return this.route(router, null); + } + + public B route(R router, + EndpointConfigurer> endpointConfigurer) { + return this.handle(router, endpointConfigurer); + } + + public B gateway(String requestChannel) { + return gateway(requestChannel, null); + } + + public B gateway(String requestChannel, + EndpointConfigurer endpointConfigurer) { + return register(new GatewayEndpointSpec(requestChannel), endpointConfigurer); + } + + public B gateway(MessageChannel requestChannel) { + return gateway(requestChannel, null); + } + + public B gateway(MessageChannel requestChannel, + EndpointConfigurer endpointConfigurer) { + return register(new GatewayEndpointSpec(requestChannel), endpointConfigurer); + } + + private > B register(S endpointSpec, + EndpointConfigurer endpointConfigurer) { + if (endpointConfigurer != null) { + endpointConfigurer.configure(endpointSpec); + } + MessageChannel inputChannel = this.currentMessageChannel; + this.currentMessageChannel = null; + if (inputChannel == null) { + inputChannel = new DirectChannel(); + this.registerOutputChannelIfCan(inputChannel); + } + + if (inputChannel instanceof MessageChannelReference) { + endpointSpec.get().getT1().setInputChannelName(((MessageChannelReference) inputChannel).getName()); + } + else { + if (inputChannel instanceof FixedSubscriberChannelPrototype) { + String beanName = ((FixedSubscriberChannelPrototype) inputChannel).getName(); + inputChannel = new FixedSubscriberChannel(endpointSpec.get().getT2()); + if (beanName != null) { + ((FixedSubscriberChannel) inputChannel).setBeanName(beanName); + } + this.registerOutputChannelIfCan(inputChannel); + } + endpointSpec.get().getT1().setInputChannel(inputChannel); + } + + return this.addComponent(endpointSpec).currentComponent(endpointSpec.get().getT2()); + } + + private B registerOutputChannelIfCan(MessageChannel outputChannel) { + if (!(outputChannel instanceof FixedSubscriberChannelPrototype)) { + this.integrationComponents.add(outputChannel); + if (this.currentComponent != null) { + String channelName = null; + if (outputChannel instanceof MessageChannelReference) { + channelName = ((MessageChannelReference) outputChannel).getName(); + } + if (this.currentComponent instanceof AbstractReplyProducingMessageHandler) { + AbstractReplyProducingMessageHandler messageProducer = + (AbstractReplyProducingMessageHandler) this.currentComponent; + if (channelName != null) { + messageProducer.setOutputChannelName(channelName); + } + else { + messageProducer.setOutputChannel(outputChannel); + } + } + else if (this.currentComponent instanceof SourcePollingChannelAdapterSpec) { + SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean = + ((SourcePollingChannelAdapterSpec) this.currentComponent).get().getT1(); + if (channelName != null) { + pollingChannelAdapterFactoryBean.setOutputChannelName(channelName); + } + else { + pollingChannelAdapterFactoryBean.setOutputChannel(outputChannel); + } + } + else if (this.currentComponent instanceof AbstractCorrelatingMessageHandler) { + AbstractCorrelatingMessageHandler messageProducer = + (AbstractCorrelatingMessageHandler) this.currentComponent; + if (channelName != null) { + messageProducer.setOutputChannelName(channelName); + } + else { + messageProducer.setOutputChannel(outputChannel); + } + } + else { + throw new BeanCreationException("The 'currentComponent' (" + this.currentComponent + + ") is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. " + + "This is the end of the integration flow."); + } + this.currentComponent = null; + } + } + return _this(); + } + + @SuppressWarnings("unchecked") + protected final B _this() { + return (B) this; + } + + private static boolean isLambda(Object o) { + Class aClass = o.getClass(); + return aClass.isSynthetic() && !aClass.isAnonymousClass() && !aClass.isLocalClass(); + } + +} diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlows.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlows.java index e93a455..8a888ec 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlows.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlows.java @@ -21,9 +21,9 @@ import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.channel.MessageChannelSpec; import org.springframework.integration.dsl.core.ComponentsRegistration; +import org.springframework.integration.dsl.core.MessageProducerSpec; import org.springframework.integration.dsl.core.MessageSourceSpec; import org.springframework.integration.dsl.core.MessagingGatewaySpec; -import org.springframework.integration.dsl.core.MessagingProducerSpec; import org.springframework.integration.dsl.support.EndpointConfigurer; import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype; import org.springframework.integration.dsl.support.MessageChannelReference; @@ -98,8 +98,8 @@ public final class IntegrationFlows { .currentComponent(spec); } - public static IntegrationFlowBuilder from(MessagingProducerSpec messagingProducerSpec) { - return from(messagingProducerSpec.get(), registerComponents(messagingProducerSpec)); + public static IntegrationFlowBuilder from(MessageProducerSpec messageProducerSpec) { + return from(messageProducerSpec.get(), registerComponents(messageProducerSpec)); } public static IntegrationFlowBuilder from(MessageProducerSupport messageProducer) { diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/Amqp.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/Amqp.java index c2a1c3b..d0e51f8 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/Amqp.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/Amqp.java @@ -22,8 +22,8 @@ import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter; import org.springframework.integration.amqp.inbound.AmqpInboundGateway; +import org.springframework.integration.dsl.core.MessageProducerSpec; import org.springframework.integration.dsl.core.MessagingGatewaySpec; -import org.springframework.integration.dsl.core.MessagingProducerSpec; /** * @author Artem Bilan @@ -59,7 +59,7 @@ public abstract class Amqp { return (AmqpInboundChannelAdapterSpec) inboundAdapter(listenerContainer); } - public static MessagingProducerSpec inboundAdapter( + public static MessageProducerSpec inboundAdapter( SimpleMessageListenerContainer listenerContainer) { return new AmqpInboundChannelAdapterSpec(listenerContainer); } diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/AmqpInboundChannelAdapterSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/AmqpInboundChannelAdapterSpec.java index f0cb84f..23ff88e 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/AmqpInboundChannelAdapterSpec.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/amqp/AmqpInboundChannelAdapterSpec.java @@ -27,14 +27,14 @@ import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter; import org.springframework.integration.amqp.support.AmqpHeaderMapper; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; -import org.springframework.integration.dsl.core.MessagingProducerSpec; +import org.springframework.integration.dsl.core.MessageProducerSpec; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.util.ErrorHandler; /** * @author Artem Bilan */ -public class AmqpInboundChannelAdapterSpec extends MessagingProducerSpec { +public class AmqpInboundChannelAdapterSpec extends MessageProducerSpec { private final SimpleMessageListenerContainer listenerContainer; diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/IntegrationFlowBeanPostProcessor.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/IntegrationFlowBeanPostProcessor.java index f447036..46d30fb 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/IntegrationFlowBeanPostProcessor.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/IntegrationFlowBeanPostProcessor.java @@ -33,6 +33,8 @@ import org.springframework.integration.config.IntegrationConfigUtils; import org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlowBuilder; +import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec; import org.springframework.integration.dsl.support.MessageChannelReference; import org.springframework.integration.support.context.NamedComponent; @@ -65,104 +67,118 @@ public class IntegrationFlowBeanPostProcessor implements BeanPostProcessor, Bean @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { - if (bean instanceof IntegrationFlow) { - IntegrationFlow flow = (IntegrationFlow) bean; - String flowNamePrefix = beanName + "."; - int channelNameIndex = 0; - for (Object component : flow.getIntegrationComponents()) { - if (component instanceof ConsumerEndpointSpec) { - ConsumerEndpointSpec endpointSpec = (ConsumerEndpointSpec) component; - MessageHandler messageHandler = endpointSpec.get().getT2(); - ConsumerEndpointFactoryBean endpoint = endpointSpec.get().getT1(); - String id = endpointSpec.getId(); + if (bean instanceof IntegrationFlowBuilder.StandardIntegrationFlow) { + return processStandardIntegrationFlow((IntegrationFlowBuilder.StandardIntegrationFlow) bean, beanName); + } + else if (bean instanceof IntegrationFlow) { + return processLambdaIntegrationFlow((IntegrationFlow) bean, beanName); + } + return bean; + } - Collection messageHandlers = this.beanFactory.getBeansOfType(MessageHandler.class, false, - false).values(); + private Object processStandardIntegrationFlow(IntegrationFlowBuilder.StandardIntegrationFlow flow, + String beanName) { + String flowNamePrefix = beanName + "."; + int channelNameIndex = 0; + for (Object component : flow.getIntegrationComponents()) { + if (component instanceof ConsumerEndpointSpec) { + ConsumerEndpointSpec endpointSpec = (ConsumerEndpointSpec) component; + MessageHandler messageHandler = endpointSpec.get().getT2(); + ConsumerEndpointFactoryBean endpoint = endpointSpec.get().getT1(); + String id = endpointSpec.getId(); - if (!messageHandlers.contains(messageHandler)) { - String handlerBeanName = generateBeanName(messageHandler); - String[] handlerAlias = id != null - ? new String[] {id + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX} - : null; + Collection messageHandlers = this.beanFactory.getBeansOfType(MessageHandler.class, false, + false).values(); - registerComponent(messageHandler, handlerBeanName); - if (handlerAlias != null) { - for (String alias : handlerAlias) { - this.beanFactory.registerAlias(handlerBeanName, alias); - } + if (!messageHandlers.contains(messageHandler)) { + String handlerBeanName = generateBeanName(messageHandler); + String[] handlerAlias = id != null + ? new String[] {id + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX} + : null; + + registerComponent(messageHandler, handlerBeanName); + if (handlerAlias != null) { + for (String alias : handlerAlias) { + this.beanFactory.registerAlias(handlerBeanName, alias); } } - - String endpointBeanName = id; - if (endpointBeanName == null) { - endpointBeanName = generateBeanName(endpoint); - } - registerComponent(endpoint, endpointBeanName); } - else { - //TODO workaround until SF will fix 'TypeDescriptor.forObject' - if (component instanceof MessageChannel) { - Collection messageChannels = - this.beanFactory.getBeansOfType(MessageChannel.class, false, false).values(); - if (!messageChannels.contains(component)) { - if (component instanceof AbstractMessageChannel) { - String channelBeanName = ((AbstractMessageChannel) component).getComponentName(); - if (channelBeanName == null) { - channelBeanName = flowNamePrefix + "channel" + - BeanFactoryUtils.GENERATED_BEAN_NAME_SEPARATOR + channelNameIndex++; - } - registerComponent(component, channelBeanName); - } - else if (component instanceof MessageChannelReference) { - String channelBeanName = ((MessageChannelReference) component).getName(); - if (!this.beanFactory.containsBean(channelBeanName)) { - DirectChannel directChannel = new DirectChannel(); - registerComponent(directChannel, channelBeanName); - } - } - else if (component instanceof FixedSubscriberChannel) { - FixedSubscriberChannel fixedSubscriberChannel = (FixedSubscriberChannel) component; - String channelBeanName = fixedSubscriberChannel.getComponentName(); - if ("Unnamed fixed subscriber channel".equals(channelBeanName)) { - channelBeanName = flowNamePrefix + "channel" + - BeanFactoryUtils.GENERATED_BEAN_NAME_SEPARATOR + channelNameIndex++; - } - registerComponent(component, channelBeanName); - } - } - } - else if (component instanceof SourcePollingChannelAdapterSpec) { - SourcePollingChannelAdapterSpec spec = (SourcePollingChannelAdapterSpec) component; - SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean = spec.get().getT1(); - String id = ((IntegrationComponentSpec) spec).getId(); - if (!StringUtils.hasText(id)) { - id = generateBeanName(pollingChannelAdapterFactoryBean); - } - registerComponent(pollingChannelAdapterFactoryBean, id); - MessageSource messageSource = spec.get().getT2(); - if (!this.beanFactory - .getBeansOfType(MessageSource.class, false, false) - .values() - .contains(messageSource)) { - String messageSourceId = id + ".source"; - if (messageSource instanceof NamedComponent - && ((NamedComponent) messageSource).getComponentName() != null) { - messageSourceId = ((NamedComponent) messageSource).getComponentName(); + String endpointBeanName = id; + if (endpointBeanName == null) { + endpointBeanName = generateBeanName(endpoint); + } + registerComponent(endpoint, endpointBeanName); + } + else { + //TODO workaround until SF will fix 'TypeDescriptor.forObject' + if (component instanceof MessageChannel) { + Collection messageChannels = + this.beanFactory.getBeansOfType(MessageChannel.class, false, false).values(); + if (!messageChannels.contains(component)) { + if (component instanceof AbstractMessageChannel) { + String channelBeanName = ((AbstractMessageChannel) component).getComponentName(); + if (channelBeanName == null) { + channelBeanName = flowNamePrefix + "channel" + + BeanFactoryUtils.GENERATED_BEAN_NAME_SEPARATOR + channelNameIndex++; } - registerComponent(messageSource, messageSourceId); + registerComponent(component, channelBeanName); + } + else if (component instanceof MessageChannelReference) { + String channelBeanName = ((MessageChannelReference) component).getName(); + if (!this.beanFactory.containsBean(channelBeanName)) { + DirectChannel directChannel = new DirectChannel(); + registerComponent(directChannel, channelBeanName); + } + } + else if (component instanceof FixedSubscriberChannel) { + FixedSubscriberChannel fixedSubscriberChannel = (FixedSubscriberChannel) component; + String channelBeanName = fixedSubscriberChannel.getComponentName(); + if ("Unnamed fixed subscriber channel".equals(channelBeanName)) { + channelBeanName = flowNamePrefix + "channel" + + BeanFactoryUtils.GENERATED_BEAN_NAME_SEPARATOR + channelNameIndex++; + } + registerComponent(component, channelBeanName); } } - else if (!this.beanFactory - .getBeansOfType(AopUtils.getTargetClass(component), false, false) - .values() - .contains(component)) { - registerComponent(component, generateBeanName(component)); + } + else if (component instanceof SourcePollingChannelAdapterSpec) { + SourcePollingChannelAdapterSpec spec = (SourcePollingChannelAdapterSpec) component; + SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean = spec.get().getT1(); + String id = ((IntegrationComponentSpec) spec).getId(); + if (!StringUtils.hasText(id)) { + id = generateBeanName(pollingChannelAdapterFactoryBean); } + registerComponent(pollingChannelAdapterFactoryBean, id); + + MessageSource messageSource = spec.get().getT2(); + if (!this.beanFactory + .getBeansOfType(MessageSource.class, false, false) + .values() + .contains(messageSource)) { + String messageSourceId = id + ".source"; + if (messageSource instanceof NamedComponent + && ((NamedComponent) messageSource).getComponentName() != null) { + messageSourceId = ((NamedComponent) messageSource).getComponentName(); + } + registerComponent(messageSource, messageSourceId); + } + } + else if (!this.beanFactory + .getBeansOfType(AopUtils.getTargetClass(component), false, false) + .values() + .contains(component)) { + registerComponent(component, generateBeanName(component)); } } } - return bean; + return flow; + } + + private Object processLambdaIntegrationFlow(IntegrationFlow flow, String beanName) { + IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(beanName + ".input"); + flow.define(flowBuilder); + return processStandardIntegrationFlow(flowBuilder.get(), beanName); } private void registerComponent(Object component, String beanName) { diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/MessagingProducerSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/MessageProducerSpec.java similarity index 90% rename from spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/MessagingProducerSpec.java rename to spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/MessageProducerSpec.java index 5747cca..f1db9cb 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/MessagingProducerSpec.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/core/MessageProducerSpec.java @@ -22,10 +22,10 @@ import org.springframework.messaging.MessageChannel; /** * @author Artem Bilan */ -public abstract class MessagingProducerSpec, P extends MessageProducerSupport> +public abstract class MessageProducerSpec, P extends MessageProducerSupport> extends IntegrationComponentSpec { - public MessagingProducerSpec(P producer) { + public MessageProducerSpec(P producer) { this.target = producer; } diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/file/TailAdapterSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/file/TailAdapterSpec.java index 27c0fbc..4cd816c 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/file/TailAdapterSpec.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/file/TailAdapterSpec.java @@ -19,7 +19,7 @@ package org.springframework.integration.dsl.file; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.core.task.TaskExecutor; import org.springframework.integration.channel.NullChannel; -import org.springframework.integration.dsl.core.MessagingProducerSpec; +import org.springframework.integration.dsl.core.MessageProducerSpec; import org.springframework.integration.file.config.FileTailInboundChannelAdapterFactoryBean; import org.springframework.integration.file.tail.FileTailingMessageProducerSupport; import org.springframework.messaging.MessageChannel; @@ -29,7 +29,7 @@ import org.springframework.util.Assert; /** * @author Artem Bilan */ -public class TailAdapterSpec extends MessagingProducerSpec { +public class TailAdapterSpec extends MessageProducerSpec { private final FileTailInboundChannelAdapterFactoryBean factoryBean = new FileTailInboundChannelAdapterFactoryBean(); diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsMessageDrivenChannelAdapterSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsMessageDrivenChannelAdapterSpec.java index 97b83a1..6216bbe 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsMessageDrivenChannelAdapterSpec.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/jms/JmsMessageDrivenChannelAdapterSpec.java @@ -18,7 +18,7 @@ package org.springframework.integration.dsl.jms; import javax.jms.Destination; -import org.springframework.integration.dsl.core.MessagingProducerSpec; +import org.springframework.integration.dsl.core.MessageProducerSpec; import org.springframework.integration.dsl.support.ComponentConfigurer; import org.springframework.integration.jms.ChannelPublishingJmsMessageListener; import org.springframework.integration.jms.JmsHeaderMapper; @@ -30,7 +30,7 @@ import org.springframework.util.Assert; * @author Artem Bilan */ public class JmsMessageDrivenChannelAdapterSpec> - extends MessagingProducerSpec { + extends MessageProducerSpec { JmsMessageDrivenChannelAdapterSpec(AbstractMessageListenerContainer listenerContainer) { super(new JmsMessageDrivenChannelAdapter(listenerContainer, new ChannelPublishingJmsMessageListener())); diff --git a/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/flows/IntegrationFlowTests.java b/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/flows/IntegrationFlowTests.java index 51c8348..a52fcba 100644 --- a/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/flows/IntegrationFlowTests.java +++ b/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/flows/IntegrationFlowTests.java @@ -46,14 +46,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; -import com.jcraft.jsch.ChannelSftp; -import com.mongodb.MongoClient; -import de.flapdoodle.embed.mongo.MongodExecutable; -import de.flapdoodle.embed.mongo.MongodStarter; -import de.flapdoodle.embed.mongo.config.MongodConfigBuilder; -import de.flapdoodle.embed.mongo.config.Net; -import de.flapdoodle.embed.mongo.distribution.Version; -import de.flapdoodle.embed.process.runtime.Network; import org.aopalliance.aop.Advice; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; @@ -66,6 +58,15 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; +import com.jcraft.jsch.ChannelSftp; +import com.mongodb.MongoClient; +import de.flapdoodle.embed.mongo.MongodExecutable; +import de.flapdoodle.embed.mongo.MongodStarter; +import de.flapdoodle.embed.mongo.config.MongodConfigBuilder; +import de.flapdoodle.embed.mongo.config.Net; +import de.flapdoodle.embed.mongo.distribution.Version; +import de.flapdoodle.embed.process.runtime.Network; + import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.AnonymousQueue; import org.springframework.amqp.core.Queue; @@ -255,7 +256,7 @@ public class IntegrationFlowTests { private FixedSubscriberChannel enricherInput3; @Autowired - @Qualifier("splitInput") + @Qualifier("splitResequenceFlow.input") private MessageChannel splitInput; @Autowired @@ -295,7 +296,7 @@ public class IntegrationFlowTests { private MessageChannel routerMethod2Input; @Autowired - @Qualifier("routerMethod3Input") + @Qualifier("routeMethodInvocationFlow3.input") private MessageChannel routerMethod3Input; @Autowired @@ -1017,7 +1018,7 @@ public class IntegrationFlowTests { } @Autowired - @Qualifier("jmsOutboundInboundChannel") + @Qualifier("jmsOutboundFlow.input") private MessageChannel jmsOutboundInboundChannel; @Autowired @@ -1046,7 +1047,7 @@ public class IntegrationFlowTests { } @Autowired - @Qualifier("jmsOutboundGatewayChannel") + @Qualifier("jmsOutboundGatewayFlow.input") private MessageChannel jmsOutboundGatewayChannel; @Test @@ -1337,10 +1338,8 @@ public class IntegrationFlowTests { @Bean public IntegrationFlow jmsOutboundFlow() { - return IntegrationFlows.from("jmsOutboundInboundChannel") - .handle(Jms.outboundAdapter(this.jmsConnectionFactory) - .destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)) - .get(); + return f -> f.handle(Jms.outboundAdapter(this.jmsConnectionFactory) + .destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)); } @Bean @@ -1370,11 +1369,9 @@ public class IntegrationFlowTests { @Bean public IntegrationFlow jmsOutboundGatewayFlow() { - return IntegrationFlows.from("jmsOutboundGatewayChannel") - .handle(Jms.outboundGateway(this.jmsConnectionFactory) - .replyContainer() - .requestDestination("jmsPipelineTest")) - .get(); + return f -> f.handle(Jms.outboundGateway(this.jmsConnectionFactory) + .replyContainer() + .requestDestination("jmsPipelineTest")); } @Bean @@ -1760,8 +1757,7 @@ public class IntegrationFlowTests { @Bean public IntegrationFlow splitResequenceFlow() { - return IntegrationFlows.from("splitInput") - .enrichHeaders(s -> s.header("FOO", "BAR")) + return f -> f.enrichHeaders(s -> s.header("FOO", "BAR")) .split("testSplitterData", "buildList", c -> c.applySequence(false)) .channel(MessageChannels.executor(this.taskExecutor())) .split(Message.class, target -> (List) target.getPayload(), c -> c.applySequence(false)) @@ -1771,8 +1767,7 @@ public class IntegrationFlowTests { .transform(Integer::parseInt) .enrichHeaders(s -> s.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, "payload")) .resequence(r -> r.releasePartialSequences(true).correlationExpression("'foo'"), null) - .headerFilter("foo", false) - .get(); + .headerFilter("foo", false); } @Bean @@ -1838,9 +1833,7 @@ public class IntegrationFlowTests { @Bean public IntegrationFlow routeMethodInvocationFlow3() { - return IntegrationFlows.from("routerMethod3Input") - .route((String p) -> ContextConfiguration4.this.routingTestBean().routePayload(p)) - .get(); + return f -> f.route((String p) -> ContextConfiguration4.this.routingTestBean().routePayload(p)); } @Bean