diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java index 2c9558fa0d..32526674e5 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java @@ -36,7 +36,6 @@ import org.springframework.context.ApplicationListener; import org.springframework.context.Lifecycle; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.integration.channel.ChannelRegistry; -import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.MessagePublishingErrorHandler; import org.springframework.integration.endpoint.MessageEndpoint; @@ -177,9 +176,6 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A private void activateEndpoint(MessageEndpoint endpoint) { Assert.notNull(endpoint, "'endpoint' must not be null"); - if (endpoint instanceof ChannelRegistryAware) { - ((ChannelRegistryAware) endpoint).setChannelRegistry(this); - } if (endpoint instanceof TaskSchedulerAware) { ((TaskSchedulerAware) endpoint).setTaskScheduler(this.taskScheduler); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusAwareBeanPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusAwareBeanPostProcessor.java index 6ab86f925d..1e5ba5ab8d 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusAwareBeanPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusAwareBeanPostProcessor.java @@ -18,7 +18,6 @@ package org.springframework.integration.bus; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; -import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.util.Assert; /** @@ -51,9 +50,6 @@ public class MessageBusAwareBeanPostProcessor implements BeanPostProcessor { if (bean instanceof MessageBusAware) { ((MessageBusAware) bean).setMessageBus(this.messageBus); } - if (bean instanceof ChannelRegistryAware) { - ((ChannelRegistryAware) bean).setChannelRegistry(this.messageBus); - } return bean; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/ChannelRegistryAware.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/ChannelRegistryAware.java deleted file mode 100644 index 1f5f2773d4..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/ChannelRegistryAware.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.channel; - -/** - * Interface to be implemented by components that need access to the - * {@link ChannelRegistry}. - * - * @author Mark Fisher - */ -public interface ChannelRegistryAware { - - void setChannelRegistry(ChannelRegistry channelRegistry); - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java index 8f46acefdb..04469385f1 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java @@ -18,12 +18,11 @@ package org.springframework.integration.config; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.core.task.TaskExecutor; -import org.springframework.integration.channel.ChannelRegistry; -import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.SubscribableChannel; @@ -40,10 +39,12 @@ import org.springframework.util.Assert; /** * @author Mark Fisher */ -public class ConsumerEndpointFactoryBean implements FactoryBean, ChannelRegistryAware, BeanFactoryAware, InitializingBean { +public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAware, BeanNameAware, InitializingBean { private final MessageConsumer consumer; + private volatile String beanName; + private volatile String inputChannelName; private volatile Trigger trigger; @@ -73,14 +74,12 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, ChannelRegistry } - public void setInputChannelName(String inputChannelName) { - this.inputChannelName = inputChannelName; + public void setBeanName(String beanName) { + this.beanName = beanName; } - public void setChannelRegistry(ChannelRegistry channelRegistry) { - if (this.consumer instanceof ChannelRegistryAware) { - ((ChannelRegistryAware) this.consumer).setChannelRegistry(channelRegistry); - } + public void setInputChannelName(String inputChannelName) { + this.inputChannelName = inputChannelName; } public void setTrigger(Trigger trigger) { @@ -141,11 +140,12 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, ChannelRegistry return; } Assert.isTrue(this.beanFactory.containsBean(this.inputChannelName), - "no such input channel '" + this.inputChannelName + "'"); + "no such input channel '" + this.inputChannelName + "' for endpoint '" + this.beanName + "'"); MessageChannel channel = (MessageChannel) this.beanFactory.getBean(this.inputChannelName, MessageChannel.class); if (channel instanceof SubscribableChannel) { - Assert.isNull(trigger, "A trigger should not be specified when using a SubscribableChannel"); + Assert.isNull(trigger, "A trigger should not be specified for endpoint '" + this.beanName + + "', since '" + this.inputChannelName + "' is a SubscribableChannel (not pollable)."); this.endpoint = new SubscribingConsumerEndpoint( this.consumer, (SubscribableChannel) channel); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java index 8907771185..2341be9640 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java @@ -24,12 +24,11 @@ import org.springframework.beans.factory.ListableBeanFactory; import org.springframework.beans.factory.generic.GenericBeanFactoryAccessor; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.integration.annotation.Poller; -import org.springframework.integration.channel.ChannelRegistry; -import org.springframework.integration.channel.ChannelRegistryAware; +import org.springframework.integration.channel.BeanFactoryChannelResolver; +import org.springframework.integration.channel.ChannelResolver; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.SubscribableChannel; -import org.springframework.integration.config.xml.MessageBusParser; import org.springframework.integration.endpoint.AbstractMessageConsumer; import org.springframework.integration.endpoint.AbstractReplyProducingMessageConsumer; import org.springframework.integration.endpoint.MessageEndpoint; @@ -53,22 +52,18 @@ public abstract class AbstractMethodAnnotationPostProcessor message) { @@ -175,9 +183,9 @@ public abstract class AbstractReplyProducingMessageConsumer extends AbstractMess replyChannel = (MessageChannel) returnAddress; } else if (returnAddress instanceof String) { - Assert.state(this.channelRegistry != null, - "ChannelRegistry is required for resolving a reply channel by name"); - replyChannel = this.channelRegistry.lookupChannel((String) returnAddress); + Assert.state(this.channelResolver != null, + "ChannelResolver is required for resolving a reply channel by name"); + replyChannel = this.channelResolver.resolveChannelName((String) returnAddress); } else { throw new MessagingException("expected a MessageChannel or String for 'returnAddress', but type is [" diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java index 8ae8e0eb96..028a2f32ba 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java @@ -71,6 +71,7 @@ public class DefaultMessageBusTests { return message; } }; + consumer.setBeanFactory(context); PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(consumer, sourceChannel); endpoint.afterPropertiesSet(); context.getBeanFactory().registerSingleton("testEndpoint", endpoint); @@ -79,7 +80,6 @@ public class DefaultMessageBusTests { bus.setTaskScheduler(TestUtils.createTaskScheduler(10)); context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, bus); bus.setApplicationContext(context); - consumer.setChannelRegistry(bus); bus.start(); Message result = targetChannel.receive(3000); assertEquals("test", result.getPayload()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java index 1ed91d7b28..a0971d67dc 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java @@ -18,7 +18,6 @@ package org.springframework.integration.config.annotation; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.util.concurrent.CountDownLatch; @@ -39,8 +38,6 @@ import org.springframework.integration.annotation.Poller; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.annotation.Transformer; import org.springframework.integration.bus.DefaultMessageBus; -import org.springframework.integration.channel.ChannelRegistry; -import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; @@ -48,6 +45,7 @@ import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.xml.MessageBusParser; import org.springframework.integration.endpoint.PollingConsumerEndpoint; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.StringMessage; import org.springframework.integration.scheduling.IntervalTrigger; @@ -179,24 +177,29 @@ public class MessagingAnnotationPostProcessorTests { } @Test - public void testChannelRegistryAwareBean() { + public void testChannelResolution() { GenericApplicationContext context = new GenericApplicationContext(); - QueueChannel inputChannel = new QueueChannel(); + DirectChannel inputChannel = new DirectChannel(); + QueueChannel outputChannel = new QueueChannel(); inputChannel.setBeanName("inputChannel"); + outputChannel.setBeanName("outputChannel"); context.getBeanFactory().registerSingleton("inputChannel", inputChannel); + context.getBeanFactory().registerSingleton("outputChannel", outputChannel); DefaultMessageBus messageBus = new DefaultMessageBus(); + messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); context.getBeanFactory().registerSingleton( MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); messageBus.setApplicationContext(context); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); - ChannelRegistryAwareTestBean testBean = new ChannelRegistryAwareTestBean(); - assertNull(testBean.getChannelRegistry()); - postProcessor.postProcessAfterInitialization(testBean, "testBean"); - ChannelRegistry channelRegistry = testBean.getChannelRegistry(); - assertNotNull(channelRegistry); - assertEquals(messageBus, channelRegistry); + messageBus.start(); + ServiceActivatorAnnotatedBean bean = new ServiceActivatorAnnotatedBean(); + postProcessor.postProcessAfterInitialization(bean, "testBean"); + Message message = MessageBuilder.withPayload("test").setReturnAddress("outputChannel").build(); + inputChannel.send(message); + Message reply = outputChannel.receive(0); + assertNotNull(reply); } @Test @@ -459,26 +462,6 @@ public class MessagingAnnotationPostProcessorTests { } - @MessageEndpoint - private static class ChannelRegistryAwareTestBean implements ChannelRegistryAware { - - private ChannelRegistry channelRegistry; - - public void setChannelRegistry(ChannelRegistry channelRegistry) { - this.channelRegistry = channelRegistry; - } - - public ChannelRegistry getChannelRegistry() { - return this.channelRegistry; - } - - @ServiceActivator(inputChannel="inputChannel") - public Message handle(Message message) { - return null; - } - } - - private static class SimpleAnnotatedEndpointSubclass extends SimpleAnnotatedEndpoint { } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorEndpointTests.java index 40287f6197..709693935b 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorEndpointTests.java @@ -22,17 +22,14 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; -import org.springframework.integration.channel.ChannelRegistry; -import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.channel.TestChannelResolver; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.MessageHandlingException; @@ -91,10 +88,10 @@ public class ServiceActivatorEndpointTests { public void returnAddressHeaderWithChannelName() { QueueChannel channel = new QueueChannel(1); channel.setBeanName("testChannel"); - TestChannelRegistry channelRegistry = new TestChannelRegistry(); - channelRegistry.registerChannel(channel); + TestChannelResolver channelResolver = new TestChannelResolver(); + channelResolver.addChannel(channel); ServiceActivatorEndpoint endpoint = this.createEndpoint(); - endpoint.setChannelRegistry(channelRegistry); + endpoint.setChannelResolver(channelResolver); Message message = MessageBuilder.withPayload("foo").setReturnAddress("testChannel").build(); endpoint.onMessage(message); Message reply = channel.receive(0); @@ -114,9 +111,9 @@ public class ServiceActivatorEndpointTests { } }; ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(handler, "handle"); - TestChannelRegistry channelRegistry = new TestChannelRegistry(); - channelRegistry.registerChannel(replyChannel2); - endpoint.setChannelRegistry(channelRegistry); + TestChannelResolver channelResolver = new TestChannelResolver(); + channelResolver.addChannel(replyChannel2); + endpoint.setChannelResolver(channelResolver); Message testMessage1 = MessageBuilder.withPayload("bar") .setReturnAddress(replyChannel1).build(); endpoint.onMessage(testMessage1); @@ -359,18 +356,4 @@ public class ServiceActivatorEndpointTests { } } - - private static class TestChannelRegistry implements ChannelRegistry { - - private final Map channels = new HashMap(); - - public MessageChannel lookupChannel(String channelName) { - return this.channels.get(channelName); - } - - public void registerChannel(MessageChannel channel) { - this.channels.put(channel.getName(), channel); - } - } - } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/MethodInvokingRouterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/MethodInvokingRouterTests.java index 7f3282b60e..f1f6de3ff5 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/MethodInvokingRouterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/MethodInvokingRouterTests.java @@ -27,8 +27,6 @@ import java.util.List; import org.junit.Test; import org.springframework.integration.annotation.Header; -import org.springframework.integration.channel.ChannelRegistry; -import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.ChannelResolver; import org.springframework.integration.channel.TestChannelResolver; import org.springframework.integration.channel.MessageChannel; @@ -610,22 +608,4 @@ public class MethodInvokingRouterTests { } } - - public static class ChannelRegistryAwareTestBean implements ChannelRegistryAware { - - private ChannelRegistry channelRegistry; - - public void setChannelRegistry(ChannelRegistry channelRegistry) { - this.channelRegistry = channelRegistry; - } - - public ChannelRegistry getChannelRegistry() { - return this.channelRegistry; - } - - public MessageChannel route(String channelName) { - return this.channelRegistry.lookupChannel(channelName); - } - } - }