Refactored MessageConsumer with onMessage to MessageHandler with handleMessage.

This commit is contained in:
Mark Fisher
2008-11-03 14:17:53 +00:00
parent 90dcb7e88b
commit 39a8486ac8
101 changed files with 745 additions and 741 deletions

View File

@@ -153,9 +153,9 @@ public class AggregatorEndpointTests {
QueueChannel replyChannel = new QueueChannel();
QueueChannel discardChannel = new QueueChannel();
this.aggregator.setDiscardChannel(discardChannel);
this.aggregator.onMessage(createMessage("test-1a", 1, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-1a", 1, 1, 1, replyChannel));
assertEquals("test-1a", replyChannel.receive(100).getPayload());
this.aggregator.onMessage(createMessage("test-1b", 1, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-1b", 1, 1, 1, replyChannel));
assertEquals("test-1b", discardChannel.receive(100).getPayload());
}
@@ -165,13 +165,13 @@ public class AggregatorEndpointTests {
QueueChannel discardChannel = new QueueChannel();
this.aggregator.setTrackedCorrelationIdCapacity(3);
this.aggregator.setDiscardChannel(discardChannel);
this.aggregator.onMessage(createMessage("test-1a", 1, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-1a", 1, 1, 1, replyChannel));
assertEquals("test-1a", replyChannel.receive(100).getPayload());
this.aggregator.onMessage(createMessage("test-2", 2, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-2", 2, 1, 1, replyChannel));
assertEquals("test-2", replyChannel.receive(100).getPayload());
this.aggregator.onMessage(createMessage("test-3", 3, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-3", 3, 1, 1, replyChannel));
assertEquals("test-3", replyChannel.receive(100).getPayload());
this.aggregator.onMessage(createMessage("test-1b", 1, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-1b", 1, 1, 1, replyChannel));
assertEquals("test-1b", discardChannel.receive(100).getPayload());
}
@@ -181,15 +181,15 @@ public class AggregatorEndpointTests {
QueueChannel discardChannel = new QueueChannel();
this.aggregator.setTrackedCorrelationIdCapacity(3);
this.aggregator.setDiscardChannel(discardChannel);
this.aggregator.onMessage(createMessage("test-1a", 1, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-1a", 1, 1, 1, replyChannel));
assertEquals("test-1a", replyChannel.receive(100).getPayload());
this.aggregator.onMessage(createMessage("test-2", 2, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-2", 2, 1, 1, replyChannel));
assertEquals("test-2", replyChannel.receive(100).getPayload());
this.aggregator.onMessage(createMessage("test-3", 3, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-3", 3, 1, 1, replyChannel));
assertEquals("test-3", replyChannel.receive(100).getPayload());
this.aggregator.onMessage(createMessage("test-4", 4, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-4", 4, 1, 1, replyChannel));
assertEquals("test-4", replyChannel.receive(100).getPayload());
this.aggregator.onMessage(createMessage("test-1b", 1, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-1b", 1, 1, 1, replyChannel));
assertEquals("test-1b", replyChannel.receive(100).getPayload());
assertNull(discardChannel.receive(0));
}
@@ -197,7 +197,7 @@ public class AggregatorEndpointTests {
@Test(expected = MessageHandlingException.class)
public void testExceptionThrownIfNoCorrelationId() throws InterruptedException {
Message<?> message = createMessage("123", null, 2, 1, new QueueChannel());
this.aggregator.onMessage(message);
this.aggregator.handleMessage(message);
}
@Test
@@ -311,7 +311,7 @@ public class AggregatorEndpointTests {
public void run() {
try {
this.aggregator.onMessage(message);
this.aggregator.handleMessage(message);
}
catch (Exception e) {
this.exception = e;

View File

@@ -57,9 +57,9 @@ public class ResequencerTests {
Message<?> message1 = createMessage("123", "ABC", 3, 3, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 3, 1, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 3, 2, replyChannel);
this.resequencer.onMessage(message1);
this.resequencer.onMessage(message3);
this.resequencer.onMessage(message2);
this.resequencer.handleMessage(message1);
this.resequencer.handleMessage(message3);
this.resequencer.handleMessage(message2);
Message<?> reply1 = replyChannel.receive(0);
Message<?> reply2 = replyChannel.receive(0);
Message<?> reply3 = replyChannel.receive(0);
@@ -79,9 +79,9 @@ public class ResequencerTests {
Message<?> message2 = createMessage("456", "ABC", 4, 1, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 4, 4, replyChannel);
Message<?> message4 = createMessage("XYZ", "ABC", 4, 3, replyChannel);
this.resequencer.onMessage(message1);
this.resequencer.onMessage(message2);
this.resequencer.onMessage(message3);
this.resequencer.handleMessage(message1);
this.resequencer.handleMessage(message2);
this.resequencer.handleMessage(message3);
Message<?> reply1 = replyChannel.receive(0);
Message<?> reply2 = replyChannel.receive(0);
Message<?> reply3 = replyChannel.receive(0);
@@ -92,7 +92,7 @@ public class ResequencerTests {
assertEquals(new Integer(2), reply2.getHeaders().getSequenceNumber());
assertNull(reply3);
// when sending the last message, the whole sequence must have been sent
this.resequencer.onMessage(message4);
this.resequencer.handleMessage(message4);
reply3 = replyChannel.receive(0);
Message<?> reply4 = replyChannel.receive(0);
assertNotNull(reply3);
@@ -110,9 +110,9 @@ public class ResequencerTests {
Message<?> message2 = createMessage("456", "ABC", 4, 1, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 4, 4, replyChannel);
Message<?> message4 = createMessage("XYZ", "ABC", 4, 3, replyChannel);
this.resequencer.onMessage(message1);
this.resequencer.onMessage(message2);
this.resequencer.onMessage(message3);
this.resequencer.handleMessage(message1);
this.resequencer.handleMessage(message2);
this.resequencer.handleMessage(message3);
Message<?> reply1 = replyChannel.receive(0);
Message<?> reply2 = replyChannel.receive(0);
Message<?> reply3 = replyChannel.receive(0);
@@ -121,7 +121,7 @@ public class ResequencerTests {
assertNull(reply2);
assertNull(reply3);
// after sending the last message, the whole sequence should have been sent
this.resequencer.onMessage(message4);
this.resequencer.handleMessage(message4);
reply1 = replyChannel.receive(0);
reply2 = replyChannel.receive(0);
reply3 = replyChannel.receive(0);

View File

@@ -37,7 +37,7 @@ import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.consumer.AbstractReplyProducingMessageConsumer;
import org.springframework.integration.consumer.AbstractReplyProducingMessageHandler;
import org.springframework.integration.consumer.ReplyMessageHolder;
import org.springframework.integration.core.Message;
import org.springframework.integration.endpoint.PollingConsumerEndpoint;
@@ -69,13 +69,13 @@ public class ApplicationContextMessageBusTests {
Message<String> message = MessageBuilder.withPayload("test")
.setReplyChannelName("targetChannel").build();
sourceChannel.send(message);
AbstractReplyProducingMessageConsumer consumer = new AbstractReplyProducingMessageConsumer() {
public void onMessage(Message<?> message, ReplyMessageHolder replyHolder) {
AbstractReplyProducingMessageHandler handler = new AbstractReplyProducingMessageHandler() {
public void handleRequestMessage(Message<?> message, ReplyMessageHolder replyHolder) {
replyHolder.set(message);
}
};
consumer.setBeanFactory(context);
PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(consumer, sourceChannel);
handler.setBeanFactory(context);
PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(sourceChannel, handler);
endpoint.afterPropertiesSet();
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
context.refresh();
@@ -128,13 +128,15 @@ public class ApplicationContextMessageBusTests {
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel1 = new QueueChannel();
QueueChannel outputChannel2 = new QueueChannel();
AbstractReplyProducingMessageConsumer consumer1 = new AbstractReplyProducingMessageConsumer() {
public void onMessage(Message<?> message, ReplyMessageHolder replyHolder) {
AbstractReplyProducingMessageHandler handler1 = new AbstractReplyProducingMessageHandler() {
@Override
public void handleRequestMessage(Message<?> message, ReplyMessageHolder replyHolder) {
replyHolder.set(message);
}
};
AbstractReplyProducingMessageConsumer consumer2 = new AbstractReplyProducingMessageConsumer() {
public void onMessage(Message<?> message, ReplyMessageHolder replyHolder) {
AbstractReplyProducingMessageHandler handler2 = new AbstractReplyProducingMessageHandler() {
@Override
public void handleRequestMessage(Message<?> message, ReplyMessageHolder replyHolder) {
replyHolder.set(message);
}
};
@@ -144,11 +146,11 @@ public class ApplicationContextMessageBusTests {
context.getBeanFactory().registerSingleton("input", inputChannel);
context.getBeanFactory().registerSingleton("output1", outputChannel1);
context.getBeanFactory().registerSingleton("output2", outputChannel2);
consumer1.setOutputChannel(outputChannel1);
consumer2.setOutputChannel(outputChannel2);
PollingConsumerEndpoint endpoint1 = new PollingConsumerEndpoint(consumer1, inputChannel);
handler1.setOutputChannel(outputChannel1);
handler2.setOutputChannel(outputChannel2);
PollingConsumerEndpoint endpoint1 = new PollingConsumerEndpoint(inputChannel, handler1);
endpoint1.afterPropertiesSet();
PollingConsumerEndpoint endpoint2 = new PollingConsumerEndpoint(consumer2, inputChannel);
PollingConsumerEndpoint endpoint2 = new PollingConsumerEndpoint(inputChannel, handler2);
endpoint2.afterPropertiesSet();
context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1);
context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2);
@@ -171,14 +173,16 @@ public class ApplicationContextMessageBusTests {
QueueChannel outputChannel1 = new QueueChannel();
QueueChannel outputChannel2 = new QueueChannel();
final CountDownLatch latch = new CountDownLatch(2);
AbstractReplyProducingMessageConsumer consumer1 = new AbstractReplyProducingMessageConsumer() {
public void onMessage(Message<?> message, ReplyMessageHolder replyHolder) {
AbstractReplyProducingMessageHandler handler1 = new AbstractReplyProducingMessageHandler() {
@Override
public void handleRequestMessage(Message<?> message, ReplyMessageHolder replyHolder) {
replyHolder.set(message);
latch.countDown();
}
};
AbstractReplyProducingMessageConsumer consumer2 = new AbstractReplyProducingMessageConsumer() {
public void onMessage(Message<?> message, ReplyMessageHolder replyHolder) {
AbstractReplyProducingMessageHandler handler2 = new AbstractReplyProducingMessageHandler() {
@Override
public void handleRequestMessage(Message<?> message, ReplyMessageHolder replyHolder) {
replyHolder.set(message);
latch.countDown();
}
@@ -189,10 +193,10 @@ public class ApplicationContextMessageBusTests {
context.getBeanFactory().registerSingleton("input", inputChannel);
context.getBeanFactory().registerSingleton("output1", outputChannel1);
context.getBeanFactory().registerSingleton("output2", outputChannel2);
consumer1.setOutputChannel(outputChannel1);
consumer2.setOutputChannel(outputChannel2);
SubscribingConsumerEndpoint endpoint1 = new SubscribingConsumerEndpoint(consumer1, inputChannel);
SubscribingConsumerEndpoint endpoint2 = new SubscribingConsumerEndpoint(consumer2, inputChannel);
handler1.setOutputChannel(outputChannel1);
handler2.setOutputChannel(outputChannel2);
SubscribingConsumerEndpoint endpoint1 = new SubscribingConsumerEndpoint(inputChannel, handler1);
SubscribingConsumerEndpoint endpoint2 = new SubscribingConsumerEndpoint(inputChannel, handler2);
context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1);
context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2);
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
@@ -256,12 +260,13 @@ public class ApplicationContextMessageBusTests {
errorChannel.setBeanName(ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME);
context.getBeanFactory().registerSingleton(ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME, errorChannel);
final CountDownLatch latch = new CountDownLatch(1);
AbstractReplyProducingMessageConsumer consumer = new AbstractReplyProducingMessageConsumer() {
public void onMessage(Message<?> message, ReplyMessageHolder replyHolder) {
AbstractReplyProducingMessageHandler handler = new AbstractReplyProducingMessageHandler() {
@Override
public void handleRequestMessage(Message<?> message, ReplyMessageHolder replyHolder) {
latch.countDown();
}
};
PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(consumer, errorChannel);
PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(errorChannel, handler);
endpoint.afterPropertiesSet();
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();

View File

@@ -29,9 +29,9 @@ import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.channel.ThreadLocalChannel;
import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor;
import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.consumer.AbstractReplyProducingMessageConsumer;
import org.springframework.integration.consumer.AbstractReplyProducingMessageHandler;
import org.springframework.integration.consumer.ReplyMessageHolder;
import org.springframework.integration.consumer.ServiceActivatingConsumer;
import org.springframework.integration.consumer.ServiceActivatingHandler;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessagingException;
import org.springframework.integration.endpoint.SubscribingConsumerEndpoint;
@@ -67,9 +67,9 @@ public class DirectChannelSubscriptionTests {
@Test
public void sendAndReceiveForRegisteredEndpoint() {
GenericApplicationContext context = new GenericApplicationContext();
ServiceActivatingConsumer serviceActivator = new ServiceActivatingConsumer(new TestBean(), "handle");
ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(new TestBean(), "handle");
serviceActivator.setOutputChannel(targetChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, sourceChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(sourceChannel, serviceActivator);
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
bus.setApplicationContext(context);
context.refresh();
@@ -97,13 +97,14 @@ public class DirectChannelSubscriptionTests {
@Test(expected = MessagingException.class)
public void exceptionThrownFromRegisteredEndpoint() {
AbstractReplyProducingMessageConsumer consumer = new AbstractReplyProducingMessageConsumer() {
public void onMessage(Message<?> message, ReplyMessageHolder replyHolder) {
AbstractReplyProducingMessageHandler handler = new AbstractReplyProducingMessageHandler() {
@Override
public void handleRequestMessage(Message<?> message, ReplyMessageHolder replyHolder) {
throw new RuntimeException("intentional test failure");
}
};
consumer.setOutputChannel(targetChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(consumer, sourceChannel);
handler.setOutputChannel(targetChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(sourceChannel, handler);
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
bus.setApplicationContext(context);
context.refresh();

View File

@@ -22,7 +22,7 @@
<constructor-arg ref="sourceChannel"/>
</bean>
<bean id="serviceActivator" class="org.springframework.integration.consumer.ServiceActivatingConsumer">
<bean id="serviceActivator" class="org.springframework.integration.consumer.ServiceActivatingHandler">
<constructor-arg ref="handler"/>
<property name="outputChannel" ref="targetChannel"/>
</bean>

View File

@@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.MessageHandler;
import org.springframework.integration.message.StringMessage;
/**
@@ -60,7 +60,7 @@ public class DirectChannelTests {
}
private static class ThreadNameExtractingTestTarget implements MessageConsumer {
private static class ThreadNameExtractingTestTarget implements MessageHandler {
private String threadName;
@@ -75,7 +75,7 @@ public class DirectChannelTests {
this.latch = latch;
}
public void onMessage(Message<?> message) {
public void handleMessage(Message<?> message) {
this.threadName = Thread.currentThread().getName();
if (this.latch != null) {
this.latch.countDown();

View File

@@ -31,7 +31,7 @@ import org.junit.Test;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.consumer.AbstractReplyProducingMessageConsumer;
import org.springframework.integration.consumer.AbstractReplyProducingMessageHandler;
import org.springframework.integration.consumer.ReplyMessageHolder;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
@@ -52,12 +52,13 @@ public class MessageChannelTemplateTests {
public void setUp() {
this.requestChannel = new QueueChannel();
this.requestChannel.setBeanName("requestChannel");
AbstractReplyProducingMessageConsumer consumer = new AbstractReplyProducingMessageConsumer() {
public void onMessage(Message<?> message, ReplyMessageHolder replyHolder) {
AbstractReplyProducingMessageHandler handler = new AbstractReplyProducingMessageHandler() {
@Override
public void handleRequestMessage(Message<?> message, ReplyMessageHolder replyHolder) {
replyHolder.set(message.getPayload().toString().toUpperCase());
}
};
PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(consumer, requestChannel);
PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(requestChannel, handler);
endpoint.afterPropertiesSet();
GenericApplicationContext context = new GenericApplicationContext();
context.getBeanFactory().registerSingleton("requestChannel", requestChannel);

View File

@@ -78,7 +78,7 @@ public class AggregatorParserTests {
CompletionStrategy completionStrategy = (CompletionStrategy) context.getBean("completionStrategy");
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel");
Object consumer = new DirectFieldAccessor(endpoint).getPropertyValue("consumer");
Object consumer = new DirectFieldAccessor(endpoint).getPropertyValue("handler");
Assert.assertEquals(MethodInvokingAggregator.class, consumer.getClass());
DirectFieldAccessor accessor = new DirectFieldAccessor(consumer);
Method expectedMethod = TestAggregatorBean.class.getMethod("createSingleMessageFromGroup", List.class);
@@ -138,7 +138,7 @@ public class AggregatorParserTests {
SubscribingConsumerEndpoint endpoint =
(SubscribingConsumerEndpoint) context.getBean("aggregatorWithPojoCompletionStrategy");
CompletionStrategy completionStrategy = (CompletionStrategy) new DirectFieldAccessor(
new DirectFieldAccessor(endpoint).getPropertyValue("consumer")).getPropertyValue("completionStrategy");
new DirectFieldAccessor(endpoint).getPropertyValue("handler")).getPropertyValue("completionStrategy");
Assert.assertTrue(completionStrategy instanceof CompletionStrategyAdapter);
DirectFieldAccessor completionStrategyAccessor = new DirectFieldAccessor(completionStrategy);
MethodInvoker invoker = (MethodInvoker) completionStrategyAccessor.getPropertyValue("invoker");

View File

@@ -76,7 +76,7 @@ public class ResequencerParserTests {
@Test
public void testDefaultResequencerProperties() {
SubscribingConsumerEndpoint endpoint = (SubscribingConsumerEndpoint) context.getBean("defaultResequencer");
Resequencer resequencer = (Resequencer) new DirectFieldAccessor(endpoint).getPropertyValue("consumer");
Resequencer resequencer = (Resequencer) new DirectFieldAccessor(endpoint).getPropertyValue("handler");
assertNull(getPropertyValue(resequencer, "outputChannel"));
assertNull(getPropertyValue(resequencer, "discardChannel"));
assertEquals("The ResequencerEndpoint is not set with the appropriate timeout value",
@@ -98,7 +98,7 @@ public class ResequencerParserTests {
SubscribingConsumerEndpoint endpoint = (SubscribingConsumerEndpoint) context.getBean("completelyDefinedResequencer");
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel");
Resequencer resequencer = (Resequencer) new DirectFieldAccessor(endpoint).getPropertyValue("consumer");
Resequencer resequencer = (Resequencer) new DirectFieldAccessor(endpoint).getPropertyValue("handler");
assertEquals("The ResequencerEndpoint is not injected with the appropriate output channel",
outputChannel, getPropertyValue(resequencer, "outputChannel"));
assertEquals("The ResequencerEndpoint is not injected with the appropriate discard channel",

View File

@@ -17,12 +17,12 @@
package org.springframework.integration.config;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.MessageHandler;
/**
* @author Mark Fisher
*/
public class TestConsumer implements MessageConsumer {
public class TestConsumer implements MessageHandler {
private volatile Message<?> lastMessage;
@@ -31,7 +31,7 @@ public class TestConsumer implements MessageConsumer {
return this.lastMessage;
}
public void onMessage(Message<?> message) {
public void handleMessage(Message<?> message) {
this.lastMessage = message;
}

View File

@@ -101,11 +101,10 @@ public class AggregatorAnnotationTests {
}
@SuppressWarnings("unchecked")
private AbstractMessageAggregator getAggregator(ApplicationContext context, final String endpointName) {
SubscribingConsumerEndpoint endpoint = (SubscribingConsumerEndpoint) context.getBean(
endpointName + ".aggregatingMethod.aggregator");
return (AbstractMessageAggregator) new DirectFieldAccessor(endpoint).getPropertyValue("consumer");
return (AbstractMessageAggregator) new DirectFieldAccessor(endpoint).getPropertyValue("handler");
}
}

View File

@@ -48,7 +48,7 @@ import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.endpoint.PollingConsumerEndpoint;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.MessageHandler;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.scheduling.IntervalTrigger;
import org.springframework.integration.scheduling.Trigger;
@@ -403,8 +403,8 @@ public class MessagingAnnotationPostProcessorTests {
DirectChannel testChannel = (DirectChannel) channelResolver.resolveChannelName("testChannel");
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Message<?>> receivedMessage = new AtomicReference<Message<?>>();
testChannel.subscribe(new MessageConsumer() {
public void onMessage(Message<?> message) {
testChannel.subscribe(new MessageHandler() {
public void handleMessage(Message<?> message) {
receivedMessage.set(message);
latch.countDown();
}

View File

@@ -35,7 +35,7 @@ import org.junit.Test;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.MessageHandler;
import org.springframework.integration.message.StringMessage;
/**
@@ -50,11 +50,11 @@ public class BroadcastingDispatcherTests {
private Message<?> messageMock = createMock(Message.class);
private MessageConsumer targetMock1 = createMock(MessageConsumer.class);
private MessageHandler targetMock1 = createMock(MessageHandler.class);
private MessageConsumer targetMock2 = createMock(MessageConsumer.class);
private MessageHandler targetMock2 = createMock(MessageHandler.class);
private MessageConsumer targetMock3 = createMock(MessageConsumer.class);
private MessageHandler targetMock3 = createMock(MessageHandler.class);
private Object[] globalMocks = new Object[] {
messageMock, taskExecutorMock, targetMock1, targetMock2, targetMock3 };
@@ -72,8 +72,8 @@ public class BroadcastingDispatcherTests {
@Test
public void singleTargetWithoutTaskExecutor() throws Exception {
dispatcher.setTaskExecutor(null);
dispatcher.addConsumer(targetMock1);
targetMock1.onMessage(messageMock);
dispatcher.addHandler(targetMock1);
targetMock1.handleMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
@@ -82,8 +82,8 @@ public class BroadcastingDispatcherTests {
@Test
public void singleTargetWithTaskExecutor() throws Exception {
dispatcher.addConsumer(targetMock1);
targetMock1.onMessage(messageMock);
dispatcher.addHandler(targetMock1);
targetMock1.handleMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
@@ -93,14 +93,14 @@ public class BroadcastingDispatcherTests {
@Test
public void multipleTargetsWithoutTaskExecutor() {
dispatcher.setTaskExecutor(null);
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
targetMock1.onMessage(messageMock);
dispatcher.addHandler(targetMock1);
dispatcher.addHandler(targetMock2);
dispatcher.addHandler(targetMock3);
targetMock1.handleMessage(messageMock);
expectLastCall();
targetMock2.onMessage(messageMock);
targetMock2.handleMessage(messageMock);
expectLastCall();
targetMock3.onMessage(messageMock);
targetMock3.handleMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
@@ -109,14 +109,14 @@ public class BroadcastingDispatcherTests {
@Test
public void multipleTargetsWithTaskExecutor() {
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
targetMock1.onMessage(messageMock);
dispatcher.addHandler(targetMock1);
dispatcher.addHandler(targetMock2);
dispatcher.addHandler(targetMock3);
targetMock1.handleMessage(messageMock);
expectLastCall();
targetMock2.onMessage(messageMock);
targetMock2.handleMessage(messageMock);
expectLastCall();
targetMock3.onMessage(messageMock);
targetMock3.handleMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
@@ -126,13 +126,13 @@ public class BroadcastingDispatcherTests {
@Test
public void multipleTargetsPartialFailureFirst() {
reset(taskExecutorMock);
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
dispatcher.addHandler(targetMock1);
dispatcher.addHandler(targetMock2);
dispatcher.addHandler(targetMock3);
partialFailingExecutorMock(false, true, true);
targetMock2.onMessage(messageMock);
targetMock2.handleMessage(messageMock);
expectLastCall();
targetMock3.onMessage(messageMock);
targetMock3.handleMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
@@ -142,13 +142,13 @@ public class BroadcastingDispatcherTests {
@Test
public void multipleTargetsPartialFailureMiddle() {
reset(taskExecutorMock);
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
dispatcher.addHandler(targetMock1);
dispatcher.addHandler(targetMock2);
dispatcher.addHandler(targetMock3);
partialFailingExecutorMock(true, false, true);
targetMock1.onMessage(messageMock);
targetMock1.handleMessage(messageMock);
expectLastCall();
targetMock3.onMessage(messageMock);
targetMock3.handleMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
@@ -158,13 +158,13 @@ public class BroadcastingDispatcherTests {
@Test
public void multipleTargetsPartialFailureLast() {
reset(taskExecutorMock);
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
dispatcher.addHandler(targetMock1);
dispatcher.addHandler(targetMock2);
dispatcher.addHandler(targetMock3);
partialFailingExecutorMock(true, true, false);
targetMock1.onMessage(messageMock);
targetMock1.handleMessage(messageMock);
expectLastCall();
targetMock2.onMessage(messageMock);
targetMock2.handleMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
@@ -174,9 +174,9 @@ public class BroadcastingDispatcherTests {
@Test
public void multipleTargetsAllFail() {
reset(taskExecutorMock);
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
dispatcher.addHandler(targetMock1);
dispatcher.addHandler(targetMock2);
dispatcher.addHandler(targetMock3);
partialFailingExecutorMock(false, false, false);
replay(globalMocks);
dispatcher.dispatch(messageMock);
@@ -185,10 +185,10 @@ public class BroadcastingDispatcherTests {
@Test
public void noDuplicateSubscription() {
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock1);
targetMock1.onMessage(messageMock);
dispatcher.addHandler(targetMock1);
dispatcher.addHandler(targetMock1);
dispatcher.addHandler(targetMock1);
targetMock1.handleMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
@@ -197,13 +197,13 @@ public class BroadcastingDispatcherTests {
@Test
public void removeConsumerBeforeSend() {
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
dispatcher.removeConsumer(targetMock2);
targetMock1.onMessage(messageMock);
dispatcher.addHandler(targetMock1);
dispatcher.addHandler(targetMock2);
dispatcher.addHandler(targetMock3);
dispatcher.removeHandler(targetMock2);
targetMock1.handleMessage(messageMock);
expectLastCall();
targetMock3.onMessage(messageMock);
targetMock3.handleMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
@@ -212,18 +212,18 @@ public class BroadcastingDispatcherTests {
@Test
public void removeConsumerBetweenSends() {
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
targetMock1.onMessage(messageMock);
dispatcher.addHandler(targetMock1);
dispatcher.addHandler(targetMock2);
dispatcher.addHandler(targetMock3);
targetMock1.handleMessage(messageMock);
expectLastCall().times(2);
targetMock2.onMessage(messageMock);
targetMock2.handleMessage(messageMock);
expectLastCall();
targetMock3.onMessage(messageMock);
targetMock3.handleMessage(messageMock);
expectLastCall().times(2);
replay(globalMocks);
dispatcher.dispatch(messageMock);
dispatcher.removeConsumer(targetMock2);
dispatcher.removeHandler(targetMock2);
dispatcher.dispatch(messageMock);
verify(globalMocks);
}
@@ -232,10 +232,10 @@ public class BroadcastingDispatcherTests {
public void applySequenceDisabledByDefault() {
BroadcastingDispatcher dispatcher = new BroadcastingDispatcher();
final List<Message<?>> messages = Collections.synchronizedList(new ArrayList<Message<?>>());
MessageConsumer target1 = new MessageStoringTestEndpoint(messages);
MessageConsumer target2 = new MessageStoringTestEndpoint(messages);
dispatcher.addConsumer(target1);
dispatcher.addConsumer(target2);
MessageHandler target1 = new MessageStoringTestEndpoint(messages);
MessageHandler target2 = new MessageStoringTestEndpoint(messages);
dispatcher.addHandler(target1);
dispatcher.addHandler(target2);
dispatcher.dispatch(new StringMessage("test"));
assertEquals(2, messages.size());
assertEquals(0, (int) messages.get(0).getHeaders().getSequenceNumber());
@@ -249,12 +249,12 @@ public class BroadcastingDispatcherTests {
BroadcastingDispatcher dispatcher = new BroadcastingDispatcher();
dispatcher.setApplySequence(true);
final List<Message<?>> messages = Collections.synchronizedList(new ArrayList<Message<?>>());
MessageConsumer target1 = new MessageStoringTestEndpoint(messages);
MessageConsumer target2 = new MessageStoringTestEndpoint(messages);
MessageConsumer target3 = new MessageStoringTestEndpoint(messages);
dispatcher.addConsumer(target1);
dispatcher.addConsumer(target2);
dispatcher.addConsumer(target3);
MessageHandler target1 = new MessageStoringTestEndpoint(messages);
MessageHandler target2 = new MessageStoringTestEndpoint(messages);
MessageHandler target3 = new MessageStoringTestEndpoint(messages);
dispatcher.addHandler(target1);
dispatcher.addHandler(target2);
dispatcher.addHandler(target3);
dispatcher.dispatch(new StringMessage("test"));
assertEquals(3, messages.size());
assertEquals(1, (int) messages.get(0).getHeaders().getSequenceNumber());
@@ -294,7 +294,7 @@ public class BroadcastingDispatcherTests {
}
private static class MessageStoringTestEndpoint implements MessageConsumer {
private static class MessageStoringTestEndpoint implements MessageHandler {
private final List<Message<?>> messageList;
@@ -302,7 +302,7 @@ public class BroadcastingDispatcherTests {
this.messageList = messageList;
}
public void onMessage(Message<?> message) {
public void handleMessage(Message<?> message) {
this.messageList.add(message);
}
};

View File

@@ -26,10 +26,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.springframework.integration.consumer.AbstractReplyProducingMessageConsumer;
import org.springframework.integration.consumer.ServiceActivatingConsumer;
import org.springframework.integration.consumer.AbstractReplyProducingMessageHandler;
import org.springframework.integration.consumer.ServiceActivatingHandler;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.MessageHandler;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessageRejectedException;
import org.springframework.integration.message.StringMessage;
@@ -45,7 +45,7 @@ public class SimpleDispatcherTests {
public void singleMessage() throws InterruptedException {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final CountDownLatch latch = new CountDownLatch(1);
dispatcher.addConsumer(createConsumer(TestHandlers.countDownHandler(latch)));
dispatcher.addHandler(createConsumer(TestHandlers.countDownHandler(latch)));
dispatcher.dispatch(new StringMessage("test"));
latch.await(500, TimeUnit.MILLISECONDS);
assertEquals(0, latch.getCount());
@@ -57,8 +57,8 @@ public class SimpleDispatcherTests {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger counter1 = new AtomicInteger();
final AtomicInteger counter2 = new AtomicInteger();
dispatcher.addConsumer(createConsumer(TestHandlers.countingCountDownHandler(counter1, latch)));
dispatcher.addConsumer(createConsumer(TestHandlers.countingCountDownHandler(counter2, latch)));
dispatcher.addHandler(createConsumer(TestHandlers.countingCountDownHandler(counter1, latch)));
dispatcher.addHandler(createConsumer(TestHandlers.countingCountDownHandler(counter2, latch)));
dispatcher.dispatch(new StringMessage("test"));
latch.await(500, TimeUnit.MILLISECONDS);
assertEquals(0, latch.getCount());
@@ -69,9 +69,9 @@ public class SimpleDispatcherTests {
public void noDuplicateSubscriptions() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageConsumer target = new CountingTestEndpoint(counter, false);
dispatcher.addConsumer(target);
dispatcher.addConsumer(target);
MessageHandler target = new CountingTestEndpoint(counter, false);
dispatcher.addHandler(target);
dispatcher.addHandler(target);
try {
dispatcher.dispatch(new StringMessage("test"));
}
@@ -85,13 +85,13 @@ public class SimpleDispatcherTests {
public void removeConsumerBeforeSend() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageConsumer target1 = new CountingTestEndpoint(counter, false);
MessageConsumer target2 = new CountingTestEndpoint(counter, false);
MessageConsumer target3 = new CountingTestEndpoint(counter, false);
dispatcher.addConsumer(target1);
dispatcher.addConsumer(target2);
dispatcher.addConsumer(target3);
dispatcher.removeConsumer(target2);
MessageHandler target1 = new CountingTestEndpoint(counter, false);
MessageHandler target2 = new CountingTestEndpoint(counter, false);
MessageHandler target3 = new CountingTestEndpoint(counter, false);
dispatcher.addHandler(target1);
dispatcher.addHandler(target2);
dispatcher.addHandler(target3);
dispatcher.removeHandler(target2);
try {
dispatcher.dispatch(new StringMessage("test"));
}
@@ -105,12 +105,12 @@ public class SimpleDispatcherTests {
public void removeConsumerBetweenSends() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageConsumer target1 = new CountingTestEndpoint(counter, false);
MessageConsumer target2 = new CountingTestEndpoint(counter, false);
MessageConsumer target3 = new CountingTestEndpoint(counter, false);
dispatcher.addConsumer(target1);
dispatcher.addConsumer(target2);
dispatcher.addConsumer(target3);
MessageHandler target1 = new CountingTestEndpoint(counter, false);
MessageHandler target2 = new CountingTestEndpoint(counter, false);
MessageHandler target3 = new CountingTestEndpoint(counter, false);
dispatcher.addHandler(target1);
dispatcher.addHandler(target2);
dispatcher.addHandler(target3);
try {
dispatcher.dispatch(new StringMessage("test1"));
}
@@ -118,7 +118,7 @@ public class SimpleDispatcherTests {
// ignore
}
assertEquals(3, counter.get());
dispatcher.removeConsumer(target2);
dispatcher.removeHandler(target2);
try {
dispatcher.dispatch(new StringMessage("test2"));
}
@@ -126,7 +126,7 @@ public class SimpleDispatcherTests {
// ignore
}
assertEquals(5, counter.get());
dispatcher.removeConsumer(target1);
dispatcher.removeHandler(target1);
try {
dispatcher.dispatch(new StringMessage("test3"));
}
@@ -140,8 +140,8 @@ public class SimpleDispatcherTests {
public void removeConsumerLastTargetCausesDeliveryException() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageConsumer target = new CountingTestEndpoint(counter, false);
dispatcher.addConsumer(target);
MessageHandler target = new CountingTestEndpoint(counter, false);
dispatcher.addHandler(target);
try {
dispatcher.dispatch(new StringMessage("test1"));
}
@@ -149,7 +149,7 @@ public class SimpleDispatcherTests {
// ignore
}
assertEquals(1, counter.get());
dispatcher.removeConsumer(target);
dispatcher.removeHandler(target);
dispatcher.dispatch(new StringMessage("test2"));
}
@@ -161,15 +161,15 @@ public class SimpleDispatcherTests {
final AtomicInteger counter2 = new AtomicInteger();
final AtomicInteger counter3 = new AtomicInteger();
final AtomicInteger selectorCounter = new AtomicInteger();
AbstractReplyProducingMessageConsumer consumer1 = createConsumer(TestHandlers.countingCountDownHandler(counter1, latch));
AbstractReplyProducingMessageConsumer consumer2 = createConsumer(TestHandlers.countingCountDownHandler(counter2, latch));
AbstractReplyProducingMessageConsumer consumer3 = createConsumer(TestHandlers.countingCountDownHandler(counter3, latch));
AbstractReplyProducingMessageHandler consumer1 = createConsumer(TestHandlers.countingCountDownHandler(counter1, latch));
AbstractReplyProducingMessageHandler consumer2 = createConsumer(TestHandlers.countingCountDownHandler(counter2, latch));
AbstractReplyProducingMessageHandler consumer3 = createConsumer(TestHandlers.countingCountDownHandler(counter3, latch));
consumer1.setSelector(new TestMessageSelector(selectorCounter, false));
consumer2.setSelector(new TestMessageSelector(selectorCounter, false));
consumer3.setSelector(new TestMessageSelector(selectorCounter, true));
dispatcher.addConsumer(consumer1);
dispatcher.addConsumer(consumer2);
dispatcher.addConsumer(consumer3);
dispatcher.addHandler(consumer1);
dispatcher.addHandler(consumer2);
dispatcher.addHandler(consumer3);
dispatcher.dispatch(new StringMessage("test"));
assertEquals(0, latch.getCount());
assertEquals("selectors should have been invoked one time each", 3, selectorCounter.get());
@@ -186,15 +186,15 @@ public class SimpleDispatcherTests {
final AtomicInteger counter2 = new AtomicInteger();
final AtomicInteger counter3 = new AtomicInteger();
final AtomicInteger selectorCounter = new AtomicInteger();
AbstractReplyProducingMessageConsumer consumer1 = createConsumer(TestHandlers.countingCountDownHandler(counter1, latch));
AbstractReplyProducingMessageConsumer consumer2 = createConsumer(TestHandlers.countingCountDownHandler(counter2, latch));
AbstractReplyProducingMessageConsumer consumer3 = createConsumer(TestHandlers.countingCountDownHandler(counter3, latch));
AbstractReplyProducingMessageHandler consumer1 = createConsumer(TestHandlers.countingCountDownHandler(counter1, latch));
AbstractReplyProducingMessageHandler consumer2 = createConsumer(TestHandlers.countingCountDownHandler(counter2, latch));
AbstractReplyProducingMessageHandler consumer3 = createConsumer(TestHandlers.countingCountDownHandler(counter3, latch));
consumer1.setSelector(new TestMessageSelector(selectorCounter, false));
consumer2.setSelector(new TestMessageSelector(selectorCounter, false));
consumer3.setSelector(new TestMessageSelector(selectorCounter, false));
dispatcher.addConsumer(consumer1);
dispatcher.addConsumer(consumer2);
dispatcher.addConsumer(consumer3);
dispatcher.addHandler(consumer1);
dispatcher.addHandler(consumer2);
dispatcher.addHandler(consumer3);
boolean exceptionThrown = false;
try {
dispatcher.dispatch(new StringMessage("test"));
@@ -213,12 +213,12 @@ public class SimpleDispatcherTests {
public void firstHandlerReturnsTrue() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageConsumer target1 = new CountingTestEndpoint(counter, true);
MessageConsumer target2 = new CountingTestEndpoint(counter, false);
MessageConsumer target3 = new CountingTestEndpoint(counter, false);
dispatcher.addConsumer(target1);
dispatcher.addConsumer(target2);
dispatcher.addConsumer(target3);
MessageHandler target1 = new CountingTestEndpoint(counter, true);
MessageHandler target2 = new CountingTestEndpoint(counter, false);
MessageHandler target3 = new CountingTestEndpoint(counter, false);
dispatcher.addHandler(target1);
dispatcher.addHandler(target2);
dispatcher.addHandler(target3);
assertTrue(dispatcher.dispatch(new StringMessage("test")));
assertEquals("only the first target should have been invoked", 1, counter.get());
}
@@ -227,12 +227,12 @@ public class SimpleDispatcherTests {
public void middleHandlerReturnsTrue() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageConsumer target1 = new CountingTestEndpoint(counter, false);
MessageConsumer target2 = new CountingTestEndpoint(counter, true);
MessageConsumer target3 = new CountingTestEndpoint(counter, false);
dispatcher.addConsumer(target1);
dispatcher.addConsumer(target2);
dispatcher.addConsumer(target3);
MessageHandler target1 = new CountingTestEndpoint(counter, false);
MessageHandler target2 = new CountingTestEndpoint(counter, true);
MessageHandler target3 = new CountingTestEndpoint(counter, false);
dispatcher.addHandler(target1);
dispatcher.addHandler(target2);
dispatcher.addHandler(target3);
assertTrue(dispatcher.dispatch(new StringMessage("test")));
assertEquals("first two targets should have been invoked", 2, counter.get());
}
@@ -241,12 +241,12 @@ public class SimpleDispatcherTests {
public void allHandlersReturnFalse() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageConsumer target1 = new CountingTestEndpoint(counter, false);
MessageConsumer target2 = new CountingTestEndpoint(counter, false);
MessageConsumer target3 = new CountingTestEndpoint(counter, false);
dispatcher.addConsumer(target1);
dispatcher.addConsumer(target2);
dispatcher.addConsumer(target3);
MessageHandler target1 = new CountingTestEndpoint(counter, false);
MessageHandler target2 = new CountingTestEndpoint(counter, false);
MessageHandler target3 = new CountingTestEndpoint(counter, false);
dispatcher.addHandler(target1);
dispatcher.addHandler(target2);
dispatcher.addHandler(target3);
try {
assertFalse(dispatcher.dispatch(new StringMessage("test")));
}
@@ -257,8 +257,8 @@ public class SimpleDispatcherTests {
}
private static ServiceActivatingConsumer createConsumer(Object object) {
return new ServiceActivatingConsumer(object);
private static ServiceActivatingHandler createConsumer(Object object) {
return new ServiceActivatingHandler(object);
}
@@ -280,7 +280,7 @@ public class SimpleDispatcherTests {
}
private static class CountingTestEndpoint implements MessageConsumer {
private static class CountingTestEndpoint implements MessageHandler {
private final AtomicInteger counter;
@@ -291,7 +291,7 @@ public class SimpleDispatcherTests {
this.shouldAccept = shouldAccept;
}
public void onMessage(Message<?> message) {
public void handleMessage(Message<?> message) {
this.counter.incrementAndGet();
if (!this.shouldAccept) {
throw new MessageRejectedException(message, "intentional test failure");

View File

@@ -23,7 +23,7 @@ import org.junit.Test;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.consumer.ServiceActivatingConsumer;
import org.springframework.integration.consumer.ServiceActivatingHandler;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.StringMessage;
@@ -42,9 +42,9 @@ public class CorrelationIdTests {
.setCorrelationId(correlationId).build();
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel(1);
ServiceActivatingConsumer serviceActivator = new ServiceActivatingConsumer(new TestBean(), "upperCase");
ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(new TestBean(), "upperCase");
serviceActivator.setOutputChannel(outputChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(inputChannel, serviceActivator);
endpoint.start();
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
@@ -57,9 +57,9 @@ public class CorrelationIdTests {
.setCorrelationId("correlationId").build();
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel(1);
ServiceActivatingConsumer serviceActivator = new ServiceActivatingConsumer(new TestBean(), "upperCase");
ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(new TestBean(), "upperCase");
serviceActivator.setOutputChannel(outputChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(inputChannel, serviceActivator);
endpoint.start();
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
@@ -74,9 +74,9 @@ public class CorrelationIdTests {
.setCorrelationId(correlationId).build();
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel(1);
ServiceActivatingConsumer serviceActivator = new ServiceActivatingConsumer(new TestBean(), "createMessage");
ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(new TestBean(), "createMessage");
serviceActivator.setOutputChannel(outputChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(inputChannel, serviceActivator);
endpoint.start();
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
@@ -88,9 +88,9 @@ public class CorrelationIdTests {
Message<?> message = new StringMessage("test");
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel(1);
ServiceActivatingConsumer serviceActivator = new ServiceActivatingConsumer(new TestBean(), "createMessage");
ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(new TestBean(), "createMessage");
serviceActivator.setOutputChannel(outputChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(inputChannel, serviceActivator);
endpoint.start();
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
@@ -105,7 +105,7 @@ public class CorrelationIdTests {
new TestBean(), TestBean.class.getMethod("split", String.class));
splitter.setOutputChannel(testChannel);
splitter.afterPropertiesSet();
splitter.onMessage(message);
splitter.handleMessage(message);
Message<?> reply1 = testChannel.receive(100);
Message<?> reply2 = testChannel.receive(100);
assertEquals(message.getHeaders().getId(), reply1.getHeaders().getCorrelationId());

View File

@@ -37,7 +37,7 @@ import org.junit.Test;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.MessageHandler;
import org.springframework.integration.message.MessageRejectedException;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.scheduling.SimpleTaskScheduler;
@@ -72,7 +72,7 @@ public class PollingConsumerEndpointTests {
public void init() throws InterruptedException {
consumer.counter.set(0);
trigger.reset();
endpoint = new PollingConsumerEndpoint(consumer, channelMock);
endpoint = new PollingConsumerEndpoint(channelMock, consumer);
endpoint.setTaskScheduler(taskScheduler);
taskScheduler.setErrorHandler(errorHandler);
taskScheduler.start();
@@ -178,11 +178,11 @@ public class PollingConsumerEndpointTests {
}
private static class TestConsumer implements MessageConsumer {
private static class TestConsumer implements MessageHandler {
private volatile AtomicInteger counter = new AtomicInteger();
public void onMessage(Message<?> message) {
public void handleMessage(Message<?> message) {
this.counter.incrementAndGet();
if ("bad".equals(message.getPayload().toString())) {
throw new MessageRejectedException(message, "intentional test failure");

View File

@@ -30,7 +30,7 @@ import org.junit.Test;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.channel.TestChannelResolver;
import org.springframework.integration.consumer.ServiceActivatingConsumer;
import org.springframework.integration.consumer.ServiceActivatingHandler;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessagingException;
import org.springframework.integration.message.MessageBuilder;
@@ -50,10 +50,10 @@ public class ServiceActivatorEndpointTests {
@Test
public void outputChannel() {
QueueChannel channel = new QueueChannel(1);
ServiceActivatingConsumer endpoint = this.createEndpoint();
ServiceActivatingHandler endpoint = this.createEndpoint();
endpoint.setOutputChannel(channel);
Message<?> message = MessageBuilder.withPayload("foo").build();
endpoint.onMessage(message);
endpoint.handleMessage(message);
Message<?> reply = channel.receive(0);
assertNotNull(reply);
assertEquals("FOO", reply.getPayload());
@@ -63,10 +63,10 @@ public class ServiceActivatorEndpointTests {
public void outputChannelTakesPrecedence() {
QueueChannel channel1 = new QueueChannel(1);
QueueChannel channel2 = new QueueChannel(1);
ServiceActivatingConsumer endpoint = this.createEndpoint();
ServiceActivatingHandler endpoint = this.createEndpoint();
endpoint.setOutputChannel(channel1);
Message<?> message = MessageBuilder.withPayload("foo").setReplyChannel(channel2).build();
endpoint.onMessage(message);
endpoint.handleMessage(message);
Message<?> reply1 = channel1.receive(0);
assertNotNull(reply1);
assertEquals("FOO", reply1.getPayload());
@@ -77,9 +77,9 @@ public class ServiceActivatorEndpointTests {
@Test
public void returnAddressHeader() {
QueueChannel channel = new QueueChannel(1);
ServiceActivatingConsumer endpoint = this.createEndpoint();
ServiceActivatingHandler endpoint = this.createEndpoint();
Message<?> message = MessageBuilder.withPayload("foo").setReplyChannel(channel).build();
endpoint.onMessage(message);
endpoint.handleMessage(message);
Message<?> reply = channel.receive(0);
assertNotNull(reply);
assertEquals("FOO", reply.getPayload());
@@ -91,11 +91,11 @@ public class ServiceActivatorEndpointTests {
channel.setBeanName("testChannel");
TestChannelResolver channelResolver = new TestChannelResolver();
channelResolver.addChannel(channel);
ServiceActivatingConsumer endpoint = this.createEndpoint();
ServiceActivatingHandler endpoint = this.createEndpoint();
endpoint.setChannelResolver(channelResolver);
Message<?> message = MessageBuilder.withPayload("foo")
.setReplyChannelName("testChannel").build();
endpoint.onMessage(message);
endpoint.handleMessage(message);
Message<?> reply = channel.receive(0);
assertNotNull(reply);
assertEquals("FOO", reply.getPayload());
@@ -112,13 +112,13 @@ public class ServiceActivatorEndpointTests {
return new StringMessage("foo" + message.getPayload());
}
};
ServiceActivatingConsumer endpoint = new ServiceActivatingConsumer(handler, "handle");
ServiceActivatingHandler endpoint = new ServiceActivatingHandler(handler, "handle");
TestChannelResolver channelResolver = new TestChannelResolver();
channelResolver.addChannel(replyChannel2);
endpoint.setChannelResolver(channelResolver);
Message<String> testMessage1 = MessageBuilder.withPayload("bar")
.setReplyChannel(replyChannel1).build();
endpoint.onMessage(testMessage1);
endpoint.handleMessage(testMessage1);
Message<?> reply1 = replyChannel1.receive(50);
assertNotNull(reply1);
assertEquals("foobar", reply1.getPayload());
@@ -126,7 +126,7 @@ public class ServiceActivatorEndpointTests {
assertNull(reply2);
Message<String> testMessage2 = MessageBuilder.fromMessage(testMessage1)
.setReplyChannelName("replyChannel2").build();
endpoint.onMessage(testMessage2);
endpoint.handleMessage(testMessage2);
reply1 = replyChannel1.receive(0);
assertNull(reply1);
reply2 = replyChannel2.receive(0);
@@ -137,9 +137,9 @@ public class ServiceActivatorEndpointTests {
@Test
public void noOutputChannelFallsBackToReturnAddress() {
QueueChannel channel = new QueueChannel(1);
ServiceActivatingConsumer endpoint = this.createEndpoint();
ServiceActivatingHandler endpoint = this.createEndpoint();
Message<?> message = MessageBuilder.withPayload("foo").setReplyChannel(channel).build();
endpoint.onMessage(message);
endpoint.handleMessage(message);
Message<?> reply = channel.receive(0);
assertNotNull(reply);
assertEquals("FOO", reply.getPayload());
@@ -147,56 +147,56 @@ public class ServiceActivatorEndpointTests {
@Test(expected = MessagingException.class)
public void noReplyTarget() {
ServiceActivatingConsumer endpoint = this.createEndpoint();
ServiceActivatingHandler endpoint = this.createEndpoint();
Message<?> message = MessageBuilder.withPayload("foo").build();
endpoint.onMessage(message);
endpoint.handleMessage(message);
}
@Test
public void noReplyMessage() {
QueueChannel channel = new QueueChannel(1);
ServiceActivatingConsumer endpoint = new ServiceActivatingConsumer(
ServiceActivatingHandler endpoint = new ServiceActivatingHandler(
new TestNullReplyBean(), "handle");
endpoint.setOutputChannel(channel);
Message<?> message = MessageBuilder.withPayload("foo").build();
endpoint.onMessage(message);
endpoint.handleMessage(message);
assertNull(channel.receive(0));
}
@Test(expected = MessageHandlingException.class)
public void noReplyMessageWithRequiresReply() {
QueueChannel channel = new QueueChannel(1);
ServiceActivatingConsumer endpoint = new ServiceActivatingConsumer(
ServiceActivatingHandler endpoint = new ServiceActivatingHandler(
new TestNullReplyBean(), "handle");
endpoint.setRequiresReply(true);
endpoint.setOutputChannel(channel);
Message<?> message = MessageBuilder.withPayload("foo").build();
endpoint.onMessage(message);
endpoint.handleMessage(message);
}
@Test(expected=MessageRejectedException.class)
public void endpointWithSelectorRejecting() {
ServiceActivatingConsumer endpoint = new ServiceActivatingConsumer(
ServiceActivatingHandler endpoint = new ServiceActivatingHandler(
TestHandlers.nullHandler(), "handle");
endpoint.setSelector(new MessageSelector() {
public boolean accept(Message<?> message) {
return false;
}
});
endpoint.onMessage(new StringMessage("test"));
endpoint.handleMessage(new StringMessage("test"));
}
@Test
public void endpointWithSelectorAccepting() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
ServiceActivatingConsumer endpoint = new ServiceActivatingConsumer(
ServiceActivatingHandler endpoint = new ServiceActivatingHandler(
TestHandlers.countDownHandler(latch), "handle");
endpoint.setSelector(new MessageSelector() {
public boolean accept(Message<?> message) {
return true;
}
});
endpoint.onMessage(new StringMessage("test"));
endpoint.handleMessage(new StringMessage("test"));
latch.await(100, TimeUnit.MILLISECONDS);
assertEquals("handler should have been invoked", 0, latch.getCount());
}
@@ -204,7 +204,7 @@ public class ServiceActivatorEndpointTests {
@Test
public void endpointWithMultipleSelectorsAndFirstRejects() {
final AtomicInteger counter = new AtomicInteger();
ServiceActivatingConsumer endpoint = new ServiceActivatingConsumer(
ServiceActivatingHandler endpoint = new ServiceActivatingHandler(
TestHandlers.countingHandler(counter), "handle");
MessageSelectorChain selectorChain = new MessageSelectorChain();
selectorChain.add(new MessageSelector() {
@@ -222,7 +222,7 @@ public class ServiceActivatorEndpointTests {
endpoint.setSelector(selectorChain);
boolean exceptionWasThrown = false;
try {
endpoint.onMessage(new StringMessage("test"));
endpoint.handleMessage(new StringMessage("test"));
}
catch (MessageRejectedException e) {
exceptionWasThrown = true;
@@ -235,7 +235,7 @@ public class ServiceActivatorEndpointTests {
public void endpointWithMultipleSelectorsAndFirstAccepts() {
final AtomicInteger selectorCounter = new AtomicInteger();
AtomicInteger handlerCounter = new AtomicInteger();
ServiceActivatingConsumer endpoint = new ServiceActivatingConsumer(
ServiceActivatingHandler endpoint = new ServiceActivatingHandler(
TestHandlers.countingHandler(handlerCounter), "handle");
MessageSelectorChain selectorChain = new MessageSelectorChain();
selectorChain.add(new MessageSelector() {
@@ -253,7 +253,7 @@ public class ServiceActivatorEndpointTests {
endpoint.setSelector(selectorChain);
boolean exceptionWasThrown = false;
try {
endpoint.onMessage(new StringMessage("test"));
endpoint.handleMessage(new StringMessage("test"));
}
catch (MessageRejectedException e) {
exceptionWasThrown = true;
@@ -266,7 +266,7 @@ public class ServiceActivatorEndpointTests {
@Test
public void endpointWithMultipleSelectorsAndBothAccept() {
final AtomicInteger counter = new AtomicInteger();
ServiceActivatingConsumer endpoint = new ServiceActivatingConsumer(
ServiceActivatingHandler endpoint = new ServiceActivatingHandler(
TestHandlers.countingHandler(counter), "handle");
MessageSelectorChain selectorChain = new MessageSelectorChain();
selectorChain.add(new MessageSelector() {
@@ -282,14 +282,14 @@ public class ServiceActivatorEndpointTests {
}
});
endpoint.setSelector(selectorChain);
endpoint.onMessage(new StringMessage("test"));
endpoint.handleMessage(new StringMessage("test"));
assertEquals("both selectors and handler should have been invoked", 3, counter.get());
}
@Test
public void correlationIdNotSetIfMessageIsReturnedUnaltered() {
QueueChannel replyChannel = new QueueChannel(1);
ServiceActivatingConsumer endpoint = new ServiceActivatingConsumer(new Object() {
ServiceActivatingHandler endpoint = new ServiceActivatingHandler(new Object() {
@SuppressWarnings("unused")
public Message<?> handle(Message<?> message) {
return message;
@@ -297,7 +297,7 @@ public class ServiceActivatorEndpointTests {
}, "handle");
Message<String> message = MessageBuilder.withPayload("test")
.setReplyChannel(replyChannel).build();
endpoint.onMessage(message);
endpoint.handleMessage(message);
Message<?> reply = replyChannel.receive(500);
assertNull(reply.getHeaders().getCorrelationId());
}
@@ -305,7 +305,7 @@ public class ServiceActivatorEndpointTests {
@Test
public void correlationIdSetByHandlerTakesPrecedence() {
QueueChannel replyChannel = new QueueChannel(1);
ServiceActivatingConsumer endpoint = new ServiceActivatingConsumer(new Object() {
ServiceActivatingHandler endpoint = new ServiceActivatingHandler(new Object() {
@SuppressWarnings("unused")
public Message<?> handle(Message<?> message) {
return MessageBuilder.fromMessage(message)
@@ -314,7 +314,7 @@ public class ServiceActivatorEndpointTests {
}, "handle");
Message<String> message = MessageBuilder.withPayload("test")
.setReplyChannel(replyChannel).build();
endpoint.onMessage(message);
endpoint.handleMessage(message);
Message<?> reply = replyChannel.receive(500);
Object correlationId = reply.getHeaders().getCorrelationId();
assertFalse(message.getHeaders().getId().equals(correlationId));
@@ -322,8 +322,8 @@ public class ServiceActivatorEndpointTests {
}
private ServiceActivatingConsumer createEndpoint() {
return new ServiceActivatingConsumer(new TestBean(), "handle");
private ServiceActivatingHandler createEndpoint() {
return new ServiceActivatingHandler(new TestBean(), "handle");
}

View File

@@ -22,7 +22,7 @@ import org.junit.Test;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.consumer.ServiceActivatingConsumer;
import org.springframework.integration.consumer.ServiceActivatingHandler;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.StringMessage;
@@ -34,10 +34,10 @@ public class ServiceActivatorMethodResolutionTests {
@Test
public void singleAnnotationMatches() {
SingleAnnotationTestBean testBean = new SingleAnnotationTestBean();
ServiceActivatingConsumer serviceActivator = new ServiceActivatingConsumer(testBean);
ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(testBean);
QueueChannel outputChannel = new QueueChannel();
serviceActivator.setOutputChannel(outputChannel);
serviceActivator.onMessage(new StringMessage("foo"));
serviceActivator.handleMessage(new StringMessage("foo"));
Message<?> result = outputChannel.receive(0);
assertEquals("FOO", result.getPayload());
}
@@ -45,16 +45,16 @@ public class ServiceActivatorMethodResolutionTests {
@Test(expected = IllegalArgumentException.class)
public void multipleAnnotationFails() {
MultipleAnnotationTestBean testBean = new MultipleAnnotationTestBean();
new ServiceActivatingConsumer(testBean);
new ServiceActivatingHandler(testBean);
}
@Test
public void singlePublicMethodMatches() {
SinglePublicMethodTestBean testBean = new SinglePublicMethodTestBean();
ServiceActivatingConsumer serviceActivator = new ServiceActivatingConsumer(testBean);
ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(testBean);
QueueChannel outputChannel = new QueueChannel();
serviceActivator.setOutputChannel(outputChannel);
serviceActivator.onMessage(new StringMessage("foo"));
serviceActivator.handleMessage(new StringMessage("foo"));
Message<?> result = outputChannel.receive(0);
assertEquals("FOO", result.getPayload());
}
@@ -62,7 +62,7 @@ public class ServiceActivatorMethodResolutionTests {
@Test(expected = IllegalArgumentException.class)
public void multiplePublicMethodFails() {
MultiplePublicMethodTestBean testBean = new MultiplePublicMethodTestBean();
new ServiceActivatingConsumer(testBean);
new ServiceActivatingHandler(testBean);
}

View File

@@ -46,7 +46,7 @@ public class MessageFilterTests {
Message<?> message = new StringMessage("test");
QueueChannel output = new QueueChannel();
filter.setOutputChannel(output);
filter.onMessage(message);
filter.handleMessage(message);
assertEquals(message, output.receive(0));
}
@@ -59,7 +59,7 @@ public class MessageFilterTests {
});
QueueChannel output = new QueueChannel();
filter.setOutputChannel(output);
filter.onMessage(new StringMessage("test"));
filter.handleMessage(new StringMessage("test"));
assertNull(output.receive(0));
}
@@ -73,7 +73,7 @@ public class MessageFilterTests {
}
});
filter.setOutputChannel(outputChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(filter, inputChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(inputChannel, filter);
endpoint.start();
Message<?> message = new StringMessage("test");
assertTrue(inputChannel.send(message));
@@ -92,7 +92,7 @@ public class MessageFilterTests {
}
});
filter.setOutputChannel(outputChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(filter, inputChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(inputChannel, filter);
endpoint.start();
Message<?> message = new StringMessage("test");
assertTrue(inputChannel.send(message));

View File

@@ -29,7 +29,7 @@ import org.junit.Test;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.consumer.MethodInvokingConsumer;
import org.springframework.integration.consumer.MethodInvokingMessageHandler;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessagingException;
import org.springframework.integration.endpoint.PollingConsumerEndpoint;
@@ -38,28 +38,28 @@ import org.springframework.integration.util.TestUtils;
/**
* @author Mark Fisher
*/
public class MethodInvokingConsumerTests {
public class MethodInvokingMessageHandlerTests {
@Test
public void validMethod() {
MethodInvokingConsumer consumer = new MethodInvokingConsumer(new TestSink(), "validMethod");
consumer.afterPropertiesSet();
consumer.onMessage(new GenericMessage<String>("test"));
MethodInvokingMessageHandler handler = new MethodInvokingMessageHandler(new TestSink(), "validMethod");
handler.afterPropertiesSet();
handler.handleMessage(new GenericMessage<String>("test"));
}
@Test(expected = IllegalArgumentException.class)
public void invalidMethodWithNoArgs() {
MethodInvokingConsumer consumer = new MethodInvokingConsumer(new TestSink(), "invalidMethodWithNoArgs");
consumer.afterPropertiesSet();
MethodInvokingMessageHandler handler = new MethodInvokingMessageHandler(new TestSink(), "invalidMethodWithNoArgs");
handler.afterPropertiesSet();
}
@Test(expected = MessagingException.class)
public void methodWithReturnValue() {
Message<?> message = new StringMessage("test");
try {
MethodInvokingConsumer consumer = new MethodInvokingConsumer(new TestSink(), "methodWithReturnValue");
consumer.afterPropertiesSet();
consumer.onMessage(message);
MethodInvokingMessageHandler handler = new MethodInvokingMessageHandler(new TestSink(), "methodWithReturnValue");
handler.afterPropertiesSet();
handler.handleMessage(message);
}
catch (MessagingException e) {
assertEquals(e.getFailedMessage(), message);
@@ -69,8 +69,8 @@ public class MethodInvokingConsumerTests {
@Test(expected = IllegalArgumentException.class)
public void noMatchingMethodName() {
MethodInvokingConsumer consumer = new MethodInvokingConsumer(new TestSink(), "noSuchMethod");
consumer.afterPropertiesSet();
MethodInvokingMessageHandler handler = new MethodInvokingMessageHandler(new TestSink(), "noSuchMethod");
handler.afterPropertiesSet();
}
@Test
@@ -84,8 +84,8 @@ public class MethodInvokingConsumerTests {
Message<String> message = new GenericMessage<String>("testing");
channel.send(message);
assertNull(queue.poll());
MethodInvokingConsumer consumer = new MethodInvokingConsumer(testBean, "foo");
PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(consumer, channel);
MethodInvokingMessageHandler handler = new MethodInvokingMessageHandler(testBean, "foo");
PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(channel, handler);
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));

View File

@@ -64,7 +64,7 @@ public class ErrorMessageExceptionTypeRouterTests {
exceptionTypeChannelMap.put(MessageHandlingException.class, messageHandlingExceptionChannel);
router.setExceptionTypeChannelMap(exceptionTypeChannelMap);
router.setDefaultOutputChannel(defaultChannel);
router.onMessage(message);
router.handleMessage(message);
assertNotNull(illegalArgumentChannel.receive(1000));
assertNull(defaultChannel.receive(0));
assertNull(runtimeExceptionChannel.receive(0));
@@ -85,7 +85,7 @@ public class ErrorMessageExceptionTypeRouterTests {
exceptionTypeChannelMap.put(MessageHandlingException.class, messageHandlingExceptionChannel);
router.setExceptionTypeChannelMap(exceptionTypeChannelMap);
router.setDefaultOutputChannel(defaultChannel);
router.onMessage(message);
router.handleMessage(message);
assertNotNull(runtimeExceptionChannel.receive(1000));
assertNull(illegalArgumentChannel.receive(0));
assertNull(defaultChannel.receive(0));
@@ -105,7 +105,7 @@ public class ErrorMessageExceptionTypeRouterTests {
exceptionTypeChannelMap.put(MessageHandlingException.class, messageHandlingExceptionChannel);
router.setExceptionTypeChannelMap(exceptionTypeChannelMap);
router.setDefaultOutputChannel(defaultChannel);
router.onMessage(message);
router.handleMessage(message);
assertNotNull(messageHandlingExceptionChannel.receive(1000));
assertNull(runtimeExceptionChannel.receive(0));
assertNull(illegalArgumentChannel.receive(0));
@@ -121,7 +121,7 @@ public class ErrorMessageExceptionTypeRouterTests {
ErrorMessage message = new ErrorMessage(error);
ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
router.setDefaultOutputChannel(defaultChannel);
router.onMessage(message);
router.handleMessage(message);
assertNotNull(defaultChannel.receive(1000));
assertNull(runtimeExceptionChannel.receive(0));
assertNull(illegalArgumentChannel.receive(0));
@@ -141,7 +141,7 @@ public class ErrorMessageExceptionTypeRouterTests {
exceptionTypeChannelMap.put(MessageDeliveryException.class, messageDeliveryExceptionChannel);
router.setExceptionTypeChannelMap(exceptionTypeChannelMap);
router.setResolutionRequired(true);
router.onMessage(message);
router.handleMessage(message);
}
@Test
@@ -159,7 +159,7 @@ public class ErrorMessageExceptionTypeRouterTests {
exceptionTypeChannelMap.put(MessageHandlingException.class, messageHandlingExceptionChannel);
router.setExceptionTypeChannelMap(exceptionTypeChannelMap);
router.setDefaultOutputChannel(defaultChannel);
router.onMessage(message);
router.handleMessage(message);
assertNotNull(illegalArgumentChannel.receive(1000));
assertNull(defaultChannel.receive(0));
assertNull(runtimeExceptionChannel.receive(0));
@@ -180,7 +180,7 @@ public class ErrorMessageExceptionTypeRouterTests {
exceptionTypeChannelMap.put(MessageHandlingException.class, messageHandlingExceptionChannel);
router.setExceptionTypeChannelMap(exceptionTypeChannelMap);
router.setDefaultOutputChannel(defaultChannel);
router.onMessage(message);
router.handleMessage(message);
assertNotNull(illegalArgumentChannel.receive(1000));
assertNull(defaultChannel.receive(0));
assertNull(runtimeExceptionChannel.receive(0));

View File

@@ -53,7 +53,7 @@ public class MethodInvokingRouterTests {
MethodInvokingRouter router = new MethodInvokingRouter(testBean, routingMethod);
router.setChannelResolver(channelResolver);
Message<String> message = new GenericMessage<String>("bar");
router.onMessage(message);
router.handleMessage(message);
Message<?> replyMessage = barChannel.receive();
assertNotNull(replyMessage);
assertEquals(message, replyMessage);
@@ -69,7 +69,7 @@ public class MethodInvokingRouterTests {
MethodInvokingRouter router = new MethodInvokingRouter(testBean, "routePayload");
router.setChannelResolver(channelResolver);
Message<String> message = new GenericMessage<String>("bar");
router.onMessage(message);
router.handleMessage(message);
Message<?> replyMessage = barChannel.receive();
assertNotNull(replyMessage);
assertEquals(message, replyMessage);
@@ -90,7 +90,7 @@ public class MethodInvokingRouterTests {
router.setChannelResolver(channelResolver);
Message<String> message = MessageBuilder.withPayload("bar")
.setHeader("targetChannel", "foo").build();
router.onMessage(message);
router.handleMessage(message);
Message<?> fooReply = fooChannel.receive(0);
Message<?> barReply = barChannel.receive(0);
assertNotNull(fooReply);
@@ -103,7 +103,7 @@ public class MethodInvokingRouterTests {
SingleChannelNameRoutingTestBean testBean = new SingleChannelNameRoutingTestBean();
Method routingMethod = testBean.getClass().getMethod("routeByHeader", String.class);
MethodInvokingRouter router = new MethodInvokingRouter(testBean, routingMethod);
router.onMessage(new GenericMessage<String>("testing"));
router.handleMessage(new GenericMessage<String>("testing"));
}
@Test
@@ -133,15 +133,15 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
router.onMessage(fooMessage);
router.handleMessage(fooMessage);
Message<?> result1 = fooChannel.receive(0);
assertNotNull(result1);
assertEquals("foo", result1.getPayload());
router.onMessage(barMessage);
router.handleMessage(barMessage);
Message<?> result2 = barChannel.receive(0);
assertNotNull(result2);
assertEquals("bar", result2.getPayload());
router.onMessage(badMessage);
router.handleMessage(badMessage);
}
@Test
@@ -172,15 +172,15 @@ public class MethodInvokingRouterTests {
channelResolver.addChannel(fooChannel);
channelResolver.addChannel(barChannel);
router.setChannelResolver(channelResolver);
router.onMessage(fooMessage);
router.handleMessage(fooMessage);
Message<?> result1 = fooChannel.receive(0);
assertNotNull(result1);
assertEquals("foo", result1.getPayload());
router.onMessage(barMessage);
router.handleMessage(barMessage);
Message<?> result2 = barChannel.receive(0);
assertNotNull(result2);
assertEquals("bar", result2.getPayload());
router.onMessage(badMessage);
router.handleMessage(badMessage);
}
@Test
@@ -211,15 +211,15 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
router.onMessage(fooMessage);
router.handleMessage(fooMessage);
Message<?> result1 = fooChannel.receive(0);
assertNotNull(result1);
assertEquals("foo", result1.getPayload());
router.onMessage(barMessage);
router.handleMessage(barMessage);
Message<?> result2 = barChannel.receive(0);
assertNotNull(result2);
assertEquals("bar", result2.getPayload());
router.onMessage(badMessage);
router.handleMessage(badMessage);
}
@Test
@@ -250,21 +250,21 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
router.onMessage(fooMessage);
router.handleMessage(fooMessage);
Message<?> result1a = fooChannel.receive(0);
Message<?> result1b = barChannel.receive(0);
assertNotNull(result1a);
assertEquals("foo", result1a.getPayload());
assertNotNull(result1b);
assertEquals("foo", result1b.getPayload());
router.onMessage(barMessage);
router.handleMessage(barMessage);
Message<?> result2a = fooChannel.receive(0);
Message<?> result2b = barChannel.receive(0);
assertNotNull(result2a);
assertEquals("bar", result2a.getPayload());
assertNotNull(result2b);
assertEquals("bar", result2b.getPayload());
router.onMessage(badMessage);
router.handleMessage(badMessage);
}
@Test
@@ -295,21 +295,21 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
router.onMessage(fooMessage);
router.handleMessage(fooMessage);
Message<?> result1a = fooChannel.receive(0);
assertNotNull(result1a);
assertEquals("foo", result1a.getPayload());
Message<?> result1b = barChannel.receive(0);
assertNotNull(result1b);
assertEquals("foo", result1b.getPayload());
router.onMessage(barMessage);
router.handleMessage(barMessage);
Message<?> result2a = fooChannel.receive(0);
assertNotNull(result2a);
assertEquals("bar", result2a.getPayload());
Message<?> result2b = barChannel.receive(0);
assertNotNull(result2b);
assertEquals("bar", result2b.getPayload());
router.onMessage(badMessage);
router.handleMessage(badMessage);
}
@Test
@@ -340,21 +340,21 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
router.onMessage(fooMessage);
router.handleMessage(fooMessage);
Message<?> result1a = fooChannel.receive(0);
assertNotNull(result1a);
assertEquals("foo", result1a.getPayload());
Message<?> result1b = barChannel.receive(0);
assertNotNull(result1b);
assertEquals("foo", result1b.getPayload());
router.onMessage(barMessage);
router.handleMessage(barMessage);
Message<?> result2a = fooChannel.receive(0);
assertNotNull(result2a);
assertEquals("bar", result2a.getPayload());
Message<?> result2b = barChannel.receive(0);
assertNotNull(result2b);
assertEquals("bar", result2b.getPayload());
router.onMessage(badMessage);
router.handleMessage(badMessage);
}
@Test
@@ -385,21 +385,21 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
router.onMessage(fooMessage);
router.handleMessage(fooMessage);
Message<?> result1a = fooChannel.receive(0);
Message<?> result1b = barChannel.receive(0);
assertNotNull(result1a);
assertEquals("foo", result1a.getPayload());
assertNotNull(result1b);
assertEquals("foo", result1b.getPayload());
router.onMessage(barMessage);
router.handleMessage(barMessage);
Message<?> result2a = fooChannel.receive(0);
Message<?> result2b = barChannel.receive(0);
assertNotNull(result2a);
assertEquals("bar", result2a.getPayload());
assertNotNull(result2b);
assertEquals("bar", result2b.getPayload());
router.onMessage(badMessage);
router.handleMessage(badMessage);
}
@Test
@@ -430,21 +430,21 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
router.onMessage(fooMessage);
router.handleMessage(fooMessage);
Message<?> result1a = fooChannel.receive(0);
Message<?> result1b = barChannel.receive(0);
assertNotNull(result1a);
assertEquals("foo", result1a.getPayload());
assertNotNull(result1b);
assertEquals("foo", result1b.getPayload());
router.onMessage(barMessage);
router.handleMessage(barMessage);
Message<?> result2a = fooChannel.receive(0);
Message<?> result2b = barChannel.receive(0);
assertNotNull(result2a);
assertEquals("bar", result2a.getPayload());
assertNotNull(result2b);
assertEquals("bar", result2b.getPayload());
router.onMessage(badMessage);
router.handleMessage(badMessage);
}
@Test
@@ -475,21 +475,21 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
router.onMessage(fooMessage);
router.handleMessage(fooMessage);
Message<?> result1a = fooChannel.receive(0);
Message<?> result1b = barChannel.receive(0);
assertNotNull(result1a);
assertEquals("foo", result1a.getPayload());
assertNotNull(result1b);
assertEquals("foo", result1b.getPayload());
router.onMessage(barMessage);
router.handleMessage(barMessage);
Message<?> result2a = fooChannel.receive(0);
Message<?> result2b = barChannel.receive(0);
assertNotNull(result2a);
assertEquals("bar", result2a.getPayload());
assertNotNull(result2b);
assertEquals("bar", result2b.getPayload());
router.onMessage(badMessage);
router.handleMessage(badMessage);
}

View File

@@ -48,7 +48,7 @@ public class MultiChannelRouterTests {
channelResolver.addChannel(channel2);
router.setChannelResolver(channelResolver);
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
Message<?> result1 = channel1.receive(25);
assertNotNull(result1);
assertEquals("test", result1.getPayload());
@@ -67,7 +67,7 @@ public class MultiChannelRouterTests {
TestChannelResolver channelResolver = new TestChannelResolver();
router.setChannelResolver(channelResolver);
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
}
@Test(expected = MessagingException.class)
@@ -78,7 +78,7 @@ public class MultiChannelRouterTests {
}
};
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
}
}

View File

@@ -65,8 +65,8 @@ public class PayloadTypeRouterTests {
router.setPayloadTypeChannelMap(payloadTypeChannelMap);
Message<String> message1 = new StringMessage("test");
Message<Integer> message2 = new GenericMessage<Integer>(123);
router.onMessage(message1);
router.onMessage(message2);
router.handleMessage(message1);
router.handleMessage(message2);
Message<?> reply1 = stringChannel.receive(0);
Message<?> reply2 = integerChannel.receive(0);
assertEquals("test", reply1.getPayload());
@@ -86,8 +86,8 @@ public class PayloadTypeRouterTests {
router.setDefaultOutputChannel(defaultChannel);
Message<String> message1 = new StringMessage("test");
Message<Integer> message2 = new GenericMessage<Integer>(123);
router.onMessage(message1);
router.onMessage(message2);
router.handleMessage(message1);
router.handleMessage(message2);
Message<?> result1 = stringChannel.receive(25);
assertNotNull(result1);
assertEquals("test", result1.getPayload());

View File

@@ -66,7 +66,7 @@ public class RecipientListRouterTests {
router.setChannels(channels);
router.afterPropertiesSet();
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
Message<?> result1 = channel1.receive(25);
assertNotNull(result1);
assertEquals("test", result1.getPayload());
@@ -82,7 +82,7 @@ public class RecipientListRouterTests {
RecipientListRouter router = new RecipientListRouter();
router.setChannels(Collections.singletonList((MessageChannel) channel));
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
Message<?> result1 = channel.receive(25);
assertNotNull(result1);
assertEquals("test", result1.getPayload());

View File

@@ -45,7 +45,7 @@ public class RouterTests {
}
};
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
}
@Test(expected = MessageDeliveryException.class)
@@ -57,7 +57,7 @@ public class RouterTests {
};
router.setResolutionRequired(true);
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
}
@Test
@@ -68,7 +68,7 @@ public class RouterTests {
}
};
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
}
@Test(expected = MessageDeliveryException.class)
@@ -80,7 +80,7 @@ public class RouterTests {
};
router.setResolutionRequired(true);
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
}
@Test
@@ -93,7 +93,7 @@ public class RouterTests {
TestChannelResolver channelResolver = new TestChannelResolver();
router.setChannelResolver(channelResolver);
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
}
@Test(expected = MessageDeliveryException.class)
@@ -107,7 +107,7 @@ public class RouterTests {
router.setChannelResolver(channelResolver);
router.setResolutionRequired(true);
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
}
@@ -121,7 +121,7 @@ public class RouterTests {
TestChannelResolver channelResolver = new TestChannelResolver();
router.setChannelResolver(channelResolver);
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
}
@Test(expected = MessageDeliveryException.class)
@@ -135,7 +135,7 @@ public class RouterTests {
router.setChannelResolver(channelResolver);
router.setResolutionRequired(true);
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
}
@Test(expected = MessagingException.class)
@@ -145,7 +145,7 @@ public class RouterTests {
return "notImportant";
}
};
router.onMessage(new StringMessage("this should fail"));
router.handleMessage(new StringMessage("this should fail"));
}
@Test(expected = MessagingException.class)
@@ -155,7 +155,7 @@ public class RouterTests {
return new String[] { "notImportant" };
}
};
router.onMessage(new StringMessage("this should fail"));
router.handleMessage(new StringMessage("this should fail"));
}
@Test
@@ -169,7 +169,7 @@ public class RouterTests {
GenericApplicationContext context = new GenericApplicationContext();
context.getBeanFactory().registerSingleton("testChannel", testChannel);
router.setBeanFactory(context);
router.onMessage(new StringMessage("test"));
router.handleMessage(new StringMessage("test"));
Message<?> reply = testChannel.receive(0);
assertEquals("test", reply.getPayload());
}
@@ -185,7 +185,7 @@ public class RouterTests {
GenericApplicationContext context = new GenericApplicationContext();
context.getBeanFactory().registerSingleton("testChannel", testChannel);
router.setBeanFactory(context);
router.onMessage(new StringMessage("test"));
router.handleMessage(new StringMessage("test"));
Message<?> reply = testChannel.receive(0);
assertEquals("test", reply.getPayload());
}

View File

@@ -42,7 +42,7 @@ public class SingleChannelRouterTests {
}
};
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
Message<?> result = channel.receive(25);
assertNotNull(result);
assertEquals("test", result.getPayload());
@@ -61,7 +61,7 @@ public class SingleChannelRouterTests {
channelResolver.addChannel(channel);
router.setChannelResolver(channelResolver);
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
Message<?> result = channel.receive(25);
assertNotNull(result);
assertEquals("test", result.getPayload());
@@ -75,7 +75,7 @@ public class SingleChannelRouterTests {
}
};
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
}
@Test(expected = MessagingException.class)
@@ -88,7 +88,7 @@ public class SingleChannelRouterTests {
TestChannelResolver channelResolver = new TestChannelResolver();
router.setChannelResolver(channelResolver);
Message<String> message = new StringMessage("test");
router.onMessage(message);
router.handleMessage(message);
}
}

View File

@@ -43,7 +43,7 @@ public class DefaultSplitterTests {
QueueChannel replyChannel = new QueueChannel();
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
assertEquals(3, replies.size());
Message<?> reply1 = replies.get(0);
@@ -64,7 +64,7 @@ public class DefaultSplitterTests {
QueueChannel replyChannel = new QueueChannel();
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
assertEquals(3, replies.size());
Message<?> reply1 = replies.get(0);
@@ -85,7 +85,7 @@ public class DefaultSplitterTests {
QueueChannel outputChannel = new QueueChannel(1);
DefaultMessageSplitter splitter = new DefaultMessageSplitter();
splitter.setOutputChannel(outputChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(splitter, inputChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(inputChannel, splitter);
endpoint.start();
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);

View File

@@ -47,7 +47,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = this.getSplitter("stringToStringArray");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -63,7 +63,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = this.getSplitter("stringToStringList");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -79,7 +79,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = this.getSplitter("messageToStringArray");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -95,7 +95,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = this.getSplitter("messageToStringList");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -111,7 +111,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = this.getSplitter("messageToMessageArray");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -127,7 +127,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = this.getSplitter("messageToMessageList");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -143,7 +143,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = this.getSplitter("stringToMessageArray");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -159,7 +159,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = this.getSplitter("stringToMessageList");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -175,7 +175,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "stringToStringArray");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -191,7 +191,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "stringToStringList");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -207,7 +207,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "messageToStringArray");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -223,7 +223,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "messageToStringList");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -239,7 +239,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "messageToMessageArray");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -255,7 +255,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "messageToMessageList");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -271,7 +271,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "stringToMessageArray");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -287,7 +287,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "stringToMessageList");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -303,7 +303,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = this.getSplitter("stringToStringArray");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -323,7 +323,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = this.getSplitter("messageToMessageList");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -344,7 +344,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = this.getSplitter("splitHeader");
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -362,7 +362,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, splittingMethod);
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -385,7 +385,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = new MethodInvokingSplitter(annotatedBean);
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);
@@ -407,7 +407,7 @@ public class MethodInvokingSplitterTests {
MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean);
QueueChannel replyChannel = new QueueChannel();
splitter.setOutputChannel(replyChannel);
splitter.onMessage(message);
splitter.handleMessage(message);
List<Message<?>> replies = replyChannel.clear();
Message<?> reply1 = replies.get(0);
assertNotNull(reply1);

View File

@@ -32,7 +32,7 @@ import org.springframework.aop.framework.ProxyFactory;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.consumer.ServiceActivatingConsumer;
import org.springframework.integration.consumer.ServiceActivatingHandler;
import org.springframework.integration.endpoint.SubscribingConsumerEndpoint;
import org.springframework.integration.message.StringMessage;
@@ -82,9 +82,9 @@ public class DefaultMethodResolverTests {
ProxyFactory proxyFactory = new ProxyFactory(testBean);
proxyFactory.setProxyTargetClass(false);
testBean = (GreetingService) proxyFactory.getProxy();
ServiceActivatingConsumer consumer = new ServiceActivatingConsumer(testBean);
consumer.setOutputChannel(output);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(consumer, input);
ServiceActivatingHandler handler = new ServiceActivatingHandler(testBean);
handler.setOutputChannel(output);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(input, handler);
endpoint.start();
input.send(new StringMessage("proxy"));
assertEquals("hello proxy", output.receive(0).getPayload());;
@@ -98,9 +98,9 @@ public class DefaultMethodResolverTests {
ProxyFactory proxyFactory = new ProxyFactory(testBean);
proxyFactory.setProxyTargetClass(true);
testBean = (GreetingService) proxyFactory.getProxy();
ServiceActivatingConsumer consumer = new ServiceActivatingConsumer(testBean);
consumer.setOutputChannel(output);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(consumer, input);
ServiceActivatingHandler handler = new ServiceActivatingHandler(testBean);
handler.setOutputChannel(output);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(input, handler);
endpoint.start();
input.send(new StringMessage("proxy"));
assertEquals("hello proxy", output.receive(0).getPayload());;