Removed ChannelRegistryAware. The AbstractReplyProducingMessageConsumer now uses a BeanFactoryChannelResolver and is therefore now BeanFactoryAware. The annotation post-processors set the BeanFactoryChannelResolver when creating such a consumer (since it is not created within the ApplicationContext in that case).
This commit is contained in:
@@ -71,6 +71,7 @@ public class DefaultMessageBusTests {
|
||||
return message;
|
||||
}
|
||||
};
|
||||
consumer.setBeanFactory(context);
|
||||
PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(consumer, sourceChannel);
|
||||
endpoint.afterPropertiesSet();
|
||||
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
|
||||
@@ -79,7 +80,6 @@ public class DefaultMessageBusTests {
|
||||
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
|
||||
context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, bus);
|
||||
bus.setApplicationContext(context);
|
||||
consumer.setChannelRegistry(bus);
|
||||
bus.start();
|
||||
Message<?> result = targetChannel.receive(3000);
|
||||
assertEquals("test", result.getPayload());
|
||||
|
||||
@@ -18,7 +18,6 @@ package org.springframework.integration.config.annotation;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
@@ -39,8 +38,6 @@ import org.springframework.integration.annotation.Poller;
|
||||
import org.springframework.integration.annotation.ServiceActivator;
|
||||
import org.springframework.integration.annotation.Transformer;
|
||||
import org.springframework.integration.bus.DefaultMessageBus;
|
||||
import org.springframework.integration.channel.ChannelRegistry;
|
||||
import org.springframework.integration.channel.ChannelRegistryAware;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.MessageChannel;
|
||||
import org.springframework.integration.channel.PollableChannel;
|
||||
@@ -48,6 +45,7 @@ import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.config.xml.MessageBusParser;
|
||||
import org.springframework.integration.endpoint.PollingConsumerEndpoint;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.MessageBuilder;
|
||||
import org.springframework.integration.message.MessageConsumer;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
import org.springframework.integration.scheduling.IntervalTrigger;
|
||||
@@ -179,24 +177,29 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChannelRegistryAwareBean() {
|
||||
public void testChannelResolution() {
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
QueueChannel inputChannel = new QueueChannel();
|
||||
DirectChannel inputChannel = new DirectChannel();
|
||||
QueueChannel outputChannel = new QueueChannel();
|
||||
inputChannel.setBeanName("inputChannel");
|
||||
outputChannel.setBeanName("outputChannel");
|
||||
context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
|
||||
context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
|
||||
DefaultMessageBus messageBus = new DefaultMessageBus();
|
||||
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
|
||||
context.getBeanFactory().registerSingleton(
|
||||
MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
|
||||
messageBus.setApplicationContext(context);
|
||||
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
|
||||
postProcessor.setBeanFactory(context.getBeanFactory());
|
||||
postProcessor.afterPropertiesSet();
|
||||
ChannelRegistryAwareTestBean testBean = new ChannelRegistryAwareTestBean();
|
||||
assertNull(testBean.getChannelRegistry());
|
||||
postProcessor.postProcessAfterInitialization(testBean, "testBean");
|
||||
ChannelRegistry channelRegistry = testBean.getChannelRegistry();
|
||||
assertNotNull(channelRegistry);
|
||||
assertEquals(messageBus, channelRegistry);
|
||||
messageBus.start();
|
||||
ServiceActivatorAnnotatedBean bean = new ServiceActivatorAnnotatedBean();
|
||||
postProcessor.postProcessAfterInitialization(bean, "testBean");
|
||||
Message<?> message = MessageBuilder.withPayload("test").setReturnAddress("outputChannel").build();
|
||||
inputChannel.send(message);
|
||||
Message<?> reply = outputChannel.receive(0);
|
||||
assertNotNull(reply);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -459,26 +462,6 @@ public class MessagingAnnotationPostProcessorTests {
|
||||
}
|
||||
|
||||
|
||||
@MessageEndpoint
|
||||
private static class ChannelRegistryAwareTestBean implements ChannelRegistryAware {
|
||||
|
||||
private ChannelRegistry channelRegistry;
|
||||
|
||||
public void setChannelRegistry(ChannelRegistry channelRegistry) {
|
||||
this.channelRegistry = channelRegistry;
|
||||
}
|
||||
|
||||
public ChannelRegistry getChannelRegistry() {
|
||||
return this.channelRegistry;
|
||||
}
|
||||
|
||||
@ServiceActivator(inputChannel="inputChannel")
|
||||
public Message<?> handle(Message<?> message) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class SimpleAnnotatedEndpointSubclass extends SimpleAnnotatedEndpoint {
|
||||
}
|
||||
|
||||
|
||||
@@ -22,17 +22,14 @@ import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.integration.channel.ChannelRegistry;
|
||||
import org.springframework.integration.channel.MessageChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.channel.TestChannelResolver;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.MessageBuilder;
|
||||
import org.springframework.integration.message.MessageHandlingException;
|
||||
@@ -91,10 +88,10 @@ public class ServiceActivatorEndpointTests {
|
||||
public void returnAddressHeaderWithChannelName() {
|
||||
QueueChannel channel = new QueueChannel(1);
|
||||
channel.setBeanName("testChannel");
|
||||
TestChannelRegistry channelRegistry = new TestChannelRegistry();
|
||||
channelRegistry.registerChannel(channel);
|
||||
TestChannelResolver channelResolver = new TestChannelResolver();
|
||||
channelResolver.addChannel(channel);
|
||||
ServiceActivatorEndpoint endpoint = this.createEndpoint();
|
||||
endpoint.setChannelRegistry(channelRegistry);
|
||||
endpoint.setChannelResolver(channelResolver);
|
||||
Message<?> message = MessageBuilder.withPayload("foo").setReturnAddress("testChannel").build();
|
||||
endpoint.onMessage(message);
|
||||
Message<?> reply = channel.receive(0);
|
||||
@@ -114,9 +111,9 @@ public class ServiceActivatorEndpointTests {
|
||||
}
|
||||
};
|
||||
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(handler, "handle");
|
||||
TestChannelRegistry channelRegistry = new TestChannelRegistry();
|
||||
channelRegistry.registerChannel(replyChannel2);
|
||||
endpoint.setChannelRegistry(channelRegistry);
|
||||
TestChannelResolver channelResolver = new TestChannelResolver();
|
||||
channelResolver.addChannel(replyChannel2);
|
||||
endpoint.setChannelResolver(channelResolver);
|
||||
Message<String> testMessage1 = MessageBuilder.withPayload("bar")
|
||||
.setReturnAddress(replyChannel1).build();
|
||||
endpoint.onMessage(testMessage1);
|
||||
@@ -359,18 +356,4 @@ public class ServiceActivatorEndpointTests {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class TestChannelRegistry implements ChannelRegistry {
|
||||
|
||||
private final Map<String, MessageChannel> channels = new HashMap<String, MessageChannel>();
|
||||
|
||||
public MessageChannel lookupChannel(String channelName) {
|
||||
return this.channels.get(channelName);
|
||||
}
|
||||
|
||||
public void registerChannel(MessageChannel channel) {
|
||||
this.channels.put(channel.getName(), channel);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -27,8 +27,6 @@ import java.util.List;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.integration.annotation.Header;
|
||||
import org.springframework.integration.channel.ChannelRegistry;
|
||||
import org.springframework.integration.channel.ChannelRegistryAware;
|
||||
import org.springframework.integration.channel.ChannelResolver;
|
||||
import org.springframework.integration.channel.TestChannelResolver;
|
||||
import org.springframework.integration.channel.MessageChannel;
|
||||
@@ -610,22 +608,4 @@ public class MethodInvokingRouterTests {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static class ChannelRegistryAwareTestBean implements ChannelRegistryAware {
|
||||
|
||||
private ChannelRegistry channelRegistry;
|
||||
|
||||
public void setChannelRegistry(ChannelRegistry channelRegistry) {
|
||||
this.channelRegistry = channelRegistry;
|
||||
}
|
||||
|
||||
public ChannelRegistry getChannelRegistry() {
|
||||
return this.channelRegistry;
|
||||
}
|
||||
|
||||
public MessageChannel route(String channelName) {
|
||||
return this.channelRegistry.lookupChannel(channelName);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user