MessageChannels no longer implement MessageTarget, and MessageEndpoints that send a reply have a setOutputChannel() method instead of setTarget().

This commit is contained in:
Mark Fisher
2008-09-05 16:36:13 +00:00
parent 63720ded3f
commit b47d81ff16
42 changed files with 304 additions and 354 deletions

View File

@@ -126,10 +126,10 @@ public class DefaultMessageBusTests {
bus.registerChannel(outputChannel2);
endpoint1.setBeanName("testEndpoint1");
endpoint1.setSource(inputChannel);
endpoint1.setTarget(outputChannel1);
endpoint1.setOutputChannel(outputChannel1);
endpoint2.setBeanName("testEndpoint2");
endpoint2.setSource(inputChannel);
endpoint2.setTarget(outputChannel2);
endpoint2.setOutputChannel(outputChannel2);
bus.registerEndpoint(endpoint1);
bus.registerEndpoint(endpoint2);
bus.start();
@@ -169,10 +169,10 @@ public class DefaultMessageBusTests {
bus.registerChannel(outputChannel2);
endpoint1.setBeanName("testEndpoint1");
endpoint1.setSource(inputChannel);
endpoint1.setTarget(outputChannel1);
endpoint1.setOutputChannel(outputChannel1);
endpoint2.setBeanName("testEndpoint2");
endpoint2.setSource(inputChannel);
endpoint2.setTarget(outputChannel2);
endpoint2.setOutputChannel(outputChannel2);
bus.registerEndpoint(endpoint1);
bus.registerEndpoint(endpoint2);
bus.start();

View File

@@ -62,7 +62,7 @@ public class DirectChannelSubscriptionTests {
MethodInvoker invoker = new MessageMappingMethodInvoker(new TestBean(), "handle");
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(invoker);
endpoint.setSource(sourceChannel);
endpoint.setTarget(targetChannel);
endpoint.setOutputChannel(targetChannel);
endpoint.setBeanName("testEndpoint");
bus.registerEndpoint(endpoint);
bus.start();
@@ -96,7 +96,7 @@ public class DirectChannelSubscriptionTests {
}
};
endpoint.setSource(sourceChannel);
endpoint.setTarget(targetChannel);
endpoint.setOutputChannel(targetChannel);
endpoint.setBeanName("testEndpoint");
bus.registerEndpoint(endpoint);
bus.start();

View File

@@ -13,7 +13,7 @@
<bean id="endpoint" class="org.springframework.integration.endpoint.ServiceActivatorEndpoint">
<constructor-arg ref="handler"/>
<property name="source" ref="sourceChannel"/>
<property name="target" ref="targetChannel"/>
<property name="outputChannel" ref="targetChannel"/>
</bean>
<bean id="handler" class="org.springframework.integration.handler.TestHandlers" factory-method="echoHandler"/>

View File

@@ -26,6 +26,7 @@ import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.FatalBeanException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.BlockingChannel;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
@@ -52,7 +53,7 @@ public class ChannelParserTests {
public void testChannelWithCapacity() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"channelParserTests.xml", this.getClass());
MessageChannel channel = (MessageChannel) context.getBean("capacityChannel");
BlockingChannel channel = (BlockingChannel) context.getBean("capacityChannel");
for (int i = 0; i < 10; i++) {
boolean result = channel.send(new GenericMessage<String>("test"), 10);
assertTrue(result);

View File

@@ -22,10 +22,10 @@ import static org.junit.Assert.assertNull;
import org.junit.Test;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.message.selector.MessageSelector;
@@ -80,13 +80,13 @@ public class WireTapTests {
@Test
public void simpleTargetWireTap() {
QueueChannel mainChannel = new QueueChannel();
TestTarget secondaryTarget = new TestTarget();
mainChannel.addInterceptor(new WireTap(secondaryTarget));
assertNull(secondaryTarget.getLastMessage());
TestChannel secondaryChannel = new TestChannel();
mainChannel.addInterceptor(new WireTap(secondaryChannel));
assertNull(secondaryChannel.getLastMessage());
Message<?> message = new StringMessage("testing");
mainChannel.send(message);
Message<?> original = mainChannel.receive(0);
Message<?> intercepted = secondaryTarget.getLastMessage();
Message<?> intercepted = secondaryChannel.getLastMessage();
assertNotNull(original);
assertNotNull(intercepted);
assertEquals(original, intercepted);
@@ -124,10 +124,14 @@ public class WireTapTests {
}
}
private static class TestTarget implements MessageTarget {
private static class TestChannel implements MessageChannel {
private volatile Message<?> lastMessage;
public String getName() {
return "testChannel";
}
public boolean send(Message<?> message) {
this.lastMessage = message;
return true;

View File

@@ -85,7 +85,7 @@ public class AggregatorParserTests {
"The AggregatorEndpoint is not injected with the appropriate CompletionStrategy instance",
completionStrategy, accessor.getPropertyValue("completionStrategy"));
Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate output channel",
outputChannel, accessor.getPropertyValue("target"));
outputChannel, accessor.getPropertyValue("outputChannel"));
Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate discard channel",
discardChannel, accessor.getPropertyValue("discardChannel"));
Assert.assertEquals("The AggregatorEndpoint is not set with the appropriate timeout value",

View File

@@ -72,7 +72,7 @@ public class ResequencerParserTests {
public void testDefaultResequencerProperties() {
ResequencerEndpoint endpoint = (ResequencerEndpoint) context.getBean("defaultResequencer");
DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint);
Assert.assertNull(accessor.getPropertyValue("target"));
Assert.assertNull(accessor.getPropertyValue("outputChannel"));
Assert.assertNull(accessor.getPropertyValue("discardChannel"));
Assert.assertEquals("The ResequencerEndpoint is not set with the appropriate timeout value",
1000l, accessor.getPropertyValue("sendTimeout"));
@@ -97,7 +97,7 @@ public class ResequencerParserTests {
MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel");
DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint);
Assert.assertEquals("The ResequencerEndpoint is not injected with the appropriate output channel",
outputChannel, accessor.getPropertyValue("target"));
outputChannel, accessor.getPropertyValue("outputChannel"));
Assert.assertEquals("The ResequencerEndpoint is not injected with the appropriate discard channel",
discardChannel, accessor.getPropertyValue("discardChannel"));
Assert.assertEquals("The ResequencerEndpoint is not set with the appropriate timeout value",

View File

@@ -27,6 +27,7 @@ import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.interceptor.WireTap;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
@@ -40,12 +41,12 @@ public class WireTapParserTests {
public void simpleWireTap() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"wireTapParserTests.xml", this.getClass());
MessageChannel channel = (MessageChannel) context.getBean("simple");
WireTapTestTarget target = (WireTapTestTarget) context.getBean("testTarget");
assertNull(target.getLastMessage());
MessageChannel mainChannel = (MessageChannel) context.getBean("noSelectors");
PollableChannel wireTapChannel = (PollableChannel) context.getBean("wireTapChannel");
assertNull(wireTapChannel.receive(0));
Message<?> original = new StringMessage("test");
channel.send(original);
Message<?> intercepted = target.getLastMessage();
mainChannel.send(original);
Message<?> intercepted = wireTapChannel.receive(0);
assertNotNull(intercepted);
assertEquals(original, intercepted);
}
@@ -54,12 +55,12 @@ public class WireTapParserTests {
public void wireTapWithAcceptingSelector() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"wireTapParserTests.xml", this.getClass());
MessageChannel channel = (MessageChannel) context.getBean("accepting");
WireTapTestTarget target = (WireTapTestTarget) context.getBean("testTarget");
assertNull(target.getLastMessage());
MessageChannel mainChannel = (MessageChannel) context.getBean("accepting");
PollableChannel wireTapChannel = (PollableChannel) context.getBean("wireTapChannel");
assertNull(wireTapChannel.receive(0));
Message<?> original = new StringMessage("test");
channel.send(original);
Message<?> intercepted = target.getLastMessage();
mainChannel.send(original);
Message<?> intercepted = wireTapChannel.receive(0);
assertNotNull(intercepted);
assertEquals(original, intercepted);
}
@@ -68,12 +69,12 @@ public class WireTapParserTests {
public void wireTapWithRejectingSelector() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"wireTapParserTests.xml", this.getClass());
MessageChannel channel = (MessageChannel) context.getBean("rejecting");
WireTapTestTarget target = (WireTapTestTarget) context.getBean("testTarget");
assertNull(target.getLastMessage());
MessageChannel mainChannel = (MessageChannel) context.getBean("rejecting");
PollableChannel wireTapChannel = (PollableChannel) context.getBean("wireTapChannel");
assertNull(wireTapChannel.receive(0));
Message<?> original = new StringMessage("test");
channel.send(original);
Message<?> intercepted = target.getLastMessage();
mainChannel.send(original);
Message<?> intercepted = wireTapChannel.receive(0);
assertNull(intercepted);
}

View File

@@ -1,39 +0,0 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.config;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageTarget;
/**
* @author Mark Fisher
*/
public class WireTapTestTarget implements MessageTarget {
private volatile Message<?> lastMessage;
public Message<?> getLastMessage() {
return this.lastMessage;
}
public boolean send(Message<?> message) {
this.lastMessage= message;
return true;
}
}

View File

@@ -41,20 +41,17 @@ public class AggregatorAnnotationTests {
ApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "classpath:/org/springframework/integration/config/annotation/testAnnotatedAggregator.xml" });
final String endpointName = "endpointWithDefaultAnnotation";
DirectFieldAccessor aggregatingMessageHandlerAccessor = getDirectFieldAccessorForAggregatingHandler(context,
DirectFieldAccessor accessor = getDirectFieldAccessorForAggregatingHandler(context,
endpointName);
Assert.assertTrue(aggregatingMessageHandlerAccessor.getPropertyValue("completionStrategy") instanceof SequenceSizeCompletionStrategy);
Assert.assertNull(aggregatingMessageHandlerAccessor.getPropertyValue("target"));
Assert.assertNull(aggregatingMessageHandlerAccessor.getPropertyValue("discardChannel"));
Assert.assertEquals(AggregatorEndpoint.DEFAULT_SEND_TIMEOUT, aggregatingMessageHandlerAccessor
.getPropertyValue("sendTimeout"));
Assert.assertEquals(AggregatorEndpoint.DEFAULT_TIMEOUT, aggregatingMessageHandlerAccessor
.getPropertyValue("timeout"));
Assert.assertEquals(false, aggregatingMessageHandlerAccessor.getPropertyValue("sendPartialResultOnTimeout"));
Assert.assertEquals(AggregatorEndpoint.DEFAULT_REAPER_INTERVAL, aggregatingMessageHandlerAccessor
.getPropertyValue("reaperInterval"));
Assert.assertTrue(accessor.getPropertyValue("completionStrategy") instanceof SequenceSizeCompletionStrategy);
Assert.assertNull(accessor.getPropertyValue("outputChannel"));
Assert.assertNull(accessor.getPropertyValue("discardChannel"));
Assert.assertEquals(AggregatorEndpoint.DEFAULT_SEND_TIMEOUT, accessor.getPropertyValue("sendTimeout"));
Assert.assertEquals(AggregatorEndpoint.DEFAULT_TIMEOUT, accessor.getPropertyValue("timeout"));
Assert.assertEquals(false, accessor.getPropertyValue("sendPartialResultOnTimeout"));
Assert.assertEquals(AggregatorEndpoint.DEFAULT_REAPER_INTERVAL, accessor.getPropertyValue("reaperInterval"));
Assert.assertEquals(AggregatorEndpoint.DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY,
aggregatingMessageHandlerAccessor.getPropertyValue("trackedCorrelationIdCapacity"));
accessor.getPropertyValue("trackedCorrelationIdCapacity"));
}
@Test
@@ -62,22 +59,15 @@ public class AggregatorAnnotationTests {
ApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "classpath:/org/springframework/integration/config/annotation/testAnnotatedAggregator.xml" });
final String endpointName = "endpointWithCustomizedAnnotation";
DirectFieldAccessor aggregatingMessageHandlerAccessor = getDirectFieldAccessorForAggregatingHandler(context,
endpointName);
Assert.assertTrue(aggregatingMessageHandlerAccessor.getPropertyValue("completionStrategy") instanceof SequenceSizeCompletionStrategy);
Assert.assertEquals(getMessageBus(context).lookupChannel("outputChannel"), aggregatingMessageHandlerAccessor
.getPropertyValue("target"));
Assert.assertEquals(getMessageBus(context).lookupChannel("discardChannel"), aggregatingMessageHandlerAccessor
.getPropertyValue("discardChannel"));
Assert.assertEquals(98765432l, aggregatingMessageHandlerAccessor
.getPropertyValue("sendTimeout"));
Assert.assertEquals(4567890l, aggregatingMessageHandlerAccessor
.getPropertyValue("timeout"));
Assert.assertEquals(true, aggregatingMessageHandlerAccessor.getPropertyValue("sendPartialResultOnTimeout"));
Assert.assertEquals(1234l, aggregatingMessageHandlerAccessor
.getPropertyValue("reaperInterval"));
Assert.assertEquals(42,
aggregatingMessageHandlerAccessor.getPropertyValue("trackedCorrelationIdCapacity"));
DirectFieldAccessor accessor = getDirectFieldAccessorForAggregatingHandler(context, endpointName);
Assert.assertTrue(accessor.getPropertyValue("completionStrategy") instanceof SequenceSizeCompletionStrategy);
Assert.assertEquals(getMessageBus(context).lookupChannel("outputChannel"), accessor.getPropertyValue("outputChannel"));
Assert.assertEquals(getMessageBus(context).lookupChannel("discardChannel"), accessor.getPropertyValue("discardChannel"));
Assert.assertEquals(98765432l, accessor.getPropertyValue("sendTimeout"));
Assert.assertEquals(4567890l, accessor.getPropertyValue("timeout"));
Assert.assertEquals(true, accessor.getPropertyValue("sendPartialResultOnTimeout"));
Assert.assertEquals(1234l, accessor.getPropertyValue("reaperInterval"));
Assert.assertEquals(42, accessor.getPropertyValue("trackedCorrelationIdCapacity"));
}
@Test

View File

@@ -7,36 +7,38 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<channel id="simple">
<channel id="wireTapChannel">
<queue capacity="1"/>
</channel>
<channel id="noSelectors">
<queue capacity="10"/>
<interceptors>
<wire-tap target="testTarget"/>
<wire-tap target="wireTapChannel"/>
</interceptors>
</channel>
<channel id="accepting">
<queue capacity="10"/>
<interceptors>
<wire-tap target="testTarget" selector="acceptingSelector"/>
<wire-tap target="wireTapChannel" selector="acceptingSelector"/>
</interceptors>
</channel>
<channel id="rejecting">
<queue capacity="10"/>
<interceptors>
<wire-tap target="testTarget" selector="rejectingSelector"/>
<wire-tap target="wireTapChannel" selector="rejectingSelector"/>
</interceptors>
</channel>
<channel id="timeout">
<queue capacity="10"/>
<interceptors>
<wire-tap target="testTarget" timeout="1234"/>
<wire-tap target="wireTapChannel" timeout="1234"/>
</interceptors>
</channel>
<beans:bean id="testTarget" class="org.springframework.integration.config.WireTapTestTarget"/>
<beans:bean id="acceptingSelector" class="org.springframework.integration.config.TestSelector">
<beans:constructor-arg value="true"/>
</beans:bean>

View File

@@ -25,14 +25,10 @@ import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.easymock.IAnswer;
import org.junit.Before;
@@ -213,57 +209,6 @@ public class BroadcastingDispatcherTests {
verify(globalMocks);
}
@Test(timeout = 500)
public void multipleTargetsPartialTimeout() throws Exception {
reset(taskExecutorMock);
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
dispatcher.setSendTimeout(50);
// three threads invoking targets
final CountDownLatch latch = new CountDownLatch(3);
threadedExecutorMock(3);
final AtomicBoolean timingOutStarted = new AtomicBoolean(false);
final AtomicBoolean testNotTimedOut = new AtomicBoolean(false);
expect(targetMock1.send(messageMock)).andAnswer(new IAnswer<Boolean>() {
public Boolean answer() throws Throwable {
latch.countDown();
return true;
}
});
expect(targetMock2.send(messageMock)).andAnswer(new IAnswer<Boolean>() {
public Boolean answer() throws Throwable {
latch.countDown();
return true;
}
});
/*
* Watch out, this is tricky. The send() method will be invoked but due
* to the faked time out it will never return. Therefore the expectation
* needs to be there, but during the verify it will be called 0 times.
* This is something that EasyMock doesn't support so I've worked around
* it with an AtomicBoolean and a latch. It isn't pretty, but it sort of works
*/
expect(targetMock3.send(messageMock)).andAnswer(new IAnswer<Boolean>() {
public Boolean answer() throws Throwable {
// this should happen
timingOutStarted.compareAndSet(false, true);
latch.countDown();
// cause timeout here
Thread.sleep(1000);
testNotTimedOut.compareAndSet(false, true);
//in a long running suite this will run until the end, but the test will already be over
return true;
}
}).anyTimes();
replay(globalMocks);
dispatcher.send(messageMock);
latch.await();
verify(globalMocks);
assertFalse("Test not timed out properly", testNotTimedOut.get());
assertTrue("Timing out Runnable not executed", timingOutStarted.get());
}
@Test
public void applySequenceDisabledByDefault() {
BroadcastingDispatcher dispatcher = new BroadcastingDispatcher();
@@ -329,21 +274,6 @@ public class BroadcastingDispatcherTests {
}
}
/*
* expect count calls to the taskExecutorMock.execute and have them run the runnable
* in a new Thread.
*/
private void threadedExecutorMock(int count) {
taskExecutorMock.execute(isA(Runnable.class));
expectLastCall().andAnswer(new IAnswer<Object>() {
public Object answer() throws Throwable {
final Runnable runnable = (Runnable) getCurrentArguments()[0];
new Thread(runnable).start();
return null;
}
}).times(count);
}
private static class MessageStoringTestTarget implements MessageTarget {

View File

@@ -40,7 +40,12 @@ public class MessagingBridgeTests {
public void simplePassThrough() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
DefaultMessageBus bus = new DefaultMessageBus();
MessagingBridge bridge = new MessagingBridge();
MessagingBridge bridge = new MessagingBridge(new MessageTarget() {
public boolean send(Message<?> message) {
latch.countDown();
return true;
}
});
bridge.setBeanName("bridge");
PollableSource<String> source = new PollableSource<String>() {
public Message<String> receive() {
@@ -50,12 +55,6 @@ public class MessagingBridgeTests {
PollingDispatcher poller = new PollingDispatcher(source, new PollingSchedule(1000));
poller.setMaxMessagesPerPoll(1);
bridge.setSource(poller);
bridge.setTarget(new MessageTarget() {
public boolean send(Message<?> message) {
latch.countDown();
return true;
}
});
bus.registerEndpoint(bridge);
bus.start();
latch.await(1, TimeUnit.SECONDS);

View File

@@ -52,7 +52,7 @@ public class ServiceActivatorEndpointTests {
public void outputChannel() {
QueueChannel channel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = this.createEndpoint();
endpoint.setTarget(channel);
endpoint.setOutputChannel(channel);
Message<?> message = MessageBuilder.fromPayload("foo").build();
endpoint.send(message);
Message<?> reply = channel.receive(0);
@@ -65,7 +65,7 @@ public class ServiceActivatorEndpointTests {
QueueChannel channel1 = new QueueChannel(1);
QueueChannel channel2 = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = this.createEndpoint();
endpoint.setTarget(channel1);
endpoint.setOutputChannel(channel1);
Message<?> message = MessageBuilder.fromPayload("foo").setReturnAddress(channel2).build();
endpoint.send(message);
Message<?> reply1 = channel1.receive(0);
@@ -157,7 +157,7 @@ public class ServiceActivatorEndpointTests {
QueueChannel channel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(
new TestNullReplyBean(), "handle");
endpoint.setTarget(channel);
endpoint.setOutputChannel(channel);
Message<?> message = MessageBuilder.fromPayload("foo").build();
endpoint.send(message);
assertNull(channel.receive(0));
@@ -169,7 +169,7 @@ public class ServiceActivatorEndpointTests {
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(
new TestNullReplyBean(), "handle");
endpoint.setRequiresReply(true);
endpoint.setTarget(channel);
endpoint.setOutputChannel(channel);
Message<?> message = MessageBuilder.fromPayload("foo").build();
endpoint.send(message);
}

View File

@@ -28,10 +28,10 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.StringMessage;
/**
@@ -87,7 +87,7 @@ public class GatewayProxyFactoryBeanTests {
public void run() {
Message<?> input = requestChannel.receive();
StringMessage response = new StringMessage(input.getPayload() + "456");
((MessageTarget) input.getHeaders().getReturnAddress()).send(response);
((MessageChannel) input.getHeaders().getReturnAddress()).send(response);
}
}).start();
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean();
@@ -174,7 +174,7 @@ public class GatewayProxyFactoryBeanTests {
public void run() {
Message<?> input = requestChannel.receive();
StringMessage response = new StringMessage(input.getPayload() + "bar");
((MessageTarget) input.getHeaders().getReturnAddress()).send(response);
((MessageChannel) input.getHeaders().getReturnAddress()).send(response);
}
}).start();
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean();
@@ -218,7 +218,7 @@ public class GatewayProxyFactoryBeanTests {
public void run() {
Message<?> input = requestChannel.receive();
StringMessage response = new StringMessage(input.getPayload() + "bar");
((MessageTarget) input.getHeaders().getReturnAddress()).send(response);
((MessageChannel) input.getHeaders().getReturnAddress()).send(response);
}
}).start();
}

View File

@@ -102,7 +102,7 @@ public class CorrelationIdTests {
MethodInvokingSplitter splitter = new MethodInvokingSplitter(
new TestBean(), TestBean.class.getMethod("split", String.class));
SplitterEndpoint endpoint = new SplitterEndpoint(splitter);
endpoint.setTarget(testChannel);
endpoint.setOutputChannel(testChannel);
splitter.afterPropertiesSet();
endpoint.send(message);
Message<?> reply1 = testChannel.receive(100);

View File

@@ -29,6 +29,7 @@ import org.junit.Test;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.AbstractInOutEndpoint;
@@ -69,7 +70,10 @@ public class MessageExchangeTemplateTests {
public void testSendWithReturnAddress() throws InterruptedException {
final List<String> replies = new ArrayList<String>(3);
final CountDownLatch latch = new CountDownLatch(3);
MessageTarget replyTarget = new MessageTarget() {
MessageChannel replyChannel = new MessageChannel() {
public String getName() {
return "testReplyChannel";
}
public boolean send(Message<?> replyMessage) {
replies.add((String) replyMessage.getPayload());
latch.countDown();
@@ -77,9 +81,9 @@ public class MessageExchangeTemplateTests {
}
};
MessageExchangeTemplate template = new MessageExchangeTemplate();
Message<String> message1 = MessageBuilder.fromPayload("test1").setReturnAddress(replyTarget).build();
Message<String> message2 = MessageBuilder.fromPayload("test2").setReturnAddress(replyTarget).build();
Message<String> message3 = MessageBuilder.fromPayload("test3").setReturnAddress(replyTarget).build();
Message<String> message1 = MessageBuilder.fromPayload("test1").setReturnAddress(replyChannel).build();
Message<String> message2 = MessageBuilder.fromPayload("test2").setReturnAddress(replyChannel).build();
Message<String> message3 = MessageBuilder.fromPayload("test3").setReturnAddress(replyChannel).build();
template.send(message1, this.requestChannel);
template.send(message2, this.requestChannel);
template.send(message3, this.requestChannel);