diff --git a/spring-integration-java-dsl/build.gradle b/spring-integration-java-dsl/build.gradle index 3e921f1..396b071 100644 --- a/spring-integration-java-dsl/build.gradle +++ b/spring-integration-java-dsl/build.gradle @@ -25,6 +25,7 @@ compileTestJava { ext { springIntegrationVersion = '4.0.0.BUILD-SNAPSHOT' + log4jVersion = '1.2.17' linkHomepage = 'https://github.com/spring-projects/spring-integration-extensions' linkCi = 'https://build.springsource.org/browse/INTEXT' @@ -36,8 +37,6 @@ ext { eclipse.project.natures += 'org.springframework.ide.eclipse.core.springnature' -sourceSets.test.resources.srcDirs = ['src/test/java'] - // See http://www.gradle.org/docs/current/userguide/dependency_management.html#sub:configurations // and http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ConfigurationContainer.html configurations { @@ -47,9 +46,11 @@ configurations { dependencies { compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion" + testCompile "log4j:log4j:$log4jVersion" testCompile "org.springframework.integration:spring-integration-test:$springIntegrationVersion" testCompile "org.springframework.integration:spring-integration-event:$springIntegrationVersion" testCompile "org.springframework.integration:spring-integration-file:$springIntegrationVersion" + testCompile "org.springframework.integration:spring-integration-xml:$springIntegrationVersion" jacoco group: "org.jacoco", name: "org.jacoco.agent", version: "0.5.6.201201232323", classifier: "runtime" } diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/HeaderEnricherSpec.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/HeaderEnricherSpec.java new file mode 100644 index 0000000..353f5f3 --- /dev/null +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/HeaderEnricherSpec.java @@ -0,0 +1,97 @@ +package org.springframework.integration.dsl; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.expression.spel.standard.SpelExpressionParser; +import org.springframework.integration.dsl.core.IntegrationComponentSpec; +import org.springframework.integration.dsl.support.BeanNameMessageProcessor; +import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor; +import org.springframework.integration.handler.MessageProcessor; +import org.springframework.integration.transformer.HeaderEnricher; +import org.springframework.integration.transformer.support.AbstractHeaderValueMessageProcessor; +import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor; +import org.springframework.integration.transformer.support.HeaderValueMessageProcessor; +import org.springframework.integration.transformer.support.StaticHeaderValueMessageProcessor; +import org.springframework.util.Assert; + +/** + * @author Artem Bilan + */ +public class HeaderEnricherSpec extends IntegrationComponentSpec { + + private final static SpelExpressionParser PARSER = new SpelExpressionParser(); + + private final Map> headerToAdd = new HashMap>(); + + private final HeaderEnricher headerEnricher = new HeaderEnricher(headerToAdd); + + HeaderEnricherSpec() { + } + + public HeaderEnricherSpec defaultOverwrite(boolean defaultOverwrite) { + this.headerEnricher.setDefaultOverwrite(defaultOverwrite); + return _this(); + } + + public HeaderEnricherSpec shouldSkipNulls(boolean shouldSkipNulls) { + this.headerEnricher.setShouldSkipNulls(shouldSkipNulls); + return _this(); + } + + + public HeaderEnricherSpec messageProcessor(MessageProcessor messageProcessor) { + this.headerEnricher.setMessageProcessor(messageProcessor); + return _this(); + } + + public HeaderEnricherSpec messageProcessor(String expression) { + return this.messageProcessor(new ExpressionEvaluatingMessageProcessor(PARSER.parseExpression(expression))); + } + + public HeaderEnricherSpec messageProcessor(String beanName, String methodName) { + return this.messageProcessor(new BeanNameMessageProcessor(beanName, methodName)); + } + + public HeaderEnricherSpec header(String name, Object value) { + return this.header(name, value, null); + } + + public HeaderEnricherSpec header(String name, Object value, Boolean overwrite) { + AbstractHeaderValueMessageProcessor headerValueMessageProcessor = new StaticHeaderValueMessageProcessor(value); + headerValueMessageProcessor.setOverwrite(overwrite); + return this.header(name, headerValueMessageProcessor); + } + + public HeaderEnricherSpec headerExpression(String name, String expression) { + return this.headerExpression(name, expression, null, null); + } + + public HeaderEnricherSpec headerExpression(String name, String expression, Boolean overwrite) { + return this.headerExpression(name, expression, overwrite, null); + } + + public HeaderEnricherSpec headerExpression(String name, String expression, Class type) { + return this.headerExpression(name, expression, null, type); + } + + public HeaderEnricherSpec headerExpression(String name, String expression, Boolean overwrite, Class type) { + AbstractHeaderValueMessageProcessor headerValueMessageProcessor = + new ExpressionEvaluatingHeaderValueMessageProcessor(expression, type); + headerValueMessageProcessor.setOverwrite(overwrite); + return this.header(name, headerValueMessageProcessor); + } + + public HeaderEnricherSpec header(String name, HeaderValueMessageProcessor headerValueMessageProcessor) { + Assert.notNull(name); + this.headerToAdd.put(name, headerValueMessageProcessor); + return _this(); + } + + + @Override + protected HeaderEnricher doGet() { + return this.headerEnricher; + } + +} diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java index 3cf7178..17da56f 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/IntegrationFlowBuilder.java @@ -16,6 +16,8 @@ package org.springframework.integration.dsl; +import java.util.Collection; + import org.springframework.beans.factory.BeanCreationException; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.channel.DirectChannel; @@ -25,20 +27,28 @@ import org.springframework.integration.core.GenericSelector; import org.springframework.integration.core.MessageSelector; import org.springframework.integration.dsl.channel.MessageChannelSpec; import org.springframework.integration.dsl.core.ConsumerEndpointSpec; -import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype; -import org.springframework.integration.dsl.support.MessageChannelReference; -import org.springframework.integration.dsl.support.BeanNameMethodInvokingMessageHandler; +import org.springframework.integration.dsl.support.BeanNameMessageProcessor; +import org.springframework.integration.dsl.support.ComponentConfigurer; import org.springframework.integration.dsl.support.EndpointConfigurer; -import org.springframework.integration.dsl.support.EnricherConfigurer; +import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype; +import org.springframework.integration.dsl.support.GenericSplitter; +import org.springframework.integration.dsl.support.MessageChannelReference; import org.springframework.integration.filter.ExpressionEvaluatingSelector; import org.springframework.integration.filter.MessageFilter; import org.springframework.integration.filter.MethodInvokingSelector; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; import org.springframework.integration.handler.BridgeHandler; import org.springframework.integration.handler.DelayHandler; +import org.springframework.integration.handler.ServiceActivatingHandler; +import org.springframework.integration.splitter.AbstractMessageSplitter; +import org.springframework.integration.splitter.DefaultMessageSplitter; +import org.springframework.integration.splitter.ExpressionEvaluatingSplitter; +import org.springframework.integration.splitter.MethodInvokingSplitter; import org.springframework.integration.transformer.ContentEnricher; import org.springframework.integration.transformer.ExpressionEvaluatingTransformer; import org.springframework.integration.transformer.GenericTransformer; +import org.springframework.integration.transformer.HeaderEnricher; +import org.springframework.integration.transformer.HeaderFilter; import org.springframework.integration.transformer.MessageTransformingHandler; import org.springframework.integration.transformer.MethodInvokingTransformer; import org.springframework.integration.transformer.Transformer; @@ -136,12 +146,12 @@ public final class IntegrationFlowBuilder { return this.handle(messageHandler, null); } - public IntegrationFlowBuilder handle(String target, String methodName) { - return this.handle(target, methodName, null); + public IntegrationFlowBuilder handle(String beanName, String methodName) { + return this.handle(beanName, methodName, null); } - public IntegrationFlowBuilder handle(String beanName, String methodName, EndpointConfigurer> endpointConfigurer) { - return this.handle(new BeanNameMethodInvokingMessageHandler(beanName, methodName), endpointConfigurer); + public IntegrationFlowBuilder handle(String beanName, String methodName, EndpointConfigurer> endpointConfigurer) { + return this.handle(new ServiceActivatingHandler(new BeanNameMessageProcessor(beanName, methodName)), endpointConfigurer); } public IntegrationFlowBuilder handle(H messageHandler, EndpointConfigurer> endpointConfigurer) { @@ -165,15 +175,114 @@ public final class IntegrationFlowBuilder { return this.register(new GenericEndpointSpec(delayHandler), endpointConfigurer); } - public IntegrationFlowBuilder enrich(EnricherConfigurer enricherConfigurer) { + public IntegrationFlowBuilder enrich(ComponentConfigurer enricherConfigurer) { return this.enrich(enricherConfigurer, null); } - public IntegrationFlowBuilder enrich(EnricherConfigurer enricherConfigurer, EndpointConfigurer> endpointConfigurer) { + public IntegrationFlowBuilder enrich(ComponentConfigurer enricherConfigurer, EndpointConfigurer> endpointConfigurer) { Assert.notNull(enricherConfigurer); EnricherSpec enricherSpec = new EnricherSpec(); enricherConfigurer.configure(enricherSpec); - return this.register(new GenericEndpointSpec(enricherSpec.get()), endpointConfigurer); + return this.enrich(enricherSpec.get(), endpointConfigurer); + } + + public IntegrationFlowBuilder enrich(ContentEnricher contentEnricher) { + return this.enrich(contentEnricher, null); + } + + public IntegrationFlowBuilder enrich(ContentEnricher contentEnricher, EndpointConfigurer> endpointConfigurer) { + return this.handle(contentEnricher, endpointConfigurer); + } + + public IntegrationFlowBuilder enrichHeaders(ComponentConfigurer headerEnricherConfigurer) { + return this.enrichHeaders(headerEnricherConfigurer, null); + } + + public IntegrationFlowBuilder enrichHeaders(ComponentConfigurer headerEnricherConfigurer, + EndpointConfigurer> endpointConfigurer) { + Assert.notNull(headerEnricherConfigurer); + HeaderEnricherSpec headerEnricherSpec = new HeaderEnricherSpec(); + headerEnricherConfigurer.configure(headerEnricherSpec); + return this.enrichHeaders(headerEnricherSpec.get(), endpointConfigurer); + } + + public IntegrationFlowBuilder enrichHeaders(HeaderEnricher headerEnricher) { + return this.enrichHeaders(headerEnricher, null); + } + + public IntegrationFlowBuilder enrichHeaders(HeaderEnricher headerEnricher, EndpointConfigurer> endpointConfigurer) { + return this.transform(headerEnricher, endpointConfigurer); + } + + public IntegrationFlowBuilder split() { + return this.split((EndpointConfigurer>) null); + } + + public IntegrationFlowBuilder split(EndpointConfigurer> endpointConfigurer) { + return this.split(new DefaultMessageSplitter(), endpointConfigurer); + } + + public IntegrationFlowBuilder split(String expression) { + return this.split(expression, (EndpointConfigurer>) null); + } + + public IntegrationFlowBuilder split(String expression, EndpointConfigurer> endpointConfigurer) { + return this.split(new ExpressionEvaluatingSplitter(PARSER.parseExpression(expression)), endpointConfigurer); + } + + public IntegrationFlowBuilder split(String beanName, String methodName) { + return this.split(beanName, methodName, null); + } + + public IntegrationFlowBuilder split(String beanName, String methodName, + EndpointConfigurer> endpointConfigurer) { + return this.split(new MethodInvokingSplitter(new BeanNameMessageProcessor>(beanName, methodName)), + endpointConfigurer); + } + + public IntegrationFlowBuilder split(AbstractMessageSplitter splitter) { + return this.split(splitter, null); + } + + public IntegrationFlowBuilder split(GenericSplitter splitter) { + return this.split(splitter, null); + } + + public IntegrationFlowBuilder split(GenericSplitter splitter, + EndpointConfigurer> endpointConfigurer) { + return this.split(new MethodInvokingSplitter(splitter, "split"), endpointConfigurer); + } + + public IntegrationFlowBuilder split(T splitter, EndpointConfigurer> endpointConfigurer) { + return this.handle(splitter, endpointConfigurer); + } + + /** + * Provides the {@link HeaderFilter} to the current {@link IntegrationFlow}. + * + * @param headersToRemove the array of headers (or patterns) to remove from {@link org.springframework.messaging.MessageHeaders}. + * @return the {@link IntegrationFlowBuilder}. + */ + public IntegrationFlowBuilder headerFilter(String... headersToRemove) { + return this.headerFilter(new HeaderFilter(headersToRemove), null); + } + + /** + * Provides the {@link HeaderFilter} to the current {@link IntegrationFlow}. + * + * @param headersToRemove the comma separated headers (or patterns) to remove from {@link org.springframework.messaging.MessageHeaders}. + * @param patternMatch the {@code boolean} flag to indicate if {@code headersToRemove} should be interpreted as patterns or direct header names. + * @return the {@link IntegrationFlowBuilder}. + */ + + public IntegrationFlowBuilder headerFilter(String headersToRemove, boolean patternMatch) { + HeaderFilter headerFilter = new HeaderFilter(StringUtils.delimitedListToStringArray(headersToRemove, ",", " ")); + headerFilter.setPatternMatch(patternMatch); + return this.headerFilter(headerFilter, null); + } + + public IntegrationFlowBuilder headerFilter(HeaderFilter headerFilter, EndpointConfigurer> endpointConfigurer) { + return this.transform(headerFilter, endpointConfigurer); } private > IntegrationFlowBuilder register(S endpointSpec, EndpointConfigurer endpointConfigurer) { diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/BeanNameMessageProcessor.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/BeanNameMessageProcessor.java new file mode 100644 index 0000000..75a1113 --- /dev/null +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/BeanNameMessageProcessor.java @@ -0,0 +1,39 @@ +package org.springframework.integration.dsl.support; + +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.integration.handler.MessageProcessor; +import org.springframework.integration.handler.MethodInvokingMessageProcessor; +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +/** + * @author Artem Bilan + */ +public class BeanNameMessageProcessor implements MessageProcessor, BeanFactoryAware { + + private final String object; + + private final String methodName; + + private MessageProcessor delegate; + + public BeanNameMessageProcessor(String object, String methodName) { + this.object = object; + this.methodName = methodName; + } + + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + Assert.notNull(beanFactory); + Object target = beanFactory.getBean(object); + this.delegate = new MethodInvokingMessageProcessor(target, this.methodName); + } + + @Override + public T processMessage(Message message) { + return this.delegate.processMessage(message); + } + +} diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/BeanNameMethodInvokingMessageHandler.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/BeanNameMethodInvokingMessageHandler.java deleted file mode 100644 index b841e40..0000000 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/BeanNameMethodInvokingMessageHandler.java +++ /dev/null @@ -1,35 +0,0 @@ -package org.springframework.integration.dsl.support; - -import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; -import org.springframework.integration.handler.MessageProcessor; -import org.springframework.integration.handler.MethodInvokingMessageProcessor; -import org.springframework.messaging.Message; - -/** - * @author Artem Bilan - */ -public final class BeanNameMethodInvokingMessageHandler extends AbstractReplyProducingMessageHandler { - - private final String object; - - private final String methodName; - - private MessageProcessor processor; - - public BeanNameMethodInvokingMessageHandler(String object, String methodName) { - this.object = object; - this.methodName = methodName; - } - - @Override - protected void doInit() { - Object target = this.getBeanFactory().getBean(object); - this.processor = new MethodInvokingMessageProcessor(target, this.methodName); - } - - @Override - protected Object handleRequestMessage(Message requestMessage) { - return this.processor.processMessage(requestMessage); - } - -} diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/EnricherConfigurer.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/ComponentConfigurer.java similarity index 79% rename from spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/EnricherConfigurer.java rename to spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/ComponentConfigurer.java index 8f524ee..9f65d1a 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/EnricherConfigurer.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/ComponentConfigurer.java @@ -16,14 +16,14 @@ package org.springframework.integration.dsl.support; -import org.springframework.integration.dsl.EnricherSpec; +import org.springframework.integration.dsl.core.IntegrationComponentSpec; /** * @author Artem Bilan */ -public interface EnricherConfigurer { +public interface ComponentConfigurer> { - void configure(EnricherSpec spec); + void configure(S spec); } diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/FixedSubscriberChannelPrototype.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/FixedSubscriberChannelPrototype.java index c468bfb..dd7b64a 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/FixedSubscriberChannelPrototype.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/FixedSubscriberChannelPrototype.java @@ -5,7 +5,6 @@ import org.springframework.messaging.MessageChannel; /** * @author Artem Bilan - * @since 4.0 */ public class FixedSubscriberChannelPrototype implements MessageChannel { diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/GenericSplitter.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/GenericSplitter.java new file mode 100644 index 0000000..76b6406 --- /dev/null +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/GenericSplitter.java @@ -0,0 +1,12 @@ +package org.springframework.integration.dsl.support; + +import java.util.Collection; + +/** + * @author Artem Bilan + */ +public interface GenericSplitter { + + Collection split(T target); + +} diff --git a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/MessageChannelReference.java b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/MessageChannelReference.java index 4483cd9..fac20b0 100644 --- a/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/MessageChannelReference.java +++ b/spring-integration-java-dsl/src/main/java/org/springframework/integration/dsl/support/MessageChannelReference.java @@ -6,7 +6,6 @@ import org.springframework.util.Assert; /** * @author Artem Bilan - * @since 4.0 */ public class MessageChannelReference implements MessageChannel { diff --git a/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/IntegrationFlowTests.java b/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/IntegrationFlowTests.java index 755b1cd..e2f2011 100644 --- a/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/IntegrationFlowTests.java +++ b/spring-integration-java-dsl/src/test/java/org/springframework/integration/dsl/test/IntegrationFlowTests.java @@ -17,6 +17,7 @@ package org.springframework.integration.dsl.test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -24,9 +25,15 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -57,10 +64,12 @@ import org.springframework.integration.channel.PublishSubscribeChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.core.MessageSource; +import org.springframework.integration.dsl.GenericEndpointSpec; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.channel.DirectChannelSpec; import org.springframework.integration.dsl.channel.MessageChannels; +import org.springframework.integration.dsl.support.GenericSplitter; import org.springframework.integration.dsl.support.Pollers; import org.springframework.integration.endpoint.MethodInvokingMessageSource; import org.springframework.integration.event.core.MessagingEvent; @@ -70,10 +79,12 @@ import org.springframework.integration.file.FileHeaders; import org.springframework.integration.file.FileWritingMessageHandler; import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice; import org.springframework.integration.scheduling.PollerMetadata; +import org.springframework.integration.splitter.DefaultMessageSplitter; import org.springframework.integration.store.SimpleMessageStore; import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.transformer.PayloadDeserializingTransformer; import org.springframework.integration.transformer.PayloadSerializingTransformer; +import org.springframework.integration.xml.transformer.XPathHeaderEnricher; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageDeliveryException; @@ -84,12 +95,11 @@ import org.springframework.messaging.support.GenericMessage; import org.springframework.stereotype.Component; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import org.springframework.test.context.support.AnnotationConfigContextLoader; /** * @author Artem Bilan */ -@ContextConfiguration(loader = AnnotationConfigContextLoader.class) +@ContextConfiguration @RunWith(SpringJUnit4ClassRunner.class) public class IntegrationFlowTests { @@ -157,6 +167,14 @@ public class IntegrationFlowTests { @Qualifier("enricherInput") private FixedSubscriberChannel enricherInput; + @Autowired + @Qualifier("splitInput") + private DirectChannel splitInput; + + @Autowired + @Qualifier("xpathHeaderEnricherInput") + private DirectChannel xpathHeaderEnricherInput; + @Test public void testPollingFlow() { @@ -274,7 +292,9 @@ public class IntegrationFlowTests { assertThat(e, Matchers.instanceOf(MessageHandlingException.class)); assertThat(e.getCause(), Matchers.instanceOf(NullPointerException.class)); } - this.fileWritingMessageHandler.setFileNameGenerator(new DefaultFileNameGenerator()); + DefaultFileNameGenerator fileNameGenerator = new DefaultFileNameGenerator(); + fileNameGenerator.setBeanFactory(this.beanFactory); + this.fileWritingMessageHandler.setFileNameGenerator(fileNameGenerator); this.fileFlow1Input.send(message); assertTrue(new File(tmpDir, "foo").exists()); @@ -319,6 +339,53 @@ public class IntegrationFlowTests { assertThat(new Date(), Matchers.greaterThan(result.getDate())); } + @Test + public void testSplitter() { + QueueChannel replyChannel = new QueueChannel(); + + this.splitInput.send(MessageBuilder.withPayload("").setReplyChannel(replyChannel).setHeader("foo", "bar").build()); + + List results = new ArrayList(); + + for (int i = 0; i < 12; i++) { + Message receive = replyChannel.receive(2000); + assertNotNull(receive); + assertFalse(receive.getHeaders().containsKey("foo")); + assertTrue(receive.getHeaders().containsKey("FOO")); + assertEquals("BAR", receive.getHeaders().get("FOO")); + results.add(receive.getPayload()); + } + + assertTrue(results.containsAll(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12))); + } + + @Test + public void testHeaderEnricher() { + QueueChannel replyChannel = new QueueChannel(); + + Message message = MessageBuilder.withPayload("12") + .setReplyChannel(replyChannel) + .build(); + + try { + this.xpathHeaderEnricherInput.send(message); + fail("Expected MessageDispatchingException"); + } + catch (Exception e) { + assertThat(e, Matchers.instanceOf(MessageDeliveryException.class)); + assertThat(e.getCause(), Matchers.instanceOf(MessageDispatchingException.class)); + assertThat(e.getMessage(), Matchers.containsString("Dispatcher has no subscribers")); + } + + this.beanFactory.getBean("xpathHeaderEnricher", Lifecycle.class).start(); + this.xpathHeaderEnricherInput.send(message); + + Message result = replyChannel.receive(2000); + assertNotNull(result); + MessageHeaders headers = result.getHeaders(); + assertEquals("1", headers.get("one")); + assertEquals("2", headers.get("two")); + } @Configuration @EnableIntegration @@ -523,6 +590,56 @@ public class IntegrationFlowTests { .get(); } + @Bean + public Executor taskExecutor() { + return Executors.newCachedThreadPool(); + } + + @Bean + public TestSplitterPojo testSplitterData() { + List first = new ArrayList<>(); + first.add("1,2,3"); + first.add("4,5,6"); + + List second = new ArrayList<>(); + second.add("7,8,9"); + second.add("10,11,12"); + + return new TestSplitterPojo(first, second); + } + + @Bean + public IntegrationFlow splitFlow() { + return IntegrationFlows.from("splitInput") + .enrichHeaders(s -> s.header("FOO", "BAR")) + .split("testSplitterData", "buildList", c -> c.get().getT2().setApplySequence(true)) + .channel(MessageChannels.executor(this.taskExecutor())) + .split(new GenericSplitter>>() { + + @Override + public Collection split(Message> target) { + return target.getPayload(); + } + }) + .channel(MessageChannels.executor(this.taskExecutor())) + .split((GenericEndpointSpec s) -> s.get().getT2().setDelimiters(",")) + .channel(MessageChannels.executor(this.taskExecutor())) + .transform(Integer::parseInt) + .headerFilter("foo", false) + .get(); + } + + @Bean + public IntegrationFlow xpathHeaderEnricherFlow() { + return IntegrationFlows.from("xpathHeaderEnricherInput") + .enrichHeaders( + s -> s.header("one", new XPathHeaderEnricher.XPathExpressionEvaluatingHeaderValueMessageProcessor("/root/elementOne")) + .header("two", new XPathHeaderEnricher.XPathExpressionEvaluatingHeaderValueMessageProcessor("/root/elementTwo")), + c -> c.autoStartup(false).id("xpathHeaderEnricher") + ) + .get(); + } + } @Component("greetingService") @@ -595,4 +712,29 @@ public class IntegrationFlowTests { } + private static class TestSplitterPojo { + + final List first; + + final List second; + + private TestSplitterPojo(List first, List second) { + this.first = first; + this.second = second; + } + + public List getFirst() { + return first; + } + + public List getSecond() { + return second; + } + + public List> buildList() { + return Arrays.asList(this.first, this.second); + } + + } + } diff --git a/spring-integration-java-dsl/src/test/resources/log4j.properties b/spring-integration-java-dsl/src/test/resources/log4j.properties index 5aa14e4..8b5a4f8 100644 --- a/spring-integration-java-dsl/src/test/resources/log4j.properties +++ b/spring-integration-java-dsl/src/test/resources/log4j.properties @@ -2,6 +2,7 @@ log4j.rootCategory=WARN, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss.SSS} %-5p [%t][%c] %m%n +log4j.appender.stdout.layout.ConversionPattern=%d %c{1} [%t] : %m%n +log4j.category.org.springframework.integration=WARN log4j.category.org.springframework.integration.dsl=INFO