DSL: Upgrade to SI 4.0. RELEASE

This commit is contained in:
Artem Bilan
2014-04-30 17:32:21 +03:00
parent 34efe4b262
commit 6664d413d4
18 changed files with 160 additions and 81 deletions

View File

@@ -24,12 +24,14 @@ compileTestJava {
}
ext {
springIntegrationVersion = '4.0.0.RC1'
embedMongoVersion = '1.43'
jacocoVersion = '0.7.0.201403182114'
log4jVersion = '1.2.17'
springIntegrationVersion = '4.0.0.RELEASE'
linkHomepage = 'https://github.com/spring-projects/spring-integration-extensions'
linkCi = 'https://build.springsource.org/browse/INTEXT'
linkIssue = 'https://jira.springsource.org/browse/INTEXT'
linkCi = 'https://build.spring.io/browse/INTEXT'
linkIssue = 'https://jira.spring.io/browse/INTEXT'
linkScmUrl = 'https://github.com/spring-projects/spring-integration-extensions'
linkScmConnection = 'https://github.com/spring-projects/spring-integration-extensions.git'
linkScmDevConnection = 'git@github.com:spring-projects/spring-integration-extensions.git'
@@ -52,9 +54,9 @@ dependencies {
testCompile "org.springframework.integration:spring-integration-file:$springIntegrationVersion"
testCompile "org.springframework.integration:spring-integration-xml:$springIntegrationVersion"
testCompile "org.springframework.integration:spring-integration-mongodb:$springIntegrationVersion"
testCompile "de.flapdoodle.embed:de.flapdoodle.embed.mongo:1.43"
testCompile "de.flapdoodle.embed:de.flapdoodle.embed.mongo:$embedMongoVersion"
jacoco group: "org.jacoco", name: "org.jacoco.agent", version: "0.5.6.201201232323", classifier: "runtime"
jacoco "org.jacoco:org.jacoco.agent:$jacocoVersion:runtime"
}
// enable all compiler warnings; individual projects may customize further
@@ -186,6 +188,6 @@ task dist(dependsOn: assemble) {
task wrapper(type: Wrapper) {
description = 'Generates gradlew[.bat] scripts'
gradleVersion = '1.11'
gradleVersion = '1.12'
distributionUrl = "http://services.gradle.org/distributions/gradle-${gradleVersion}-all.zip"
}

View File

@@ -23,7 +23,8 @@ import org.springframework.messaging.MessageChannel;
/**
* @author Artem Bilan
*/
public class AbstractRouterSpec<S extends AbstractRouterSpec<S, R>, R extends AbstractMessageRouter> extends IntegrationComponentSpec<S, R> {
public class AbstractRouterSpec<S extends AbstractRouterSpec<S, R>, R extends AbstractMessageRouter>
extends IntegrationComponentSpec<S, R> {
AbstractRouterSpec(R router) {
this.target = router;

View File

@@ -47,7 +47,8 @@ class DslRecipientListRouter extends RecipientListRouter {
}
Map<String, Object> getRecipients() {
Map<String, Object> recipients = new HashMap<String, Object>(this.expressionRecipientMap.size() + this.selectorRecipientMap.size());
Map<String, Object> recipients =
new HashMap<String, Object>(this.expressionRecipientMap.size() + this.selectorRecipientMap.size());
recipients.putAll(this.expressionRecipientMap);
recipients.putAll(this.selectorRecipientMap);
return recipients;

View File

@@ -103,7 +103,8 @@ public class EnricherSpec extends IntegrationComponentSpec<EnricherSpec, Content
}
public <V> EnricherSpec header(String name, V value, Boolean overwrite) {
AbstractHeaderValueMessageProcessor<V> headerValueMessageProcessor = new StaticHeaderValueMessageProcessor<V>(value);
AbstractHeaderValueMessageProcessor<V> headerValueMessageProcessor =
new StaticHeaderValueMessageProcessor<V>(value);
headerValueMessageProcessor.setOverwrite(overwrite);
return this.header(name, headerValueMessageProcessor);
}

View File

@@ -22,7 +22,8 @@ import org.springframework.messaging.MessageHandler;
/**
* @author Artem Bilan
*/
public final class GenericEndpointSpec<H extends MessageHandler> extends ConsumerEndpointSpec<GenericEndpointSpec<H>, H> {
public final class GenericEndpointSpec<H extends MessageHandler>
extends ConsumerEndpointSpec<GenericEndpointSpec<H>, H> {
GenericEndpointSpec(H messageHandler) {
super(messageHandler);

View File

@@ -74,7 +74,8 @@ public class HeaderEnricherSpec extends IntegrationComponentSpec<HeaderEnricherS
}
public <V> HeaderEnricherSpec header(String name, V value, Boolean overwrite) {
AbstractHeaderValueMessageProcessor<V> headerValueMessageProcessor = new StaticHeaderValueMessageProcessor<V>(value);
AbstractHeaderValueMessageProcessor<V> headerValueMessageProcessor =
new StaticHeaderValueMessageProcessor<V>(value);
headerValueMessageProcessor.setOverwrite(overwrite);
return this.header(name, headerValueMessageProcessor);
}

View File

@@ -126,7 +126,8 @@ public final class IntegrationFlowBuilder {
}
public IntegrationFlowBuilder controlBus() {
return this.handle(new ServiceActivatingHandler(new ExpressionCommandMessageProcessor(new ControlBusMethodFilter())), null);
return this.handle(new ServiceActivatingHandler(new ExpressionCommandMessageProcessor(
new ControlBusMethodFilter())), null);
}
public IntegrationFlowBuilder transform(String expression) {
@@ -155,7 +156,8 @@ public final class IntegrationFlowBuilder {
return this.filter(genericSelector, null);
}
public <S> IntegrationFlowBuilder filter(GenericSelector<S> genericSelector, EndpointConfigurer<FilterEndpointSpec> endpointConfigurer) {
public <S> IntegrationFlowBuilder filter(GenericSelector<S> genericSelector,
EndpointConfigurer<FilterEndpointSpec> endpointConfigurer) {
Assert.notNull(genericSelector);
MessageSelector selector = genericSelector instanceof MessageSelector
? (MessageSelector) genericSelector : new MethodInvokingSelector(genericSelector);
@@ -172,7 +174,8 @@ public final class IntegrationFlowBuilder {
public IntegrationFlowBuilder handle(String beanName, String methodName,
EndpointConfigurer<GenericEndpointSpec<ServiceActivatingHandler>> endpointConfigurer) {
return this.handle(new ServiceActivatingHandler(new BeanNameMessageProcessor<Object>(beanName, methodName)), endpointConfigurer);
return this.handle(new ServiceActivatingHandler(new BeanNameMessageProcessor<Object>(beanName, methodName)),
endpointConfigurer);
}
public <H extends MessageHandler> IntegrationFlowBuilder handle(H messageHandler,
@@ -189,7 +192,8 @@ public final class IntegrationFlowBuilder {
return this.delay(groupId, expression, null);
}
public IntegrationFlowBuilder delay(String groupId, String expression, EndpointConfigurer<GenericEndpointSpec<DelayHandler>> endpointConfigurer) {
public IntegrationFlowBuilder delay(String groupId, String expression,
EndpointConfigurer<GenericEndpointSpec<DelayHandler>> endpointConfigurer) {
DelayHandler delayHandler = new DelayHandler(groupId);
if (StringUtils.hasText(expression)) {
delayHandler.setDelayExpression(PARSER.parseExpression(expression));
@@ -213,7 +217,8 @@ public final class IntegrationFlowBuilder {
return this.enrich(contentEnricher, null);
}
public IntegrationFlowBuilder enrich(ContentEnricher contentEnricher, EndpointConfigurer<GenericEndpointSpec<ContentEnricher>> endpointConfigurer) {
public IntegrationFlowBuilder enrich(ContentEnricher contentEnricher,
EndpointConfigurer<GenericEndpointSpec<ContentEnricher>> endpointConfigurer) {
return this.handle(contentEnricher, endpointConfigurer);
}
@@ -250,7 +255,8 @@ public final class IntegrationFlowBuilder {
return this.split(expression, (EndpointConfigurer<SplitterEndpointSpec<ExpressionEvaluatingSplitter>>) null);
}
public IntegrationFlowBuilder split(String expression, EndpointConfigurer<SplitterEndpointSpec<ExpressionEvaluatingSplitter>> endpointConfigurer) {
public IntegrationFlowBuilder split(String expression,
EndpointConfigurer<SplitterEndpointSpec<ExpressionEvaluatingSplitter>> endpointConfigurer) {
return this.split(new ExpressionEvaluatingSplitter(PARSER.parseExpression(expression)), endpointConfigurer);
}
@@ -277,14 +283,14 @@ public final class IntegrationFlowBuilder {
return this.split(new MethodInvokingSplitter(splitter, "split"), endpointConfigurer);
}
public <S extends AbstractMessageSplitter> IntegrationFlowBuilder split(S splitter, EndpointConfigurer<SplitterEndpointSpec<S>> endpointConfigurer) {
public <S extends AbstractMessageSplitter> IntegrationFlowBuilder split(S splitter,
EndpointConfigurer<SplitterEndpointSpec<S>> endpointConfigurer) {
Assert.notNull(splitter);
return this.register(new SplitterEndpointSpec<S>(splitter), endpointConfigurer);
}
/**
* Provides the {@link HeaderFilter} to the current {@link IntegrationFlow}.
*
* @param headersToRemove the array of headers (or patterns) to remove from {@link org.springframework.messaging.MessageHeaders}.
* @return the {@link IntegrationFlowBuilder}.
*/
@@ -294,9 +300,10 @@ public final class IntegrationFlowBuilder {
/**
* Provides the {@link HeaderFilter} to the current {@link IntegrationFlow}.
*
* @param headersToRemove the comma separated headers (or patterns) to remove from {@link org.springframework.messaging.MessageHeaders}.
* @param patternMatch the {@code boolean} flag to indicate if {@code headersToRemove} should be interpreted as patterns or direct header names.
* @param headersToRemove the comma separated headers (or patterns) to remove from
* {@link org.springframework.messaging.MessageHeaders}.
* @param patternMatch the {@code boolean} flag to indicate if {@code headersToRemove}
* should be interpreted as patterns or direct header names.
* @return the {@link IntegrationFlowBuilder}.
*/
@@ -315,7 +322,8 @@ public final class IntegrationFlowBuilder {
return this.claimCheckIn(messageStore, null);
}
public IntegrationFlowBuilder claimCheckIn(MessageStore messageStore, EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
public IntegrationFlowBuilder claimCheckIn(MessageStore messageStore,
EndpointConfigurer<GenericEndpointSpec<MessageTransformingHandler>> endpointConfigurer) {
return this.transform(new ClaimCheckInTransformer(messageStore), endpointConfigurer);
}
@@ -368,7 +376,8 @@ public final class IntegrationFlowBuilder {
}
public IntegrationFlowBuilder aggregate(EndpointConfigurer<GenericEndpointSpec<AggregatingMessageHandler>> endpointConfigurer) {
return this.aggregate(new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor()), endpointConfigurer);
return this.aggregate(new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor()),
endpointConfigurer);
}
public IntegrationFlowBuilder aggregate(ComponentConfigurer<AggregatorSpec> aggregatorConfigurer) {
@@ -405,7 +414,8 @@ public final class IntegrationFlowBuilder {
public IntegrationFlowBuilder route(String beanName, String method,
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer,
EndpointConfigurer<GenericEndpointSpec<MethodInvokingRouter>> endpointConfigurer) {
return this.route(new MethodInvokingRouter(new BeanNameMessageProcessor<Object>(beanName, method)), routerConfigurer, endpointConfigurer);
return this.route(new MethodInvokingRouter(new BeanNameMessageProcessor<Object>(beanName, method)),
routerConfigurer, endpointConfigurer);
}
@@ -413,21 +423,24 @@ public final class IntegrationFlowBuilder {
return this.route(expression, (ComponentConfigurer<RouterSpec<ExpressionEvaluatingRouter>>) null);
}
public IntegrationFlowBuilder route(String expression, ComponentConfigurer<RouterSpec<ExpressionEvaluatingRouter>> routerConfigurer) {
public IntegrationFlowBuilder route(String expression,
ComponentConfigurer<RouterSpec<ExpressionEvaluatingRouter>> routerConfigurer) {
return this.route(expression, routerConfigurer, null);
}
public IntegrationFlowBuilder route(String expression,
ComponentConfigurer<RouterSpec<ExpressionEvaluatingRouter>> routerConfigurer,
EndpointConfigurer<GenericEndpointSpec<ExpressionEvaluatingRouter>> endpointConfigurer) {
return this.route(new ExpressionEvaluatingRouter(PARSER.parseExpression(expression)), routerConfigurer, endpointConfigurer);
return this.route(new ExpressionEvaluatingRouter(PARSER.parseExpression(expression)), routerConfigurer,
endpointConfigurer);
}
public <S, T> IntegrationFlowBuilder route(GenericRouter<S, T> router) {
return this.route(router, (ComponentConfigurer<RouterSpec<MethodInvokingRouter>>) null);
}
public <S, T> IntegrationFlowBuilder route(GenericRouter<S, T> router, ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer) {
public <S, T> IntegrationFlowBuilder route(GenericRouter<S, T> router,
ComponentConfigurer<RouterSpec<MethodInvokingRouter>> routerConfigurer) {
return this.route(router, routerConfigurer, null);
}
@@ -437,7 +450,8 @@ public final class IntegrationFlowBuilder {
return this.route(new MethodInvokingRouter(router), routerConfigurer, endpointConfigurer);
}
public <R extends AbstractMappingMessageRouter> IntegrationFlowBuilder route(R router, ComponentConfigurer<RouterSpec<R>> routerConfigurer) {
public <R extends AbstractMappingMessageRouter> IntegrationFlowBuilder route(R router,
ComponentConfigurer<RouterSpec<R>> routerConfigurer) {
return this.route(router, routerConfigurer, null);
}
@@ -475,7 +489,8 @@ public final class IntegrationFlowBuilder {
}
private <S extends ConsumerEndpointSpec<S, ?>> IntegrationFlowBuilder register(S endpointSpec, EndpointConfigurer<S> endpointConfigurer) {
private <S extends ConsumerEndpointSpec<S, ?>> IntegrationFlowBuilder register(S endpointSpec,
EndpointConfigurer<S> endpointConfigurer) {
if (endpointConfigurer != null) {
endpointConfigurer.configure(endpointSpec);
}
@@ -513,7 +528,8 @@ public final class IntegrationFlowBuilder {
channelName = ((MessageChannelReference) outputChannel).getName();
}
if (this.currentComponent instanceof AbstractReplyProducingMessageHandler) {
AbstractReplyProducingMessageHandler messageProducer = (AbstractReplyProducingMessageHandler) this.currentComponent;
AbstractReplyProducingMessageHandler messageProducer =
(AbstractReplyProducingMessageHandler) this.currentComponent;
if (channelName != null) {
messageProducer.setOutputChannelName(channelName);
}
@@ -522,7 +538,8 @@ public final class IntegrationFlowBuilder {
}
}
else if (this.currentComponent instanceof SourcePollingChannelAdapterFactoryBean) {
SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean = (SourcePollingChannelAdapterFactoryBean) this.currentComponent;
SourcePollingChannelAdapterFactoryBean pollingChannelAdapterFactoryBean =
(SourcePollingChannelAdapterFactoryBean) this.currentComponent;
if (channelName != null) {
pollingChannelAdapterFactoryBean.setOutputChannelName(channelName);
}
@@ -531,7 +548,8 @@ public final class IntegrationFlowBuilder {
}
}
else if (this.currentComponent instanceof AbstractCorrelatingMessageHandler) {
AbstractCorrelatingMessageHandler messageProducer = (AbstractCorrelatingMessageHandler) this.currentComponent;
AbstractCorrelatingMessageHandler messageProducer =
(AbstractCorrelatingMessageHandler) this.currentComponent;
if (channelName != null) {
messageProducer.setOutputChannelName(channelName);
}
@@ -540,8 +558,9 @@ public final class IntegrationFlowBuilder {
}
}
else {
throw new BeanCreationException("The 'currentComponent' (" + this.currentComponent + ") is a one-way 'MessageHandler'" +
" and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.");
throw new BeanCreationException("The 'currentComponent' (" + this.currentComponent +
") is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. " +
"This is the end of the integration flow.");
}
this.currentComponent = null;
}
@@ -551,9 +570,10 @@ public final class IntegrationFlowBuilder {
public IntegrationFlow get() {
if (this.currentMessageChannel instanceof FixedSubscriberChannelPrototype) {
throw new BeanCreationException("The 'currentMessageChannel' (" + this.currentMessageChannel + ") is a prototype" +
" for FixedSubscriberChannel which can't be created without MessageHandler constructor argument. " +
"That means that '.fixedSubscriberChannel()' can't be the last EIP-method in the IntegrationFlow definition.");
throw new BeanCreationException("The 'currentMessageChannel' (" + this.currentMessageChannel +
") is a prototype for FixedSubscriberChannel which can't be created without MessageHandler " +
"constructor argument. That means that '.fixedSubscriberChannel()' can't be the last EIP-method " +
"in the IntegrationFlow definition.");
}
if (this.flow.getIntegrationComponents().size() == 1) {

View File

@@ -24,7 +24,8 @@ import org.springframework.integration.aggregator.ResequencingMessageHandler;
*/
public class ResequencerSpec extends CorrelationHandlerSpec<ResequencerSpec, ResequencingMessageHandler> {
private final ResequencingMessageHandler resequencingMessageHandler = new ResequencingMessageHandler(new ResequencingMessageGroupProcessor());
private final ResequencingMessageHandler resequencingMessageHandler =
new ResequencingMessageHandler(new ResequencingMessageGroupProcessor());
ResequencerSpec() {
}

View File

@@ -25,8 +25,8 @@ import org.springframework.integration.scheduling.PollerMetadata;
* @author Artem Bilan
*/
public final class SourcePollingChannelAdapterSpec
extends EndpointSpec<SourcePollingChannelAdapterSpec, SourcePollingChannelAdapterFactoryBean, MessageSource<?>> {
public final class SourcePollingChannelAdapterSpec extends EndpointSpec<SourcePollingChannelAdapterSpec,
SourcePollingChannelAdapterFactoryBean, MessageSource<?>> {
SourcePollingChannelAdapterSpec(MessageSource<?> messageSource) {
super(messageSource);

View File

@@ -22,7 +22,8 @@ import org.springframework.integration.splitter.AbstractMessageSplitter;
/**
* @author Artem Bilan
*/
public final class SplitterEndpointSpec<S extends AbstractMessageSplitter> extends ConsumerEndpointSpec<SplitterEndpointSpec<S>, S> {
public final class SplitterEndpointSpec<S extends AbstractMessageSplitter>
extends ConsumerEndpointSpec<SplitterEndpointSpec<S>, S> {
SplitterEndpointSpec(S splitter) {
super(splitter);

View File

@@ -23,7 +23,8 @@ import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrateg
/**
* @author Artem Bilan
*/
public abstract class LoadBalancingChannelSpec<S extends MessageChannelSpec<S, C>, C extends AbstractMessageChannel> extends MessageChannelSpec<S, C> {
public abstract class LoadBalancingChannelSpec<S extends MessageChannelSpec<S, C>, C extends AbstractMessageChannel>
extends MessageChannelSpec<S, C> {
protected LoadBalancingStrategy loadBalancingStrategy = new RoundRobinLoadBalancingStrategy();

View File

@@ -30,7 +30,8 @@ import org.springframework.util.Assert;
/**
* @author Artem Bilan
*/
public abstract class MessageChannelSpec<S extends MessageChannelSpec<S, C>, C extends AbstractMessageChannel> extends IntegrationComponentSpec<S, C> {
public abstract class MessageChannelSpec<S extends MessageChannelSpec<S, C>, C extends AbstractMessageChannel>
extends IntegrationComponentSpec<S, C> {
protected C channel;

View File

@@ -64,7 +64,8 @@ public final class MessageChannels {
return new QueueChannelSpec.MessageStoreSpec(messageGroupStore, groupId);
}
public static QueueChannelSpec.MessageStoreSpec queue(String id, ChannelMessageStore messageGroupStore, Object groupId) {
public static QueueChannelSpec.MessageStoreSpec queue(String id, ChannelMessageStore messageGroupStore,
Object groupId) {
return queue(messageGroupStore, groupId).id(id);
}
@@ -97,8 +98,8 @@ public final class MessageChannels {
return new QueueChannelSpec.MessageStoreSpec(messageGroupStore, groupId);
}
public static QueueChannelSpec.MessageStoreSpec priority(String id, PriorityCapableChannelMessageStore messageGroupStore,
Object groupId) {
public static QueueChannelSpec.MessageStoreSpec priority(String id,
PriorityCapableChannelMessageStore messageGroupStore, Object groupId) {
return queue(messageGroupStore, groupId).id(id);
}

View File

@@ -24,7 +24,8 @@ import org.springframework.util.ErrorHandler;
/**
* @author Artem Bilan
*/
public class PublishSubscribeChannelSpec extends MessageChannelSpec<PublishSubscribeChannelSpec, PublishSubscribeChannel> {
public class PublishSubscribeChannelSpec
extends MessageChannelSpec<PublishSubscribeChannelSpec, PublishSubscribeChannel> {
PublishSubscribeChannelSpec() {
this.channel = new PublishSubscribeChannel();

View File

@@ -93,7 +93,8 @@ public class QueueChannelSpec extends MessageChannelSpec<QueueChannelSpec, Queue
protected QueueChannel doGet() {
if (this.capacity != null) {
if (this.storeLock != null) {
this.queue = new MessageGroupQueue(this.messageGroupStore, this.groupId, this.capacity, this.storeLock);
this.queue = new MessageGroupQueue(this.messageGroupStore, this.groupId, this.capacity,
this.storeLock);
}
else {
this.queue = new MessageGroupQueue(this.messageGroupStore, this.groupId, this.capacity);

View File

@@ -54,23 +54,26 @@ public class DslIntegrationConfigurationInitializer implements IntegrationConfig
@Override
public void initialize(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
Assert.isInstanceOf(BeanDefinitionRegistry.class, configurableListableBeanFactory,
"To use Spring Integration Java DSL the 'beanFactory' has to be an instance of 'BeanDefinitionRegistry'." +
"Consider using 'GenericApplicationContext' implementation."
"To use Spring Integration Java DSL the 'beanFactory' has to be an instance of " +
"'BeanDefinitionRegistry'. Consider using 'GenericApplicationContext' implementation."
);
this.checkSpecBeans(configurableListableBeanFactory);
this.initializeIntegrationFlows(configurableListableBeanFactory);
}
private void checkSpecBeans(ConfigurableListableBeanFactory beanFactory) {
List<String> specBeanNames = Arrays.asList(beanFactory.getBeanNamesForType(IntegrationComponentSpec.class, true, false));
List<String> specBeanNames = Arrays.asList(beanFactory.getBeanNamesForType(IntegrationComponentSpec.class,
true, false));
if (!specBeanNames.isEmpty()) {
throw new BeanCreationException("'IntegrationComponentSpec' beans: '" + specBeanNames + "' must be populated " +
"to target objects via 'get()' method call. It is important for @Autowired injections.");
throw new BeanCreationException("'IntegrationComponentSpec' beans: '" + specBeanNames +
"' must be populated to target objects via 'get()' method call. It is important for " +
"@Autowired injections.");
}
}
private void initializeIntegrationFlows(ConfigurableListableBeanFactory beanFactory) {
AutowiredAnnotationBeanPostProcessor autowiredAnnotationBeanPostProcessor = beanFactory.getBean(AutowiredAnnotationBeanPostProcessor.class);
AutowiredAnnotationBeanPostProcessor autowiredAnnotationBeanPostProcessor =
beanFactory.getBean(AutowiredAnnotationBeanPostProcessor.class);
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
String[] integrationFlowBeanNames = beanFactory.getBeanNamesForType(IntegrationFlow.class, false, false);
Set<String> processedConfigurations = new HashSet<String>();
@@ -91,7 +94,8 @@ public class DslIntegrationConfigurationInitializer implements IntegrationConfig
if (instance instanceof AbstractMessageChannel) {
String channelBeanName = ((AbstractMessageChannel) instance).getComponentName();
if (channelBeanName == null) {
channelBeanName = flowNamePrefix + "channel" + BeanFactoryUtils.GENERATED_BEAN_NAME_SEPARATOR + channelNameIndex++;
channelBeanName = flowNamePrefix + "channel" +
BeanFactoryUtils.GENERATED_BEAN_NAME_SEPARATOR + channelNameIndex++;
}
registry.registerBeanDefinition(channelBeanName, component);
}
@@ -101,12 +105,16 @@ public class DslIntegrationConfigurationInitializer implements IntegrationConfig
ConsumerEndpointFactoryBean endpoint = endpointSpec.get().getT1();
String id = endpointSpec.getId();
Collection<?> messageHandlers = beanFactory.getBeansOfType(messageHandler.getClass(), false, false).values();
Collection<?> messageHandlers =
beanFactory.getBeansOfType(messageHandler.getClass(), false, false).values();
if (!messageHandlers.contains(messageHandler)) {
String handlerBeanName = generateInstanceBeanDefinitionName(registry, messageHandler);
String[] handlerAlias = id != null ? new String[]{id + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX} : null;
BeanComponentDefinition definitionHolder = new BeanComponentDefinition(new InstanceBeanDefinition(messageHandler),
String[] handlerAlias = id != null
? new String[]{id + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX}
: null;
BeanComponentDefinition definitionHolder =
new BeanComponentDefinition(new InstanceBeanDefinition(messageHandler),
handlerBeanName, handlerAlias);
BeanDefinitionReaderUtils.registerBeanDefinition(definitionHolder, registry);
}
@@ -127,7 +135,8 @@ public class DslIntegrationConfigurationInitializer implements IntegrationConfig
FixedSubscriberChannel fixedSubscriberChannel = (FixedSubscriberChannel) instance;
String channelBeanName = fixedSubscriberChannel.getComponentName();
if ("Unnamed fixed subscriber channel".equals(channelBeanName)) {
channelBeanName = flowNamePrefix + "channel" + BeanFactoryUtils.GENERATED_BEAN_NAME_SEPARATOR + channelNameIndex++;
channelBeanName = flowNamePrefix + "channel" +
BeanFactoryUtils.GENERATED_BEAN_NAME_SEPARATOR + channelNameIndex++;
}
registry.registerBeanDefinition(channelBeanName, component);
}

View File

@@ -27,7 +27,8 @@ import org.springframework.integration.scheduling.PollerMetadata;
* @author Artem Bilan
*/
public abstract class EndpointSpec<S extends EndpointSpec<S, F, H>, F extends BeanNameAware, H> extends IntegrationComponentSpec<S, Tuple2<F, H>> {
public abstract class EndpointSpec<S extends EndpointSpec<S, F, H>, F extends BeanNameAware, H>
extends IntegrationComponentSpec<S, Tuple2<F, H>> {
@SuppressWarnings("unchecked")
protected EndpointSpec(H handler) {

View File

@@ -33,8 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import com.mongodb.Mongo;
import com.mongodb.MongoURI;
import com.mongodb.MongoClient;
import de.flapdoodle.embed.mongo.MongodExecutable;
import de.flapdoodle.embed.mongo.MongodProcess;
import de.flapdoodle.embed.mongo.MongodStarter;
@@ -56,6 +55,7 @@ import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
@@ -372,25 +372,38 @@ public class IntegrationFlowTests {
@Test
public void testWrongLastComponent() {
ConfigurableApplicationContext context = null;
try {
new AnnotationConfigApplicationContext(InvalidLastComponentFlowContext.class);
context = new AnnotationConfigApplicationContext(InvalidLastComponentFlowContext.class);
fail("BeanCreationException expected");
}
catch (Exception e) {
assertThat(e, Matchers.instanceOf(BeanCreationException.class));
assertThat(e.getMessage(), Matchers.containsString("is a one-way 'MessageHandler'"));
}
finally {
if (context != null) {
context.close();
}
}
}
@Test
public void testWrongLastMessageChannel() {
ConfigurableApplicationContext context = null;
try {
new AnnotationConfigApplicationContext(InvalidLastMessageChannelFlowContext.class);
context = new AnnotationConfigApplicationContext(InvalidLastMessageChannelFlowContext.class);
fail("BeanCreationException expected");
}
catch (Exception e) {
assertThat(e, Matchers.instanceOf(BeanCreationException.class));
assertThat(e.getMessage(), Matchers.containsString("'.fixedSubscriberChannel()' can't be the last EIP-method in the IntegrationFlow definition"));
assertThat(e.getMessage(), Matchers.containsString("'.fixedSubscriberChannel()' " +
"can't be the last EIP-method in the IntegrationFlow definition"));
}
finally {
if (context != null) {
context.close();
}
}
}
@@ -418,7 +431,9 @@ public class IntegrationFlowTests {
@Test
public void testMethodInvokingMessageHandler() {
QueueChannel replyChannel = new QueueChannel();
Message<?> message = MessageBuilder.withPayload("world").setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel).build();
Message<?> message = MessageBuilder.withPayload("world")
.setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel)
.build();
this.methodInvokingInput.send(message);
Message<?> receive = replyChannel.receive(5000);
assertNotNull(receive);
@@ -427,21 +442,30 @@ public class IntegrationFlowTests {
@Test
public void testWrongConfigurationWithSpecBean() {
ConfigurableApplicationContext context = null;
try {
new AnnotationConfigApplicationContext(InvalidConfigurationWithSpec.class);
context = new AnnotationConfigApplicationContext(InvalidConfigurationWithSpec.class);
fail("BeanCreationException expected");
}
catch (Exception e) {
assertThat(e, Matchers.instanceOf(IllegalArgumentException.class));
assertThat(e.getCause(), Matchers.instanceOf(BeanCreationException.class));
assertThat(e.getCause().getMessage(), Matchers.containsString("must be populated to target objects via 'get()' method call"));
assertThat(e.getCause().getMessage(),
Matchers.containsString("must be populated to target objects via 'get()' method call"));
}
finally {
if (context != null) {
context.close();
}
}
}
@Test
public void testContentEnricher() {
QueueChannel replyChannel = new QueueChannel();
Message<?> message = MessageBuilder.withPayload(new TestPojo("Bar")).setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel).build();
Message<?> message = MessageBuilder.withPayload(new TestPojo("Bar"))
.setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel)
.build();
this.enricherInput.send(message);
Message<?> receive = replyChannel.receive(5000);
assertNotNull(receive);
@@ -458,7 +482,10 @@ public class IntegrationFlowTests {
public void testSplitterResequencer() {
QueueChannel replyChannel = new QueueChannel();
this.splitInput.send(MessageBuilder.withPayload("").setReplyChannel(replyChannel).setHeader("foo", "bar").build());
this.splitInput.send(MessageBuilder.withPayload("")
.setReplyChannel(replyChannel)
.setHeader("foo", "bar")
.build());
for (int i = 0; i < 12; i++) {
Message<?> receive = replyChannel.receive(2000);
@@ -475,7 +502,9 @@ public class IntegrationFlowTests {
List<Character> payload = Arrays.asList('a', 'b', 'c', 'd', 'e');
QueueChannel replyChannel = new QueueChannel();
this.splitAggregateInput.send(MessageBuilder.withPayload(payload).setReplyChannel(replyChannel).build());
this.splitAggregateInput.send(MessageBuilder.withPayload(payload)
.setReplyChannel(replyChannel)
.build());
Message<?> receive = replyChannel.receive(2000);
assertNotNull(receive);
@@ -491,9 +520,10 @@ public class IntegrationFlowTests {
public void testHeaderEnricher() {
QueueChannel replyChannel = new QueueChannel();
Message<String> message = MessageBuilder.withPayload("<root><elementOne>1</elementOne><elementTwo>2</elementTwo></root>")
.setReplyChannel(replyChannel)
.build();
Message<String> message =
MessageBuilder.withPayload("<root><elementOne>1</elementOne><elementTwo>2</elementTwo></root>")
.setReplyChannel(replyChannel)
.build();
try {
this.xpathHeaderEnricherInput.send(message);
@@ -560,7 +590,8 @@ public class IntegrationFlowTests {
fail("MessageDeliveryException expected.");
}
catch (MessageDeliveryException e) {
assertThat(e.getMessage(), Matchers.containsString("no channel resolved by router and no default output channel defined"));
assertThat(e.getMessage(),
Matchers.containsString("no channel resolved by router and no default output channel defined"));
}
}
@@ -590,7 +621,8 @@ public class IntegrationFlowTests {
}
catch (MessagingException e) {
assertThat(e.getCause(), Matchers.instanceOf(DestinationResolutionException.class));
assertThat(e.getCause().getMessage(), Matchers.containsString("failed to look up MessageChannel with name 'bad-channel'"));
assertThat(e.getCause().getMessage(),
Matchers.containsString("failed to look up MessageChannel with name 'bad-channel'"));
}
}
@@ -620,7 +652,8 @@ public class IntegrationFlowTests {
}
catch (MessagingException e) {
assertThat(e.getCause(), Matchers.instanceOf(DestinationResolutionException.class));
assertThat(e.getCause().getMessage(), Matchers.containsString("failed to look up MessageChannel with name 'bad-channel'"));
assertThat(e.getCause().getMessage(),
Matchers.containsString("failed to look up MessageChannel with name 'bad-channel'"));
}
}
@@ -652,7 +685,8 @@ public class IntegrationFlowTests {
fail("MessageDeliveryException expected.");
}
catch (MessageDeliveryException e) {
assertThat(e.getMessage(), Matchers.containsString("no channel resolved by router and no default output channel defined"));
assertThat(e.getMessage(),
Matchers.containsString("no channel resolved by router and no default output channel defined"));
}
}
@@ -848,7 +882,7 @@ public class IntegrationFlowTests {
@Bean
public MongoDbFactory mongoDbFactory() throws Exception {
return new SimpleMongoDbFactory(new MongoURI("mongodb://localhost:12345/local"));
return new SimpleMongoDbFactory(new MongoClient("localhost",12345), "local");
}
@Bean
@@ -1068,7 +1102,8 @@ public class IntegrationFlowTests {
}
}, c -> c.applySequence(false))
.channel(MessageChannels.executor(this.taskExecutor()))
.split((SplitterEndpointSpec<DefaultMessageSplitter> s) -> s.applySequence(false).get().getT2().setDelimiters(","))
.split((SplitterEndpointSpec<DefaultMessageSplitter> s) ->
s.applySequence(false).get().getT2().setDelimiters(","))
.channel(MessageChannels.executor(this.taskExecutor()))
.<String, Integer>transform(Integer::parseInt)
.enrichHeaders(s -> s.headerExpression(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, "payload"))