MessageEndpoint no longer extends the MessageTarget interface.

This commit is contained in:
Mark Fisher
2008-09-07 01:48:34 +00:00
parent 4eaec4ce68
commit 673d7d250b
24 changed files with 244 additions and 192 deletions

View File

@@ -25,8 +25,9 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.StringMessage;
/**
@@ -61,7 +62,7 @@ public class DirectChannelTests {
}
private static class ThreadNameExtractingTestTarget implements MessageTarget {
private static class ThreadNameExtractingTestTarget implements MessageEndpoint {
private String threadName;
@@ -83,6 +84,15 @@ public class DirectChannelTests {
}
return true;
}
// TODO: remove once this is a consumer instead of endpoint
public String getName() {
return null;
}
public MessageSource<?> getSource() {
return null;
}
}
}

View File

@@ -19,8 +19,8 @@ package org.springframework.integration.channel.config;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.SubscribableSource;
/**
@@ -28,20 +28,20 @@ import org.springframework.integration.message.SubscribableSource;
*/
public class TestSubscribableSource implements SubscribableSource {
private final List<MessageTarget> targets = new CopyOnWriteArrayList<MessageTarget>();
private final List<MessageEndpoint> endpoints = new CopyOnWriteArrayList<MessageEndpoint>();
public boolean subscribe(MessageTarget target) {
return this.targets.add(target);
public boolean subscribe(MessageEndpoint endpoint) {
return this.endpoints.add(endpoint);
}
public boolean unsubscribe(MessageTarget target) {
return this.targets.remove(target);
public boolean unsubscribe(MessageEndpoint endpoint) {
return this.endpoints.remove(endpoint);
}
public void publishMessage(Message<?> message) {
for (MessageTarget target : this.targets) {
target.send(message);
for (MessageEndpoint endpoint : this.endpoints) {
endpoint.send(message);
}
}

View File

@@ -28,11 +28,11 @@ import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageRejectedException;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.StringMessage;
/**
@@ -57,7 +57,7 @@ public class EndpointParserTests {
public void testEndpointWithSelectorAccepts() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"endpointWithSelector.xml", this.getClass());
MessageTarget endpoint = (MessageTarget) context.getBean("endpoint");
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("endpoint");
QueueChannel replyChannel = new QueueChannel();
Message<?> message = MessageBuilder.fromPayload("test")
.setReturnAddress(replyChannel).build();
@@ -71,7 +71,7 @@ public class EndpointParserTests {
public void testEndpointWithSelectorRejects() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"endpointWithSelector.xml", this.getClass());
MessageTarget endpoint = (MessageTarget) context.getBean("endpoint");
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("endpoint");
MessageChannel replyChannel = new QueueChannel();
Message<?> message = MessageBuilder.fromPayload(123)
.setReturnAddress(replyChannel).build();

View File

@@ -318,12 +318,18 @@ public class MessagingAnnotationPostProcessorTests {
DirectChannel testChannel = (DirectChannel) messageBus.lookupChannel("testChannel");
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Message<?>> receivedMessage = new AtomicReference<Message<?>>();
testChannel.subscribe(new org.springframework.integration.message.MessageTarget() {
testChannel.subscribe(new org.springframework.integration.endpoint.MessageEndpoint() {
public boolean send(Message<?> message) {
receivedMessage.set(message);
latch.countDown();
return false;
}
public String getName() {
return null;
}
public MessageSource<?> getSource() {
return null;
}
});
latch.await(3, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());

View File

@@ -35,8 +35,9 @@ import org.junit.Before;
import org.junit.Test;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.StringMessage;
/**
@@ -51,11 +52,11 @@ public class BroadcastingDispatcherTests {
private Message<?> messageMock = createMock(Message.class);
private MessageTarget targetMock1 = createMock(MessageTarget.class);
private MessageEndpoint targetMock1 = createMock(MessageEndpoint.class);
private MessageTarget targetMock2 = createMock(MessageTarget.class);
private MessageEndpoint targetMock2 = createMock(MessageEndpoint.class);
private MessageTarget targetMock3 = createMock(MessageTarget.class);
private MessageEndpoint targetMock3 = createMock(MessageEndpoint.class);
private Object[] globalMocks = new Object[] {
messageMock, taskExecutorMock, targetMock1, targetMock2, targetMock3 };
@@ -213,8 +214,8 @@ public class BroadcastingDispatcherTests {
public void applySequenceDisabledByDefault() {
BroadcastingDispatcher dispatcher = new BroadcastingDispatcher();
final List<Message<?>> messages = Collections.synchronizedList(new ArrayList<Message<?>>());
MessageTarget target1 = new MessageStoringTestTarget(messages);
MessageTarget target2 = new MessageStoringTestTarget(messages);
MessageEndpoint target1 = new MessageStoringTestEndpoint(messages);
MessageEndpoint target2 = new MessageStoringTestEndpoint(messages);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.send(new StringMessage("test"));
@@ -230,9 +231,9 @@ public class BroadcastingDispatcherTests {
BroadcastingDispatcher dispatcher = new BroadcastingDispatcher();
dispatcher.setApplySequence(true);
final List<Message<?>> messages = Collections.synchronizedList(new ArrayList<Message<?>>());
MessageTarget target1 = new MessageStoringTestTarget(messages);
MessageTarget target2 = new MessageStoringTestTarget(messages);
MessageTarget target3 = new MessageStoringTestTarget(messages);
MessageEndpoint target1 = new MessageStoringTestEndpoint(messages);
MessageEndpoint target2 = new MessageStoringTestEndpoint(messages);
MessageEndpoint target3 = new MessageStoringTestEndpoint(messages);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
@@ -275,11 +276,11 @@ public class BroadcastingDispatcherTests {
}
private static class MessageStoringTestTarget implements MessageTarget {
private static class MessageStoringTestEndpoint implements MessageEndpoint {
private final List<Message<?>> messageList;
MessageStoringTestTarget(List<Message<?>> messageList) {
MessageStoringTestEndpoint(List<Message<?>> messageList) {
this.messageList = messageList;
}
@@ -287,6 +288,14 @@ public class BroadcastingDispatcherTests {
this.messageList.add(message);
return true;
}
public String getName() {
return null;
}
public MessageSource<?> getSource() {
return null;
}
};
}

View File

@@ -27,12 +27,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.springframework.integration.endpoint.AbstractInOutEndpoint;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.endpoint.ServiceActivatorEndpoint;
import org.springframework.integration.handler.TestHandlers;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessageRejectedException;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.message.selector.MessageSelector;
@@ -69,7 +70,7 @@ public class SimpleDispatcherTests {
public void noDuplicateSubscriptions() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target = new CountingTestTarget(counter, false);
MessageEndpoint target = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target);
dispatcher.subscribe(target);
dispatcher.send(new StringMessage("test"));
@@ -80,9 +81,9 @@ public class SimpleDispatcherTests {
public void unsubscribeBeforeSend() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target1 = new CountingTestTarget(counter, false);
MessageTarget target2 = new CountingTestTarget(counter, false);
MessageTarget target3 = new CountingTestTarget(counter, false);
MessageEndpoint target1 = new CountingTestEndpoint(counter, false);
MessageEndpoint target2 = new CountingTestEndpoint(counter, false);
MessageEndpoint target3 = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
@@ -95,9 +96,9 @@ public class SimpleDispatcherTests {
public void unsubscribeBetweenSends() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target1 = new CountingTestTarget(counter, false);
MessageTarget target2 = new CountingTestTarget(counter, false);
MessageTarget target3 = new CountingTestTarget(counter, false);
MessageEndpoint target1 = new CountingTestEndpoint(counter, false);
MessageEndpoint target2 = new CountingTestEndpoint(counter, false);
MessageEndpoint target3 = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
@@ -115,7 +116,7 @@ public class SimpleDispatcherTests {
public void unsubscribeLastTargetCausesDeliveryException() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target = new CountingTestTarget(counter, false);
MessageEndpoint target = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target);
dispatcher.send(new StringMessage("test1"));
assertEquals(1, counter.get());
@@ -183,9 +184,9 @@ public class SimpleDispatcherTests {
public void firstHandlerReturnsTrue() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target1 = new CountingTestTarget(counter, true);
MessageTarget target2 = new CountingTestTarget(counter, false);
MessageTarget target3 = new CountingTestTarget(counter, false);
MessageEndpoint target1 = new CountingTestEndpoint(counter, true);
MessageEndpoint target2 = new CountingTestEndpoint(counter, false);
MessageEndpoint target3 = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
@@ -197,9 +198,9 @@ public class SimpleDispatcherTests {
public void middleHandlerReturnsTrue() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target1 = new CountingTestTarget(counter, false);
MessageTarget target2 = new CountingTestTarget(counter, true);
MessageTarget target3 = new CountingTestTarget(counter, false);
MessageEndpoint target1 = new CountingTestEndpoint(counter, false);
MessageEndpoint target2 = new CountingTestEndpoint(counter, true);
MessageEndpoint target3 = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
@@ -211,9 +212,9 @@ public class SimpleDispatcherTests {
public void allHandlersReturnFalse() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target1 = new CountingTestTarget(counter, false);
MessageTarget target2 = new CountingTestTarget(counter, false);
MessageTarget target3 = new CountingTestTarget(counter, false);
MessageEndpoint target1 = new CountingTestEndpoint(counter, false);
MessageEndpoint target2 = new CountingTestEndpoint(counter, false);
MessageEndpoint target3 = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
@@ -245,13 +246,13 @@ public class SimpleDispatcherTests {
}
private static class CountingTestTarget implements MessageTarget {
private static class CountingTestEndpoint implements MessageEndpoint {
private final AtomicInteger counter;
private final boolean returnValue;
CountingTestTarget(AtomicInteger counter, boolean returnValue) {
CountingTestEndpoint(AtomicInteger counter, boolean returnValue) {
this.counter = counter;
this.returnValue = returnValue;
}
@@ -260,6 +261,14 @@ public class SimpleDispatcherTests {
this.counter.incrementAndGet();
return this.returnValue;
}
public String getName() {
return null;
}
public MessageSource<?> getSource() {
return null;
}
}
}