Add split() and enrichHeaders() EIP-methods
This commit is contained in:
@@ -25,6 +25,7 @@ compileTestJava {
|
||||
|
||||
ext {
|
||||
springIntegrationVersion = '4.0.0.BUILD-SNAPSHOT'
|
||||
log4jVersion = '1.2.17'
|
||||
|
||||
linkHomepage = 'https://github.com/spring-projects/spring-integration-extensions'
|
||||
linkCi = 'https://build.springsource.org/browse/INTEXT'
|
||||
@@ -36,8 +37,6 @@ ext {
|
||||
|
||||
eclipse.project.natures += 'org.springframework.ide.eclipse.core.springnature'
|
||||
|
||||
sourceSets.test.resources.srcDirs = ['src/test/java']
|
||||
|
||||
// See http://www.gradle.org/docs/current/userguide/dependency_management.html#sub:configurations
|
||||
// and http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ConfigurationContainer.html
|
||||
configurations {
|
||||
@@ -47,9 +46,11 @@ configurations {
|
||||
dependencies {
|
||||
compile "org.springframework.integration:spring-integration-core:$springIntegrationVersion"
|
||||
|
||||
testCompile "log4j:log4j:$log4jVersion"
|
||||
testCompile "org.springframework.integration:spring-integration-test:$springIntegrationVersion"
|
||||
testCompile "org.springframework.integration:spring-integration-event:$springIntegrationVersion"
|
||||
testCompile "org.springframework.integration:spring-integration-file:$springIntegrationVersion"
|
||||
testCompile "org.springframework.integration:spring-integration-xml:$springIntegrationVersion"
|
||||
|
||||
jacoco group: "org.jacoco", name: "org.jacoco.agent", version: "0.5.6.201201232323", classifier: "runtime"
|
||||
}
|
||||
|
||||
@@ -0,0 +1,97 @@
|
||||
package org.springframework.integration.dsl;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||
import org.springframework.integration.dsl.core.IntegrationComponentSpec;
|
||||
import org.springframework.integration.dsl.support.BeanNameMessageProcessor;
|
||||
import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor;
|
||||
import org.springframework.integration.handler.MessageProcessor;
|
||||
import org.springframework.integration.transformer.HeaderEnricher;
|
||||
import org.springframework.integration.transformer.support.AbstractHeaderValueMessageProcessor;
|
||||
import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor;
|
||||
import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
|
||||
import org.springframework.integration.transformer.support.StaticHeaderValueMessageProcessor;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* @author Artem Bilan
|
||||
*/
|
||||
public class HeaderEnricherSpec extends IntegrationComponentSpec<HeaderEnricherSpec, HeaderEnricher> {
|
||||
|
||||
private final static SpelExpressionParser PARSER = new SpelExpressionParser();
|
||||
|
||||
private final Map<String, HeaderValueMessageProcessor<?>> headerToAdd = new HashMap<String, HeaderValueMessageProcessor<?>>();
|
||||
|
||||
private final HeaderEnricher headerEnricher = new HeaderEnricher(headerToAdd);
|
||||
|
||||
HeaderEnricherSpec() {
|
||||
}
|
||||
|
||||
public HeaderEnricherSpec defaultOverwrite(boolean defaultOverwrite) {
|
||||
this.headerEnricher.setDefaultOverwrite(defaultOverwrite);
|
||||
return _this();
|
||||
}
|
||||
|
||||
public HeaderEnricherSpec shouldSkipNulls(boolean shouldSkipNulls) {
|
||||
this.headerEnricher.setShouldSkipNulls(shouldSkipNulls);
|
||||
return _this();
|
||||
}
|
||||
|
||||
|
||||
public HeaderEnricherSpec messageProcessor(MessageProcessor<?> messageProcessor) {
|
||||
this.headerEnricher.setMessageProcessor(messageProcessor);
|
||||
return _this();
|
||||
}
|
||||
|
||||
public HeaderEnricherSpec messageProcessor(String expression) {
|
||||
return this.messageProcessor(new ExpressionEvaluatingMessageProcessor<Object>(PARSER.parseExpression(expression)));
|
||||
}
|
||||
|
||||
public HeaderEnricherSpec messageProcessor(String beanName, String methodName) {
|
||||
return this.messageProcessor(new BeanNameMessageProcessor<Object>(beanName, methodName));
|
||||
}
|
||||
|
||||
public HeaderEnricherSpec header(String name, Object value) {
|
||||
return this.header(name, value, null);
|
||||
}
|
||||
|
||||
public HeaderEnricherSpec header(String name, Object value, Boolean overwrite) {
|
||||
AbstractHeaderValueMessageProcessor<?> headerValueMessageProcessor = new StaticHeaderValueMessageProcessor<Object>(value);
|
||||
headerValueMessageProcessor.setOverwrite(overwrite);
|
||||
return this.header(name, headerValueMessageProcessor);
|
||||
}
|
||||
|
||||
public HeaderEnricherSpec headerExpression(String name, String expression) {
|
||||
return this.headerExpression(name, expression, null, null);
|
||||
}
|
||||
|
||||
public HeaderEnricherSpec headerExpression(String name, String expression, Boolean overwrite) {
|
||||
return this.headerExpression(name, expression, overwrite, null);
|
||||
}
|
||||
|
||||
public HeaderEnricherSpec headerExpression(String name, String expression, Class<?> type) {
|
||||
return this.headerExpression(name, expression, null, type);
|
||||
}
|
||||
|
||||
public <T> HeaderEnricherSpec headerExpression(String name, String expression, Boolean overwrite, Class<T> type) {
|
||||
AbstractHeaderValueMessageProcessor<T> headerValueMessageProcessor =
|
||||
new ExpressionEvaluatingHeaderValueMessageProcessor<T>(expression, type);
|
||||
headerValueMessageProcessor.setOverwrite(overwrite);
|
||||
return this.header(name, headerValueMessageProcessor);
|
||||
}
|
||||
|
||||
public HeaderEnricherSpec header(String name, HeaderValueMessageProcessor<?> headerValueMessageProcessor) {
|
||||
Assert.notNull(name);
|
||||
this.headerToAdd.put(name, headerValueMessageProcessor);
|
||||
return _this();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected HeaderEnricher doGet() {
|
||||
return this.headerEnricher;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -16,6 +16,8 @@
|
||||
|
||||
package org.springframework.integration.dsl;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.springframework.beans.factory.BeanCreationException;
|
||||
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
@@ -25,20 +27,28 @@ import org.springframework.integration.core.GenericSelector;
|
||||
import org.springframework.integration.core.MessageSelector;
|
||||
import org.springframework.integration.dsl.channel.MessageChannelSpec;
|
||||
import org.springframework.integration.dsl.core.ConsumerEndpointSpec;
|
||||
import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype;
|
||||
import org.springframework.integration.dsl.support.MessageChannelReference;
|
||||
import org.springframework.integration.dsl.support.BeanNameMethodInvokingMessageHandler;
|
||||
import org.springframework.integration.dsl.support.BeanNameMessageProcessor;
|
||||
import org.springframework.integration.dsl.support.ComponentConfigurer;
|
||||
import org.springframework.integration.dsl.support.EndpointConfigurer;
|
||||
import org.springframework.integration.dsl.support.EnricherConfigurer;
|
||||
import org.springframework.integration.dsl.support.FixedSubscriberChannelPrototype;
|
||||
import org.springframework.integration.dsl.support.GenericSplitter;
|
||||
import org.springframework.integration.dsl.support.MessageChannelReference;
|
||||
import org.springframework.integration.filter.ExpressionEvaluatingSelector;
|
||||
import org.springframework.integration.filter.MessageFilter;
|
||||
import org.springframework.integration.filter.MethodInvokingSelector;
|
||||
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
|
||||
import org.springframework.integration.handler.BridgeHandler;
|
||||
import org.springframework.integration.handler.DelayHandler;
|
||||
import org.springframework.integration.handler.ServiceActivatingHandler;
|
||||
import org.springframework.integration.splitter.AbstractMessageSplitter;
|
||||
import org.springframework.integration.splitter.DefaultMessageSplitter;
|
||||
import org.springframework.integration.splitter.ExpressionEvaluatingSplitter;
|
||||
import org.springframework.integration.splitter.MethodInvokingSplitter;
|
||||
import org.springframework.integration.transformer.ContentEnricher;
|
||||
import org.springframework.integration.transformer.ExpressionEvaluatingTransformer;
|
||||
import org.springframework.integration.transformer.GenericTransformer;
|
||||
import org.springframework.integration.transformer.HeaderEnricher;
|
||||
import org.springframework.integration.transformer.HeaderFilter;
|
||||
import org.springframework.integration.transformer.MessageTransformingHandler;
|
||||
import org.springframework.integration.transformer.MethodInvokingTransformer;
|
||||
import org.springframework.integration.transformer.Transformer;
|
||||
@@ -136,12 +146,12 @@ public final class IntegrationFlowBuilder {
|
||||
return this.handle(messageHandler, null);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder handle(String target, String methodName) {
|
||||
return this.handle(target, methodName, null);
|
||||
public IntegrationFlowBuilder handle(String beanName, String methodName) {
|
||||
return this.handle(beanName, methodName, null);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder handle(String beanName, String methodName, EndpointConfigurer<GenericEndpointSpec<BeanNameMethodInvokingMessageHandler>> endpointConfigurer) {
|
||||
return this.handle(new BeanNameMethodInvokingMessageHandler(beanName, methodName), endpointConfigurer);
|
||||
public IntegrationFlowBuilder handle(String beanName, String methodName, EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
|
||||
return this.handle(new ServiceActivatingHandler(new BeanNameMessageProcessor<Object>(beanName, methodName)), endpointConfigurer);
|
||||
}
|
||||
|
||||
public <H extends MessageHandler> IntegrationFlowBuilder handle(H messageHandler, EndpointConfigurer<GenericEndpointSpec<H>> endpointConfigurer) {
|
||||
@@ -165,15 +175,114 @@ public final class IntegrationFlowBuilder {
|
||||
return this.register(new GenericEndpointSpec<DelayHandler>(delayHandler), endpointConfigurer);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder enrich(EnricherConfigurer enricherConfigurer) {
|
||||
public IntegrationFlowBuilder enrich(ComponentConfigurer<EnricherSpec> enricherConfigurer) {
|
||||
return this.enrich(enricherConfigurer, null);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder enrich(EnricherConfigurer enricherConfigurer, EndpointConfigurer<GenericEndpointSpec<ContentEnricher>> endpointConfigurer) {
|
||||
public IntegrationFlowBuilder enrich(ComponentConfigurer<EnricherSpec> enricherConfigurer, EndpointConfigurer<GenericEndpointSpec<ContentEnricher>> endpointConfigurer) {
|
||||
Assert.notNull(enricherConfigurer);
|
||||
EnricherSpec enricherSpec = new EnricherSpec();
|
||||
enricherConfigurer.configure(enricherSpec);
|
||||
return this.register(new GenericEndpointSpec<ContentEnricher>(enricherSpec.get()), endpointConfigurer);
|
||||
return this.enrich(enricherSpec.get(), endpointConfigurer);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder enrich(ContentEnricher contentEnricher) {
|
||||
return this.enrich(contentEnricher, null);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder enrich(ContentEnricher contentEnricher, EndpointConfigurer<GenericEndpointSpec<ContentEnricher>> endpointConfigurer) {
|
||||
return this.handle(contentEnricher, endpointConfigurer);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder enrichHeaders(ComponentConfigurer<HeaderEnricherSpec> headerEnricherConfigurer) {
|
||||
return this.enrichHeaders(headerEnricherConfigurer, null);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder enrichHeaders(ComponentConfigurer<HeaderEnricherSpec> headerEnricherConfigurer,
|
||||
EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
|
||||
Assert.notNull(headerEnricherConfigurer);
|
||||
HeaderEnricherSpec headerEnricherSpec = new HeaderEnricherSpec();
|
||||
headerEnricherConfigurer.configure(headerEnricherSpec);
|
||||
return this.enrichHeaders(headerEnricherSpec.get(), endpointConfigurer);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder enrichHeaders(HeaderEnricher headerEnricher) {
|
||||
return this.enrichHeaders(headerEnricher, null);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder enrichHeaders(HeaderEnricher headerEnricher, EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
|
||||
return this.transform(headerEnricher, endpointConfigurer);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder split() {
|
||||
return this.split((EndpointConfigurer<GenericEndpointSpec<DefaultMessageSplitter>>) null);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder split(EndpointConfigurer<GenericEndpointSpec<DefaultMessageSplitter>> endpointConfigurer) {
|
||||
return this.split(new DefaultMessageSplitter(), endpointConfigurer);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder split(String expression) {
|
||||
return this.split(expression, (EndpointConfigurer<GenericEndpointSpec<ExpressionEvaluatingSplitter>>) null);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder split(String expression, EndpointConfigurer<GenericEndpointSpec<ExpressionEvaluatingSplitter>> endpointConfigurer) {
|
||||
return this.split(new ExpressionEvaluatingSplitter(PARSER.parseExpression(expression)), endpointConfigurer);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder split(String beanName, String methodName) {
|
||||
return this.split(beanName, methodName, null);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder split(String beanName, String methodName,
|
||||
EndpointConfigurer<GenericEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
|
||||
return this.split(new MethodInvokingSplitter(new BeanNameMessageProcessor<Collection<?>>(beanName, methodName)),
|
||||
endpointConfigurer);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder split(AbstractMessageSplitter splitter) {
|
||||
return this.split(splitter, null);
|
||||
}
|
||||
|
||||
public <T> IntegrationFlowBuilder split(GenericSplitter<T> splitter) {
|
||||
return this.split(splitter, null);
|
||||
}
|
||||
|
||||
public <T> IntegrationFlowBuilder split(GenericSplitter<T> splitter,
|
||||
EndpointConfigurer<GenericEndpointSpec<MethodInvokingSplitter>> endpointConfigurer) {
|
||||
return this.split(new MethodInvokingSplitter(splitter, "split"), endpointConfigurer);
|
||||
}
|
||||
|
||||
public <T extends AbstractMessageSplitter> IntegrationFlowBuilder split(T splitter, EndpointConfigurer<GenericEndpointSpec<T>> endpointConfigurer) {
|
||||
return this.handle(splitter, endpointConfigurer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides the {@link HeaderFilter} to the current {@link IntegrationFlow}.
|
||||
*
|
||||
* @param headersToRemove the array of headers (or patterns) to remove from {@link org.springframework.messaging.MessageHeaders}.
|
||||
* @return the {@link IntegrationFlowBuilder}.
|
||||
*/
|
||||
public IntegrationFlowBuilder headerFilter(String... headersToRemove) {
|
||||
return this.headerFilter(new HeaderFilter(headersToRemove), null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Provides the {@link HeaderFilter} to the current {@link IntegrationFlow}.
|
||||
*
|
||||
* @param headersToRemove the comma separated headers (or patterns) to remove from {@link org.springframework.messaging.MessageHeaders}.
|
||||
* @param patternMatch the {@code boolean} flag to indicate if {@code headersToRemove} should be interpreted as patterns or direct header names.
|
||||
* @return the {@link IntegrationFlowBuilder}.
|
||||
*/
|
||||
|
||||
public IntegrationFlowBuilder headerFilter(String headersToRemove, boolean patternMatch) {
|
||||
HeaderFilter headerFilter = new HeaderFilter(StringUtils.delimitedListToStringArray(headersToRemove, ",", " "));
|
||||
headerFilter.setPatternMatch(patternMatch);
|
||||
return this.headerFilter(headerFilter, null);
|
||||
}
|
||||
|
||||
public IntegrationFlowBuilder headerFilter(HeaderFilter headerFilter, EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
|
||||
return this.transform(headerFilter, endpointConfigurer);
|
||||
}
|
||||
|
||||
private <S extends ConsumerEndpointSpec<?, ?>> IntegrationFlowBuilder register(S endpointSpec, EndpointConfigurer<S> endpointConfigurer) {
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
package org.springframework.integration.dsl.support;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.beans.factory.BeanFactoryAware;
|
||||
import org.springframework.integration.handler.MessageProcessor;
|
||||
import org.springframework.integration.handler.MethodInvokingMessageProcessor;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* @author Artem Bilan
|
||||
*/
|
||||
public class BeanNameMessageProcessor<T> implements MessageProcessor<T>, BeanFactoryAware {
|
||||
|
||||
private final String object;
|
||||
|
||||
private final String methodName;
|
||||
|
||||
private MessageProcessor<T> delegate;
|
||||
|
||||
public BeanNameMessageProcessor(String object, String methodName) {
|
||||
this.object = object;
|
||||
this.methodName = methodName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
|
||||
Assert.notNull(beanFactory);
|
||||
Object target = beanFactory.getBean(object);
|
||||
this.delegate = new MethodInvokingMessageProcessor<T>(target, this.methodName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public T processMessage(Message<?> message) {
|
||||
return this.delegate.processMessage(message);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,35 +0,0 @@
|
||||
package org.springframework.integration.dsl.support;
|
||||
|
||||
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
|
||||
import org.springframework.integration.handler.MessageProcessor;
|
||||
import org.springframework.integration.handler.MethodInvokingMessageProcessor;
|
||||
import org.springframework.messaging.Message;
|
||||
|
||||
/**
|
||||
* @author Artem Bilan
|
||||
*/
|
||||
public final class BeanNameMethodInvokingMessageHandler extends AbstractReplyProducingMessageHandler {
|
||||
|
||||
private final String object;
|
||||
|
||||
private final String methodName;
|
||||
|
||||
private MessageProcessor<Object> processor;
|
||||
|
||||
public BeanNameMethodInvokingMessageHandler(String object, String methodName) {
|
||||
this.object = object;
|
||||
this.methodName = methodName;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doInit() {
|
||||
Object target = this.getBeanFactory().getBean(object);
|
||||
this.processor = new MethodInvokingMessageProcessor<Object>(target, this.methodName);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object handleRequestMessage(Message<?> requestMessage) {
|
||||
return this.processor.processMessage(requestMessage);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -16,14 +16,14 @@
|
||||
|
||||
package org.springframework.integration.dsl.support;
|
||||
|
||||
import org.springframework.integration.dsl.EnricherSpec;
|
||||
import org.springframework.integration.dsl.core.IntegrationComponentSpec;
|
||||
|
||||
/**
|
||||
* @author Artem Bilan
|
||||
|
||||
*/
|
||||
public interface EnricherConfigurer {
|
||||
public interface ComponentConfigurer<S extends IntegrationComponentSpec<?, ?>> {
|
||||
|
||||
void configure(EnricherSpec spec);
|
||||
void configure(S spec);
|
||||
|
||||
}
|
||||
@@ -5,7 +5,6 @@ import org.springframework.messaging.MessageChannel;
|
||||
|
||||
/**
|
||||
* @author Artem Bilan
|
||||
* @since 4.0
|
||||
*/
|
||||
public class FixedSubscriberChannelPrototype implements MessageChannel {
|
||||
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
package org.springframework.integration.dsl.support;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* @author Artem Bilan
|
||||
*/
|
||||
public interface GenericSplitter<T> {
|
||||
|
||||
Collection<?> split(T target);
|
||||
|
||||
}
|
||||
@@ -6,7 +6,6 @@ import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
* @author Artem Bilan
|
||||
* @since 4.0
|
||||
*/
|
||||
public class MessageChannelReference implements MessageChannel {
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
package org.springframework.integration.dsl.test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertThat;
|
||||
@@ -24,9 +25,15 @@ import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@@ -57,10 +64,12 @@ import org.springframework.integration.channel.PublishSubscribeChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.config.EnableIntegration;
|
||||
import org.springframework.integration.core.MessageSource;
|
||||
import org.springframework.integration.dsl.GenericEndpointSpec;
|
||||
import org.springframework.integration.dsl.IntegrationFlow;
|
||||
import org.springframework.integration.dsl.IntegrationFlows;
|
||||
import org.springframework.integration.dsl.channel.DirectChannelSpec;
|
||||
import org.springframework.integration.dsl.channel.MessageChannels;
|
||||
import org.springframework.integration.dsl.support.GenericSplitter;
|
||||
import org.springframework.integration.dsl.support.Pollers;
|
||||
import org.springframework.integration.endpoint.MethodInvokingMessageSource;
|
||||
import org.springframework.integration.event.core.MessagingEvent;
|
||||
@@ -70,10 +79,12 @@ import org.springframework.integration.file.FileHeaders;
|
||||
import org.springframework.integration.file.FileWritingMessageHandler;
|
||||
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
|
||||
import org.springframework.integration.scheduling.PollerMetadata;
|
||||
import org.springframework.integration.splitter.DefaultMessageSplitter;
|
||||
import org.springframework.integration.store.SimpleMessageStore;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.integration.transformer.PayloadDeserializingTransformer;
|
||||
import org.springframework.integration.transformer.PayloadSerializingTransformer;
|
||||
import org.springframework.integration.xml.transformer.XPathHeaderEnricher;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageDeliveryException;
|
||||
@@ -84,12 +95,11 @@ import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
import org.springframework.test.context.support.AnnotationConfigContextLoader;
|
||||
|
||||
/**
|
||||
* @author Artem Bilan
|
||||
*/
|
||||
@ContextConfiguration(loader = AnnotationConfigContextLoader.class)
|
||||
@ContextConfiguration
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
public class IntegrationFlowTests {
|
||||
|
||||
@@ -157,6 +167,14 @@ public class IntegrationFlowTests {
|
||||
@Qualifier("enricherInput")
|
||||
private FixedSubscriberChannel enricherInput;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("splitInput")
|
||||
private DirectChannel splitInput;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("xpathHeaderEnricherInput")
|
||||
private DirectChannel xpathHeaderEnricherInput;
|
||||
|
||||
|
||||
@Test
|
||||
public void testPollingFlow() {
|
||||
@@ -274,7 +292,9 @@ public class IntegrationFlowTests {
|
||||
assertThat(e, Matchers.instanceOf(MessageHandlingException.class));
|
||||
assertThat(e.getCause(), Matchers.instanceOf(NullPointerException.class));
|
||||
}
|
||||
this.fileWritingMessageHandler.setFileNameGenerator(new DefaultFileNameGenerator());
|
||||
DefaultFileNameGenerator fileNameGenerator = new DefaultFileNameGenerator();
|
||||
fileNameGenerator.setBeanFactory(this.beanFactory);
|
||||
this.fileWritingMessageHandler.setFileNameGenerator(fileNameGenerator);
|
||||
this.fileFlow1Input.send(message);
|
||||
|
||||
assertTrue(new File(tmpDir, "foo").exists());
|
||||
@@ -319,6 +339,53 @@ public class IntegrationFlowTests {
|
||||
assertThat(new Date(), Matchers.greaterThan(result.getDate()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitter() {
|
||||
QueueChannel replyChannel = new QueueChannel();
|
||||
|
||||
this.splitInput.send(MessageBuilder.withPayload("").setReplyChannel(replyChannel).setHeader("foo", "bar").build());
|
||||
|
||||
List<Object> results = new ArrayList<Object>();
|
||||
|
||||
for (int i = 0; i < 12; i++) {
|
||||
Message<?> receive = replyChannel.receive(2000);
|
||||
assertNotNull(receive);
|
||||
assertFalse(receive.getHeaders().containsKey("foo"));
|
||||
assertTrue(receive.getHeaders().containsKey("FOO"));
|
||||
assertEquals("BAR", receive.getHeaders().get("FOO"));
|
||||
results.add(receive.getPayload());
|
||||
}
|
||||
|
||||
assertTrue(results.containsAll(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHeaderEnricher() {
|
||||
QueueChannel replyChannel = new QueueChannel();
|
||||
|
||||
Message<String> message = MessageBuilder.withPayload("<root><elementOne>1</elementOne><elementTwo>2</elementTwo></root>")
|
||||
.setReplyChannel(replyChannel)
|
||||
.build();
|
||||
|
||||
try {
|
||||
this.xpathHeaderEnricherInput.send(message);
|
||||
fail("Expected MessageDispatchingException");
|
||||
}
|
||||
catch (Exception e) {
|
||||
assertThat(e, Matchers.instanceOf(MessageDeliveryException.class));
|
||||
assertThat(e.getCause(), Matchers.instanceOf(MessageDispatchingException.class));
|
||||
assertThat(e.getMessage(), Matchers.containsString("Dispatcher has no subscribers"));
|
||||
}
|
||||
|
||||
this.beanFactory.getBean("xpathHeaderEnricher", Lifecycle.class).start();
|
||||
this.xpathHeaderEnricherInput.send(message);
|
||||
|
||||
Message<?> result = replyChannel.receive(2000);
|
||||
assertNotNull(result);
|
||||
MessageHeaders headers = result.getHeaders();
|
||||
assertEquals("1", headers.get("one"));
|
||||
assertEquals("2", headers.get("two"));
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@EnableIntegration
|
||||
@@ -523,6 +590,56 @@ public class IntegrationFlowTests {
|
||||
.get();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Executor taskExecutor() {
|
||||
return Executors.newCachedThreadPool();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public TestSplitterPojo testSplitterData() {
|
||||
List<String> first = new ArrayList<>();
|
||||
first.add("1,2,3");
|
||||
first.add("4,5,6");
|
||||
|
||||
List<String> second = new ArrayList<>();
|
||||
second.add("7,8,9");
|
||||
second.add("10,11,12");
|
||||
|
||||
return new TestSplitterPojo(first, second);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IntegrationFlow splitFlow() {
|
||||
return IntegrationFlows.from("splitInput")
|
||||
.enrichHeaders(s -> s.header("FOO", "BAR"))
|
||||
.split("testSplitterData", "buildList", c -> c.get().getT2().setApplySequence(true))
|
||||
.channel(MessageChannels.executor(this.taskExecutor()))
|
||||
.split(new GenericSplitter<Message<List<?>>>() {
|
||||
|
||||
@Override
|
||||
public Collection<?> split(Message<List<?>> target) {
|
||||
return target.getPayload();
|
||||
}
|
||||
})
|
||||
.channel(MessageChannels.executor(this.taskExecutor()))
|
||||
.split((GenericEndpointSpec<DefaultMessageSplitter> s) -> s.get().getT2().setDelimiters(","))
|
||||
.channel(MessageChannels.executor(this.taskExecutor()))
|
||||
.<String, Integer>transform(Integer::parseInt)
|
||||
.headerFilter("foo", false)
|
||||
.get();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public IntegrationFlow xpathHeaderEnricherFlow() {
|
||||
return IntegrationFlows.from("xpathHeaderEnricherInput")
|
||||
.enrichHeaders(
|
||||
s -> s.header("one", new XPathHeaderEnricher.XPathExpressionEvaluatingHeaderValueMessageProcessor("/root/elementOne"))
|
||||
.header("two", new XPathHeaderEnricher.XPathExpressionEvaluatingHeaderValueMessageProcessor("/root/elementTwo")),
|
||||
c -> c.autoStartup(false).id("xpathHeaderEnricher")
|
||||
)
|
||||
.get();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Component("greetingService")
|
||||
@@ -595,4 +712,29 @@ public class IntegrationFlowTests {
|
||||
|
||||
}
|
||||
|
||||
private static class TestSplitterPojo {
|
||||
|
||||
final List<String> first;
|
||||
|
||||
final List<String> second;
|
||||
|
||||
private TestSplitterPojo(List<String> first, List<String> second) {
|
||||
this.first = first;
|
||||
this.second = second;
|
||||
}
|
||||
|
||||
public List<String> getFirst() {
|
||||
return first;
|
||||
}
|
||||
|
||||
public List<String> getSecond() {
|
||||
return second;
|
||||
}
|
||||
|
||||
public List<List<String>> buildList() {
|
||||
return Arrays.asList(this.first, this.second);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ log4j.rootCategory=WARN, stdout
|
||||
|
||||
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss.SSS} %-5p [%t][%c] %m%n
|
||||
log4j.appender.stdout.layout.ConversionPattern=%d %c{1} [%t] : %m%n
|
||||
|
||||
log4j.category.org.springframework.integration=WARN
|
||||
log4j.category.org.springframework.integration.dsl=INFO
|
||||
|
||||
Reference in New Issue
Block a user