diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java index 642a769ceb..67c0c62539 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java @@ -16,10 +16,10 @@ package org.springframework.integration.bus; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.generic.GenericBeanFactoryAccessor; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationEvent; @@ -69,8 +70,6 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A private final ChannelRegistry channelRegistry = new DefaultChannelRegistry(); - private final Map endpoints = new ConcurrentHashMap(); - private final MessageBusInterceptorsList interceptors = new MessageBusInterceptorsList(); private final Set lifecycleGateways = new CopyOnWriteArraySet(); @@ -171,6 +170,7 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A return; } this.initializing = true; + Assert.notNull(this.applicationContext, "ApplicationContext must not be null"); if (this.taskScheduler == null) { ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(DEFAULT_DISPATCHER_POOL_SIZE); executor.setThreadFactory(new CustomizableThreadFactory("message-bus-")); @@ -214,8 +214,6 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A public void registerEndpoint(MessageEndpoint endpoint) { Assert.notNull(endpoint, "'endpoint' must not be null"); - Assert.notNull(endpoint.getName(), "endpoint name must not be null"); - this.endpoints.put(endpoint.getName(), endpoint); if (this.isRunning()) { this.activateEndpoint(endpoint); } @@ -224,26 +222,27 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A } } - public MessageEndpoint unregisterEndpoint(String name) { - Assert.notNull(name, "endpoint name must not be null"); - MessageEndpoint endpoint = this.endpoints.remove(name); - if (endpoint == null) { - return null; - } - this.deactivateEndpoint(endpoint); - return endpoint; - } - public MessageEndpoint lookupEndpoint(String endpointName) { - return this.endpoints.get(endpointName); + if (this.applicationContext.containsBean(endpointName)) { + Object bean = this.applicationContext.getBean(endpointName); + if (bean instanceof MessageEndpoint) { + return (MessageEndpoint) bean; + } + } + return null; } - public Set getEndpointNames() { - return this.endpoints.keySet(); + public String[] getEndpointNames() { + return this.applicationContext.getBeanNamesForType(MessageEndpoint.class); + } + + private Collection getEndpoints() { + GenericBeanFactoryAccessor accessor = new GenericBeanFactoryAccessor(this.applicationContext); + return accessor.getBeansOfType(MessageEndpoint.class).values(); } private void activateEndpoints() { - for (MessageEndpoint endpoint : this.endpoints.values()) { + for (MessageEndpoint endpoint : this.getEndpoints()) { if (endpoint != null) { this.activateEndpoint(endpoint); } @@ -251,7 +250,7 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A } private void deactivateEndpoints() { - for (MessageEndpoint endpoint : this.endpoints.values()) { + for (MessageEndpoint endpoint : this.getEndpoints()) { if (endpoint != null) { this.deactivateEndpoint(endpoint); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java index c1c219b4db..156f16044a 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java @@ -19,15 +19,17 @@ package org.springframework.integration.bus; import org.springframework.context.Lifecycle; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.MessageChannel; -import org.springframework.integration.endpoint.EndpointRegistry; +import org.springframework.integration.endpoint.MessageEndpoint; /** * The message bus interface. * * @author Mark Fisher */ -public interface MessageBus extends ChannelRegistry, EndpointRegistry, Lifecycle { +public interface MessageBus extends ChannelRegistry, Lifecycle { MessageChannel getErrorChannel(); + void registerEndpoint(MessageEndpoint endpoint); + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java index 9c501960a3..e87357be1f 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java @@ -31,7 +31,6 @@ import org.springframework.integration.endpoint.AbstractInOutEndpoint; import org.springframework.integration.endpoint.AbstractMessageConsumingEndpoint; import org.springframework.integration.scheduling.PollingSchedule; import org.springframework.util.Assert; -import org.springframework.util.ClassUtils; import org.springframework.util.StringUtils; /** @@ -66,9 +65,6 @@ public abstract class AbstractMethodAnnotationPostProcessor, MethodAnnotationPostProcessor> postProcessors = @@ -71,13 +77,20 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Init } + public void setBeanFactory(BeanFactory beanFactory) { + Assert.isAssignable(ConfigurableBeanFactory.class, beanFactory.getClass(), + "a ConfigurableBeanFactory is required"); + this.beanFactory = (ConfigurableBeanFactory) beanFactory; + } + public void setBeanClassLoader(ClassLoader beanClassLoader) { this.beanClassLoader = beanClassLoader; } public void afterPropertiesSet() { + Assert.notNull(this.beanFactory, "BeanFactory must not be null"); postProcessors.put(Aggregator.class, new AggregatorAnnotationPostProcessor(this.messageBus)); - postProcessors.put(ChannelAdapter.class, new ChannelAdapterAnnotationPostProcessor(this.messageBus)); + postProcessors.put(ChannelAdapter.class, new ChannelAdapterAnnotationPostProcessor(this.messageBus, this.beanFactory)); postProcessors.put(Router.class, new RouterAnnotationPostProcessor(this.messageBus)); postProcessors.put(ServiceActivator.class, new ServiceActivatorAnnotationPostProcessor(this.messageBus)); postProcessors.put(Splitter.class, new SplitterAnnotationPostProcessor(this.messageBus)); @@ -89,6 +102,7 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Init } public Object postProcessAfterInitialization(Object bean, final String beanName) throws BeansException { + Assert.notNull(this.beanFactory, "BeanFactory must not be null"); final Object originalBean = bean; final Class beanClass = this.getBeanClass(bean); if (!this.isStereotype(beanClass)) { @@ -107,7 +121,11 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Init Object result = postProcessor.postProcess(originalBean, beanName, method, annotation); if (result != null) { if (result instanceof MessageEndpoint) { - messageBus.registerEndpoint((MessageEndpoint) result); + String endpointBeanName = generateBeanName(beanName, method, annotation.annotationType()); + if (result instanceof BeanNameAware) { + ((BeanNameAware) result).setBeanName(endpointBeanName); + } + beanFactory.registerSingleton(endpointBeanName, result); } else { boolean shouldProxy = false; @@ -163,4 +181,14 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Init return false; } + private String generateBeanName(String originalBeanName, Method method, Class annotationType) { + String baseName = originalBeanName + "." + method.getName() + "." + ClassUtils.getShortNameAsProperty(annotationType); + String name = baseName; + int count = 1; + while (this.beanFactory.containsBean(name)) { + name = baseName + "#" + (++count); + } + return name; + } + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointRegistry.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointRegistry.java deleted file mode 100644 index 1b65c0dcd4..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointRegistry.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2002-2007 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.endpoint; - -import java.util.Set; - -/** - * A strategy interface for registration and lookup of message endpoints by name. - * - * @author Mark Fisher - */ -public interface EndpointRegistry { - - void registerEndpoint(MessageEndpoint endpoint); - - MessageEndpoint unregisterEndpoint(String name); - - MessageEndpoint lookupEndpoint(String endpointName); - - Set getEndpointNames(); - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java index 930b69da21..a20050d448 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java @@ -24,6 +24,4 @@ package org.springframework.integration.endpoint; */ public interface MessageEndpoint { - String getName(); - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java index be6b7c5cd5..a712435879 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java @@ -21,7 +21,6 @@ import org.springframework.integration.bus.MessageBus; import org.springframework.integration.bus.MessageBusAware; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; -import org.springframework.integration.endpoint.EndpointRegistry; import org.springframework.integration.endpoint.MessagingGateway; import org.springframework.integration.handler.ReplyMessageCorrelator; import org.springframework.integration.message.DefaultMessageCreator; @@ -53,7 +52,7 @@ public class SimpleMessagingGateway extends MessagingGatewaySupport implements M private volatile ReplyMessageCorrelator replyMessageCorrelator; - private volatile EndpointRegistry endpointRegistry; + private volatile MessageBus messageBus; private final Object replyMessageCorrelatorMonitor = new Object(); @@ -97,7 +96,7 @@ public class SimpleMessagingGateway extends MessagingGatewaySupport implements M } public void setMessageBus(MessageBus messageBus) { - this.endpointRegistry = messageBus; + this.messageBus = messageBus; } public void send(Object object) { @@ -160,14 +159,14 @@ public class SimpleMessagingGateway extends MessagingGatewaySupport implements M if (this.replyMessageCorrelator != null) { return; } - if (this.endpointRegistry == null) { - throw new ConfigurationException("No EndpointRegistry available. Cannot register ReplyMessageCorrelator."); + if (this.messageBus == null) { + throw new ConfigurationException("No MessageBus available. Cannot register ReplyMessageCorrelator."); } ReplyMessageCorrelator correlator = new ReplyMessageCorrelator(); correlator.setBeanName("internal.correlator." + this); correlator.setInputChannel(this.replyChannel); correlator.afterPropertiesSet(); - this.endpointRegistry.registerEndpoint(correlator); + this.messageBus.registerEndpoint(correlator); this.replyMessageCorrelator = correlator; } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java index c6dff9db94..453bd35234 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java @@ -28,6 +28,7 @@ import org.junit.Test; import org.springframework.beans.factory.BeanCreationException; import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.PublishSubscribeChannel; @@ -49,16 +50,16 @@ public class DefaultMessageBusTests { @Test public void testRegistrationWithInputChannelReference() { - DefaultMessageBus bus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); QueueChannel sourceChannel = new QueueChannel(); QueueChannel targetChannel = new QueueChannel(); sourceChannel.setBeanName("sourceChannel"); targetChannel.setBeanName("targetChannel"); - bus.registerChannel(sourceChannel); + context.getBeanFactory().registerSingleton("sourceChannel", sourceChannel); + context.getBeanFactory().registerSingleton("targetChannel", targetChannel); Message message = MessageBuilder.withPayload("test") .setReturnAddress("targetChannel").build(); sourceChannel.send(message); - bus.registerChannel(targetChannel); AbstractInOutEndpoint endpoint = new AbstractInOutEndpoint() { public Message handle(Message message) { return message; @@ -66,7 +67,10 @@ public class DefaultMessageBusTests { }; endpoint.setBeanName("testEndpoint"); endpoint.setInputChannel(sourceChannel); - bus.registerEndpoint(endpoint); + context.getBeanFactory().registerSingleton("testEndpoint", endpoint); + context.refresh(); + DefaultMessageBus bus = new DefaultMessageBus(); + bus.setApplicationContext(context); bus.start(); Message result = targetChannel.receive(3000); assertEquals("test", result.getPayload()); @@ -75,14 +79,16 @@ public class DefaultMessageBusTests { @Test public void testChannelsWithoutHandlers() { - MessageBus bus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); + DefaultMessageBus bus = new DefaultMessageBus(); + bus.setApplicationContext(context); QueueChannel sourceChannel = new QueueChannel(); sourceChannel.setBeanName("sourceChannel"); + context.getBeanFactory().registerSingleton("sourceChannel", sourceChannel); sourceChannel.send(new StringMessage("test")); QueueChannel targetChannel = new QueueChannel(); targetChannel.setBeanName("targetChannel"); - bus.registerChannel(sourceChannel); - bus.registerChannel(targetChannel); + context.getBeanFactory().registerSingleton("targetChannel", targetChannel); bus.start(); Message result = targetChannel.receive(100); assertNull(result); @@ -104,6 +110,7 @@ public class DefaultMessageBusTests { @Test public void testExactlyOneHandlerReceivesPointToPointMessage() { + GenericApplicationContext context = new GenericApplicationContext(); QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel1 = new QueueChannel(); QueueChannel outputChannel2 = new QueueChannel(); @@ -117,21 +124,22 @@ public class DefaultMessageBusTests { return MessageBuilder.fromMessage(message).build(); } }; - MessageBus bus = new DefaultMessageBus(); inputChannel.setBeanName("input"); outputChannel1.setBeanName("output1"); outputChannel2.setBeanName("output2"); - bus.registerChannel(inputChannel); - bus.registerChannel(outputChannel1); - bus.registerChannel(outputChannel2); + context.getBeanFactory().registerSingleton("input", inputChannel); + context.getBeanFactory().registerSingleton("output1", outputChannel1); + context.getBeanFactory().registerSingleton("output2", outputChannel2); endpoint1.setBeanName("testEndpoint1"); endpoint1.setInputChannel(inputChannel); endpoint1.setOutputChannel(outputChannel1); endpoint2.setBeanName("testEndpoint2"); endpoint2.setInputChannel(inputChannel); endpoint2.setOutputChannel(outputChannel2); - bus.registerEndpoint(endpoint1); - bus.registerEndpoint(endpoint2); + context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1); + context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2); + DefaultMessageBus bus = new DefaultMessageBus(); + bus.setApplicationContext(context); bus.start(); inputChannel.send(new StringMessage("testing")); Message message1 = outputChannel1.receive(500); @@ -142,6 +150,7 @@ public class DefaultMessageBusTests { @Test public void testBothHandlersReceivePublishSubscribeMessage() throws InterruptedException { + GenericApplicationContext context = new GenericApplicationContext(); PublishSubscribeChannel inputChannel = new PublishSubscribeChannel(); QueueChannel outputChannel1 = new QueueChannel(); QueueChannel outputChannel2 = new QueueChannel(); @@ -160,21 +169,22 @@ public class DefaultMessageBusTests { return reply; } }; - MessageBus bus = new DefaultMessageBus(); inputChannel.setBeanName("input"); outputChannel1.setBeanName("output1"); outputChannel2.setBeanName("output2"); - bus.registerChannel(inputChannel); - bus.registerChannel(outputChannel1); - bus.registerChannel(outputChannel2); + context.getBeanFactory().registerSingleton("input", inputChannel); + context.getBeanFactory().registerSingleton("output1", outputChannel1); + context.getBeanFactory().registerSingleton("output2", outputChannel2); endpoint1.setBeanName("testEndpoint1"); endpoint1.setInputChannel(inputChannel); endpoint1.setOutputChannel(outputChannel1); endpoint2.setBeanName("testEndpoint2"); endpoint2.setInputChannel(inputChannel); endpoint2.setOutputChannel(outputChannel2); - bus.registerEndpoint(endpoint1); - bus.registerEndpoint(endpoint2); + context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1); + context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2); + DefaultMessageBus bus = new DefaultMessageBus(); + bus.setApplicationContext(context); bus.start(); inputChannel.send(new StringMessage("testing")); latch.await(500, TimeUnit.MILLISECONDS); @@ -188,18 +198,20 @@ public class DefaultMessageBusTests { @Test public void testErrorChannelWithFailedDispatch() throws InterruptedException { - MessageBus bus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); QueueChannel errorChannel = new QueueChannel(); QueueChannel outputChannel = new QueueChannel(); errorChannel.setBeanName("errorChannel"); - bus.registerChannel(errorChannel); + context.getBeanFactory().registerSingleton("errorChannel", errorChannel); CountDownLatch latch = new CountDownLatch(1); SourcePollingChannelAdapter channelAdapter = new SourcePollingChannelAdapter(); channelAdapter.setSource(new FailingSource(latch)); channelAdapter.setSchedule(new PollingSchedule(1000)); channelAdapter.setOutputChannel(outputChannel); channelAdapter.setBeanName("testChannel"); - bus.registerEndpoint(channelAdapter); + context.getBeanFactory().registerSingleton("testChannel", channelAdapter); + DefaultMessageBus bus = new DefaultMessageBus(); + bus.setApplicationContext(context); bus.start(); latch.await(2000, TimeUnit.MILLISECONDS); Message message = errorChannel.receive(5000); @@ -227,10 +239,10 @@ public class DefaultMessageBusTests { @Test public void testHandlerSubscribedToErrorChannel() throws InterruptedException { - DefaultMessageBus bus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); QueueChannel errorChannel = new QueueChannel(); errorChannel.setBeanName(ChannelRegistry.ERROR_CHANNEL_NAME); - bus.registerChannel(errorChannel); + context.getBeanFactory().registerSingleton(ChannelRegistry.ERROR_CHANNEL_NAME, errorChannel); final CountDownLatch latch = new CountDownLatch(1); AbstractInOutEndpoint endpoint = new AbstractInOutEndpoint() { public Message handle(Message message) { @@ -240,7 +252,9 @@ public class DefaultMessageBusTests { }; endpoint.setBeanName("testEndpoint"); endpoint.setInputChannel(errorChannel); - bus.registerEndpoint(endpoint); + context.getBeanFactory().registerSingleton("testEndpoint", endpoint); + DefaultMessageBus bus = new DefaultMessageBus(); + bus.setApplicationContext(context); bus.start(); errorChannel.send(new ErrorMessage(new RuntimeException("test-exception"))); latch.await(1000, TimeUnit.MILLISECONDS); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java index 8a716c2e3b..8019868163 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import org.junit.Before; import org.junit.Test; +import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.ChannelRegistry; @@ -59,12 +60,14 @@ public class DirectChannelSubscriptionTests { @Test public void testSendAndReceiveForRegisteredEndpoint() { + GenericApplicationContext context = new GenericApplicationContext(); MethodInvoker invoker = new MessageMappingMethodInvoker(new TestBean(), "handle"); ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(invoker); endpoint.setInputChannel(sourceChannel); endpoint.setOutputChannel(targetChannel); endpoint.setBeanName("testEndpoint"); - bus.registerEndpoint(endpoint); + context.getBeanFactory().registerSingleton("testEndpoint", endpoint); + bus.setApplicationContext(context); bus.start(); this.sourceChannel.send(new StringMessage("foo")); Message response = this.targetChannel.receive(); @@ -74,7 +77,10 @@ public class DirectChannelSubscriptionTests { @Test public void testSendAndReceiveForAnnotatedEndpoint() { + GenericApplicationContext context = new GenericApplicationContext(); + bus.setApplicationContext(context); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(bus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); TestEndpoint endpoint = new TestEndpoint(); postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint"); @@ -105,10 +111,13 @@ public class DirectChannelSubscriptionTests { @Test(expected=MessagingException.class) public void testExceptionThrownFromAnnotatedEndpoint() { + GenericApplicationContext context = new GenericApplicationContext(); + bus.setApplicationContext(context); QueueChannel errorChannel = new QueueChannel(); errorChannel.setBeanName(ChannelRegistry.ERROR_CHANNEL_NAME); - bus.registerChannel(errorChannel); + context.getBeanFactory().registerSingleton(ChannelRegistry.ERROR_CHANNEL_NAME, errorChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(bus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); FailingTestEndpoint endpoint = new FailingTestEndpoint(); postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint"); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusInterceptorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusInterceptorTests.java index 01e02962fe..d75a26c4d0 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusInterceptorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusInterceptorTests.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue; import org.junit.Test; +import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.bus.DefaultMessageBus; import org.springframework.integration.bus.MessageBus; @@ -32,6 +33,7 @@ public class MessageBusInterceptorTests { @Test public void testStart() { DefaultMessageBus messageBus = new DefaultMessageBus(); + messageBus.setApplicationContext(new GenericApplicationContext()); TestMessageBusStartInterceptor startInterceptor = new TestMessageBusStartInterceptor(); TestMessageBusStopInterceptor stopInterceptor = new TestMessageBusStopInterceptor(); // add all interceptors diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java index f9c9715427..12176278e9 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java @@ -48,7 +48,7 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests MessageBus bus = (MessageBus) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME); bus.start(); assertNotNull(bus.lookupChannel(beanName)); - Object adapter = bus.lookupEndpoint(beanName + ".adapter"); + Object adapter = this.applicationContext.getBean(beanName + ".adapter"); assertNotNull(adapter); assertTrue(adapter instanceof OutboundChannelAdapter); TestTarget target = (TestTarget) this.applicationContext.getBean("target"); @@ -68,7 +68,7 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests MessageBus bus = (MessageBus) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME); bus.start(); assertNotNull(bus.lookupChannel(beanName)); - Object adapter = bus.lookupEndpoint(beanName + ".adapter"); + Object adapter = this.applicationContext.getBean(beanName + ".adapter"); assertNotNull(adapter); assertTrue(adapter instanceof OutboundChannelAdapter); TestBean testBean = (TestBean) this.applicationContext.getBean("testBean"); @@ -86,7 +86,7 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests PollableChannel channel = (PollableChannel) this.applicationContext.getBean("queueChannel"); MessageBus bus = (MessageBus) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME); assertNull(bus.lookupChannel(beanName)); - Object adapter = bus.lookupEndpoint(beanName); + Object adapter = this.applicationContext.getBean(beanName); assertNotNull(adapter); assertTrue(adapter instanceof SourcePollingChannelAdapter); TestBean testBean = (TestBean) this.applicationContext.getBean("testBean"); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java index 8c5715b999..d44f40a452 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java @@ -90,8 +90,7 @@ public class AggregatorAnnotationTests { @SuppressWarnings("unchecked") private DirectFieldAccessor getDirectFieldAccessorForAggregatingHandler(ApplicationContext context, final String endpointName) { - MessageBus messageBus = this.getMessageBus(context); - AggregatorEndpoint endpoint = (AggregatorEndpoint) messageBus.lookupEndpoint(endpointName + ".aggregator"); + AggregatorEndpoint endpoint = (AggregatorEndpoint) context.getBean(endpointName + ".aggregatingMethod.aggregator"); return new DirectFieldAccessor(endpoint); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java index a8f29171b5..abdf6ab05e 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java @@ -31,13 +31,13 @@ import org.springframework.aop.framework.ProxyFactory; import org.springframework.beans.DirectFieldAccessor; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.annotation.ChannelAdapter; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.Poller; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.annotation.Transformer; import org.springframework.integration.bus.DefaultMessageBus; -import org.springframework.integration.bus.MessageBus; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.DirectChannel; @@ -60,8 +60,11 @@ public class MessagingAnnotationPostProcessorTests { @Test public void testHandlerAnnotation() { - MessageBus messageBus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); + DefaultMessageBus messageBus = new DefaultMessageBus(); + messageBus.setApplicationContext(context); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); HandlerAnnotatedBean bean = new HandlerAnnotatedBean(); Object result = postProcessor.postProcessAfterInitialization(bean, "testBean"); @@ -128,8 +131,11 @@ public class MessagingAnnotationPostProcessorTests { @Test public void testTargetAnnotation() throws InterruptedException { - MessageBus messageBus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); + DefaultMessageBus messageBus = new DefaultMessageBus(); + messageBus.setApplicationContext(context); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); CountDownLatch latch = new CountDownLatch(1); TargetAnnotationTestBean testBean = new TargetAnnotationTestBean(latch); @@ -150,11 +156,14 @@ public class MessagingAnnotationPostProcessorTests { @Test public void testChannelRegistryAwareBean() { - MessageBus messageBus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); + DefaultMessageBus messageBus = new DefaultMessageBus(); + messageBus.setApplicationContext(context); QueueChannel inputChannel = new QueueChannel(); inputChannel.setBeanName("inputChannel"); messageBus.registerChannel(inputChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); ChannelRegistryAwareTestBean testBean = new ChannelRegistryAwareTestBean(); assertNull(testBean.getChannelRegistry()); @@ -166,14 +175,17 @@ public class MessagingAnnotationPostProcessorTests { @Test public void testProxiedMessageEndpointAnnotation() { - DefaultMessageBus messageBus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel = new QueueChannel(); inputChannel.setBeanName("inputChannel"); outputChannel.setBeanName("outputChannel"); - messageBus.registerChannel(inputChannel); - messageBus.registerChannel(outputChannel); + context.getBeanFactory().registerSingleton("inputChannel", inputChannel); + context.getBeanFactory().registerSingleton("outputChannel", outputChannel); + DefaultMessageBus messageBus = new DefaultMessageBus(); + messageBus.setApplicationContext(context); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); ProxyFactory proxyFactory = new ProxyFactory(new SimpleAnnotatedEndpoint()); Object proxy = proxyFactory.getProxy(); @@ -187,14 +199,17 @@ public class MessagingAnnotationPostProcessorTests { @Test public void testMessageEndpointAnnotationInherited() { - DefaultMessageBus messageBus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel = new QueueChannel(); inputChannel.setBeanName("inputChannel"); outputChannel.setBeanName("outputChannel"); - messageBus.registerChannel(inputChannel); - messageBus.registerChannel(outputChannel); + context.getBeanFactory().registerSingleton("inputChannel", inputChannel); + context.getBeanFactory().registerSingleton("outputChannel", outputChannel); + DefaultMessageBus messageBus = new DefaultMessageBus(); + messageBus.setApplicationContext(context); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointSubclass(), "subclass"); messageBus.start(); @@ -206,14 +221,17 @@ public class MessagingAnnotationPostProcessorTests { @Test public void testMessageEndpointAnnotationInheritedWithProxy() { - DefaultMessageBus messageBus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel = new QueueChannel(); inputChannel.setBeanName("inputChannel"); outputChannel.setBeanName("outputChannel"); - messageBus.registerChannel(inputChannel); - messageBus.registerChannel(outputChannel); + context.getBeanFactory().registerSingleton("inputChannel", inputChannel); + context.getBeanFactory().registerSingleton("outputChannel", outputChannel); + DefaultMessageBus messageBus = new DefaultMessageBus(); + messageBus.setApplicationContext(context); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); ProxyFactory proxyFactory = new ProxyFactory(new SimpleAnnotatedEndpointSubclass()); Object proxy = proxyFactory.getProxy(); @@ -227,14 +245,17 @@ public class MessagingAnnotationPostProcessorTests { @Test public void testMessageEndpointAnnotationInheritedFromInterface() { - MessageBus messageBus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel = new QueueChannel(); inputChannel.setBeanName("inputChannel"); outputChannel.setBeanName("outputChannel"); - messageBus.registerChannel(inputChannel); - messageBus.registerChannel(outputChannel); + context.getBeanFactory().registerSingleton("inputChannel", inputChannel); + context.getBeanFactory().registerSingleton("outputChannel", outputChannel); + DefaultMessageBus messageBus = new DefaultMessageBus(); + messageBus.setApplicationContext(context); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl"); messageBus.start(); @@ -246,14 +267,17 @@ public class MessagingAnnotationPostProcessorTests { @Test public void testMessageEndpointAnnotationInheritedFromInterfaceWithAutoCreatedChannels() { - DefaultMessageBus messageBus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel = new QueueChannel(); inputChannel.setBeanName("inputChannel"); outputChannel.setBeanName("outputChannel"); - messageBus.registerChannel(inputChannel); - messageBus.registerChannel(outputChannel); + context.getBeanFactory().registerSingleton("inputChannel", inputChannel); + context.getBeanFactory().registerSingleton("outputChannel", outputChannel); + DefaultMessageBus messageBus = new DefaultMessageBus(); + messageBus.setApplicationContext(context); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl"); messageBus.start(); @@ -265,14 +289,17 @@ public class MessagingAnnotationPostProcessorTests { @Test public void testMessageEndpointAnnotationInheritedFromInterfaceWithProxy() { - MessageBus messageBus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel = new QueueChannel(); inputChannel.setBeanName("inputChannel"); outputChannel.setBeanName("outputChannel"); - messageBus.registerChannel(inputChannel); - messageBus.registerChannel(outputChannel); + context.getBeanFactory().registerSingleton("inputChannel", inputChannel); + context.getBeanFactory().registerSingleton("outputChannel", outputChannel); + DefaultMessageBus messageBus = new DefaultMessageBus(); + messageBus.setApplicationContext(context); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); ProxyFactory proxyFactory = new ProxyFactory(new SimpleAnnotatedEndpointImplementation()); Object proxy = proxyFactory.getProxy(); @@ -286,15 +313,19 @@ public class MessagingAnnotationPostProcessorTests { @Test public void testEndpointWithPollerAnnotation() { - MessageBus messageBus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); QueueChannel testChannel = new QueueChannel(); testChannel.setBeanName("testChannel"); - messageBus.registerChannel(testChannel); + context.getBeanFactory().registerSingleton("testChannel", testChannel); + DefaultMessageBus messageBus = new DefaultMessageBus(); + messageBus.setApplicationContext(context); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); AnnotatedEndpointWithPolledAnnotation endpoint = new AnnotatedEndpointWithPolledAnnotation(); postProcessor.postProcessAfterInitialization(endpoint, "testBean"); - ServiceActivatorEndpoint processedEndpoint = (ServiceActivatorEndpoint) messageBus.lookupEndpoint("testBean.serviceActivator"); + ServiceActivatorEndpoint processedEndpoint = + (ServiceActivatorEndpoint) context.getBean("testBean.prependFoo.serviceActivator"); processedEndpoint.afterPropertiesSet(); DirectFieldAccessor accessor = new DirectFieldAccessor(processedEndpoint); ChannelPoller poller = (ChannelPoller) accessor.getPropertyValue("poller"); @@ -309,8 +340,11 @@ public class MessagingAnnotationPostProcessorTests { @Test public void testChannelAdapterAnnotation() throws InterruptedException { - MessageBus messageBus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); + DefaultMessageBus messageBus = new DefaultMessageBus(); + messageBus.setApplicationContext(context); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); ChannelAdapterAnnotationTestBean testBean = new ChannelAdapterAnnotationTestBean(); postProcessor.postProcessAfterInitialization(testBean, "testBean"); @@ -333,8 +367,11 @@ public class MessagingAnnotationPostProcessorTests { @Test public void testTransformer() { - MessageBus messageBus = new DefaultMessageBus(); + GenericApplicationContext context = new GenericApplicationContext(); + DefaultMessageBus messageBus = new DefaultMessageBus(); + messageBus.setApplicationContext(context); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); TransformerAnnotationTestBean testBean = new TransformerAnnotationTestBean(); org.springframework.integration.transformer.Transformer transformer = diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java index 71c137fa2f..af1d544932 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java @@ -21,10 +21,10 @@ import static org.junit.Assert.assertEquals; import org.junit.Before; import org.junit.Test; +import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.Router; import org.springframework.integration.bus.DefaultMessageBus; -import org.springframework.integration.bus.MessageBus; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; @@ -35,32 +35,33 @@ import org.springframework.integration.message.StringMessage; */ public class RouterAnnotationPostProcessorTests { - private MessageBus messageBus; + private GenericApplicationContext context = new GenericApplicationContext(); - private DirectChannel inputChannel; + private DefaultMessageBus messageBus = new DefaultMessageBus(); - private QueueChannel outputChannel; + private DirectChannel inputChannel = new DirectChannel(); + + private QueueChannel outputChannel = new QueueChannel(); @Before public void init() { - inputChannel = new DirectChannel(); - outputChannel = new QueueChannel(); + messageBus.setApplicationContext(context); inputChannel.setBeanName("input"); outputChannel.setBeanName("output"); - messageBus = new DefaultMessageBus(); - messageBus.registerChannel(inputChannel); - messageBus.registerChannel(outputChannel); + context.getBeanFactory().registerSingleton("input", inputChannel); + context.getBeanFactory().registerSingleton("output", outputChannel); } @Test public void testRouter() { MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); - messageBus.start(); TestRouter testRouter = new TestRouter(); postProcessor.postProcessAfterInitialization(testRouter, "test"); + messageBus.start(); inputChannel.send(new StringMessage("foo")); Message replyMessage = outputChannel.receive(0); assertEquals("foo", replyMessage.getPayload()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java index 71fb21c248..3f2d3d210c 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java @@ -23,10 +23,10 @@ import static org.junit.Assert.assertNull; import org.junit.Before; import org.junit.Test; +import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.Splitter; import org.springframework.integration.bus.DefaultMessageBus; -import org.springframework.integration.bus.MessageBus; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; @@ -37,28 +37,29 @@ import org.springframework.integration.message.StringMessage; */ public class SplitterAnnotationPostProcessorTests { - private MessageBus messageBus; + private GenericApplicationContext context = new GenericApplicationContext(); - private DirectChannel inputChannel; + private DefaultMessageBus messageBus = new DefaultMessageBus(); - private QueueChannel outputChannel; + private DirectChannel inputChannel = new DirectChannel(); + + private QueueChannel outputChannel = new QueueChannel(); @Before public void init() { - inputChannel = new DirectChannel(); - outputChannel = new QueueChannel(); inputChannel.setBeanName("input"); outputChannel.setBeanName("output"); - messageBus = new DefaultMessageBus(); - messageBus.registerChannel(inputChannel); - messageBus.registerChannel(outputChannel); + context.getBeanFactory().registerSingleton("input", inputChannel); + context.getBeanFactory().registerSingleton("output", outputChannel); + messageBus.setApplicationContext(context); } @Test public void testSplitterAnnotation() throws InterruptedException { MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); + postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); TestSplitter splitter = new TestSplitter(); postProcessor.postProcessAfterInitialization(splitter, "testSplitter"); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/handler/MethodInvokingTargetTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/handler/MethodInvokingTargetTests.java index 9c3d2d1c33..8206ca0efd 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/handler/MethodInvokingTargetTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/handler/MethodInvokingTargetTests.java @@ -27,9 +27,9 @@ import java.util.concurrent.TimeUnit; import org.junit.Test; +import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.ConfigurationException; import org.springframework.integration.bus.DefaultMessageBus; -import org.springframework.integration.bus.MessageBus; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.endpoint.ServiceActivatorEndpoint; import org.springframework.integration.message.GenericMessage; @@ -78,21 +78,23 @@ public class MethodInvokingTargetTests { @Test public void testSubscription() throws Exception { + GenericApplicationContext context = new GenericApplicationContext(); SynchronousQueue queue = new SynchronousQueue(); TestBean testBean = new TestBean(queue); MethodInvokingTarget target = new MethodInvokingTarget(testBean, "foo"); target.afterPropertiesSet(); QueueChannel channel = new QueueChannel(); channel.setBeanName("channel"); + context.getBeanFactory().registerSingleton("channel", channel); Message message = new GenericMessage("testing"); channel.send(message); assertNull(queue.poll()); - MessageBus bus = new DefaultMessageBus(); - bus.registerChannel(channel); ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(target); endpoint.setBeanName("testEndpoint"); endpoint.setInputChannel(channel); - bus.registerEndpoint(endpoint); + context.getBeanFactory().registerSingleton("testEndpoint", endpoint); + DefaultMessageBus bus = new DefaultMessageBus(); + bus.setApplicationContext(context); bus.start(); String result = queue.poll(1000, TimeUnit.MILLISECONDS); assertNotNull(result); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/message/MessageChannelTemplateTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/message/MessageChannelTemplateTests.java index 3a40686912..19ff465c6f 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/message/MessageChannelTemplateTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/message/MessageChannelTemplateTests.java @@ -27,8 +27,8 @@ import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; +import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.bus.DefaultMessageBus; -import org.springframework.integration.bus.MessageBus; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.endpoint.AbstractInOutEndpoint; @@ -50,11 +50,13 @@ public class MessageChannelTemplateTests { return new StringMessage(message.getPayload().toString().toUpperCase()); } }; - MessageBus bus = new DefaultMessageBus(); - bus.registerChannel(requestChannel); endpoint.setBeanName("testEndpoint"); - endpoint.setInputChannel(requestChannel); - bus.registerEndpoint(endpoint); + endpoint.setInputChannel(requestChannel); + GenericApplicationContext context = new GenericApplicationContext(); + context.getBeanFactory().registerSingleton("requestChannel", requestChannel); + context.getBeanFactory().registerSingleton("testEndpoint", endpoint); + DefaultMessageBus bus = new DefaultMessageBus(); + bus.setApplicationContext(context); bus.start(); }