The <channel-adapter/> now actually creates a channel instance rather than requiring another distinct channel object. Instead of configuring the poller on the channel-adapter, it is currently to be configured on the consuming endpoint just as if the <channel-adapter/> were any other pollable channel (e.g. <queue-channel/>).

This commit is contained in:
Mark Fisher
2008-08-01 23:11:56 +00:00
parent 48826ec26e
commit 951226346a
41 changed files with 1193 additions and 351 deletions

View File

@@ -0,0 +1,51 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.channel.config;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
/**
* @author Mark Fisher
*/
public class ChannelAdapterFactoryBeanContextTests {
@Test
public void testPollableSourceWithoutTarget() throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"channelAdapterFactoryBeanTests.xml", this.getClass());
MessageChannel channel = (MessageChannel)
context.getBean("adapterWithPollableSourceAndNoTarget");
assertTrue(channel instanceof PollableChannel);
assertEquals("adapterWithPollableSourceAndNoTarget", channel.getName());
Message<?> reply = ((PollableChannel) channel).receive();
assertNotNull(reply);
assertEquals("test", reply.getPayload());
assertFalse(channel.send(new StringMessage("no target")));
}
}

View File

@@ -0,0 +1,99 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.channel.config;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.message.SubscribableSource;
/**
* @author Mark Fisher
*/
public class ChannelAdapterFactoryBeanTests {
@Test
public void testPollableSourceWithoutTarget() throws Exception {
ChannelAdapterFactoryBean factoryBean = new ChannelAdapterFactoryBean();
factoryBean.setBeanName("testChannel");
factoryBean.setSource(new TestPollableSource());
Object bean = factoryBean.getObject();
assertTrue(bean instanceof PollableChannel);
PollableChannel channel = (PollableChannel) bean;
Message<?> reply = channel.receive();
assertNotNull(reply);
assertEquals("test", reply.getPayload());
assertFalse(channel.send(new StringMessage("no target")));
}
@Test
public void testSubscribableSourceWithoutTarget() throws Exception {
ChannelAdapterFactoryBean factoryBean = new ChannelAdapterFactoryBean();
factoryBean.setBeanName("testChannel");
TestSubscribableSource source = new TestSubscribableSource();
factoryBean.setSource(source);
Object bean = factoryBean.getObject();
assertTrue(bean instanceof MessageChannel);
assertTrue(bean instanceof SubscribableSource);
MessageChannel channel = (MessageChannel) bean;
final AtomicReference<Message<?>> messageReference = new AtomicReference<Message<?>>();
((SubscribableSource) bean).subscribe(new MessageTarget() {
public boolean send(Message<?> message) {
messageReference.set(message);
return true;
}
});
source.publishMessage(new StringMessage("foo"));
assertNotNull(messageReference.get());
assertEquals("foo", (messageReference.get()).getPayload());
assertFalse(channel.send(new StringMessage("no target")));
}
@Test
public void testTargetOnly() throws Exception {
ChannelAdapterFactoryBean factoryBean = new ChannelAdapterFactoryBean();
factoryBean.setBeanName("testChannel");
final AtomicReference<Message<?>> messageRef = new AtomicReference<Message<?>>();
factoryBean.setTarget(new MessageTarget() {
public boolean send(Message<?> message) {
messageRef.set(message);
return true;
}
});
Object bean = factoryBean.getObject();
assertTrue(bean instanceof PollableChannel);
PollableChannel channel = (PollableChannel) bean;
Message<?> reply = channel.receive(0);
assertNull(reply);
assertTrue(channel.send(new StringMessage("hello")));
assertNotNull(messageRef.get());
assertEquals("hello", messageRef.get().getPayload());
}
}

View File

@@ -138,10 +138,10 @@ public class ChannelParserTests {
}
@Test
public void testChannelInteceptors() {
public void testChannelInteceptorRef() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"channelInterceptorParserTests.xml", this.getClass());
PollableChannel channel = (PollableChannel) context.getBean("channel");
PollableChannel channel = (PollableChannel) context.getBean("channelWithInterceptorRef");
TestChannelInterceptor interceptor = (TestChannelInterceptor) context.getBean("interceptor");
assertEquals(0, interceptor.getSendCount());
channel.send(new StringMessage("test"));
@@ -151,6 +151,16 @@ public class ChannelParserTests {
assertEquals(1, interceptor.getReceiveCount());
}
@Test
public void testChannelInteceptorInnerBean() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"channelInterceptorParserTests.xml", this.getClass());
PollableChannel channel = (PollableChannel) context.getBean("channelWithInterceptorInnerBean");
channel.send(new StringMessage("test"));
Message<?> transformed = channel.receive(1000);
assertEquals("TEST", transformed.getPayload());
}
@Test
public void testPriorityChannelWithDefaultComparator() {
ApplicationContext context = new ClassPathXmlApplicationContext(
@@ -215,6 +225,7 @@ public class ChannelParserTests {
}
@SuppressWarnings("unchecked")
public static MessageChannel extractProxifiedChannel (Object channelProxy) {
InvocationHandler handler = Proxy.getInvocationHandler(channelProxy);
DirectFieldAccessor handlerAccessor = new DirectFieldAccessor(handler);

View File

@@ -0,0 +1,32 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.channel.config;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.PollableSource;
import org.springframework.integration.message.StringMessage;
/**
* @author Mark Fisher
*/
public class TestPollableSource implements PollableSource<String> {
public Message<String> receive() {
return new StringMessage("test");
}
}

View File

@@ -0,0 +1,48 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.channel.config;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.SubscribableSource;
/**
* @author Mark Fisher
*/
public class TestSubscribableSource implements SubscribableSource {
private final List<MessageTarget> targets = new CopyOnWriteArrayList<MessageTarget>();
public boolean subscribe(MessageTarget target) {
return this.targets.add(target);
}
public boolean unsubscribe(MessageTarget target) {
return this.targets.remove(target);
}
public void publishMessage(Message<?> message) {
for (MessageTarget target : this.targets) {
target.send(message);
}
}
}

View File

@@ -0,0 +1,42 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.channel.config;
import java.util.ArrayList;
import java.util.List;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageTarget;
/**
* @author Mark Fisher
*/
public class TestTarget implements MessageTarget {
private final List<Message<?>> receivedMessages = new ArrayList<Message<?>>();
public List<Message<?>> getReceivedMessages() {
return this.receivedMessages;
}
public boolean send(Message<?> message) {
this.receivedMessages.add(message);
return true;
}
}

View File

@@ -0,0 +1,32 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.channel.config;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
/**
* @author Mark Fisher
*/
public class TestTransformer implements MessageHandler {
public Message<?> handle(Message<?> message) {
return MessageBuilder.fromPayload(message.getPayload().toString().toUpperCase()).build();
}
}

View File

@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:si="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<bean id="adapterWithPollableSourceAndNoTarget" class="org.springframework.integration.channel.config.ChannelAdapterFactoryBean">
<property name="source" ref="pollableSource"/>
</bean>
<bean id="adapterWithPollableSourceAndTarget" class="org.springframework.integration.channel.config.ChannelAdapterFactoryBean">
<property name="source" ref="pollableSource"/>
<property name="target" ref="testTarget"/>
</bean>
<bean id="pollableSource" class="org.springframework.integration.channel.config.TestPollableSource"/>
<bean id="testTarget" class="org.springframework.integration.channel.config.TestTarget"/>
</beans>

View File

@@ -7,8 +7,20 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<channel id="channel">
<interceptor ref="interceptor"/>
<channel id="channelWithInterceptorRef">
<interceptors>
<ref bean="interceptor"/>
</interceptors>
</channel>
<channel id="channelWithInterceptorInnerBean">
<interceptors>
<beans:bean class="org.springframework.integration.transformer.MessageTransformingChannelInterceptor">
<beans:constructor-arg>
<beans:bean class="org.springframework.integration.channel.config.TestTransformer"/>
</beans:constructor-arg>
</beans:bean>
</interceptors>
</channel>
<beans:bean id="interceptor" class="org.springframework.integration.config.TestChannelInterceptor"/>

View File

@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<message-bus/>
<channel-adapter id="targetOnly" target="target"/>
<channel-adapter id="targetWithDatatype" target="target" datatype="java.lang.String"/>
<beans:bean id="target" class="org.springframework.integration.config.TestTarget"/>
</beans:beans>

View File

@@ -0,0 +1,60 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.config;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.PollableChannelAdapter;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.StringMessage;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
/**
* @author Mark Fisher
*/
@ContextConfiguration
public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests {
@Test
public void targetOnly() {
Object bean = this.applicationContext.getBean("targetOnly");
assertTrue(bean instanceof MessageChannel);
assertTrue(bean instanceof PollableChannel);
assertTrue(bean instanceof PollableChannelAdapter);
MessageChannel channel = (MessageChannel) bean;
assertTrue(channel.send(new StringMessage("test")));
}
@Test
public void targetWithDatatypeAccepts() {
MessageChannel channel = (MessageChannel) this.applicationContext.getBean("targetWithDatatype");
assertTrue(channel.send(new StringMessage("test")));
}
@Test(expected = MessageDeliveryException.class)
public void targetWithDatatypeRejects() {
MessageChannel channel = (MessageChannel) this.applicationContext.getBean("targetWithDatatype");
channel.send(new GenericMessage<Integer>(123));
}
}

View File

@@ -21,13 +21,11 @@ import static org.junit.Assert.assertEquals;
import java.util.List;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.endpoint.EndpointInterceptor;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.endpoint.SourceEndpoint;
import org.springframework.integration.endpoint.TriggerMessage;
import org.springframework.integration.message.StringMessage;
/**
@@ -39,7 +37,7 @@ public class EndpointInterceptorTests {
public void testHandlerEndpointWithBeanInterceptors() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"endpointInterceptorTests.xml", this.getClass());
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerEndpointWithBeanInterceptors");
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("endpointWithBeanInterceptors");
testInterceptors(endpoint, context, true);
}
@@ -47,39 +45,7 @@ public class EndpointInterceptorTests {
public void testHandlerEndpointWithRefInterceptors() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"endpointInterceptorTests.xml", this.getClass());
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerEndpointWithRefInterceptors");
testInterceptors(endpoint, context, false);
}
@Test
public void testTargetEndpointWithBeanInterceptors() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"endpointInterceptorTests.xml", this.getClass());
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("targetEndpointWithBeanInterceptors");
testInterceptors(endpoint, context, true);
}
@Test
public void testTargetEndpointWithRefInterceptors() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"endpointInterceptorTests.xml", this.getClass());
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("targetEndpointWithRefInterceptors");
testInterceptors(endpoint, context, false);
}
@Test
public void testSourceEndpointWithBeanInterceptors() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"endpointInterceptorTests.xml", this.getClass());
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("sourceEndpointWithBeanInterceptors");
testInterceptors(endpoint, context, true);
}
@Test
public void testSourceEndpointWithRefInterceptors() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"endpointInterceptorTests.xml", this.getClass());
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("sourceEndpointWithRefInterceptors");
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("endpointWithRefInterceptors");
testInterceptors(endpoint, context, false);
}
@@ -100,14 +66,7 @@ public class EndpointInterceptorTests {
}
assertEquals(0, preInterceptor.getCount());
assertEquals(0, aroundInterceptor.getCount());
if (endpoint instanceof SourceEndpoint) {
MessageChannel channel = (MessageChannel) context.getBean("testChannel");
channel.send(new StringMessage("foo"));
endpoint.send(new TriggerMessage());
}
else {
endpoint.send(new StringMessage("test"));
}
endpoint.send(new StringMessage("test"));
assertEquals(1, preInterceptor.getCount());
assertEquals(2, aroundInterceptor.getCount());
context.stop();

View File

@@ -13,7 +13,7 @@
<channel id="replyChannel"/>
<service-activator id="handlerEndpointWithBeanInterceptors"
<service-activator id="endpointWithBeanInterceptors"
input-channel="testChannel"
ref="testHandler"
output-channel="replyChannel">
@@ -24,7 +24,7 @@
</interceptors>
</service-activator>
<service-activator id="handlerEndpointWithRefInterceptors"
<service-activator id="endpointWithRefInterceptors"
input-channel="testChannel"
ref="testHandler"
output-channel="replyChannel">
@@ -35,52 +35,9 @@
</interceptors>
</service-activator>
<channel-adapter id="targetEndpointWithBeanInterceptors"
channel="testChannel"
target="testTarget">
<schedule period="100"/>
<interceptors>
<beans:bean class="org.springframework.integration.config.TestPreSendInterceptor"/>
<beans:bean class="org.springframework.integration.config.TestAroundSendEndpointInterceptor"/>
</interceptors>
</channel-adapter>
<channel-adapter id="targetEndpointWithRefInterceptors"
channel="testChannel"
target="testTarget">
<schedule period="100"/>
<interceptors>
<ref bean="preInterceptor"/>
<ref bean="aroundInterceptor"/>
</interceptors>
</channel-adapter>
<channel-adapter id="sourceEndpointWithBeanInterceptors"
source="testSource"
channel="testChannel">
<schedule period="100"/>
<interceptors>
<beans:bean class="org.springframework.integration.config.TestPreSendInterceptor"/>
<beans:bean class="org.springframework.integration.config.TestAroundSendEndpointInterceptor"/>
</interceptors>
</channel-adapter>
<channel-adapter id="sourceEndpointWithRefInterceptors"
source="testSource"
channel="testChannel">
<schedule period="100"/>
<interceptors>
<ref bean="preInterceptor"/>
<ref bean="aroundInterceptor"/>
</interceptors>
</channel-adapter>
<beans:bean id="testHandler" class="org.springframework.integration.config.TestHandler"/>
<beans:bean id="testSource" class="org.springframework.integration.config.TestSource"/>
<beans:bean id="testTarget" class="org.springframework.integration.config.TestTarget"/>
<beans:bean id="preInterceptor" class="org.springframework.integration.config.TestPreSendInterceptor"/>
<beans:bean id="aroundInterceptor" class="org.springframework.integration.config.TestAroundSendEndpointInterceptor"/>

View File

@@ -12,7 +12,9 @@
<channel id="requestChannel"/>
<channel id="replyChannel">
<interceptor ref="interceptor"/>
<interceptors>
<ref bean="interceptor"/>
</interceptors>
</channel>
<service-activator ref="handler" input-channel="requestChannel"/>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2007 the original author or authors.
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,13 +21,15 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
/**
* @author Mark Fisher
@@ -35,31 +37,24 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
public class MethodInvokingAdapterTests {
@Test
public void testAdaptersWithBeanDefinitions() throws IOException, InterruptedException {
public void testMethodInvokingSourceChannelAdapter() throws IOException, InterruptedException {
AbstractApplicationContext context = new ClassPathXmlApplicationContext("adapterTests.xml", this.getClass());
TestSink sink = (TestSink) context.getBean("sink");
CountDownLatch latch = new CountDownLatch(1);
sink.setLatch(latch);
assertNull(sink.get());
context.start();
latch.await(3000, TimeUnit.MILLISECONDS);
assertEquals("latch should have counted down within allotted time", 0, latch.getCount());
assertNotNull(sink.get());
context.close();
PollableChannel channel = (PollableChannel) context.getBean("sourceChannelAdapter");
Message<?> message = channel.receive(1000);
assertNotNull(message);
assertEquals("foo", message.getPayload());
}
@Test
public void testAdaptersWithNamespace() throws IOException, InterruptedException {
AbstractApplicationContext context = new ClassPathXmlApplicationContext("adapterTestsWithNamespace.xml", this.getClass());
public void testMethodInvokingTargetChannelAdapter() throws IOException, InterruptedException {
AbstractApplicationContext context = new ClassPathXmlApplicationContext("adapterTests.xml", this.getClass());
MessageChannel channel = (MessageChannel) context.getBean("targetChannelAdapter");
TestSink sink = (TestSink) context.getBean("sink");
CountDownLatch latch = new CountDownLatch(1);
sink.setLatch(latch);
assertNull(sink.get());
context.start();
latch.await(3000, TimeUnit.MILLISECONDS);
assertEquals("latch should have counted down within allotted time", 0, latch.getCount());
assertNotNull(sink.get());
context.close();
channel.send(new StringMessage("foo"));
String result = sink.get();
assertNotNull(result);
assertEquals("foo", result);
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2007 the original author or authors.
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,8 +16,6 @@
package org.springframework.integration.handler;
import java.util.concurrent.CountDownLatch;
/**
* @author Mark Fisher
*/
@@ -25,12 +23,6 @@ public class TestSink {
private String result;
private CountDownLatch latch;
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
public void validMethod(String s) {
}
@@ -44,9 +36,6 @@ public class TestSink {
public void store(String s) {
this.result = s;
if (this.latch != null) {
this.latch.countDown();
}
}
public String get() {

View File

@@ -21,8 +21,6 @@ package org.springframework.integration.handler;
*/
public class TestSource {
private boolean fooCalled;
public String validMethod() {
return "valid";
}
@@ -35,16 +33,7 @@ public class TestSource {
}
public String foo() {
if (this.fooCalled) {
try {
Thread.sleep(5000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
this.fooCalled = true;
return "bar";
return "foo";
}
}

View File

@@ -4,37 +4,24 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<bean id="bus" class="org.springframework.integration.bus.DefaultMessageBus">
<property name="autoStartup" value="false"/>
<bean id="sourceChannelAdapter" class="org.springframework.integration.channel.config.ChannelAdapterFactoryBean">
<property name="source" ref="source"/>
</bean>
<bean id="channel" class="org.springframework.integration.channel.QueueChannel"/>
<bean id="sourceEndpoint" class="org.springframework.integration.endpoint.SourceEndpoint">
<constructor-arg>
<bean class="org.springframework.integration.message.MethodInvokingSource">
<property name="object">
<bean class="org.springframework.integration.handler.TestSource"/>
</property>
<property name="methodName" value="foo"/>
</bean>
</constructor-arg>
<property name="target" ref="channel"/>
<property name="schedule">
<bean class="org.springframework.integration.scheduling.PollingSchedule">
<constructor-arg value="1000"/>
</bean>
<bean id="source" class="org.springframework.integration.message.MethodInvokingSource">
<property name="object">
<bean class="org.springframework.integration.handler.TestSource"/>
</property>
<property name="methodName" value="foo"/>
</bean>
<bean id="targetChannelAdapter" class="org.springframework.integration.channel.config.ChannelAdapterFactoryBean">
<property name="target" ref="target"/>
</bean>
<bean id="target" class="org.springframework.integration.handler.MethodInvokingTarget">
<property name="object" ref="sink"/>
<property name="methodName" value="store"/>
</bean>
<bean id="targetEndpoint" class="org.springframework.integration.endpoint.TargetEndpoint">
<constructor-arg ref="target"/>
<property name="source" ref="channel"/>
</bean>
<bean id="sink" class="org.springframework.integration.handler.TestSink"/>