Refactored existing Message-consuming endpoints to only implement MessageConsumer (not MessageEndpoint). Now, either a PollingConsumerEndpoint or SubscribingConsumerEndpoint delegates to the MessageConsumer thereby separating the Lifecycle responsibilities and configuration settings (trigger, transactions, etc) since they are different for polling vs. subscribing and not relevant for simply consuming Messages. Essentially all MessageConsumers are now "event-driven" since a "polling consumer" is actually handled by the PollingConsumerEndpoint class. The next refactoring step involves renaming several components to clarify this endpoint vs. consumer distinction.
This commit is contained in:
@@ -52,7 +52,7 @@ public class AggregatorEndpointTests {
|
||||
this.aggregator = new AggregatorEndpoint(new TestAggregator());
|
||||
this.aggregator.setTaskScheduler(this.taskScheduler);
|
||||
this.taskScheduler.start();
|
||||
this.aggregator.onStart();
|
||||
this.aggregator.start();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -325,7 +325,7 @@ public class AggregatorEndpointTests {
|
||||
@After
|
||||
public void stopTaskScheduler() {
|
||||
this.taskScheduler.stop();
|
||||
this.aggregator.onStop();
|
||||
this.aggregator.stop();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ public class ResequencerEndpointTests {
|
||||
this.taskScheduler = Schedulers.createDefaultTaskScheduler(10);
|
||||
this.resequencer.setTaskScheduler(taskScheduler);
|
||||
taskScheduler.start();
|
||||
this.resequencer.onStart();
|
||||
this.resequencer.start();
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -149,7 +149,7 @@ public class ResequencerEndpointTests {
|
||||
|
||||
@After
|
||||
public void stopTaskScheduler() {
|
||||
this.resequencer.onStop();
|
||||
this.resequencer.stop();
|
||||
this.taskScheduler.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,7 +36,9 @@ import org.springframework.integration.channel.PollableChannel;
|
||||
import org.springframework.integration.channel.PublishSubscribeChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.endpoint.AbstractMessageHandlingEndpoint;
|
||||
import org.springframework.integration.endpoint.PollingConsumerEndpoint;
|
||||
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
|
||||
import org.springframework.integration.endpoint.SubscribingConsumerEndpoint;
|
||||
import org.springframework.integration.message.ErrorMessage;
|
||||
import org.springframework.integration.message.GenericMessage;
|
||||
import org.springframework.integration.message.Message;
|
||||
@@ -62,17 +64,18 @@ public class DefaultMessageBusTests {
|
||||
Message<String> message = MessageBuilder.withPayload("test")
|
||||
.setReturnAddress("targetChannel").build();
|
||||
sourceChannel.send(message);
|
||||
AbstractMessageHandlingEndpoint endpoint = new AbstractMessageHandlingEndpoint() {
|
||||
AbstractMessageHandlingEndpoint consumer = new AbstractMessageHandlingEndpoint() {
|
||||
public Message<?> handle(Message<?> message) {
|
||||
return message;
|
||||
}
|
||||
};
|
||||
endpoint.setBeanName("testEndpoint");
|
||||
endpoint.setInputChannel(sourceChannel);
|
||||
PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(consumer, sourceChannel);
|
||||
endpoint.afterPropertiesSet();
|
||||
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
|
||||
context.refresh();
|
||||
DefaultMessageBus bus = new DefaultMessageBus();
|
||||
bus.setApplicationContext(context);
|
||||
consumer.setChannelRegistry(bus);
|
||||
bus.start();
|
||||
Message<?> result = targetChannel.receive(3000);
|
||||
assertEquals("test", result.getPayload());
|
||||
@@ -116,12 +119,12 @@ public class DefaultMessageBusTests {
|
||||
QueueChannel inputChannel = new QueueChannel();
|
||||
QueueChannel outputChannel1 = new QueueChannel();
|
||||
QueueChannel outputChannel2 = new QueueChannel();
|
||||
AbstractMessageHandlingEndpoint endpoint1 = new AbstractMessageHandlingEndpoint() {
|
||||
AbstractMessageHandlingEndpoint consumer1 = new AbstractMessageHandlingEndpoint() {
|
||||
public Message<?> handle(Message<?> message) {
|
||||
return MessageBuilder.fromMessage(message).build();
|
||||
}
|
||||
};
|
||||
AbstractMessageHandlingEndpoint endpoint2 = new AbstractMessageHandlingEndpoint() {
|
||||
AbstractMessageHandlingEndpoint consumer2 = new AbstractMessageHandlingEndpoint() {
|
||||
public Message<?> handle(Message<?> message) {
|
||||
return MessageBuilder.fromMessage(message).build();
|
||||
}
|
||||
@@ -132,12 +135,12 @@ public class DefaultMessageBusTests {
|
||||
context.getBeanFactory().registerSingleton("input", inputChannel);
|
||||
context.getBeanFactory().registerSingleton("output1", outputChannel1);
|
||||
context.getBeanFactory().registerSingleton("output2", outputChannel2);
|
||||
endpoint1.setBeanName("testEndpoint1");
|
||||
endpoint1.setInputChannel(inputChannel);
|
||||
endpoint1.setOutputChannel(outputChannel1);
|
||||
endpoint2.setBeanName("testEndpoint2");
|
||||
endpoint2.setInputChannel(inputChannel);
|
||||
endpoint2.setOutputChannel(outputChannel2);
|
||||
consumer1.setOutputChannel(outputChannel1);
|
||||
consumer2.setOutputChannel(outputChannel2);
|
||||
PollingConsumerEndpoint endpoint1 = new PollingConsumerEndpoint(consumer1, inputChannel);
|
||||
endpoint1.afterPropertiesSet();
|
||||
PollingConsumerEndpoint endpoint2 = new PollingConsumerEndpoint(consumer2, inputChannel);
|
||||
endpoint2.afterPropertiesSet();
|
||||
context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1);
|
||||
context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2);
|
||||
DefaultMessageBus bus = new DefaultMessageBus();
|
||||
@@ -157,14 +160,14 @@ public class DefaultMessageBusTests {
|
||||
QueueChannel outputChannel1 = new QueueChannel();
|
||||
QueueChannel outputChannel2 = new QueueChannel();
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
AbstractMessageHandlingEndpoint endpoint1 = new AbstractMessageHandlingEndpoint() {
|
||||
AbstractMessageHandlingEndpoint consumer1 = new AbstractMessageHandlingEndpoint() {
|
||||
public Message<?> handle(Message<?> message) {
|
||||
Message<?> reply = MessageBuilder.fromMessage(message).build();
|
||||
latch.countDown();
|
||||
return reply;
|
||||
}
|
||||
};
|
||||
AbstractMessageHandlingEndpoint endpoint2 = new AbstractMessageHandlingEndpoint() {
|
||||
AbstractMessageHandlingEndpoint consumer2 = new AbstractMessageHandlingEndpoint() {
|
||||
public Message<?> handle(Message<?> message) {
|
||||
Message<?> reply = MessageBuilder.fromMessage(message).build();
|
||||
latch.countDown();
|
||||
@@ -177,12 +180,10 @@ public class DefaultMessageBusTests {
|
||||
context.getBeanFactory().registerSingleton("input", inputChannel);
|
||||
context.getBeanFactory().registerSingleton("output1", outputChannel1);
|
||||
context.getBeanFactory().registerSingleton("output2", outputChannel2);
|
||||
endpoint1.setBeanName("testEndpoint1");
|
||||
endpoint1.setInputChannel(inputChannel);
|
||||
endpoint1.setOutputChannel(outputChannel1);
|
||||
endpoint2.setBeanName("testEndpoint2");
|
||||
endpoint2.setInputChannel(inputChannel);
|
||||
endpoint2.setOutputChannel(outputChannel2);
|
||||
consumer1.setOutputChannel(outputChannel1);
|
||||
consumer2.setOutputChannel(outputChannel2);
|
||||
SubscribingConsumerEndpoint endpoint1 = new SubscribingConsumerEndpoint(consumer1, inputChannel);
|
||||
SubscribingConsumerEndpoint endpoint2 = new SubscribingConsumerEndpoint(consumer2, inputChannel);
|
||||
context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1);
|
||||
context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2);
|
||||
DefaultMessageBus bus = new DefaultMessageBus();
|
||||
@@ -246,14 +247,14 @@ public class DefaultMessageBusTests {
|
||||
errorChannel.setBeanName(ChannelRegistry.ERROR_CHANNEL_NAME);
|
||||
context.getBeanFactory().registerSingleton(ChannelRegistry.ERROR_CHANNEL_NAME, errorChannel);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
AbstractMessageHandlingEndpoint endpoint = new AbstractMessageHandlingEndpoint() {
|
||||
AbstractMessageHandlingEndpoint consumer = new AbstractMessageHandlingEndpoint() {
|
||||
public Message<?> handle(Message<?> message) {
|
||||
latch.countDown();
|
||||
return null;
|
||||
}
|
||||
};
|
||||
endpoint.setBeanName("testEndpoint");
|
||||
endpoint.setInputChannel(errorChannel);
|
||||
PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(consumer, errorChannel);
|
||||
endpoint.afterPropertiesSet();
|
||||
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
|
||||
DefaultMessageBus bus = new DefaultMessageBus();
|
||||
bus.setApplicationContext(context);
|
||||
|
||||
@@ -31,11 +31,10 @@ import org.springframework.integration.channel.ThreadLocalChannel;
|
||||
import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor;
|
||||
import org.springframework.integration.endpoint.AbstractMessageHandlingEndpoint;
|
||||
import org.springframework.integration.endpoint.ServiceActivatorEndpoint;
|
||||
import org.springframework.integration.endpoint.SubscribingConsumerEndpoint;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.MessageMappingMethodInvoker;
|
||||
import org.springframework.integration.message.MessagingException;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
import org.springframework.integration.util.MethodInvoker;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
@@ -61,11 +60,9 @@ public class DirectChannelSubscriptionTests {
|
||||
@Test
|
||||
public void testSendAndReceiveForRegisteredEndpoint() {
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
MethodInvoker invoker = new MessageMappingMethodInvoker(new TestBean(), "handle");
|
||||
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(invoker);
|
||||
endpoint.setInputChannel(sourceChannel);
|
||||
endpoint.setOutputChannel(targetChannel);
|
||||
endpoint.setBeanName("testEndpoint");
|
||||
ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "handle");
|
||||
serviceActivator.setOutputChannel(targetChannel);
|
||||
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, sourceChannel);
|
||||
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
|
||||
bus.setApplicationContext(context);
|
||||
bus.start();
|
||||
@@ -96,14 +93,13 @@ public class DirectChannelSubscriptionTests {
|
||||
QueueChannel errorChannel = new QueueChannel();
|
||||
errorChannel.setBeanName(ChannelRegistry.ERROR_CHANNEL_NAME);
|
||||
bus.registerChannel(errorChannel);
|
||||
AbstractMessageHandlingEndpoint endpoint = new AbstractMessageHandlingEndpoint() {
|
||||
AbstractMessageHandlingEndpoint consumer = new AbstractMessageHandlingEndpoint() {
|
||||
public Message<?> handle(Message<?> message) {
|
||||
throw new RuntimeException("intentional test failure");
|
||||
}
|
||||
};
|
||||
endpoint.setInputChannel(sourceChannel);
|
||||
endpoint.setOutputChannel(targetChannel);
|
||||
endpoint.setBeanName("testEndpoint");
|
||||
consumer.setOutputChannel(targetChannel);
|
||||
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(consumer, sourceChannel);
|
||||
bus.registerEndpoint(endpoint);
|
||||
bus.start();
|
||||
this.sourceChannel.send(new StringMessage("foo"));
|
||||
|
||||
@@ -10,9 +10,13 @@
|
||||
|
||||
<bean id="targetChannel" class="org.springframework.integration.channel.QueueChannel"/>
|
||||
|
||||
<bean id="endpoint" class="org.springframework.integration.endpoint.ServiceActivatorEndpoint">
|
||||
<bean id="endpoint" class="org.springframework.integration.endpoint.PollingConsumerEndpoint">
|
||||
<constructor-arg ref="serviceActivator"/>
|
||||
<constructor-arg ref="sourceChannel"/>
|
||||
</bean>
|
||||
|
||||
<bean id="serviceActivator" class="org.springframework.integration.endpoint.ServiceActivatorEndpoint">
|
||||
<constructor-arg ref="handler"/>
|
||||
<property name="inputChannel" ref="sourceChannel"/>
|
||||
<property name="outputChannel" ref="targetChannel"/>
|
||||
</bean>
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ import org.junit.Test;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.integration.bus.DefaultMessageBus;
|
||||
import org.springframework.integration.endpoint.AbstractMessageHandlingEndpoint;
|
||||
import org.springframework.integration.endpoint.PollingConsumerEndpoint;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.MessageBuilder;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
@@ -46,13 +47,13 @@ public class MessageChannelTemplateTests {
|
||||
public void setUp() {
|
||||
this.requestChannel = new QueueChannel();
|
||||
this.requestChannel.setBeanName("requestChannel");
|
||||
AbstractMessageHandlingEndpoint endpoint = new AbstractMessageHandlingEndpoint() {
|
||||
AbstractMessageHandlingEndpoint consumer = new AbstractMessageHandlingEndpoint() {
|
||||
public Message<?> handle(Message<?> message) {
|
||||
return new StringMessage(message.getPayload().toString().toUpperCase());
|
||||
}
|
||||
};
|
||||
endpoint.setBeanName("testEndpoint");
|
||||
endpoint.setInputChannel(requestChannel);
|
||||
PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(consumer, requestChannel);
|
||||
endpoint.afterPropertiesSet();
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.getBeanFactory().registerSingleton("requestChannel", requestChannel);
|
||||
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
|
||||
|
||||
@@ -28,11 +28,11 @@ import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.beans.factory.BeanCreationException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
import org.springframework.integration.aggregator.AggregatorEndpoint;
|
||||
import org.springframework.integration.aggregator.CompletionStrategy;
|
||||
import org.springframework.integration.aggregator.CompletionStrategyAdapter;
|
||||
import org.springframework.integration.channel.MessageChannel;
|
||||
import org.springframework.integration.channel.PollableChannel;
|
||||
import org.springframework.integration.endpoint.SubscribingConsumerEndpoint;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.MessageBuilder;
|
||||
import org.springframework.integration.util.MethodInvoker;
|
||||
@@ -53,15 +53,14 @@ public class AggregatorParserTests {
|
||||
|
||||
@Test
|
||||
public void testAggregation() {
|
||||
AggregatorEndpoint endpoint =
|
||||
(AggregatorEndpoint) context.getBean("aggregatorWithReference");
|
||||
MessageChannel input = (MessageChannel) context.getBean("aggregatorWithReferenceInput");
|
||||
TestAggregator aggregatorBean = (TestAggregator) context.getBean("aggregatorBean");
|
||||
List<Message<?>> outboundMessages = new ArrayList<Message<?>>();
|
||||
outboundMessages.add(createMessage("123", "id1", 3, 1, null));
|
||||
outboundMessages.add(createMessage("789", "id1", 3, 3, null));
|
||||
outboundMessages.add(createMessage("456", "id1", 3, 2, null));
|
||||
for (Message<?> message : outboundMessages) {
|
||||
endpoint.onMessage(message);
|
||||
input.send(message);
|
||||
}
|
||||
Assert.assertEquals("One and only one message must have been aggregated", 1, aggregatorBean
|
||||
.getAggregatedMessages().size());
|
||||
@@ -72,13 +71,14 @@ public class AggregatorParserTests {
|
||||
|
||||
@Test
|
||||
public void testPropertyAssignment() throws Exception {
|
||||
AggregatorEndpoint endpoint =
|
||||
(AggregatorEndpoint) context.getBean("completelyDefinedAggregator");
|
||||
SubscribingConsumerEndpoint endpoint =
|
||||
(SubscribingConsumerEndpoint) context.getBean("completelyDefinedAggregator");
|
||||
TestAggregator testAggregator = (TestAggregator) context.getBean("aggregatorBean");
|
||||
CompletionStrategy completionStrategy = (CompletionStrategy) context.getBean("completionStrategy");
|
||||
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
|
||||
MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel");
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint);
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(
|
||||
new DirectFieldAccessor(endpoint).getPropertyValue("consumer"));
|
||||
Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate Aggregator instance",
|
||||
testAggregator, accessor.getPropertyValue("aggregator"));
|
||||
Assert.assertEquals(
|
||||
@@ -105,13 +105,13 @@ public class AggregatorParserTests {
|
||||
@Test
|
||||
public void testSimpleJavaBeanAggregator() {
|
||||
List<Message<?>> outboundMessages = new ArrayList<Message<?>>();
|
||||
AggregatorEndpoint addingAggregator =
|
||||
(AggregatorEndpoint) context.getBean("aggregatorWithReferenceAndMethod");
|
||||
MessageChannel input =
|
||||
(MessageChannel) context.getBean("aggregatorWithReferenceAndMethodInput");
|
||||
outboundMessages.add(createMessage(1l, "id1", 3, 1, null));
|
||||
outboundMessages.add(createMessage(2l, "id1", 3, 3, null));
|
||||
outboundMessages.add(createMessage(3l, "id1", 3, 2, null));
|
||||
for (Message<?> message : outboundMessages) {
|
||||
addingAggregator.onMessage(message);
|
||||
input.send(message);
|
||||
}
|
||||
PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel");
|
||||
Message<?> response = outputChannel.receive();
|
||||
@@ -130,23 +130,24 @@ public class AggregatorParserTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAggregatorWithPojoCompletionStrategy(){
|
||||
AggregatorEndpoint aggregatorWithPojoCompletionStrategy =
|
||||
(AggregatorEndpoint) context.getBean("aggregatorWithPojoCompletionStrategy");
|
||||
CompletionStrategy completionStrategy = (CompletionStrategy)
|
||||
new DirectFieldAccessor(aggregatorWithPojoCompletionStrategy).getPropertyValue("completionStrategy");
|
||||
public void testAggregatorWithPojoCompletionStrategy() {
|
||||
MessageChannel input = (MessageChannel) context.getBean("aggregatorWithPojoCompletionStrategyInput");
|
||||
SubscribingConsumerEndpoint endpoint =
|
||||
(SubscribingConsumerEndpoint) context.getBean("aggregatorWithPojoCompletionStrategy");
|
||||
CompletionStrategy completionStrategy = (CompletionStrategy) new DirectFieldAccessor(
|
||||
new DirectFieldAccessor(endpoint).getPropertyValue("consumer")).getPropertyValue("completionStrategy");
|
||||
Assert.assertTrue(completionStrategy instanceof CompletionStrategyAdapter);
|
||||
DirectFieldAccessor completionStrategyAccessor = new DirectFieldAccessor(completionStrategy);
|
||||
MethodInvoker invoker = (MethodInvoker) completionStrategyAccessor.getPropertyValue("invoker");
|
||||
Assert.assertTrue(new DirectFieldAccessor(invoker).getPropertyValue("object") instanceof MaxValueCompletionStrategy);
|
||||
Assert.assertTrue(((Method)completionStrategyAccessor.getPropertyValue("method")).getName().equals("checkCompleteness"));
|
||||
aggregatorWithPojoCompletionStrategy.onMessage(createMessage(1l, "id1", 0 , 0, null));
|
||||
aggregatorWithPojoCompletionStrategy.onMessage(createMessage(2l, "id1", 0 , 0, null));
|
||||
aggregatorWithPojoCompletionStrategy.onMessage(createMessage(3l, "id1", 0 , 0, null));
|
||||
input.send(createMessage(1l, "id1", 0 , 0, null));
|
||||
input.send(createMessage(2l, "id1", 0 , 0, null));
|
||||
input.send(createMessage(3l, "id1", 0 , 0, null));
|
||||
PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel");
|
||||
Message<?> reply = outputChannel.receive(0);
|
||||
Assert.assertNull(reply);
|
||||
aggregatorWithPojoCompletionStrategy.onMessage(createMessage(5l, "id1", 0 , 0, null));
|
||||
input.send(createMessage(5l, "id1", 0 , 0, null));
|
||||
reply = outputChannel.receive(0);
|
||||
Assert.assertNotNull(reply);
|
||||
Assert.assertEquals(11l, reply.getPayload());
|
||||
|
||||
@@ -28,7 +28,7 @@ import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.MessageChannel;
|
||||
import org.springframework.integration.channel.PollableChannel;
|
||||
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
|
||||
import org.springframework.integration.endpoint.OutboundChannelAdapter;
|
||||
import org.springframework.integration.endpoint.SubscribingConsumerEndpoint;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
@@ -50,7 +50,7 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests
|
||||
assertNotNull(bus.lookupChannel(beanName));
|
||||
Object adapter = this.applicationContext.getBean(beanName + ".adapter");
|
||||
assertNotNull(adapter);
|
||||
assertTrue(adapter instanceof OutboundChannelAdapter);
|
||||
assertTrue(adapter instanceof SubscribingConsumerEndpoint);
|
||||
TestConsumer consumer = (TestConsumer) this.applicationContext.getBean("consumer");
|
||||
assertNull(consumer.getLastMessage());
|
||||
Message<?> message = new StringMessage("test");
|
||||
@@ -70,7 +70,7 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests
|
||||
assertNotNull(bus.lookupChannel(beanName));
|
||||
Object adapter = this.applicationContext.getBean(beanName + ".adapter");
|
||||
assertNotNull(adapter);
|
||||
assertTrue(adapter instanceof OutboundChannelAdapter);
|
||||
assertTrue(adapter instanceof SubscribingConsumerEndpoint);
|
||||
TestBean testBean = (TestBean) this.applicationContext.getBean("testBean");
|
||||
assertNull(testBean.getMessage());
|
||||
Message<?> message = new StringMessage("consumer test");
|
||||
|
||||
@@ -30,7 +30,6 @@ import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.message.GenericMessage;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.MessageBuilder;
|
||||
import org.springframework.integration.message.MessageConsumer;
|
||||
import org.springframework.integration.message.MessageRejectedException;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
|
||||
@@ -56,11 +55,11 @@ public class EndpointParserTests {
|
||||
public void testEndpointWithSelectorAccepts() {
|
||||
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
|
||||
"endpointWithSelector.xml", this.getClass());
|
||||
MessageConsumer endpoint = (MessageConsumer) context.getBean("endpoint");
|
||||
MessageChannel inputChannel = (MessageChannel) context.getBean("testChannel");
|
||||
QueueChannel replyChannel = new QueueChannel();
|
||||
Message<?> message = MessageBuilder.withPayload("test")
|
||||
.setReturnAddress(replyChannel).build();
|
||||
endpoint.onMessage(message);
|
||||
inputChannel.send(message);
|
||||
Message<?> reply = replyChannel.receive(500);
|
||||
assertNotNull(reply);
|
||||
assertEquals("foo", reply.getPayload());
|
||||
@@ -70,11 +69,11 @@ public class EndpointParserTests {
|
||||
public void testEndpointWithSelectorRejects() {
|
||||
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
|
||||
"endpointWithSelector.xml", this.getClass());
|
||||
MessageConsumer endpoint = (MessageConsumer) context.getBean("endpoint");
|
||||
MessageChannel inputChannel = (MessageChannel) context.getBean("testChannel");
|
||||
MessageChannel replyChannel = new QueueChannel();
|
||||
Message<?> message = MessageBuilder.withPayload(123)
|
||||
.setReturnAddress(replyChannel).build();
|
||||
endpoint.onMessage(message);
|
||||
inputChannel.send(message);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -26,9 +26,9 @@ import org.junit.Test;
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
import org.springframework.integration.aggregator.ResequencerEndpoint;
|
||||
import org.springframework.integration.channel.MessageChannel;
|
||||
import org.springframework.integration.channel.PollableChannel;
|
||||
import org.springframework.integration.endpoint.SubscribingConsumerEndpoint;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.MessageBuilder;
|
||||
|
||||
@@ -70,8 +70,8 @@ public class ResequencerParserTests {
|
||||
|
||||
@Test
|
||||
public void testDefaultResequencerProperties() {
|
||||
ResequencerEndpoint endpoint = (ResequencerEndpoint) context.getBean("defaultResequencer");
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint);
|
||||
SubscribingConsumerEndpoint endpoint = (SubscribingConsumerEndpoint) context.getBean("defaultResequencer");
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(new DirectFieldAccessor(endpoint).getPropertyValue("consumer"));
|
||||
Assert.assertNull(accessor.getPropertyValue("outputChannel"));
|
||||
Assert.assertNull(accessor.getPropertyValue("discardChannel"));
|
||||
Assert.assertEquals("The ResequencerEndpoint is not set with the appropriate timeout value",
|
||||
@@ -92,10 +92,10 @@ public class ResequencerParserTests {
|
||||
|
||||
@Test
|
||||
public void testPropertyAssignment() throws Exception {
|
||||
ResequencerEndpoint endpoint = (ResequencerEndpoint) context.getBean("completelyDefinedResequencer");
|
||||
SubscribingConsumerEndpoint endpoint = (SubscribingConsumerEndpoint) context.getBean("completelyDefinedResequencer");
|
||||
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
|
||||
MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel");
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint);
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(new DirectFieldAccessor(endpoint).getPropertyValue("consumer"));
|
||||
Assert.assertEquals("The ResequencerEndpoint is not injected with the appropriate output channel",
|
||||
outputChannel, accessor.getPropertyValue("outputChannel"));
|
||||
Assert.assertEquals("The ResequencerEndpoint is not injected with the appropriate discard channel",
|
||||
|
||||
@@ -9,7 +9,6 @@
|
||||
|
||||
<message-bus/>
|
||||
|
||||
<channel id="inputChannel"/>
|
||||
<channel id="outputChannel">
|
||||
<queue capacity="5"/>
|
||||
</channel>
|
||||
@@ -17,10 +16,12 @@
|
||||
<queue capacity="5"/>
|
||||
</channel>
|
||||
|
||||
<aggregator id="aggregatorWithReference" ref="aggregatorBean" input-channel="inputChannel"/>
|
||||
<channel id="aggregatorWithReferenceInput"/>
|
||||
<aggregator id="aggregatorWithReference" ref="aggregatorBean" input-channel="aggregatorWithReferenceInput"/>
|
||||
|
||||
<channel id="completelyDefinedAggregatorInput"/>
|
||||
<aggregator id="completelyDefinedAggregator"
|
||||
input-channel="inputChannel"
|
||||
input-channel="completelyDefinedAggregatorInput"
|
||||
output-channel="outputChannel"
|
||||
discard-channel="discardChannel"
|
||||
ref="aggregatorBean"
|
||||
@@ -31,14 +32,16 @@
|
||||
tracked-correlation-id-capacity="99"
|
||||
timeout="42"/>
|
||||
|
||||
<channel id="aggregatorWithReferenceAndMethodInput"/>
|
||||
<aggregator id="aggregatorWithReferenceAndMethod"
|
||||
ref="adderBean"
|
||||
method="add"
|
||||
input-channel="inputChannel"
|
||||
input-channel="aggregatorWithReferenceAndMethodInput"
|
||||
output-channel="outputChannel"/>
|
||||
|
||||
<channel id="aggregatorWithPojoCompletionStrategyInput"/>
|
||||
<aggregator id="aggregatorWithPojoCompletionStrategy"
|
||||
input-channel="inputChannel"
|
||||
input-channel="aggregatorWithPojoCompletionStrategyInput"
|
||||
output-channel="outputChannel"
|
||||
ref="adderBean"
|
||||
method="add"
|
||||
|
||||
@@ -29,6 +29,7 @@ import org.springframework.integration.aggregator.CompletionStrategyAdapter;
|
||||
import org.springframework.integration.aggregator.SequenceSizeCompletionStrategy;
|
||||
import org.springframework.integration.bus.MessageBus;
|
||||
import org.springframework.integration.config.MessageBusParser;
|
||||
import org.springframework.integration.endpoint.SubscribingConsumerEndpoint;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
@@ -90,8 +91,9 @@ public class AggregatorAnnotationTests {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private DirectFieldAccessor getDirectFieldAccessorForAggregatingHandler(ApplicationContext context, final String endpointName) {
|
||||
AggregatorEndpoint endpoint = (AggregatorEndpoint) context.getBean(endpointName + ".aggregatingMethod.aggregator");
|
||||
return new DirectFieldAccessor(endpoint);
|
||||
SubscribingConsumerEndpoint endpoint = (SubscribingConsumerEndpoint) context.getBean(
|
||||
endpointName + ".aggregatingMethod.aggregator");
|
||||
return new DirectFieldAccessor(new DirectFieldAccessor(endpoint).getPropertyValue("consumer"));
|
||||
}
|
||||
|
||||
private MessageBus getMessageBus(ApplicationContext context) {
|
||||
|
||||
@@ -45,8 +45,7 @@ import org.springframework.integration.channel.MessageChannel;
|
||||
import org.springframework.integration.channel.PollableChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.config.MessageBusParser;
|
||||
import org.springframework.integration.endpoint.ChannelPoller;
|
||||
import org.springframework.integration.endpoint.ServiceActivatorEndpoint;
|
||||
import org.springframework.integration.endpoint.PollingConsumerEndpoint;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.MessageConsumer;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
@@ -329,14 +328,11 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
|
||||
postProcessor.setBeanFactory(context.getBeanFactory());
|
||||
postProcessor.afterPropertiesSet();
|
||||
AnnotatedEndpointWithPolledAnnotation endpoint = new AnnotatedEndpointWithPolledAnnotation();
|
||||
postProcessor.postProcessAfterInitialization(endpoint, "testBean");
|
||||
ServiceActivatorEndpoint processedEndpoint =
|
||||
(ServiceActivatorEndpoint) context.getBean("testBean.prependFoo.serviceActivator");
|
||||
processedEndpoint.afterPropertiesSet();
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(processedEndpoint);
|
||||
ChannelPoller poller = (ChannelPoller) accessor.getPropertyValue("poller");
|
||||
Trigger trigger = (Trigger) new DirectFieldAccessor(poller).getPropertyValue("trigger");
|
||||
AnnotatedEndpointWithPolledAnnotation bean = new AnnotatedEndpointWithPolledAnnotation();
|
||||
postProcessor.postProcessAfterInitialization(bean, "testBean");
|
||||
PollingConsumerEndpoint endpoint =
|
||||
(PollingConsumerEndpoint) context.getBean("testBean.prependFoo.serviceActivator");
|
||||
Trigger trigger = (Trigger) new DirectFieldAccessor(endpoint).getPropertyValue("trigger");
|
||||
assertEquals(IntervalTrigger.class, trigger.getClass());
|
||||
DirectFieldAccessor triggerAccessor = new DirectFieldAccessor(trigger);
|
||||
assertEquals(new Long(123000), triggerAccessor.getPropertyValue("interval"));
|
||||
|
||||
@@ -9,9 +9,7 @@
|
||||
|
||||
<message-bus/>
|
||||
|
||||
<channel id="testChannel">
|
||||
<queue capacity="50"/>
|
||||
</channel>
|
||||
<channel id="testChannel"/>
|
||||
|
||||
<service-activator id="endpoint" input-channel="testChannel"
|
||||
ref="testHandler" selector="typeSelector">
|
||||
|
||||
@@ -43,9 +43,9 @@ public class CorrelationIdTests {
|
||||
.setCorrelationId(correlationId).build();
|
||||
DirectChannel inputChannel = new DirectChannel();
|
||||
QueueChannel outputChannel = new QueueChannel(1);
|
||||
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase");
|
||||
endpoint.setInputChannel(inputChannel);
|
||||
endpoint.setOutputChannel(outputChannel);
|
||||
ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "upperCase");
|
||||
serviceActivator.setOutputChannel(outputChannel);
|
||||
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel);
|
||||
endpoint.start();
|
||||
assertTrue(inputChannel.send(message));
|
||||
Message<?> reply = outputChannel.receive(0);
|
||||
@@ -57,9 +57,9 @@ public class CorrelationIdTests {
|
||||
Message<String> message = MessageBuilder.withPayload("test").build();
|
||||
DirectChannel inputChannel = new DirectChannel();
|
||||
QueueChannel outputChannel = new QueueChannel(1);
|
||||
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase");
|
||||
endpoint.setInputChannel(inputChannel);
|
||||
endpoint.setOutputChannel(outputChannel);
|
||||
ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "upperCase");
|
||||
serviceActivator.setOutputChannel(outputChannel);
|
||||
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel);
|
||||
endpoint.start();
|
||||
assertTrue(inputChannel.send(message));
|
||||
Message<?> reply = outputChannel.receive(0);
|
||||
@@ -72,9 +72,9 @@ public class CorrelationIdTests {
|
||||
.setCorrelationId("correlationId").build();
|
||||
DirectChannel inputChannel = new DirectChannel();
|
||||
QueueChannel outputChannel = new QueueChannel(1);
|
||||
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase");
|
||||
endpoint.setInputChannel(inputChannel);
|
||||
endpoint.setOutputChannel(outputChannel);
|
||||
ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "upperCase");
|
||||
serviceActivator.setOutputChannel(outputChannel);
|
||||
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel);
|
||||
endpoint.start();
|
||||
assertTrue(inputChannel.send(message));
|
||||
Message<?> reply = outputChannel.receive(0);
|
||||
@@ -89,9 +89,9 @@ public class CorrelationIdTests {
|
||||
.setCorrelationId(correlationId).build();
|
||||
DirectChannel inputChannel = new DirectChannel();
|
||||
QueueChannel outputChannel = new QueueChannel(1);
|
||||
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "createMessage");
|
||||
endpoint.setInputChannel(inputChannel);
|
||||
endpoint.setOutputChannel(outputChannel);
|
||||
ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "createMessage");
|
||||
serviceActivator.setOutputChannel(outputChannel);
|
||||
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel);
|
||||
endpoint.start();
|
||||
assertTrue(inputChannel.send(message));
|
||||
Message<?> reply = outputChannel.receive(0);
|
||||
@@ -103,9 +103,9 @@ public class CorrelationIdTests {
|
||||
Message<?> message = new StringMessage("test");
|
||||
DirectChannel inputChannel = new DirectChannel();
|
||||
QueueChannel outputChannel = new QueueChannel(1);
|
||||
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "createMessage");
|
||||
endpoint.setInputChannel(inputChannel);
|
||||
endpoint.setOutputChannel(outputChannel);
|
||||
ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "createMessage");
|
||||
serviceActivator.setOutputChannel(outputChannel);
|
||||
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel);
|
||||
endpoint.start();
|
||||
assertTrue(inputChannel.send(message));
|
||||
Message<?> reply = outputChannel.receive(0);
|
||||
|
||||
@@ -33,11 +33,10 @@ public class ServiceActivatorMethodResolutionTests {
|
||||
@Test
|
||||
public void singleAnnotationMatches() {
|
||||
SingleAnnotationTestBean testBean = new SingleAnnotationTestBean();
|
||||
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(testBean);
|
||||
ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(testBean);
|
||||
QueueChannel outputChannel = new QueueChannel();
|
||||
endpoint.setOutputChannel(outputChannel);
|
||||
endpoint.afterPropertiesSet();
|
||||
endpoint.onMessage(new StringMessage("foo"));
|
||||
serviceActivator.setOutputChannel(outputChannel);
|
||||
serviceActivator.onMessage(new StringMessage("foo"));
|
||||
Message<?> result = outputChannel.receive(0);
|
||||
assertEquals("FOO", result.getPayload());
|
||||
}
|
||||
@@ -51,11 +50,10 @@ public class ServiceActivatorMethodResolutionTests {
|
||||
@Test
|
||||
public void singlePublicMethodMatches() {
|
||||
SinglePublicMethodTestBean testBean = new SinglePublicMethodTestBean();
|
||||
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(testBean);
|
||||
ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(testBean);
|
||||
QueueChannel outputChannel = new QueueChannel();
|
||||
endpoint.setOutputChannel(outputChannel);
|
||||
endpoint.afterPropertiesSet();
|
||||
endpoint.onMessage(new StringMessage("foo"));
|
||||
serviceActivator.setOutputChannel(outputChannel);
|
||||
serviceActivator.onMessage(new StringMessage("foo"));
|
||||
Message<?> result = outputChannel.receive(0);
|
||||
assertEquals("FOO", result.getPayload());
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.junit.Test;
|
||||
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.endpoint.SubscribingConsumerEndpoint;
|
||||
import org.springframework.integration.filter.FilterEndpoint;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
@@ -65,9 +66,9 @@ public class FilterEndpointTests {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
filter.setInputChannel(inputChannel);
|
||||
filter.setOutputChannel(outputChannel);
|
||||
filter.start();
|
||||
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(filter, inputChannel);
|
||||
endpoint.start();
|
||||
Message<?> message = new StringMessage("test");
|
||||
assertTrue(inputChannel.send(message));
|
||||
Message<?> reply = outputChannel.receive(0);
|
||||
@@ -84,9 +85,9 @@ public class FilterEndpointTests {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
filter.setInputChannel(inputChannel);
|
||||
filter.setOutputChannel(outputChannel);
|
||||
filter.start();
|
||||
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(filter, inputChannel);
|
||||
endpoint.start();
|
||||
Message<?> message = new StringMessage("test");
|
||||
assertTrue(inputChannel.send(message));
|
||||
assertNull(outputChannel.receive(0));
|
||||
|
||||
@@ -29,6 +29,7 @@ import org.junit.Test;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.integration.bus.DefaultMessageBus;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.endpoint.PollingConsumerEndpoint;
|
||||
import org.springframework.integration.endpoint.ServiceActivatorEndpoint;
|
||||
|
||||
/**
|
||||
@@ -82,9 +83,8 @@ public class MethodInvokingConsumerTests {
|
||||
Message<String> message = new GenericMessage<String>("testing");
|
||||
channel.send(message);
|
||||
assertNull(queue.poll());
|
||||
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(consumer);
|
||||
endpoint.setBeanName("testEndpoint");
|
||||
endpoint.setInputChannel(channel);
|
||||
ServiceActivatorEndpoint serivceActivator = new ServiceActivatorEndpoint(consumer);
|
||||
PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(serivceActivator, channel);
|
||||
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
|
||||
DefaultMessageBus bus = new DefaultMessageBus();
|
||||
bus.setApplicationContext(context);
|
||||
|
||||
@@ -14,12 +14,15 @@
|
||||
<channel id="inMethodInvoking" />
|
||||
<channel id="out" />
|
||||
|
||||
<splitter ref="splitterBean" input-channel="inMethodInvoking"
|
||||
<splitter ref="splitterBeanXmlConfig" input-channel="inMethodInvoking"
|
||||
method="split" output-channel="out" />
|
||||
|
||||
<splitter input-channel="inDefault" output-channel="out" />
|
||||
|
||||
<beans:bean id="splitterBean"
|
||||
<beans:bean id="splitterBeanXmlConfig"
|
||||
class="org.springframework.integration.splitter.SplitterIntegrationTests$TestSplitter" />
|
||||
|
||||
<beans:bean id="splitterBeanAnnotationConfig"
|
||||
class="org.springframework.integration.splitter.SplitterIntegrationTests$TestSplitter" />
|
||||
|
||||
<beans:bean
|
||||
|
||||
Reference in New Issue
Block a user