diff --git a/org.springframework.integration.security/src/test/java/org/springframework/integration/security/ChannelInterceptorRegisteringBeanPostProcessorTests.java b/org.springframework.integration.security/src/test/java/org/springframework/integration/security/ChannelInterceptorRegisteringBeanPostProcessorTests.java index 82e02de827..c8da634dea 100644 --- a/org.springframework.integration.security/src/test/java/org/springframework/integration/security/ChannelInterceptorRegisteringBeanPostProcessorTests.java +++ b/org.springframework.integration.security/src/test/java/org/springframework/integration/security/ChannelInterceptorRegisteringBeanPostProcessorTests.java @@ -24,6 +24,7 @@ import java.util.List; import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; + import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.integration.channel.ChannelInterceptor; import org.springframework.integration.channel.MessageChannel; @@ -31,9 +32,7 @@ import org.springframework.integration.message.Message; import org.springframework.integration.message.selector.MessageSelector; /** - * * @author Jonas Partner - * */ public class ChannelInterceptorRegisteringBeanPostProcessorTests { @@ -74,36 +73,29 @@ public class ChannelInterceptorRegisteringBeanPostProcessorTests { EasyMock.verify(channel); } + static class TestInterceptor implements ChannelInterceptor { public void postReceive(Message message, MessageChannel channel) { - // TODO Auto-generated method stub - } public void postSend(Message message, MessageChannel channel, boolean sent) { - // TODO Auto-generated method stub - } public boolean preReceive(MessageChannel channel) { - // TODO Auto-generated method stub return false; } public boolean preSend(Message message, MessageChannel channel) { - // TODO Auto-generated method stub return false; } } + static class TestChannel extends AbstractMessageChannel { ChannelInterceptor channelInterceptor; - public TestChannel() { - super(null); - } @Override public void addInterceptor(ChannelInterceptor interceptor) { @@ -113,23 +105,19 @@ public class ChannelInterceptorRegisteringBeanPostProcessorTests { @Override protected Message doReceive(long timeout) { - // TODO Auto-generated method stub return null; } @Override protected boolean doSend(Message message, long timeout) { - // TODO Auto-generated method stub return false; } public List> clear() { - // TODO Auto-generated method stub return null; } public List> purge(MessageSelector selector) { - // TODO Auto-generated method stub return null; } diff --git a/org.springframework.integration.security/src/test/java/org/springframework/integration/security/config/SecuredChannelsParserTests.java b/org.springframework.integration.security/src/test/java/org/springframework/integration/security/config/SecuredChannelsParserTests.java index 80d193e43f..e87d62b9a8 100644 --- a/org.springframework.integration.security/src/test/java/org/springframework/integration/security/config/SecuredChannelsParserTests.java +++ b/org.springframework.integration.security/src/test/java/org/springframework/integration/security/config/SecuredChannelsParserTests.java @@ -110,33 +110,27 @@ public class SecuredChannelsParserTests extends AbstractJUnit4SpringContextTests new SecurityConfig("ROLE_ADMIN"))); } + static class TestMessageChannel extends AbstractMessageChannel { List interceptors = new ArrayList(); - public TestMessageChannel() { - super(null); - } @Override protected Message doReceive(long timeout) { - // TODO Auto-generated method stub return null; } @Override protected boolean doSend(Message message, long timeout) { - // TODO Auto-generated method stub return false; } public List> clear() { - // TODO Auto-generated method stub return null; } public List> purge(MessageSelector selector) { - // TODO Auto-generated method stub return null; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultChannelFactoryBean.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultChannelFactoryBean.java index 07cdd667f8..4bc2ff7349 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultChannelFactoryBean.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultChannelFactoryBean.java @@ -24,7 +24,6 @@ import org.springframework.beans.factory.FactoryBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.integration.channel.ChannelInterceptor; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.factory.ChannelFactory; import org.springframework.integration.channel.factory.QueueChannelFactory; @@ -48,13 +47,6 @@ public class DefaultChannelFactoryBean implements ApplicationContextAware, Facto private volatile List interceptors; - private volatile DispatcherPolicy dispatcherPolicy; - - - public DefaultChannelFactoryBean(DispatcherPolicy dispatcherPolicy) { - this.dispatcherPolicy = dispatcherPolicy; - } - public void setBeanName(String beanName) { this.beanName = beanName; @@ -78,7 +70,7 @@ public class DefaultChannelFactoryBean implements ApplicationContextAware, Facto public Object getObject() throws Exception { Assert.notNull(channelFactory, "ChannelFactory not set on this instance. Is this used within an ApplicationContext?"); - return channelFactory.getChannel(this.beanName, dispatcherPolicy, interceptors); + return channelFactory.getChannel(this.beanName, interceptors); } public Class getObjectType() { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java index 0d6faad35c..32499987b1 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java @@ -361,7 +361,7 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A if (this.logger.isInfoEnabled()) { logger.info("auto-creating channel '" + channelName + "'"); } - channel = channelFactory.getChannel(channelName, null, null); + channel = channelFactory.getChannel(channelName, null); this.registerChannel(channelName, channel); } return channel; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java index ed395d0afe..fbfb0f9cc0 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java @@ -42,16 +42,6 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName private final ChannelInterceptorList interceptors = new ChannelInterceptorList(); - private final DispatcherPolicy dispatcherPolicy; - - - /** - * Create a channel with the given dispatcher policy. - */ - public AbstractMessageChannel(DispatcherPolicy dispatcherPolicy) { - this.dispatcherPolicy = dispatcherPolicy; - } - /** * Set the name of this channel. @@ -91,13 +81,6 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName this.interceptors.add(interceptor); } - /** - * Return the dispatcher policy for this channel. - */ - public DispatcherPolicy getDispatcherPolicy() { - return this.dispatcherPolicy; - } - /** * Send a message on this channel. If the channel is at capacity, this * method will block until either space becomes available or the sending diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/DispatcherPolicy.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/DispatcherPolicy.java deleted file mode 100644 index 5b0c9fe816..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/DispatcherPolicy.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright 2002-2007 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.channel; - -import org.springframework.integration.dispatcher.MessageDispatcher; -import org.springframework.util.Assert; - -/** - * Metadata for a {@link MessageDispatcher}. - * - * @author Mark Fisher - */ -public class DispatcherPolicy { - - public final static int DEFAULT_REJECTION_LIMIT = 5; - - public final static long DEFAULT_RETRY_INTERVAL = 1000; - - - private final boolean publishSubscribe; - - private volatile int rejectionLimit = DEFAULT_REJECTION_LIMIT; - - private volatile long retryInterval = DEFAULT_RETRY_INTERVAL; - - private volatile boolean shouldFailOnRejectionLimit = true; - - - public DispatcherPolicy() { - this.publishSubscribe = false; - } - - /** - * Create a DispatcherPolicy. - * - * @param publishSubscribe whether the dispatcher should attempt to publish - * to all of its subscribed handlers. If 'false' it will attempt - * to send to a single handler (point-to-point). - */ - public DispatcherPolicy(boolean publishSubscribe) { - this.publishSubscribe = publishSubscribe; - } - - - /** - * Return whether the dispatcher should attempt to publish to all of its handlers. - * This property is immutable. - */ - public boolean isPublishSubscribe() { - return this.publishSubscribe; - } - - /** - * Return the maximum number of retries upon rejection. - */ - public int getRejectionLimit() { - return this.rejectionLimit; - } - - /** - * Set the maximum number of retries upon rejection. - */ - public void setRejectionLimit(int rejectionLimit) { - Assert.isTrue(rejectionLimit > 0, "'rejectionLimit' must be at least 1"); - this.rejectionLimit = rejectionLimit; - } - - /** - * Return the amount of time in milliseconds to wait between rejections. - */ - public long getRetryInterval() { - return this.retryInterval; - } - - /** - * Set the amount of time in milliseconds to wait between rejections. - */ - public void setRetryInterval(long retryInterval) { - Assert.isTrue(retryInterval >= 0, "'retryInterval' must not be negative"); - this.retryInterval = retryInterval; - } - - /** - * Return whether an exception should be thrown when this dispatcher's - * {@link #rejectionLimit} is reached. - */ - public boolean getShouldFailOnRejectionLimit() { - return this.shouldFailOnRejectionLimit; - } - - /** - * Specify whether an exception should be thrown when this dispatcher's - * {@link #rejectionLimit} is reached. The default value is 'true'. - */ - public void setShouldFailOnRejectionLimit(boolean shouldFailOnRejectionLimit) { - this.shouldFailOnRejectionLimit = shouldFailOnRejectionLimit; - } - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessageChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessageChannel.java index 5b5d8fd442..767324a172 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessageChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessageChannel.java @@ -40,11 +40,6 @@ public interface MessageChannel extends BlockingSource, BlockingTarget { */ void setName(String name); - /** - * Return this channel's dispatcher policy - */ - DispatcherPolicy getDispatcherPolicy(); - /** * Remove all {@link Message Messages} from this channel. */ diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java index bf9091a42b..f008d68527 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PriorityChannel.java @@ -37,28 +37,20 @@ public class PriorityChannel extends QueueChannel { /** - * Create a channel with the specified queue capacity and dispatcher policy. + * Create a channel with the specified queue capacity. * Priority will be based upon the provided {@link Comparator}. */ - public PriorityChannel(int capacity, DispatcherPolicy dispatcherPolicy, Comparator> comparator) { - super(new PriorityBlockingQueue>(capacity, comparator), dispatcherPolicy); + public PriorityChannel(int capacity, Comparator> comparator) { + super(new PriorityBlockingQueue>(capacity, comparator)); this.semaphore = new Semaphore(capacity, true); } /** - * Create a channel with the specified queue capacity and dispatcher policy. + * Create a channel with the specified queue capacity. * Priority will be based upon the value of {@link MessageHeader#getPriority()}. */ - public PriorityChannel(int capacity, DispatcherPolicy dispatcherPolicy) { - this(capacity, dispatcherPolicy, new MessagePriorityComparator()); - } - - /** - * Create a channel with the specified queue capacity and default dispatcher - * policy. Priority will be based on the value of {@link MessageHeader#getPriority()}. - */ public PriorityChannel(int capacity) { - this(capacity, null, new MessagePriorityComparator()); + this(capacity, new MessagePriorityComparator()); } /** @@ -66,7 +58,7 @@ public class PriorityChannel extends QueueChannel { * Priority will be based on the value of {@link MessageHeader#getPriority()}. */ public PriorityChannel() { - this(DEFAULT_CAPACITY, null, new MessagePriorityComparator()); + this(DEFAULT_CAPACITY, new MessagePriorityComparator()); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/QueueChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/QueueChannel.java index b8a9540a2d..7545365e79 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/QueueChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/QueueChannel.java @@ -44,29 +44,20 @@ public class QueueChannel extends AbstractMessageChannel { /** - * Create a channel with the specified queue and dispatcher policy. + * Create a channel with the specified queue. */ - public QueueChannel(BlockingQueue> queue, DispatcherPolicy dispatcherPolicy) { - super((dispatcherPolicy != null) ? dispatcherPolicy : new DispatcherPolicy()); + public QueueChannel(BlockingQueue> queue) { Assert.notNull(queue, "'queue' must not be null"); this.queue = queue; } - /** - * Create a channel with the specified queue capacity and dispatcher policy. - */ - public QueueChannel(int capacity, DispatcherPolicy dispatcherPolicy) { - super((dispatcherPolicy != null) ? dispatcherPolicy : new DispatcherPolicy()); - Assert.isTrue(capacity > 0, "The capacity must be a positive integer. " + - "For a zero-capacity alternative, consider using a 'RendezvousChannel'."); - this.queue = new LinkedBlockingQueue>(capacity); - } - /** * Create a channel with the specified queue capacity. */ public QueueChannel(int capacity) { - this(capacity, null); + Assert.isTrue(capacity > 0, "The capacity must be a positive integer. " + + "For a zero-capacity alternative, consider using a 'RendezvousChannel'."); + this.queue = new LinkedBlockingQueue>(capacity); } /** @@ -74,7 +65,7 @@ public class QueueChannel extends AbstractMessageChannel { * @see #DEFAULT_CAPACITY */ public QueueChannel() { - this(DEFAULT_CAPACITY, null); + this(DEFAULT_CAPACITY); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/RendezvousChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/RendezvousChannel.java index 4831cd3512..b28fc67482 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/RendezvousChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/RendezvousChannel.java @@ -29,12 +29,8 @@ import org.springframework.integration.message.Message; */ public class RendezvousChannel extends QueueChannel { - public RendezvousChannel(DispatcherPolicy dispatcherPolicy) { - super(new SynchronousQueue>(), dispatcherPolicy); - } - public RendezvousChannel() { - this(null); + super(new SynchronousQueue>()); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/ThreadLocalChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/ThreadLocalChannel.java index 5696741cda..e252d59d14 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/ThreadLocalChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/ThreadLocalChannel.java @@ -38,11 +38,6 @@ public class ThreadLocalChannel extends AbstractMessageChannel { private static final ThreadLocalMessageHolder messageHolder = new ThreadLocalMessageHolder(); - public ThreadLocalChannel() { - super(defaultDispatcherPolicy()); - } - - @Override protected Message doReceive(long timeout) { return messageHolder.get().poll(); @@ -86,15 +81,6 @@ public class ThreadLocalChannel extends AbstractMessageChannel { } - private static DispatcherPolicy defaultDispatcherPolicy() { - DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(false); - dispatcherPolicy.setRejectionLimit(1); - dispatcherPolicy.setRetryInterval(0); - dispatcherPolicy.setShouldFailOnRejectionLimit(false); - return dispatcherPolicy; - } - - private static class ThreadLocalMessageHolder extends ThreadLocal>> { @Override diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/AbstractChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/AbstractChannelParser.java index aec8dd275a..819a88fac3 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/AbstractChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/AbstractChannelParser.java @@ -27,7 +27,6 @@ import org.springframework.beans.factory.support.ManagedList; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.interceptor.MessageSelectingInterceptor; import org.springframework.integration.message.selector.PayloadTypeSelector; import org.springframework.util.StringUtils; @@ -39,10 +38,6 @@ import org.springframework.util.StringUtils; */ public abstract class AbstractChannelParser extends AbstractSingleBeanDefinitionParser { - private static final String PUBLISH_SUBSCRIBE_ATTRIBUTE = "publish-subscribe"; - - private static final String DISPATCHER_POLICY_ELEMENT = "dispatcher-policy"; - private static final String DATATYPE_ATTRIBUTE = "datatype"; private static final String INTERCEPTOR_ELEMENT = "interceptor"; @@ -63,26 +58,19 @@ public abstract class AbstractChannelParser extends AbstractSingleBeanDefinition @Override protected abstract Class getBeanClass(Element element); - protected abstract void configureConstructorArgs( - BeanDefinitionBuilder builder, Element element, DispatcherPolicy dispatcherPolicy); + protected void configureConstructorArgs(BeanDefinitionBuilder builder, Element element) { + } @Override + @SuppressWarnings("unchecked") protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { - boolean isPublishSubscribe = "true".equals(element.getAttribute(PUBLISH_SUBSCRIBE_ATTRIBUTE)); - DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(isPublishSubscribe); ManagedList interceptors = new ManagedList(); NodeList childNodes = element.getChildNodes(); for (int i = 0; i < childNodes.getLength(); i++) { Node child = childNodes.item(i); - if (child.getNodeType() == Node.ELEMENT_NODE) { - String localName = child.getLocalName(); - if (DISPATCHER_POLICY_ELEMENT.equals(localName)) { - configureDispatcherPolicy((Element) child, dispatcherPolicy); - } - else if (INTERCEPTOR_ELEMENT.equals(localName)) { - String ref = ((Element) child).getAttribute("ref"); - interceptors.add(new RuntimeBeanReference(ref)); - } + if (child.getNodeType() == Node.ELEMENT_NODE && child.getLocalName().equals(INTERCEPTOR_ELEMENT)) { + String ref = ((Element) child).getAttribute("ref"); + interceptors.add(new RuntimeBeanReference(ref)); } } String datatypeAttr = element.getAttribute(DATATYPE_ATTRIBUTE); @@ -101,22 +89,7 @@ public abstract class AbstractChannelParser extends AbstractSingleBeanDefinition interceptors.add(new RuntimeBeanReference(interceptorBeanName)); } builder.addPropertyValue(INTERCEPTORS_PROPERTY, interceptors); - this.configureConstructorArgs(builder, element, dispatcherPolicy); - } - - private void configureDispatcherPolicy(Element element, DispatcherPolicy dispatcherPolicy) { - String rejectionLimit = element.getAttribute("rejection-limit"); - if (StringUtils.hasText(rejectionLimit)) { - dispatcherPolicy.setRejectionLimit(Integer.parseInt(rejectionLimit)); - } - String retryInterval = element.getAttribute("retry-interval"); - if (StringUtils.hasText(retryInterval)) { - dispatcherPolicy.setRetryInterval(Long.parseLong(retryInterval)); - } - String shouldFailOnRejectionLimit = element.getAttribute("should-fail-on-rejection-limit"); - if (StringUtils.hasText(shouldFailOnRejectionLimit)) { - dispatcherPolicy.setShouldFailOnRejectionLimit("true".equals(shouldFailOnRejectionLimit)); - } + this.configureConstructorArgs(builder, element); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/PriorityChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/PriorityChannelParser.java index 49b11b1395..539cb01329 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/PriorityChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/PriorityChannelParser.java @@ -19,7 +19,6 @@ package org.springframework.integration.channel.config; import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.PriorityChannel; import org.springframework.util.StringUtils; @@ -36,8 +35,8 @@ public class PriorityChannelParser extends QueueChannelParser { } @Override - protected void configureConstructorArgs(BeanDefinitionBuilder builder, Element element, DispatcherPolicy dispatcherPolicy) { - super.configureConstructorArgs(builder, element, dispatcherPolicy); + protected void configureConstructorArgs(BeanDefinitionBuilder builder, Element element) { + super.configureConstructorArgs(builder, element); String comparator = element.getAttribute("comparator"); if (StringUtils.hasText(comparator)) { builder.addConstructorArgReference(comparator); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/QueueChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/QueueChannelParser.java index 32d0fc6a51..3be4fc14b7 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/QueueChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/QueueChannelParser.java @@ -19,7 +19,6 @@ package org.springframework.integration.channel.config; import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.QueueChannel; import org.springframework.util.StringUtils; @@ -36,12 +35,11 @@ public class QueueChannelParser extends AbstractChannelParser { } @Override - protected void configureConstructorArgs(BeanDefinitionBuilder builder, Element element, DispatcherPolicy dispatcherPolicy) { + protected void configureConstructorArgs(BeanDefinitionBuilder builder, Element element) { String capacityAttribute = element.getAttribute("capacity"); int capacity = (StringUtils.hasText(capacityAttribute)) ? Integer.parseInt(capacityAttribute) : QueueChannel.DEFAULT_CAPACITY; builder.addConstructorArgValue(capacity); - builder.addConstructorArgValue(dispatcherPolicy); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/RendezvousChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/RendezvousChannelParser.java index 6860563d80..6e40f4ecdb 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/RendezvousChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/RendezvousChannelParser.java @@ -18,8 +18,6 @@ package org.springframework.integration.channel.config; import org.w3c.dom.Element; -import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.RendezvousChannel; /** @@ -34,9 +32,4 @@ public class RendezvousChannelParser extends AbstractChannelParser { return RendezvousChannel.class; } - @Override - protected void configureConstructorArgs(BeanDefinitionBuilder builder, Element element, DispatcherPolicy dispatcherPolicy) { - builder.addConstructorArgValue(dispatcherPolicy); - } - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/ThreadLocalChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/ThreadLocalChannelParser.java index 38fe4d5821..1b7bc39106 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/ThreadLocalChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/ThreadLocalChannelParser.java @@ -18,8 +18,6 @@ package org.springframework.integration.channel.config; import org.w3c.dom.Element; -import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.ThreadLocalChannel; /** @@ -34,8 +32,4 @@ public class ThreadLocalChannelParser extends AbstractChannelParser { return ThreadLocalChannel.class; } - @Override - protected void configureConstructorArgs(BeanDefinitionBuilder builder, Element element, DispatcherPolicy dispatcherPolicy) { - } - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/AbstractChannelFactory.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/AbstractChannelFactory.java index bd0e6241ad..306c7c3283 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/AbstractChannelFactory.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/AbstractChannelFactory.java @@ -20,7 +20,6 @@ import java.util.List; import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.integration.channel.ChannelInterceptor; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.MessageChannel; /** @@ -35,8 +34,8 @@ public abstract class AbstractChannelFactory implements ChannelFactory { super(); } - public final MessageChannel getChannel(String name, DispatcherPolicy dispatcherPolicy, List interceptors) { - AbstractMessageChannel channel = createChannelInternal(dispatcherPolicy); + public final MessageChannel getChannel(String name, List interceptors) { + AbstractMessageChannel channel = createChannelInternal(); if (null != interceptors) { channel.setInterceptors(interceptors); } @@ -50,6 +49,6 @@ public abstract class AbstractChannelFactory implements ChannelFactory { * Factory method to be overridden by subclasses. It assumes that subclasses will return * subclasses of AbstractMessageChannel. */ - protected abstract AbstractMessageChannel createChannelInternal(DispatcherPolicy dispatcherPolicy); + protected abstract AbstractMessageChannel createChannelInternal(); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/ChannelFactory.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/ChannelFactory.java index ba230e5a97..88a916b917 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/ChannelFactory.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/ChannelFactory.java @@ -19,7 +19,6 @@ package org.springframework.integration.channel.factory; import java.util.List; import org.springframework.integration.channel.ChannelInterceptor; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.MessageChannel; /** @@ -30,8 +29,8 @@ import org.springframework.integration.channel.MessageChannel; public interface ChannelFactory { /** - * Creates a channel based on the provided name, dispatcher policy, and interceptors. + * Creates a channel based on the provided name and interceptors. */ - MessageChannel getChannel(String name, DispatcherPolicy dispatcherPolicy, List interceptors); + MessageChannel getChannel(String name, List interceptors); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/PriorityChannelFactory.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/PriorityChannelFactory.java index 85a6a93924..834e4219bd 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/PriorityChannelFactory.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/PriorityChannelFactory.java @@ -19,7 +19,6 @@ package org.springframework.integration.channel.factory; import java.util.Comparator; import org.springframework.integration.channel.AbstractMessageChannel; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.PriorityChannel; import org.springframework.integration.message.Message; import org.springframework.util.Assert; @@ -46,8 +45,8 @@ public class PriorityChannelFactory extends AbstractChannelFactory { } @Override - protected AbstractMessageChannel createChannelInternal(DispatcherPolicy dispatcherPolicy) { - return new PriorityChannel(this.capacity, dispatcherPolicy, this.comparator); + protected AbstractMessageChannel createChannelInternal() { + return new PriorityChannel(this.capacity, this.comparator); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/QueueChannelFactory.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/QueueChannelFactory.java index 87228abff8..be68f95e32 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/QueueChannelFactory.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/QueueChannelFactory.java @@ -16,9 +16,7 @@ package org.springframework.integration.channel.factory; - import org.springframework.integration.channel.AbstractMessageChannel; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.QueueChannel; import org.springframework.util.Assert; @@ -46,8 +44,8 @@ public class QueueChannelFactory extends AbstractChannelFactory{ this.queueCapacity = queueCapacity; } - protected AbstractMessageChannel createChannelInternal(DispatcherPolicy dispatcherPolicy) { - return new QueueChannel(queueCapacity, dispatcherPolicy); + protected AbstractMessageChannel createChannelInternal() { + return new QueueChannel(queueCapacity); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/RendezvousChannelFactory.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/RendezvousChannelFactory.java index 704656effa..08f541b415 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/RendezvousChannelFactory.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/RendezvousChannelFactory.java @@ -17,7 +17,6 @@ package org.springframework.integration.channel.factory; import org.springframework.integration.channel.AbstractMessageChannel; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.RendezvousChannel; /** @@ -28,8 +27,8 @@ import org.springframework.integration.channel.RendezvousChannel; public class RendezvousChannelFactory extends AbstractChannelFactory { @Override - protected AbstractMessageChannel createChannelInternal(DispatcherPolicy dispatcherPolicy) { - return new RendezvousChannel(dispatcherPolicy); + protected AbstractMessageChannel createChannelInternal() { + return new RendezvousChannel(); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/ThreadLocalChannelFactory.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/ThreadLocalChannelFactory.java index ce55bfed24..3f18c2c428 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/ThreadLocalChannelFactory.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/factory/ThreadLocalChannelFactory.java @@ -17,7 +17,6 @@ package org.springframework.integration.channel.factory; import org.springframework.integration.channel.AbstractMessageChannel; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.ThreadLocalChannel; /** @@ -28,7 +27,7 @@ import org.springframework.integration.channel.ThreadLocalChannel; public class ThreadLocalChannelFactory extends AbstractChannelFactory { @Override - protected AbstractMessageChannel createChannelInternal(DispatcherPolicy dispatcherPolicy) { + protected AbstractMessageChannel createChannelInternal() { return new ThreadLocalChannel(); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/DefaultChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/DefaultChannelParser.java index b2b332cd1c..bbe90fbf56 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/DefaultChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/DefaultChannelParser.java @@ -16,9 +16,7 @@ package org.springframework.integration.config; -import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.integration.bus.DefaultChannelFactoryBean; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.config.AbstractChannelParser; import org.w3c.dom.Element; @@ -35,9 +33,4 @@ public class DefaultChannelParser extends AbstractChannelParser { return DefaultChannelFactoryBean.class; } - @Override - protected void configureConstructorArgs(BeanDefinitionBuilder builder, Element element, DispatcherPolicy dispatcherPolicy) { - builder.addConstructorArgValue(dispatcherPolicy); - } - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/DirectChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/DirectChannelParser.java index d994876d02..27504bcf75 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/DirectChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/DirectChannelParser.java @@ -19,7 +19,6 @@ package org.springframework.integration.config; import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.config.AbstractChannelParser; import org.springframework.integration.dispatcher.DirectChannel; import org.springframework.util.StringUtils; @@ -37,7 +36,7 @@ public class DirectChannelParser extends AbstractChannelParser { } @Override - protected void configureConstructorArgs(BeanDefinitionBuilder builder, Element element, DispatcherPolicy dispatcherPolicy) { + protected void configureConstructorArgs(BeanDefinitionBuilder builder, Element element) { String source = element.getAttribute("source"); if (StringUtils.hasText(source)) { builder.addConstructorArgReference(source); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceHandler.java index 5daaf7237d..554dd9be92 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceHandler.java @@ -62,6 +62,7 @@ public class IntegrationNamespaceHandler extends NamespaceHandlerSupport { registerBeanDefinitionParser("rendezvous-channel", new RendezvousChannelParser()); registerBeanDefinitionParser("thread-local-channel", new ThreadLocalChannelParser()); registerBeanDefinitionParser("handler-endpoint", new DefaultHandlerEndpointParser()); + registerBeanDefinitionParser("service-activator", new ServiceActivatorParser()); registerBeanDefinitionParser("channel-adapter", new ChannelAdapterParser()); registerBeanDefinitionParser("gateway", new GatewayParser()); registerBeanDefinitionParser("handler", new HandlerParser()); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/PublishSubscribeChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/PublishSubscribeChannelParser.java index ea8841cd92..bfe851a761 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/PublishSubscribeChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/PublishSubscribeChannelParser.java @@ -19,7 +19,6 @@ package org.springframework.integration.config; import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.config.AbstractChannelParser; import org.springframework.integration.dispatcher.PublishSubscribeChannel; import org.springframework.util.StringUtils; @@ -37,7 +36,7 @@ public class PublishSubscribeChannelParser extends AbstractChannelParser { } @Override - protected void configureConstructorArgs(BeanDefinitionBuilder builder, Element element, DispatcherPolicy dispatcherPolicy) { + protected void configureConstructorArgs(BeanDefinitionBuilder builder, Element element) { String taskExecutorRef = element.getAttribute("task-executor"); if (StringUtils.hasText(taskExecutorRef)) { builder.addConstructorArgReference(taskExecutorRef); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ServiceActivatorParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ServiceActivatorParser.java new file mode 100644 index 0000000000..c9e089b90c --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ServiceActivatorParser.java @@ -0,0 +1,34 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config; + +import org.springframework.integration.handler.DefaultMessageHandlerAdapter; +import org.springframework.integration.handler.MessageHandler; + +/** + * Parser for the <service-activator> element. + * + * @author Mark Fisher + */ +public class ServiceActivatorParser extends AbstractHandlerEndpointParser { + + @Override + protected Class getHandlerAdapterClass() { + return DefaultMessageHandlerAdapter.class; + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd index f5a8f7a1ef..be08382c36 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd @@ -132,7 +132,6 @@ - @@ -192,6 +191,17 @@ + + + + Defines an endpoint for exposing any bean reference as a service that receives + request Messages from an 'input-channel' and may send reply Messages to an + 'output-channel'. The 'method' attribute is also required unless the 'ref' + points to a bean that is itself an implementation of MessageHandler. + + + + @@ -305,19 +315,6 @@ - - - - - Defines a dispatcher policy. - - - - - - - - diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannel.java index 1211a4fd1a..b170558c83 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannel.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.springframework.integration.channel.AbstractMessageChannel; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageSource; @@ -52,9 +51,8 @@ public class DirectChannel extends AbstractMessageChannel implements Subscribabl } public DirectChannel(MessageSource source) { - super(defaultDispatcherPolicy()); this.source = source; - this.dispatcher = new SimpleDispatcher(this.getDispatcherPolicy()); + this.dispatcher = new SimpleDispatcher(); } @@ -99,13 +97,4 @@ public class DirectChannel extends AbstractMessageChannel implements Subscribabl return new ArrayList>(); } - - private static DispatcherPolicy defaultDispatcherPolicy() { - DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(false); - dispatcherPolicy.setRejectionLimit(1); - dispatcherPolicy.setRetryInterval(0); - dispatcherPolicy.setShouldFailOnRejectionLimit(false); - return dispatcherPolicy; - } - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannelFactory.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannelFactory.java index b05dda2c83..707e136d88 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannelFactory.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannelFactory.java @@ -17,7 +17,6 @@ package org.springframework.integration.dispatcher; import org.springframework.integration.channel.AbstractMessageChannel; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.factory.AbstractChannelFactory; import org.springframework.integration.channel.factory.ChannelFactory; @@ -29,8 +28,8 @@ import org.springframework.integration.channel.factory.ChannelFactory; public class DirectChannelFactory extends AbstractChannelFactory { @Override - protected AbstractMessageChannel createChannelInternal(DispatcherPolicy dispatcherPolicy) { - return new DirectChannel(null); + protected AbstractMessageChannel createChannelInternal() { + return new DirectChannel(); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PublishSubscribeChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PublishSubscribeChannel.java index c21f951fc9..1fb7e5b32a 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PublishSubscribeChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PublishSubscribeChannel.java @@ -20,7 +20,6 @@ import java.util.List; import org.springframework.core.task.TaskExecutor; import org.springframework.integration.channel.AbstractMessageChannel; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageTarget; import org.springframework.integration.message.Subscribable; @@ -35,7 +34,6 @@ public class PublishSubscribeChannel extends AbstractMessageChannel implements S public PublishSubscribeChannel() { - super(new DispatcherPolicy(true)); } /** @@ -43,7 +41,6 @@ public class PublishSubscribeChannel extends AbstractMessageChannel implements S * to publish its Messages. */ public PublishSubscribeChannel(TaskExecutor taskExecutor) { - this(); if (taskExecutor != null) { this.dispatcher.setTaskExecutor(taskExecutor); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java index 927426d290..f603105df0 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java @@ -20,12 +20,12 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.handler.MessageHandlerNotRunningException; import org.springframework.integration.handler.MessageHandlerRejectedExecutionException; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageTarget; +import org.springframework.util.Assert; /** * Basic implementation of {@link MessageDispatcher}. @@ -34,25 +34,54 @@ import org.springframework.integration.message.MessageTarget; */ public class SimpleDispatcher extends AbstractDispatcher { - protected final DispatcherPolicy dispatcherPolicy; + public final static int DEFAULT_REJECTION_LIMIT = 1; + + public final static long DEFAULT_RETRY_INTERVAL = 1000; - public SimpleDispatcher(DispatcherPolicy dispatcherPolicy) { - this.dispatcherPolicy = (dispatcherPolicy != null) ? dispatcherPolicy : new DispatcherPolicy(); + private volatile int rejectionLimit = DEFAULT_REJECTION_LIMIT; + + private volatile long retryInterval = DEFAULT_RETRY_INTERVAL; + + private volatile boolean shouldFailOnRejectionLimit = true; + + + /** + * Set the maximum number of retries upon rejection. + */ + public void setRejectionLimit(int rejectionLimit) { + Assert.isTrue(rejectionLimit > 0, "'rejectionLimit' must be at least 1"); + this.rejectionLimit = rejectionLimit; + } + + /** + * Set the amount of time in milliseconds to wait between rejections. + */ + public void setRetryInterval(long retryInterval) { + Assert.isTrue(retryInterval >= 0, "'retryInterval' must not be negative"); + this.retryInterval = retryInterval; + } + + /** + * Specify whether an exception should be thrown when this dispatcher's + * {@link #rejectionLimit} is reached. The default value is 'true'. + */ + public void setShouldFailOnRejectionLimit(boolean shouldFailOnRejectionLimit) { + this.shouldFailOnRejectionLimit = shouldFailOnRejectionLimit; } public boolean send(Message message) { int attempts = 0; List targetList = new ArrayList(this.targets); - while (attempts < this.dispatcherPolicy.getRejectionLimit()) { + while (attempts < this.rejectionLimit) { if (attempts > 0) { if (logger.isDebugEnabled()) { logger.debug("target(s) rejected message after " + attempts + " attempt(s), will try again after 'retryInterval' of " + - this.dispatcherPolicy.getRetryInterval() + " milliseconds"); + this.retryInterval + " milliseconds"); } try { - Thread.sleep(this.dispatcherPolicy.getRetryInterval()); + Thread.sleep(this.retryInterval); } catch (InterruptedException iex) { Thread.currentThread().interrupt(); @@ -70,11 +99,10 @@ public class SimpleDispatcher extends AbstractDispatcher { while (iter.hasNext()) { MessageTarget target = iter.next(); try { - boolean sent = this.sendMessageToTarget(message, target); - if (!this.dispatcherPolicy.isPublishSubscribe() && sent) { + if (this.sendMessageToTarget(message, target)) { return true; } - if (!sent && logger.isDebugEnabled()) { + if (logger.isDebugEnabled()) { logger.debug("target rejected message, continuing with other targets if available"); } iter.remove(); @@ -96,9 +124,9 @@ public class SimpleDispatcher extends AbstractDispatcher { } attempts++; } - if (this.dispatcherPolicy.getShouldFailOnRejectionLimit()) { + if (this.shouldFailOnRejectionLimit) { throw new MessageDeliveryException(message, "Dispatcher reached rejection limit of " - + this.dispatcherPolicy.getRejectionLimit() + + this.rejectionLimit + ". Consider increasing the target's concurrency and/or " + "the dispatcherPolicy's 'rejectionLimit'."); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/RequestReplyTemplate.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/RequestReplyTemplate.java index fbf37d19f3..fc1907e20c 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/RequestReplyTemplate.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/RequestReplyTemplate.java @@ -21,7 +21,6 @@ import java.util.List; import org.springframework.integration.ConfigurationException; import org.springframework.integration.bus.MessageBus; import org.springframework.integration.bus.MessageBusAware; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.RendezvousChannel; import org.springframework.integration.endpoint.EndpointRegistry; @@ -239,10 +238,6 @@ public class RequestReplyTemplate implements MessageBusAware { return null; } - public DispatcherPolicy getDispatcherPolicy() { - return null; - } - public String getName() { return null; } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/PriorityChannelTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/PriorityChannelTests.java index 8db3d64c91..7a33fe3862 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/PriorityChannelTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/PriorityChannelTests.java @@ -66,7 +66,7 @@ public class PriorityChannelTests { @Test public void testCustomComparator() { - PriorityChannel channel = new PriorityChannel(5, null, new StringPayloadComparator()); + PriorityChannel channel = new PriorityChannel(5, new StringPayloadComparator()); Message messageA = new StringMessage("A"); Message messageB = new StringMessage("B"); Message messageC = new StringMessage("C"); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java index 659f6462ea..01135fc56b 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java @@ -26,7 +26,6 @@ import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.FatalBeanException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.TestChannelInterceptor; @@ -88,29 +87,6 @@ public class ChannelParserTests { assertEquals(taskExecutorBean, taskExecutorProperty); } - @Test - public void testDefaultDispatcherPolicy() throws InterruptedException { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "channelParserTests.xml", this.getClass()); - MessageChannel channel = (MessageChannel) context.getBean("queueChannelByDefault"); - DispatcherPolicy dispatcherPolicy = channel.getDispatcherPolicy(); - assertFalse(dispatcherPolicy.isPublishSubscribe()); - assertEquals(DispatcherPolicy.DEFAULT_REJECTION_LIMIT, dispatcherPolicy.getRejectionLimit()); - assertEquals(DispatcherPolicy.DEFAULT_RETRY_INTERVAL, dispatcherPolicy.getRetryInterval()); - assertTrue(dispatcherPolicy.getShouldFailOnRejectionLimit()); - } - - @Test - public void testDispatcherPolicyConfiguration() throws InterruptedException { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "channelParserTests.xml", this.getClass()); - MessageChannel channel = (MessageChannel) context.getBean("channelWithDispatcherPolicy"); - DispatcherPolicy dispatcherPolicy = channel.getDispatcherPolicy(); - assertEquals(7, dispatcherPolicy.getRejectionLimit()); - assertEquals(77, dispatcherPolicy.getRetryInterval()); - assertFalse(dispatcherPolicy.getShouldFailOnRejectionLimit()); - } - @Test public void testDatatypeChannelWithCorrectType() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml index 311e5ab148..0ea9adaf92 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml @@ -16,12 +16,6 @@ - - - - diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/ChannelFactoryTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/ChannelFactoryTests.java index 4a532f63d6..dfcd59b09c 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/ChannelFactoryTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/ChannelFactoryTests.java @@ -28,14 +28,11 @@ import org.junit.Test; import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.context.ApplicationContext; -import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.context.support.StaticApplicationContext; import org.springframework.integration.bus.DefaultChannelFactoryBean; import org.springframework.integration.bus.DefaultMessageBus; import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.integration.channel.ChannelInterceptor; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PriorityChannel; import org.springframework.integration.channel.QueueChannel; @@ -53,8 +50,6 @@ public class ChannelFactoryTests { private final ArrayList interceptors = new ArrayList(); - private final DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); - @Before public void initInterceptorsList() { @@ -76,7 +71,7 @@ public class ChannelFactoryTests { DirectChannelFactory channelFactory = new DirectChannelFactory(); assertNotNull(interceptors); AbstractMessageChannel channel = (AbstractMessageChannel) - channelFactory.getChannel("testChannel", dispatcherPolicy, interceptors); + channelFactory.getChannel("testChannel", interceptors); assertEquals(DirectChannel.class, channel.getClass()); assertEquals("testChannel", channel.getName()); assertInterceptors(channel); @@ -99,7 +94,7 @@ public class ChannelFactoryTests { ThreadLocalChannelFactory channelFactory = new ThreadLocalChannelFactory(); assertNotNull(interceptors); AbstractMessageChannel channel = (AbstractMessageChannel) - channelFactory.getChannel("testChannel", dispatcherPolicy, interceptors); + channelFactory.getChannel("testChannel", interceptors); assertEquals(ThreadLocalChannel.class, channel.getClass()); assertEquals("testChannel", channel.getName()); assertInterceptors(channel); @@ -114,36 +109,21 @@ public class ChannelFactoryTests { BeanDefinitionBuilder messageBusDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(DefaultMessageBus.class); messageBusDefinitionBuilder.getBeanDefinition().getPropertyValues().addPropertyValue("channelFactory", channelFactory); applicationContext.registerBeanDefinition("messageBus", messageBusDefinitionBuilder.getBeanDefinition()); - DefaultChannelFactoryBean channelFactoryBean = new DefaultChannelFactoryBean(dispatcherPolicy); + DefaultChannelFactoryBean channelFactoryBean = new DefaultChannelFactoryBean(); channelFactoryBean.setBeanName("testChannel"); channelFactoryBean.setApplicationContext(applicationContext); channelFactoryBean.setInterceptors(interceptors); StubChannel channel = (StubChannel) channelFactoryBean.getObject(); assertEquals("testChannel", channel.getName()); - assertTrue(dispatcherPolicy == channel.getDispatcherPolicy()); assertInterceptors(channel); } - @Test - public void testDefaultChannelFactoryBeanInApplicationContext() throws Exception{ - ApplicationContext context = new ClassPathXmlApplicationContext( - "defaultChannelFactoryBeanTests.xml", this.getClass()); - MessageChannel channel = (MessageChannel) context.getBean("testChannel"); - assertEquals(StubChannel.class, channel.getClass()); - assertEquals("testChannel", channel.getName()); - DispatcherPolicy dispatcherPolicy = (DispatcherPolicy) context.getBean("dispatcherPolicy"); - assertTrue(dispatcherPolicy == channel.getDispatcherPolicy()); - } - private void genericChannelFactoryTests(ChannelFactory channelFactory, Class expectedChannelClass) { - assertNotNull(dispatcherPolicy); assertNotNull(interceptors); - AbstractMessageChannel channel = (AbstractMessageChannel) - channelFactory.getChannel("testChannel", dispatcherPolicy, interceptors); + AbstractMessageChannel channel = (AbstractMessageChannel) channelFactory.getChannel("testChannel", interceptors); assertEquals(expectedChannelClass, channel.getClass()); assertEquals("testChannel", channel.getName()); - assertTrue(channel.getDispatcherPolicy() == dispatcherPolicy); assertInterceptors(channel); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/StubChannel.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/StubChannel.java index 3a555761e6..0ec914431f 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/StubChannel.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/StubChannel.java @@ -19,7 +19,6 @@ package org.springframework.integration.channel.factory; import java.util.List; import org.springframework.integration.channel.AbstractMessageChannel; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.message.Message; import org.springframework.integration.message.selector.MessageSelector; @@ -28,10 +27,6 @@ import org.springframework.integration.message.selector.MessageSelector; */ public class StubChannel extends AbstractMessageChannel { - public StubChannel(DispatcherPolicy dispatcherPolicy) { - super(dispatcherPolicy); - } - @Override protected Message doReceive(long timeout) { return null; diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/StubChannelFactory.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/StubChannelFactory.java index 0bd92d9f74..976a795f54 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/StubChannelFactory.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/StubChannelFactory.java @@ -17,7 +17,6 @@ package org.springframework.integration.channel.factory; import org.springframework.integration.channel.AbstractMessageChannel; -import org.springframework.integration.channel.DispatcherPolicy; /** * @author Marius Bogoevici @@ -25,8 +24,8 @@ import org.springframework.integration.channel.DispatcherPolicy; public class StubChannelFactory extends AbstractChannelFactory { @Override - protected AbstractMessageChannel createChannelInternal(DispatcherPolicy dispatcherPolicy) { - return new StubChannel(dispatcherPolicy); + protected AbstractMessageChannel createChannelInternal() { + return new StubChannel(); } - + } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/defaultChannelFactoryBeanTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/defaultChannelFactoryBeanTests.xml index 52f013bb82..46ab83003e 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/defaultChannelFactoryBeanTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/defaultChannelFactoryBeanTests.xml @@ -11,10 +11,6 @@ - - - - - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java new file mode 100644 index 0000000000..aa13cac0b3 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java @@ -0,0 +1,60 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.dispatcher; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import org.springframework.integration.endpoint.HandlerEndpoint; +import org.springframework.integration.handler.MessageHandler; +import org.springframework.integration.handler.TestHandlers; +import org.springframework.integration.message.MessageTarget; +import org.springframework.integration.message.StringMessage; + +/** + * @author Mark Fisher + */ +public class BroadcastingDispatcherTests { + + @Test + public void testPublishSubscribe() throws InterruptedException { + BroadcastingDispatcher dispatcher = new BroadcastingDispatcher(); + final CountDownLatch latch = new CountDownLatch(2); + final AtomicInteger counter1 = new AtomicInteger(); + final AtomicInteger counter2 = new AtomicInteger(); + dispatcher.addTarget(createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch))); + dispatcher.addTarget(createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch))); + dispatcher.send(new StringMessage("test")); + latch.await(500, TimeUnit.MILLISECONDS); + assertEquals(0, latch.getCount()); + assertEquals(1, counter1.get()); + assertEquals(1, counter2.get()); + } + + + private static MessageTarget createEndpoint(MessageHandler handler) { + HandlerEndpoint endpoint = new HandlerEndpoint(handler); + endpoint.start(); + return endpoint; + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java index db877afdfd..4207be209f 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; -import org.springframework.integration.channel.DispatcherPolicy; import org.springframework.integration.endpoint.HandlerEndpoint; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.handler.TestHandlers; @@ -38,7 +37,7 @@ public class SimpleDispatcherTests { @Test public void testSingleMessage() throws InterruptedException { - SimpleDispatcher dispatcher = new SimpleDispatcher(new DispatcherPolicy()); + SimpleDispatcher dispatcher = new SimpleDispatcher(); final CountDownLatch latch = new CountDownLatch(1); dispatcher.addTarget(createEndpoint(TestHandlers.countDownHandler(latch))); dispatcher.send(new StringMessage("test")); @@ -48,7 +47,7 @@ public class SimpleDispatcherTests { @Test public void testPointToPoint() throws InterruptedException { - SimpleDispatcher dispatcher = new SimpleDispatcher(new DispatcherPolicy(false)); + SimpleDispatcher dispatcher = new SimpleDispatcher(); final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger counter1 = new AtomicInteger(); final AtomicInteger counter2 = new AtomicInteger(); @@ -60,21 +59,6 @@ public class SimpleDispatcherTests { assertEquals("only 1 handler should have received the message", 1, counter1.get() + counter2.get()); } - @Test - public void testPublishSubscribe() throws InterruptedException { - SimpleDispatcher dispatcher = new SimpleDispatcher(new DispatcherPolicy(true)); - final CountDownLatch latch = new CountDownLatch(2); - final AtomicInteger counter1 = new AtomicInteger(); - final AtomicInteger counter2 = new AtomicInteger(); - dispatcher.addTarget(createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch))); - dispatcher.addTarget(createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch))); - dispatcher.send(new StringMessage("test")); - latch.await(500, TimeUnit.MILLISECONDS); - assertEquals(0, latch.getCount()); - assertEquals(1, counter1.get()); - assertEquals(1, counter2.get()); - } - private static MessageTarget createEndpoint(MessageHandler handler) { HandlerEndpoint endpoint = new HandlerEndpoint(handler); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/returnAddressTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/returnAddressTests.xml index c2388cc476..6f68d5db6d 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/returnAddressTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/returnAddressTests.xml @@ -18,16 +18,16 @@ - - - - - - + + + +