INT-487
This commit is contained in:
@@ -7,11 +7,16 @@
|
||||
http://www.springframework.org/schema/integration
|
||||
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
|
||||
|
||||
<channel id="testChannel"/>
|
||||
<channel id="input"/>
|
||||
|
||||
<service-activator id="endpoint" input-channel="testChannel"
|
||||
ref="testHandler" selector="typeSelector">
|
||||
</service-activator>
|
||||
<channel id="output">
|
||||
<queue/>
|
||||
</channel>
|
||||
|
||||
<chain input-channel="input" output-channel="output">
|
||||
<filter ref="typeSelector"/>
|
||||
<service-activator ref="testHandler"/>
|
||||
</chain>
|
||||
|
||||
<beans:bean id="typeSelector" class="org.springframework.integration.selector.PayloadTypeSelector">
|
||||
<beans:constructor-arg value="java.lang.String"/>
|
||||
@@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Copyright 2002-2008 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.config;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.integration.channel.PollableChannel;
|
||||
import org.springframework.integration.core.Message;
|
||||
import org.springframework.integration.core.MessageChannel;
|
||||
import org.springframework.integration.message.MessageBuilder;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
*/
|
||||
@ContextConfiguration
|
||||
public class ChainParserTests extends AbstractJUnit4SpringContextTests {
|
||||
|
||||
@Autowired
|
||||
@Qualifier("input")
|
||||
private MessageChannel inputChannel;
|
||||
|
||||
@Autowired
|
||||
@Qualifier("output")
|
||||
private PollableChannel outputChannel;
|
||||
|
||||
|
||||
@Test
|
||||
public void testChainWithAcceptingFilter() {
|
||||
Message<?> message = MessageBuilder.withPayload("test").build();
|
||||
this.inputChannel.send(message);
|
||||
Message<?> reply = this.outputChannel.receive(0);
|
||||
assertNotNull(reply);
|
||||
assertEquals("foo", reply.getPayload());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void chainWithRejectingFilter() {
|
||||
Message<?> message = MessageBuilder.withPayload(123).build();
|
||||
this.inputChannel.send(message);
|
||||
Message<?> reply = this.outputChannel.receive(0);
|
||||
assertNull(reply);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -17,7 +17,6 @@
|
||||
package org.springframework.integration.config;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@@ -25,12 +24,8 @@ import java.util.concurrent.TimeUnit;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.core.Message;
|
||||
import org.springframework.integration.core.MessageChannel;
|
||||
import org.springframework.integration.message.GenericMessage;
|
||||
import org.springframework.integration.message.MessageBuilder;
|
||||
import org.springframework.integration.message.MessageRejectedException;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
@@ -50,29 +45,4 @@ public class EndpointParserTests {
|
||||
assertEquals("test", handler.getMessageString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEndpointWithSelectorAccepts() {
|
||||
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
|
||||
"endpointWithSelector.xml", this.getClass());
|
||||
MessageChannel inputChannel = (MessageChannel) context.getBean("testChannel");
|
||||
QueueChannel replyChannel = new QueueChannel();
|
||||
Message<?> message = MessageBuilder.withPayload("test")
|
||||
.setReplyChannel(replyChannel).build();
|
||||
inputChannel.send(message);
|
||||
Message<?> reply = replyChannel.receive(500);
|
||||
assertNotNull(reply);
|
||||
assertEquals("foo", reply.getPayload());
|
||||
}
|
||||
|
||||
@Test(expected=MessageRejectedException.class)
|
||||
public void testEndpointWithSelectorRejects() {
|
||||
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
|
||||
"endpointWithSelector.xml", this.getClass());
|
||||
MessageChannel inputChannel = (MessageChannel) context.getBean("testChannel");
|
||||
MessageChannel replyChannel = new QueueChannel();
|
||||
Message<?> message = MessageBuilder.withPayload(123)
|
||||
.setReplyChannel(replyChannel).build();
|
||||
inputChannel.send(message);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -27,14 +27,12 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.integration.core.Message;
|
||||
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
|
||||
import org.springframework.integration.handler.ServiceActivatingHandler;
|
||||
import org.springframework.integration.message.MessageHandler;
|
||||
import org.springframework.integration.message.MessageDeliveryException;
|
||||
import org.springframework.integration.message.MessageRejectedException;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
import org.springframework.integration.message.TestHandlers;
|
||||
import org.springframework.integration.selector.MessageSelector;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
@@ -153,62 +151,6 @@ public class SimpleDispatcherTests {
|
||||
dispatcher.dispatch(new StringMessage("test2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void handlersWithSelectorsAndOneAccepts() throws InterruptedException {
|
||||
SimpleDispatcher dispatcher = new SimpleDispatcher();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicInteger counter1 = new AtomicInteger();
|
||||
final AtomicInteger counter2 = new AtomicInteger();
|
||||
final AtomicInteger counter3 = new AtomicInteger();
|
||||
final AtomicInteger selectorCounter = new AtomicInteger();
|
||||
AbstractReplyProducingMessageHandler consumer1 = createConsumer(TestHandlers.countingCountDownHandler(counter1, latch));
|
||||
AbstractReplyProducingMessageHandler consumer2 = createConsumer(TestHandlers.countingCountDownHandler(counter2, latch));
|
||||
AbstractReplyProducingMessageHandler consumer3 = createConsumer(TestHandlers.countingCountDownHandler(counter3, latch));
|
||||
consumer1.setSelector(new TestMessageSelector(selectorCounter, false));
|
||||
consumer2.setSelector(new TestMessageSelector(selectorCounter, false));
|
||||
consumer3.setSelector(new TestMessageSelector(selectorCounter, true));
|
||||
dispatcher.addHandler(consumer1);
|
||||
dispatcher.addHandler(consumer2);
|
||||
dispatcher.addHandler(consumer3);
|
||||
dispatcher.dispatch(new StringMessage("test"));
|
||||
assertEquals(0, latch.getCount());
|
||||
assertEquals("selectors should have been invoked one time each", 3, selectorCounter.get());
|
||||
assertEquals("consumer with rejecting selector should not have received the message", 0, counter1.get());
|
||||
assertEquals("consumer with rejecting selector should not have received the message", 0, counter2.get());
|
||||
assertEquals("consumer with accepting selector should have received the message", 1, counter3.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void handlersWithSelectorsAndNoneAccept() throws InterruptedException {
|
||||
SimpleDispatcher dispatcher = new SimpleDispatcher();
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
final AtomicInteger counter1 = new AtomicInteger();
|
||||
final AtomicInteger counter2 = new AtomicInteger();
|
||||
final AtomicInteger counter3 = new AtomicInteger();
|
||||
final AtomicInteger selectorCounter = new AtomicInteger();
|
||||
AbstractReplyProducingMessageHandler consumer1 = createConsumer(TestHandlers.countingCountDownHandler(counter1, latch));
|
||||
AbstractReplyProducingMessageHandler consumer2 = createConsumer(TestHandlers.countingCountDownHandler(counter2, latch));
|
||||
AbstractReplyProducingMessageHandler consumer3 = createConsumer(TestHandlers.countingCountDownHandler(counter3, latch));
|
||||
consumer1.setSelector(new TestMessageSelector(selectorCounter, false));
|
||||
consumer2.setSelector(new TestMessageSelector(selectorCounter, false));
|
||||
consumer3.setSelector(new TestMessageSelector(selectorCounter, false));
|
||||
dispatcher.addHandler(consumer1);
|
||||
dispatcher.addHandler(consumer2);
|
||||
dispatcher.addHandler(consumer3);
|
||||
boolean exceptionThrown = false;
|
||||
try {
|
||||
dispatcher.dispatch(new StringMessage("test"));
|
||||
}
|
||||
catch (MessageRejectedException e) {
|
||||
exceptionThrown = true;
|
||||
}
|
||||
assertTrue(exceptionThrown);
|
||||
assertEquals("selectors should have been invoked one time each", 3, selectorCounter.get());
|
||||
assertEquals("consumer with rejecting selector should not have received the message", 0, counter1.get());
|
||||
assertEquals("consumer with rejecting selector should not have received the message", 0, counter2.get());
|
||||
assertEquals("consumer with rejecting selector should not have received the message", 0, counter3.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void firstHandlerReturnsTrue() {
|
||||
SimpleDispatcher dispatcher = new SimpleDispatcher();
|
||||
@@ -262,24 +204,6 @@ public class SimpleDispatcherTests {
|
||||
}
|
||||
|
||||
|
||||
private static class TestMessageSelector implements MessageSelector {
|
||||
|
||||
private final AtomicInteger counter;
|
||||
|
||||
private final boolean accept;
|
||||
|
||||
TestMessageSelector(AtomicInteger counter, boolean accept) {
|
||||
this.counter = counter;
|
||||
this.accept = accept;
|
||||
}
|
||||
|
||||
public boolean accept(Message<?> message) {
|
||||
this.counter.incrementAndGet();
|
||||
return this.accept;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class CountingTestEndpoint implements MessageHandler {
|
||||
|
||||
private final AtomicInteger counter;
|
||||
|
||||
@@ -20,11 +20,6 @@ import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
@@ -35,11 +30,7 @@ import org.springframework.integration.core.MessagingException;
|
||||
import org.springframework.integration.handler.ServiceActivatingHandler;
|
||||
import org.springframework.integration.message.MessageBuilder;
|
||||
import org.springframework.integration.message.MessageHandlingException;
|
||||
import org.springframework.integration.message.MessageRejectedException;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
import org.springframework.integration.message.TestHandlers;
|
||||
import org.springframework.integration.selector.MessageSelector;
|
||||
import org.springframework.integration.selector.MessageSelectorChain;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
@@ -174,118 +165,6 @@ public class ServiceActivatorEndpointTests {
|
||||
endpoint.handleMessage(message);
|
||||
}
|
||||
|
||||
@Test(expected=MessageRejectedException.class)
|
||||
public void endpointWithSelectorRejecting() {
|
||||
ServiceActivatingHandler endpoint = new ServiceActivatingHandler(
|
||||
TestHandlers.nullHandler(), "handle");
|
||||
endpoint.setSelector(new MessageSelector() {
|
||||
public boolean accept(Message<?> message) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
endpoint.handleMessage(new StringMessage("test"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void endpointWithSelectorAccepting() throws InterruptedException {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
ServiceActivatingHandler endpoint = new ServiceActivatingHandler(
|
||||
TestHandlers.countDownHandler(latch), "handle");
|
||||
endpoint.setSelector(new MessageSelector() {
|
||||
public boolean accept(Message<?> message) {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
endpoint.handleMessage(new StringMessage("test"));
|
||||
latch.await(100, TimeUnit.MILLISECONDS);
|
||||
assertEquals("handler should have been invoked", 0, latch.getCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void endpointWithMultipleSelectorsAndFirstRejects() {
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
ServiceActivatingHandler endpoint = new ServiceActivatingHandler(
|
||||
TestHandlers.countingHandler(counter), "handle");
|
||||
MessageSelectorChain selectorChain = new MessageSelectorChain();
|
||||
selectorChain.add(new MessageSelector() {
|
||||
public boolean accept(Message<?> message) {
|
||||
counter.incrementAndGet();
|
||||
return false;
|
||||
}
|
||||
});
|
||||
selectorChain.add(new MessageSelector() {
|
||||
public boolean accept(Message<?> message) {
|
||||
counter.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
});
|
||||
endpoint.setSelector(selectorChain);
|
||||
boolean exceptionWasThrown = false;
|
||||
try {
|
||||
endpoint.handleMessage(new StringMessage("test"));
|
||||
}
|
||||
catch (MessageRejectedException e) {
|
||||
exceptionWasThrown = true;
|
||||
}
|
||||
assertTrue(exceptionWasThrown);
|
||||
assertEquals("only the first selector should have been invoked", 1, counter.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void endpointWithMultipleSelectorsAndFirstAccepts() {
|
||||
final AtomicInteger selectorCounter = new AtomicInteger();
|
||||
AtomicInteger handlerCounter = new AtomicInteger();
|
||||
ServiceActivatingHandler endpoint = new ServiceActivatingHandler(
|
||||
TestHandlers.countingHandler(handlerCounter), "handle");
|
||||
MessageSelectorChain selectorChain = new MessageSelectorChain();
|
||||
selectorChain.add(new MessageSelector() {
|
||||
public boolean accept(Message<?> message) {
|
||||
selectorCounter.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
});
|
||||
selectorChain.add(new MessageSelector() {
|
||||
public boolean accept(Message<?> message) {
|
||||
selectorCounter.incrementAndGet();
|
||||
return false;
|
||||
}
|
||||
});
|
||||
endpoint.setSelector(selectorChain);
|
||||
boolean exceptionWasThrown = false;
|
||||
try {
|
||||
endpoint.handleMessage(new StringMessage("test"));
|
||||
}
|
||||
catch (MessageRejectedException e) {
|
||||
exceptionWasThrown = true;
|
||||
}
|
||||
assertTrue(exceptionWasThrown);
|
||||
assertEquals("both selectors should have been invoked", 2, selectorCounter.get());
|
||||
assertEquals("the handler should not have been invoked", 0, handlerCounter.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void endpointWithMultipleSelectorsAndBothAccept() {
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
ServiceActivatingHandler endpoint = new ServiceActivatingHandler(
|
||||
TestHandlers.countingHandler(counter), "handle");
|
||||
MessageSelectorChain selectorChain = new MessageSelectorChain();
|
||||
selectorChain.add(new MessageSelector() {
|
||||
public boolean accept(Message<?> message) {
|
||||
counter.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
});
|
||||
selectorChain.add(new MessageSelector() {
|
||||
public boolean accept(Message<?> message) {
|
||||
counter.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
});
|
||||
endpoint.setSelector(selectorChain);
|
||||
endpoint.handleMessage(new StringMessage("test"));
|
||||
assertEquals("both selectors and handler should have been invoked", 3, counter.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void correlationIdNotSetIfMessageIsReturnedUnaltered() {
|
||||
QueueChannel replyChannel = new QueueChannel(1);
|
||||
|
||||
@@ -28,6 +28,7 @@ import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.core.Message;
|
||||
import org.springframework.integration.endpoint.EventDrivenConsumer;
|
||||
import org.springframework.integration.filter.MessageFilter;
|
||||
import org.springframework.integration.message.MessageRejectedException;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
import org.springframework.integration.selector.MessageSelector;
|
||||
|
||||
@@ -51,7 +52,7 @@ public class MessageFilterTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void filterRejectsMessage() {
|
||||
public void filterRejectsMessageSilently() {
|
||||
MessageFilter filter = new MessageFilter(new MessageSelector() {
|
||||
public boolean accept(Message<?> message) {
|
||||
return false;
|
||||
@@ -63,6 +64,19 @@ public class MessageFilterTests {
|
||||
assertNull(output.receive(0));
|
||||
}
|
||||
|
||||
@Test(expected = MessageRejectedException.class)
|
||||
public void filterThrowsException() {
|
||||
MessageFilter filter = new MessageFilter(new MessageSelector() {
|
||||
public boolean accept(Message<?> message) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
filter.setThrowExceptionOnRejection(true);
|
||||
QueueChannel output = new QueueChannel();
|
||||
filter.setOutputChannel(output);
|
||||
filter.handleMessage(new StringMessage("test"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void filterAcceptsWithChannels() {
|
||||
DirectChannel inputChannel = new DirectChannel();
|
||||
@@ -83,7 +97,7 @@ public class MessageFilterTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void filterRejectsWithChannels() {
|
||||
public void filterRejectsSilentlyWithChannels() {
|
||||
DirectChannel inputChannel = new DirectChannel();
|
||||
QueueChannel outputChannel = new QueueChannel();
|
||||
MessageFilter filter = new MessageFilter(new MessageSelector() {
|
||||
@@ -99,4 +113,21 @@ public class MessageFilterTests {
|
||||
assertNull(outputChannel.receive(0));
|
||||
}
|
||||
|
||||
@Test(expected = MessageRejectedException.class)
|
||||
public void filterThrowsExceptionWithChannels() {
|
||||
DirectChannel inputChannel = new DirectChannel();
|
||||
QueueChannel outputChannel = new QueueChannel();
|
||||
MessageFilter filter = new MessageFilter(new MessageSelector() {
|
||||
public boolean accept(Message<?> message) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
filter.setOutputChannel(outputChannel);
|
||||
filter.setThrowExceptionOnRejection(true);
|
||||
EventDrivenConsumer endpoint = new EventDrivenConsumer(inputChannel, filter);
|
||||
endpoint.start();
|
||||
Message<?> message = new StringMessage("test");
|
||||
assertTrue(inputChannel.send(message));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user