Channel Adapters are now endpoints, but if no "channel" attribute if provided for a <channel-adapter/> element, a DirectChannel will be created automatically. The <poller/> sub-element now belongs within the <channel-adapter/> (not the consumer endpoint downstream). This enables support for multiple Channel Adapters to share a MessageChannel. Also, the @Poller annotation belongs at class-level along with @ChannelAdapter if a @Pollable method is being adapted via MethodInvokingSource.

This commit is contained in:
Mark Fisher
2008-08-17 22:37:35 +00:00
parent c48fa5a361
commit 9b85225675
36 changed files with 701 additions and 802 deletions

View File

@@ -30,10 +30,10 @@ import org.springframework.beans.factory.BeanCreationException;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.ChannelRegistry;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.PollableChannelAdapter;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.DefaultEndpoint;
import org.springframework.integration.endpoint.InboundChannelAdapter;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.message.ErrorMessage;
import org.springframework.integration.message.GenericMessage;
@@ -223,17 +223,10 @@ public class DefaultMessageBusTests {
public void testErrorChannelWithFailedDispatch() throws InterruptedException {
MessageBus bus = new DefaultMessageBus();
CountDownLatch latch = new CountDownLatch(1);
PollableChannelAdapter channelAdapter = new PollableChannelAdapter(
"testChannel", new FailingSource(latch), null);
MessageHandler handler = new MessageHandler() {
public Message<?> handle(Message<?> message) {
return message;
}
};
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(handler);
endpoint.setBeanName("testEndpoint");
endpoint.setSource(channelAdapter);
bus.registerEndpoint(endpoint);
InboundChannelAdapter channelAdapter = new InboundChannelAdapter();
channelAdapter.setSource(new FailingSource(latch));
channelAdapter.setBeanName("testChannel");
bus.registerEndpoint(channelAdapter);
bus.start();
latch.await(2000, TimeUnit.MILLISECONDS);
Message<?> message = ((PollableChannel) bus.getErrorChannel()).receive(5000);

View File

@@ -1,51 +0,0 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.channel.config;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
/**
* @author Mark Fisher
*/
public class ChannelAdapterFactoryBeanContextTests {
@Test
public void testPollableSourceWithoutTarget() throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"channelAdapterFactoryBeanTests.xml", this.getClass());
MessageChannel channel = (MessageChannel)
context.getBean("adapterWithPollableSourceAndNoTarget");
assertTrue(channel instanceof PollableChannel);
assertEquals("adapterWithPollableSourceAndNoTarget", channel.getName());
Message<?> reply = ((PollableChannel) channel).receive();
assertNotNull(reply);
assertEquals("test", reply.getPayload());
assertFalse(channel.send(new StringMessage("no target")));
}
}

View File

@@ -1,99 +0,0 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.channel.config;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.message.SubscribableSource;
/**
* @author Mark Fisher
*/
public class ChannelAdapterFactoryBeanTests {
@Test
public void testPollableSourceWithoutTarget() throws Exception {
ChannelAdapterFactoryBean factoryBean = new ChannelAdapterFactoryBean();
factoryBean.setBeanName("testChannel");
factoryBean.setSource(new TestPollableSource());
Object bean = factoryBean.getObject();
assertTrue(bean instanceof PollableChannel);
PollableChannel channel = (PollableChannel) bean;
Message<?> reply = channel.receive();
assertNotNull(reply);
assertEquals("test", reply.getPayload());
assertFalse(channel.send(new StringMessage("no target")));
}
@Test
public void testSubscribableSourceWithoutTarget() throws Exception {
ChannelAdapterFactoryBean factoryBean = new ChannelAdapterFactoryBean();
factoryBean.setBeanName("testChannel");
TestSubscribableSource source = new TestSubscribableSource();
factoryBean.setSource(source);
Object bean = factoryBean.getObject();
assertTrue(bean instanceof MessageChannel);
assertTrue(bean instanceof SubscribableSource);
MessageChannel channel = (MessageChannel) bean;
final AtomicReference<Message<?>> messageReference = new AtomicReference<Message<?>>();
((SubscribableSource) bean).subscribe(new MessageTarget() {
public boolean send(Message<?> message) {
messageReference.set(message);
return true;
}
});
source.publishMessage(new StringMessage("foo"));
assertNotNull(messageReference.get());
assertEquals("foo", (messageReference.get()).getPayload());
assertFalse(channel.send(new StringMessage("no target")));
}
@Test
public void testTargetOnly() throws Exception {
ChannelAdapterFactoryBean factoryBean = new ChannelAdapterFactoryBean();
factoryBean.setBeanName("testChannel");
final AtomicReference<Message<?>> messageRef = new AtomicReference<Message<?>>();
factoryBean.setTarget(new MessageTarget() {
public boolean send(Message<?> message) {
messageRef.set(message);
return true;
}
});
Object bean = factoryBean.getObject();
assertTrue(bean instanceof PollableChannel);
PollableChannel channel = (PollableChannel) bean;
Message<?> reply = channel.receive(0);
assertNull(reply);
assertTrue(channel.send(new StringMessage("hello")));
assertNotNull(messageRef.get());
assertEquals("hello", messageRef.get().getPayload());
}
}

View File

@@ -1,23 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:si="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<bean id="adapterWithPollableSourceAndNoTarget" class="org.springframework.integration.channel.config.ChannelAdapterFactoryBean">
<property name="source" ref="pollableSource"/>
</bean>
<bean id="adapterWithPollableSourceAndTarget" class="org.springframework.integration.channel.config.ChannelAdapterFactoryBean">
<property name="source" ref="pollableSource"/>
<property name="target" ref="testTarget"/>
</bean>
<bean id="pollableSource" class="org.springframework.integration.channel.config.TestPollableSource"/>
<bean id="testTarget" class="org.springframework.integration.channel.config.TestTarget"/>
</beans>

View File

@@ -9,9 +9,7 @@
<message-bus/>
<channel-adapter id="targetOnly" target="target"/>
<channel-adapter id="targetWithDatatype" target="target" datatype="java.lang.String"/>
<channel-adapter id="outboundWithImplicitChannel" target="target"/>
<beans:bean id="target" class="org.springframework.integration.config.TestTarget"/>

View File

@@ -16,15 +16,18 @@
package org.springframework.integration.config;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.PollableChannelAdapter;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.endpoint.OutboundChannelAdapter;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
@@ -37,24 +40,20 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests
@Test
public void targetOnly() {
Object bean = this.applicationContext.getBean("targetOnly");
assertTrue(bean instanceof MessageChannel);
assertTrue(bean instanceof PollableChannel);
assertTrue(bean instanceof PollableChannelAdapter);
MessageChannel channel = (MessageChannel) bean;
assertTrue(channel.send(new StringMessage("test")));
}
@Test
public void targetWithDatatypeAccepts() {
MessageChannel channel = (MessageChannel) this.applicationContext.getBean("targetWithDatatype");
assertTrue(channel.send(new StringMessage("test")));
}
@Test(expected = MessageDeliveryException.class)
public void targetWithDatatypeRejects() {
MessageChannel channel = (MessageChannel) this.applicationContext.getBean("targetWithDatatype");
channel.send(new GenericMessage<Integer>(123));
String beanName = "outboundWithImplicitChannel";
Object channel = this.applicationContext.getBean(beanName);
assertTrue(channel instanceof DirectChannel);
MessageBus bus = (MessageBus) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
assertNotNull(bus.lookupChannel(beanName));
Object adapter = bus.lookupEndpoint(beanName + ".adapter");
assertNotNull(adapter);
assertTrue(adapter instanceof OutboundChannelAdapter);
TestTarget target = (TestTarget) this.applicationContext.getBean("target");
assertNull(target.getLastMessage());
Message<?> message = new StringMessage("test");
assertTrue(((MessageChannel) channel).send(message));
assertNotNull(target.getLastMessage());
assertEquals(message, target.getLastMessage());
}
}

View File

@@ -24,7 +24,15 @@ import org.springframework.integration.message.MessageTarget;
*/
public class TestTarget implements MessageTarget {
private volatile Message<?> lastMessage;
public Message<?> getLastMessage() {
return this.lastMessage;
}
public boolean send(Message<?> message) {
this.lastMessage = message;
return true;
}

View File

@@ -29,12 +29,12 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.annotation.Order;
@@ -52,6 +52,7 @@ import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.ChannelRegistry;
import org.springframework.integration.channel.ChannelRegistryAware;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.QueueChannel;
@@ -91,22 +92,24 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testSimpleHandlerWithContext() {
ApplicationContext context = new ClassPathXmlApplicationContext(
AbstractApplicationContext context = new ClassPathXmlApplicationContext(
"handlerAnnotationPostProcessorTests.xml", this.getClass());
MessageHandler handler = (MessageHandler) context.getBean("simpleHandler");
Message<?> reply = handler.handle(new StringMessage("world"));
assertEquals("hello world", reply.getPayload());
context.stop();
}
@Test
public void testSimpleHandlerEndpointWithContext() {
ApplicationContext context = new ClassPathXmlApplicationContext(
AbstractApplicationContext context = new ClassPathXmlApplicationContext(
"handlerAnnotationPostProcessorTests.xml", this.getClass());
MessageChannel inputChannel = (MessageChannel) context.getBean("inputChannel");
PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel");
inputChannel.send(new StringMessage("foo"));
Message<?> reply = outputChannel.receive(1000);
assertEquals("hello foo", reply.getPayload());
context.stop();
}
@Test
@@ -381,16 +384,27 @@ public class MessagingAnnotationPostProcessorTests {
}
@Test
public void testChannelAdapterAnnotation() {
public void testChannelAdapterAnnotation() throws InterruptedException {
MessageBus messageBus = new DefaultMessageBus();
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
postProcessor.afterPropertiesSet();
ChannelAdapterAnnotationTestBean testBean = new ChannelAdapterAnnotationTestBean();
postProcessor.postProcessAfterInitialization(testBean, "testBean");
messageBus.start();
PollableChannel testChannel = (PollableChannel) messageBus.lookupChannel("testChannel");
Message<?> message = testChannel.receive(1000);
assertEquals("test", message.getPayload());
DirectChannel testChannel = (DirectChannel) messageBus.lookupChannel("testChannel");
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Message<?>> receivedMessage = new AtomicReference<Message<?>>();
testChannel.subscribe(new org.springframework.integration.message.MessageTarget() {
public boolean send(Message<?> message) {
receivedMessage.set(message);
latch.countDown();
return false;
}
});
latch.await(3, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
assertNotNull(receivedMessage.get());
assertEquals("test", receivedMessage.get().getPayload());
messageBus.stop();
}
@@ -534,6 +548,7 @@ public class MessagingAnnotationPostProcessorTests {
@ChannelAdapter("testChannel")
@Poller(period = 1000, initialDelay = 0, maxMessagesPerPoll = 1)
private static class ChannelAdapterAnnotationTestBean {
@Pollable

View File

@@ -1,60 +0,0 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.handler;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import org.junit.Test;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
/**
* @author Mark Fisher
*/
public class MethodInvokingAdapterTests {
@Test
public void testMethodInvokingSourceChannelAdapter() throws IOException, InterruptedException {
AbstractApplicationContext context = new ClassPathXmlApplicationContext("adapterTests.xml", this.getClass());
PollableChannel channel = (PollableChannel) context.getBean("sourceChannelAdapter");
Message<?> message = channel.receive(1000);
assertNotNull(message);
assertEquals("foo", message.getPayload());
}
@Test
public void testMethodInvokingTargetChannelAdapter() throws IOException, InterruptedException {
AbstractApplicationContext context = new ClassPathXmlApplicationContext("adapterTests.xml", this.getClass());
MessageChannel channel = (MessageChannel) context.getBean("targetChannelAdapter");
TestSink sink = (TestSink) context.getBean("sink");
assertNull(sink.get());
channel.send(new StringMessage("foo"));
String result = sink.get();
assertNotNull(result);
assertEquals("foo", result);
}
}

View File

@@ -1,29 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<bean id="sourceChannelAdapter" class="org.springframework.integration.channel.config.ChannelAdapterFactoryBean">
<property name="source" ref="source"/>
</bean>
<bean id="source" class="org.springframework.integration.message.MethodInvokingSource">
<property name="object">
<bean class="org.springframework.integration.handler.TestSource"/>
</property>
<property name="methodName" value="foo"/>
</bean>
<bean id="targetChannelAdapter" class="org.springframework.integration.channel.config.ChannelAdapterFactoryBean">
<property name="target" ref="target"/>
</bean>
<bean id="target" class="org.springframework.integration.handler.MethodInvokingTarget">
<property name="object" ref="sink"/>
<property name="methodName" value="store"/>
</bean>
<bean id="sink" class="org.springframework.integration.handler.TestSink"/>
</beans>

View File

@@ -1,24 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:si="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<si:message-bus auto-startup="false"/>
<si:channel id="channel"/>
<si:channel-adapter source="source" method="foo" channel="channel">
<si:schedule period="100"/>
</si:channel-adapter>
<si:channel-adapter target="sink" method="store" channel="channel"/>
<bean id="source" class="org.springframework.integration.handler.TestSource"/>
<bean id="sink" class="org.springframework.integration.handler.TestSink"/>
</beans>