diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java index 6651142f7b..2c174f5907 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java @@ -44,8 +44,6 @@ public abstract class AbstractConsumerEndpointParser extends AbstractBeanDefinit private static final String POLLER_ELEMENT = "poller"; - private static final String SELECTOR_ATTRIBUTE = "selector"; - @Override protected boolean shouldGenerateId() { @@ -77,8 +75,6 @@ public abstract class AbstractConsumerEndpointParser extends AbstractBeanDefinit BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ConsumerEndpointFactoryBean.class); String handlerBeanName = BeanDefinitionReaderUtils.registerWithGeneratedName(handlerBeanDefinition, parserContext.getRegistry()); builder.addConstructorArgReference(handlerBeanName); - // TODO: remove the 'selector' - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(handlerBuilder, element, SELECTOR_ATTRIBUTE); String inputChannelAttributeName = this.getInputChannelAttributeName(); String inputChannelName = element.getAttribute(inputChannelAttributeName); Assert.hasText(inputChannelName, "the '" + inputChannelAttributeName + "' attribute is required"); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd index e969776b41..a03f9e09eb 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd @@ -227,7 +227,6 @@ - diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/filter/MessageFilter.java b/org.springframework.integration/src/main/java/org/springframework/integration/filter/MessageFilter.java index f27c52638e..cf00c94823 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/filter/MessageFilter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/filter/MessageFilter.java @@ -19,32 +19,55 @@ package org.springframework.integration.filter; import org.springframework.integration.core.Message; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; import org.springframework.integration.handler.ReplyMessageHolder; +import org.springframework.integration.message.MessageRejectedException; import org.springframework.integration.selector.MessageSelector; import org.springframework.util.Assert; /** * Message Handler that delegates to a {@link MessageSelector}. If and only if * the selector {@link MessageSelector#accept(Message) accepts} the Message, it - * will be passed to this filter's output channel. + * will be passed to this filter's output channel. Otherwise the message will + * either be silently dropped (the default) or will trigger the throwing of a + * {@link MessageRejectedException} depending on the value of its + * {@link #throwExceptionOnRejection} property. * * @author Mark Fisher */ public class MessageFilter extends AbstractReplyProducingMessageHandler { - private MessageSelector selector; + private final MessageSelector selector; + + private volatile boolean throwExceptionOnRejection; + /** + * Create a MessageFilter that will delegate to the given + * {@link MessageSelector}. + */ public MessageFilter(MessageSelector selector) { Assert.notNull(selector, "selector must not be null"); this.selector = selector; } + /** + * Specify whether this filter should throw a + * {@link MessageRejectedException} when its selector does not accept a + * Message. The default value is false meaning that rejected + * Messages will be quietly dropped. + */ + public void setThrowExceptionOnRejection(boolean throwExceptionOnRejection) { + this.throwExceptionOnRejection = throwExceptionOnRejection; + } + @Override protected void handleRequestMessage(Message message, ReplyMessageHolder replyHolder) { if (this.selector.accept(message)) { replyHolder.set(message); } + else if (this.throwExceptionOnRejection) { + throw new MessageRejectedException(message); + } } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandler.java index 150956c5c0..a9e4d0d78e 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandler.java @@ -27,8 +27,6 @@ import org.springframework.integration.core.MessageChannel; import org.springframework.integration.core.MessageHeaders; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.MessageHandlingException; -import org.springframework.integration.message.MessageRejectedException; -import org.springframework.integration.selector.MessageSelector; import org.springframework.util.Assert; /** @@ -45,8 +43,6 @@ public abstract class AbstractReplyProducingMessageHandler extends AbstractMessa private volatile ChannelResolver channelResolver; - private volatile MessageSelector selector; - private volatile boolean requiresReply = false; private final MessageChannelTemplate channelTemplate; @@ -78,10 +74,6 @@ public abstract class AbstractReplyProducingMessageHandler extends AbstractMessa this.channelResolver = channelResolver; } - public void setSelector(MessageSelector selector) { - this.selector = selector; - } - public void setRequiresReply(boolean requiresReply) { this.requiresReply = requiresReply; } @@ -94,9 +86,6 @@ public abstract class AbstractReplyProducingMessageHandler extends AbstractMessa @Override protected final void handleMessageInternal(Message message) { - if (!this.supports(message)) { - throw new MessageRejectedException(message, "unsupported message"); - } ReplyMessageHolder replyMessageHolder = new ReplyMessageHolder(); this.handleRequestMessage(message, replyMessageHolder); if (replyMessageHolder.isEmpty()) { @@ -123,16 +112,6 @@ public abstract class AbstractReplyProducingMessageHandler extends AbstractMessa protected abstract void handleRequestMessage(Message requestMessage, ReplyMessageHolder replyMessageHolder); - protected boolean supports(Message message) { - if (this.selector != null && !this.selector.accept(message)) { - if (logger.isDebugEnabled()) { - logger.debug("selector for handler '" + this + "' rejected message: " + message); - } - return false; - } - return true; - } - protected boolean sendReplyMessage(Message replyMessage, MessageChannel replyChannel) { return this.channelTemplate.send(replyMessage, replyChannel); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithSelector.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChainParserTests-context.xml similarity index 80% rename from org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithSelector.xml rename to org.springframework.integration/src/test/java/org/springframework/integration/config/ChainParserTests-context.xml index 63d9fa18e6..a3a45e9579 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithSelector.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChainParserTests-context.xml @@ -7,11 +7,16 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"> - + - - + + + + + + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChainParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChainParserTests.java new file mode 100644 index 0000000000..633dde9bd1 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChainParserTests.java @@ -0,0 +1,66 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import org.junit.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.integration.channel.PollableChannel; +import org.springframework.integration.core.Message; +import org.springframework.integration.core.MessageChannel; +import org.springframework.integration.message.MessageBuilder; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests; + +/** + * @author Mark Fisher + */ +@ContextConfiguration +public class ChainParserTests extends AbstractJUnit4SpringContextTests { + + @Autowired + @Qualifier("input") + private MessageChannel inputChannel; + + @Autowired + @Qualifier("output") + private PollableChannel outputChannel; + + + @Test + public void testChainWithAcceptingFilter() { + Message message = MessageBuilder.withPayload("test").build(); + this.inputChannel.send(message); + Message reply = this.outputChannel.receive(0); + assertNotNull(reply); + assertEquals("foo", reply.getPayload()); + } + + @Test + public void chainWithRejectingFilter() { + Message message = MessageBuilder.withPayload(123).build(); + this.inputChannel.send(message); + Message reply = this.outputChannel.receive(0); + assertNull(reply); + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java index 4d8a12e28d..27ff6e9e8a 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java @@ -17,7 +17,6 @@ package org.springframework.integration.config; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import java.util.concurrent.TimeUnit; @@ -25,12 +24,8 @@ import java.util.concurrent.TimeUnit; import org.junit.Test; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.core.Message; import org.springframework.integration.core.MessageChannel; import org.springframework.integration.message.GenericMessage; -import org.springframework.integration.message.MessageBuilder; -import org.springframework.integration.message.MessageRejectedException; /** * @author Mark Fisher @@ -50,29 +45,4 @@ public class EndpointParserTests { assertEquals("test", handler.getMessageString()); } - @Test - public void testEndpointWithSelectorAccepts() { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "endpointWithSelector.xml", this.getClass()); - MessageChannel inputChannel = (MessageChannel) context.getBean("testChannel"); - QueueChannel replyChannel = new QueueChannel(); - Message message = MessageBuilder.withPayload("test") - .setReplyChannel(replyChannel).build(); - inputChannel.send(message); - Message reply = replyChannel.receive(500); - assertNotNull(reply); - assertEquals("foo", reply.getPayload()); - } - - @Test(expected=MessageRejectedException.class) - public void testEndpointWithSelectorRejects() { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "endpointWithSelector.xml", this.getClass()); - MessageChannel inputChannel = (MessageChannel) context.getBean("testChannel"); - MessageChannel replyChannel = new QueueChannel(); - Message message = MessageBuilder.withPayload(123) - .setReplyChannel(replyChannel).build(); - inputChannel.send(message); - } - } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java index 3aa8bdfa4c..6f3567143d 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java @@ -27,14 +27,12 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.springframework.integration.core.Message; -import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; import org.springframework.integration.handler.ServiceActivatingHandler; import org.springframework.integration.message.MessageHandler; import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageRejectedException; import org.springframework.integration.message.StringMessage; import org.springframework.integration.message.TestHandlers; -import org.springframework.integration.selector.MessageSelector; /** * @author Mark Fisher @@ -153,62 +151,6 @@ public class SimpleDispatcherTests { dispatcher.dispatch(new StringMessage("test2")); } - @Test - public void handlersWithSelectorsAndOneAccepts() throws InterruptedException { - SimpleDispatcher dispatcher = new SimpleDispatcher(); - final CountDownLatch latch = new CountDownLatch(1); - final AtomicInteger counter1 = new AtomicInteger(); - final AtomicInteger counter2 = new AtomicInteger(); - final AtomicInteger counter3 = new AtomicInteger(); - final AtomicInteger selectorCounter = new AtomicInteger(); - AbstractReplyProducingMessageHandler consumer1 = createConsumer(TestHandlers.countingCountDownHandler(counter1, latch)); - AbstractReplyProducingMessageHandler consumer2 = createConsumer(TestHandlers.countingCountDownHandler(counter2, latch)); - AbstractReplyProducingMessageHandler consumer3 = createConsumer(TestHandlers.countingCountDownHandler(counter3, latch)); - consumer1.setSelector(new TestMessageSelector(selectorCounter, false)); - consumer2.setSelector(new TestMessageSelector(selectorCounter, false)); - consumer3.setSelector(new TestMessageSelector(selectorCounter, true)); - dispatcher.addHandler(consumer1); - dispatcher.addHandler(consumer2); - dispatcher.addHandler(consumer3); - dispatcher.dispatch(new StringMessage("test")); - assertEquals(0, latch.getCount()); - assertEquals("selectors should have been invoked one time each", 3, selectorCounter.get()); - assertEquals("consumer with rejecting selector should not have received the message", 0, counter1.get()); - assertEquals("consumer with rejecting selector should not have received the message", 0, counter2.get()); - assertEquals("consumer with accepting selector should have received the message", 1, counter3.get()); - } - - @Test - public void handlersWithSelectorsAndNoneAccept() throws InterruptedException { - SimpleDispatcher dispatcher = new SimpleDispatcher(); - final CountDownLatch latch = new CountDownLatch(2); - final AtomicInteger counter1 = new AtomicInteger(); - final AtomicInteger counter2 = new AtomicInteger(); - final AtomicInteger counter3 = new AtomicInteger(); - final AtomicInteger selectorCounter = new AtomicInteger(); - AbstractReplyProducingMessageHandler consumer1 = createConsumer(TestHandlers.countingCountDownHandler(counter1, latch)); - AbstractReplyProducingMessageHandler consumer2 = createConsumer(TestHandlers.countingCountDownHandler(counter2, latch)); - AbstractReplyProducingMessageHandler consumer3 = createConsumer(TestHandlers.countingCountDownHandler(counter3, latch)); - consumer1.setSelector(new TestMessageSelector(selectorCounter, false)); - consumer2.setSelector(new TestMessageSelector(selectorCounter, false)); - consumer3.setSelector(new TestMessageSelector(selectorCounter, false)); - dispatcher.addHandler(consumer1); - dispatcher.addHandler(consumer2); - dispatcher.addHandler(consumer3); - boolean exceptionThrown = false; - try { - dispatcher.dispatch(new StringMessage("test")); - } - catch (MessageRejectedException e) { - exceptionThrown = true; - } - assertTrue(exceptionThrown); - assertEquals("selectors should have been invoked one time each", 3, selectorCounter.get()); - assertEquals("consumer with rejecting selector should not have received the message", 0, counter1.get()); - assertEquals("consumer with rejecting selector should not have received the message", 0, counter2.get()); - assertEquals("consumer with rejecting selector should not have received the message", 0, counter3.get()); - } - @Test public void firstHandlerReturnsTrue() { SimpleDispatcher dispatcher = new SimpleDispatcher(); @@ -262,24 +204,6 @@ public class SimpleDispatcherTests { } - private static class TestMessageSelector implements MessageSelector { - - private final AtomicInteger counter; - - private final boolean accept; - - TestMessageSelector(AtomicInteger counter, boolean accept) { - this.counter = counter; - this.accept = accept; - } - - public boolean accept(Message message) { - this.counter.incrementAndGet(); - return this.accept; - } - } - - private static class CountingTestEndpoint implements MessageHandler { private final AtomicInteger counter; diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorEndpointTests.java index 5966e9dddd..e2e49bbe28 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorEndpointTests.java @@ -20,11 +20,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; @@ -35,11 +30,7 @@ import org.springframework.integration.core.MessagingException; import org.springframework.integration.handler.ServiceActivatingHandler; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.MessageHandlingException; -import org.springframework.integration.message.MessageRejectedException; import org.springframework.integration.message.StringMessage; -import org.springframework.integration.message.TestHandlers; -import org.springframework.integration.selector.MessageSelector; -import org.springframework.integration.selector.MessageSelectorChain; /** * @author Mark Fisher @@ -174,118 +165,6 @@ public class ServiceActivatorEndpointTests { endpoint.handleMessage(message); } - @Test(expected=MessageRejectedException.class) - public void endpointWithSelectorRejecting() { - ServiceActivatingHandler endpoint = new ServiceActivatingHandler( - TestHandlers.nullHandler(), "handle"); - endpoint.setSelector(new MessageSelector() { - public boolean accept(Message message) { - return false; - } - }); - endpoint.handleMessage(new StringMessage("test")); - } - - @Test - public void endpointWithSelectorAccepting() throws InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - ServiceActivatingHandler endpoint = new ServiceActivatingHandler( - TestHandlers.countDownHandler(latch), "handle"); - endpoint.setSelector(new MessageSelector() { - public boolean accept(Message message) { - return true; - } - }); - endpoint.handleMessage(new StringMessage("test")); - latch.await(100, TimeUnit.MILLISECONDS); - assertEquals("handler should have been invoked", 0, latch.getCount()); - } - - @Test - public void endpointWithMultipleSelectorsAndFirstRejects() { - final AtomicInteger counter = new AtomicInteger(); - ServiceActivatingHandler endpoint = new ServiceActivatingHandler( - TestHandlers.countingHandler(counter), "handle"); - MessageSelectorChain selectorChain = new MessageSelectorChain(); - selectorChain.add(new MessageSelector() { - public boolean accept(Message message) { - counter.incrementAndGet(); - return false; - } - }); - selectorChain.add(new MessageSelector() { - public boolean accept(Message message) { - counter.incrementAndGet(); - return true; - } - }); - endpoint.setSelector(selectorChain); - boolean exceptionWasThrown = false; - try { - endpoint.handleMessage(new StringMessage("test")); - } - catch (MessageRejectedException e) { - exceptionWasThrown = true; - } - assertTrue(exceptionWasThrown); - assertEquals("only the first selector should have been invoked", 1, counter.get()); - } - - @Test - public void endpointWithMultipleSelectorsAndFirstAccepts() { - final AtomicInteger selectorCounter = new AtomicInteger(); - AtomicInteger handlerCounter = new AtomicInteger(); - ServiceActivatingHandler endpoint = new ServiceActivatingHandler( - TestHandlers.countingHandler(handlerCounter), "handle"); - MessageSelectorChain selectorChain = new MessageSelectorChain(); - selectorChain.add(new MessageSelector() { - public boolean accept(Message message) { - selectorCounter.incrementAndGet(); - return true; - } - }); - selectorChain.add(new MessageSelector() { - public boolean accept(Message message) { - selectorCounter.incrementAndGet(); - return false; - } - }); - endpoint.setSelector(selectorChain); - boolean exceptionWasThrown = false; - try { - endpoint.handleMessage(new StringMessage("test")); - } - catch (MessageRejectedException e) { - exceptionWasThrown = true; - } - assertTrue(exceptionWasThrown); - assertEquals("both selectors should have been invoked", 2, selectorCounter.get()); - assertEquals("the handler should not have been invoked", 0, handlerCounter.get()); - } - - @Test - public void endpointWithMultipleSelectorsAndBothAccept() { - final AtomicInteger counter = new AtomicInteger(); - ServiceActivatingHandler endpoint = new ServiceActivatingHandler( - TestHandlers.countingHandler(counter), "handle"); - MessageSelectorChain selectorChain = new MessageSelectorChain(); - selectorChain.add(new MessageSelector() { - public boolean accept(Message message) { - counter.incrementAndGet(); - return true; - } - }); - selectorChain.add(new MessageSelector() { - public boolean accept(Message message) { - counter.incrementAndGet(); - return true; - } - }); - endpoint.setSelector(selectorChain); - endpoint.handleMessage(new StringMessage("test")); - assertEquals("both selectors and handler should have been invoked", 3, counter.get()); - } - @Test public void correlationIdNotSetIfMessageIsReturnedUnaltered() { QueueChannel replyChannel = new QueueChannel(1); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/filter/MessageFilterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/filter/MessageFilterTests.java index 4c6363da4f..a0a886fd8d 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/filter/MessageFilterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/filter/MessageFilterTests.java @@ -28,6 +28,7 @@ import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.core.Message; import org.springframework.integration.endpoint.EventDrivenConsumer; import org.springframework.integration.filter.MessageFilter; +import org.springframework.integration.message.MessageRejectedException; import org.springframework.integration.message.StringMessage; import org.springframework.integration.selector.MessageSelector; @@ -51,7 +52,7 @@ public class MessageFilterTests { } @Test - public void filterRejectsMessage() { + public void filterRejectsMessageSilently() { MessageFilter filter = new MessageFilter(new MessageSelector() { public boolean accept(Message message) { return false; @@ -63,6 +64,19 @@ public class MessageFilterTests { assertNull(output.receive(0)); } + @Test(expected = MessageRejectedException.class) + public void filterThrowsException() { + MessageFilter filter = new MessageFilter(new MessageSelector() { + public boolean accept(Message message) { + return false; + } + }); + filter.setThrowExceptionOnRejection(true); + QueueChannel output = new QueueChannel(); + filter.setOutputChannel(output); + filter.handleMessage(new StringMessage("test")); + } + @Test public void filterAcceptsWithChannels() { DirectChannel inputChannel = new DirectChannel(); @@ -83,7 +97,7 @@ public class MessageFilterTests { } @Test - public void filterRejectsWithChannels() { + public void filterRejectsSilentlyWithChannels() { DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(); MessageFilter filter = new MessageFilter(new MessageSelector() { @@ -99,4 +113,21 @@ public class MessageFilterTests { assertNull(outputChannel.receive(0)); } + @Test(expected = MessageRejectedException.class) + public void filterThrowsExceptionWithChannels() { + DirectChannel inputChannel = new DirectChannel(); + QueueChannel outputChannel = new QueueChannel(); + MessageFilter filter = new MessageFilter(new MessageSelector() { + public boolean accept(Message message) { + return false; + } + }); + filter.setOutputChannel(outputChannel); + filter.setThrowExceptionOnRejection(true); + EventDrivenConsumer endpoint = new EventDrivenConsumer(inputChannel, filter); + endpoint.start(); + Message message = new StringMessage("test"); + assertTrue(inputChannel.send(message)); + } + }