Removed DefaultEndpoint.

This commit is contained in:
Mark Fisher
2008-09-04 22:44:45 +00:00
parent 313fa67b0b
commit 17164dc37e
8 changed files with 65 additions and 263 deletions

View File

@@ -32,9 +32,8 @@ import org.springframework.integration.channel.ChannelRegistry;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.DefaultEndpoint;
import org.springframework.integration.endpoint.AbstractInOutEndpoint;
import org.springframework.integration.endpoint.InboundChannelAdapter;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.message.ErrorMessage;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
@@ -60,12 +59,11 @@ public class DefaultMessageBusTests {
.setReturnAddress("targetChannel").build();
sourceChannel.send(message);
bus.registerChannel(targetChannel);
MessageHandler handler = new MessageHandler() {
AbstractInOutEndpoint endpoint = new AbstractInOutEndpoint() {
public Message<?> handle(Message<?> message) {
return message;
}
};
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(handler);
endpoint.setBeanName("testEndpoint");
endpoint.setSource(sourceChannel);
bus.registerEndpoint(endpoint);
@@ -109,12 +107,12 @@ public class DefaultMessageBusTests {
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel1 = new QueueChannel();
QueueChannel outputChannel2 = new QueueChannel();
MessageHandler handler1 = new MessageHandler() {
AbstractInOutEndpoint endpoint1 = new AbstractInOutEndpoint() {
public Message<?> handle(Message<?> message) {
return MessageBuilder.fromMessage(message).build();
}
};
MessageHandler handler2 = new MessageHandler() {
AbstractInOutEndpoint endpoint2 = new AbstractInOutEndpoint() {
public Message<?> handle(Message<?> message) {
return MessageBuilder.fromMessage(message).build();
}
@@ -126,11 +124,9 @@ public class DefaultMessageBusTests {
bus.registerChannel(inputChannel);
bus.registerChannel(outputChannel1);
bus.registerChannel(outputChannel2);
DefaultEndpoint<MessageHandler> endpoint1 = new DefaultEndpoint<MessageHandler>(handler1);
endpoint1.setBeanName("testEndpoint1");
endpoint1.setSource(inputChannel);
endpoint1.setTarget(outputChannel1);
DefaultEndpoint<MessageHandler> endpoint2 = new DefaultEndpoint<MessageHandler>(handler2);
endpoint2.setBeanName("testEndpoint2");
endpoint2.setSource(inputChannel);
endpoint2.setTarget(outputChannel2);
@@ -150,14 +146,14 @@ public class DefaultMessageBusTests {
QueueChannel outputChannel1 = new QueueChannel();
QueueChannel outputChannel2 = new QueueChannel();
final CountDownLatch latch = new CountDownLatch(2);
MessageHandler handler1 = new MessageHandler() {
AbstractInOutEndpoint endpoint1 = new AbstractInOutEndpoint() {
public Message<?> handle(Message<?> message) {
Message<?> reply = MessageBuilder.fromMessage(message).build();
latch.countDown();
return reply;
}
};
MessageHandler handler2 = new MessageHandler() {
AbstractInOutEndpoint endpoint2 = new AbstractInOutEndpoint() {
public Message<?> handle(Message<?> message) {
Message<?> reply = MessageBuilder.fromMessage(message).build();
latch.countDown();
@@ -171,11 +167,9 @@ public class DefaultMessageBusTests {
bus.registerChannel(inputChannel);
bus.registerChannel(outputChannel1);
bus.registerChannel(outputChannel2);
DefaultEndpoint<MessageHandler> endpoint1 = new DefaultEndpoint<MessageHandler>(handler1);
endpoint1.setBeanName("testEndpoint1");
endpoint1.setSource(inputChannel);
endpoint1.setTarget(outputChannel1);
DefaultEndpoint<MessageHandler> endpoint2 = new DefaultEndpoint<MessageHandler>(handler2);
endpoint2.setBeanName("testEndpoint2");
endpoint2.setSource(inputChannel);
endpoint2.setTarget(outputChannel2);
@@ -232,13 +226,12 @@ public class DefaultMessageBusTests {
errorChannel.setBeanName(ChannelRegistry.ERROR_CHANNEL_NAME);
bus.registerChannel(errorChannel);
final CountDownLatch latch = new CountDownLatch(1);
MessageHandler handler = new MessageHandler() {
AbstractInOutEndpoint endpoint = new AbstractInOutEndpoint() {
public Message<?> handle(Message<?> message) {
latch.countDown();
return null;
}
};
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(handler);
endpoint.setBeanName("testEndpoint");
endpoint.setSource(errorChannel);
bus.registerEndpoint(endpoint);

View File

@@ -28,7 +28,8 @@ import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.channel.ThreadLocalChannel;
import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor;
import org.springframework.integration.endpoint.DefaultEndpoint;
import org.springframework.integration.endpoint.AbstractInOutEndpoint;
import org.springframework.integration.endpoint.ServiceActivatorEndpoint;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessagingException;
@@ -57,7 +58,7 @@ public class DirectChannelSubscriptionTests {
@Test
public void testSendAndReceiveForRegisteredEndpoint() {
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(new TestHandler());
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestHandler());
endpoint.setSource(sourceChannel);
endpoint.setTarget(targetChannel);
endpoint.setBeanName("testEndpoint");
@@ -87,11 +88,11 @@ public class DirectChannelSubscriptionTests {
QueueChannel errorChannel = new QueueChannel();
errorChannel.setBeanName(ChannelRegistry.ERROR_CHANNEL_NAME);
bus.registerChannel(errorChannel);
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(new MessageHandler() {
AbstractInOutEndpoint endpoint = new AbstractInOutEndpoint() {
public Message<?> handle(Message<?> message) {
throw new RuntimeException("intentional test failure");
}
});
};
endpoint.setSource(sourceChannel);
endpoint.setTarget(targetChannel);
endpoint.setBeanName("testEndpoint");

View File

@@ -10,7 +10,7 @@
<bean id="targetChannel" class="org.springframework.integration.channel.QueueChannel"/>
<bean id="endpoint" class="org.springframework.integration.endpoint.DefaultEndpoint">
<bean id="endpoint" class="org.springframework.integration.endpoint.ServiceActivatorEndpoint">
<constructor-arg ref="handler"/>
<property name="source" ref="sourceChannel"/>
<property name="target" ref="targetChannel"/>
@@ -21,7 +21,7 @@
<bean class="org.springframework.integration.bus.MessageBusAwareBeanPostProcessor">
<constructor-arg ref="bus"/>
</bean>
<bean id= "messageBusAwareBean" class="org.springframework.integration.bus.TestMessageBusAwareImpl"/>
</beans>

View File

@@ -26,7 +26,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.springframework.integration.endpoint.DefaultEndpoint;
import org.springframework.integration.endpoint.AbstractInOutEndpoint;
import org.springframework.integration.endpoint.ServiceActivatorEndpoint;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.handler.TestHandlers;
import org.springframework.integration.message.Message;
@@ -131,9 +132,9 @@ public class SimpleDispatcherTests {
final AtomicInteger counter2 = new AtomicInteger();
final AtomicInteger counter3 = new AtomicInteger();
final AtomicInteger selectorCounter = new AtomicInteger();
DefaultEndpoint<?> endpoint1 = createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch));
DefaultEndpoint<?> endpoint2 = createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch));
DefaultEndpoint<?> endpoint3 = createEndpoint(TestHandlers.countingCountDownHandler(counter3, latch));
AbstractInOutEndpoint endpoint1 = createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch));
AbstractInOutEndpoint endpoint2 = createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch));
AbstractInOutEndpoint endpoint3 = createEndpoint(TestHandlers.countingCountDownHandler(counter3, latch));
endpoint1.setSelector(new TestMessageSelector(selectorCounter, false));
endpoint2.setSelector(new TestMessageSelector(selectorCounter, false));
endpoint3.setSelector(new TestMessageSelector(selectorCounter, true));
@@ -156,9 +157,9 @@ public class SimpleDispatcherTests {
final AtomicInteger counter2 = new AtomicInteger();
final AtomicInteger counter3 = new AtomicInteger();
final AtomicInteger selectorCounter = new AtomicInteger();
DefaultEndpoint<?> endpoint1 = createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch));
DefaultEndpoint<?> endpoint2 = createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch));
DefaultEndpoint<?> endpoint3 = createEndpoint(TestHandlers.countingCountDownHandler(counter3, latch));
AbstractInOutEndpoint endpoint1 = createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch));
AbstractInOutEndpoint endpoint2 = createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch));
AbstractInOutEndpoint endpoint3 = createEndpoint(TestHandlers.countingCountDownHandler(counter3, latch));
endpoint1.setSelector(new TestMessageSelector(selectorCounter, false));
endpoint2.setSelector(new TestMessageSelector(selectorCounter, false));
endpoint3.setSelector(new TestMessageSelector(selectorCounter, false));
@@ -222,8 +223,8 @@ public class SimpleDispatcherTests {
}
private static DefaultEndpoint<MessageHandler> createEndpoint(MessageHandler handler) {
return new DefaultEndpoint<MessageHandler>(handler);
private static ServiceActivatorEndpoint createEndpoint(MessageHandler handler) {
return new ServiceActivatorEndpoint(handler);
}

View File

@@ -16,16 +16,16 @@
package org.springframework.integration.endpoint;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.springframework.integration.bus.DefaultMessageBus;
@@ -38,6 +38,7 @@ import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageHandlingException;
import org.springframework.integration.message.MessageRejectedException;
import org.springframework.integration.message.MessagingException;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.message.selector.MessageSelector;
import org.springframework.integration.message.selector.MessageSelectorChain;
@@ -46,13 +47,13 @@ import org.springframework.integration.message.selector.MessageSelectorChain;
* @author Mark Fisher
* @author Marius Bogoevici
*/
public class DefaultEndpointTests {
public class ServiceActivatorEndpointTests {
@Test
public void outputChannel() {
QueueChannel channel = new QueueChannel(1);
MessageHandler handler = new TestHandler();
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(handler);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(handler);
endpoint.setTarget(channel);
Message<?> message = MessageBuilder.fromPayload("foo").build();
endpoint.send(message);
@@ -65,7 +66,7 @@ public class DefaultEndpointTests {
public void outputChannelTakesPrecedence() {
QueueChannel channel1 = new QueueChannel(1);
QueueChannel channel2 = new QueueChannel(1);
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(new TestHandler());
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestHandler());
endpoint.setTarget(channel1);
Message<?> message = MessageBuilder.fromPayload("foo").setReturnAddress(channel2).build();
endpoint.send(message);
@@ -80,7 +81,7 @@ public class DefaultEndpointTests {
public void returnAddressHeader() {
QueueChannel channel = new QueueChannel(1);
MessageHandler handler = new TestHandler();
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(handler);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(handler);
Message<?> message = MessageBuilder.fromPayload("foo").setReturnAddress(channel).build();
endpoint.send(message);
Message<?> reply = channel.receive(0);
@@ -95,7 +96,7 @@ public class DefaultEndpointTests {
ChannelRegistry channelRegistry = new DefaultMessageBus();
channelRegistry.registerChannel(channel);
MessageHandler handler = new TestHandler();
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(handler);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(handler);
endpoint.setChannelRegistry(channelRegistry);
Message<?> message = MessageBuilder.fromPayload("foo").setReturnAddress("testChannel").build();
endpoint.send(message);
@@ -116,7 +117,7 @@ public class DefaultEndpointTests {
return new StringMessage("foo" + message.getPayload());
}
};
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(handler);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(handler);
endpoint.setChannelRegistry(channelRegistry);
Message<String> testMessage1 = MessageBuilder.fromPayload("bar")
.setReturnAddress(replyChannel1).build();
@@ -140,7 +141,7 @@ public class DefaultEndpointTests {
public void noOutputChannelFallsBackToReturnAddress() {
QueueChannel channel = new QueueChannel(1);
MessageHandler handler = new TestHandler();
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(handler);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(handler);
Message<?> message = MessageBuilder.fromPayload("foo").setReturnAddress(channel).build();
endpoint.send(message);
Message<?> reply = channel.receive(0);
@@ -148,10 +149,10 @@ public class DefaultEndpointTests {
assertEquals("FOO", reply.getPayload());
}
@Test(expected = MessageEndpointReplyException.class)
@Test(expected = MessagingException.class)
public void noReplyTarget() {
MessageHandler handler = new TestHandler();
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(handler);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(handler);
Message<?> message = MessageBuilder.fromPayload("foo").build();
endpoint.send(message);
}
@@ -160,7 +161,7 @@ public class DefaultEndpointTests {
public void noReplyMessage() {
QueueChannel channel = new QueueChannel(1);
MessageHandler handler = new TestNullReplyHandler();
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(handler);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(handler);
endpoint.setTarget(channel);
Message<?> message = MessageBuilder.fromPayload("foo").build();
endpoint.send(message);
@@ -171,7 +172,7 @@ public class DefaultEndpointTests {
public void noReplyMessageWithRequiresReply() {
QueueChannel channel = new QueueChannel(1);
MessageHandler handler = new TestNullReplyHandler();
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(handler);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(handler);
endpoint.setRequiresReply(true);
endpoint.setTarget(channel);
Message<?> message = MessageBuilder.fromPayload("foo").build();
@@ -180,7 +181,7 @@ public class DefaultEndpointTests {
@Test(expected=MessageRejectedException.class)
public void endpointWithSelectorRejecting() {
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(TestHandlers.nullHandler());
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(TestHandlers.nullHandler());
endpoint.setSelector(new MessageSelector() {
public boolean accept(Message<?> message) {
return false;
@@ -192,7 +193,7 @@ public class DefaultEndpointTests {
@Test
public void endpointWithSelectorAccepting() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(TestHandlers.countDownHandler(latch));
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(TestHandlers.countDownHandler(latch));
endpoint.setSelector(new MessageSelector() {
public boolean accept(Message<?> message) {
return true;
@@ -206,7 +207,7 @@ public class DefaultEndpointTests {
@Test
public void endpointWithMultipleSelectorsAndFirstRejects() {
final AtomicInteger counter = new AtomicInteger();
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(TestHandlers.countingHandler(counter));
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(TestHandlers.countingHandler(counter));
MessageSelectorChain selectorChain = new MessageSelectorChain();
selectorChain.add(new MessageSelector() {
public boolean accept(Message<?> message) {
@@ -236,7 +237,7 @@ public class DefaultEndpointTests {
public void endpointWithMultipleSelectorsAndFirstAccepts() {
final AtomicInteger selectorCounter = new AtomicInteger();
AtomicInteger handlerCounter = new AtomicInteger();
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(TestHandlers.countingHandler(handlerCounter));
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(TestHandlers.countingHandler(handlerCounter));
MessageSelectorChain selectorChain = new MessageSelectorChain();
selectorChain.add(new MessageSelector() {
public boolean accept(Message<?> message) {
@@ -266,7 +267,7 @@ public class DefaultEndpointTests {
@Test
public void endpointWithMultipleSelectorsAndBothAccept() {
final AtomicInteger counter = new AtomicInteger();
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(TestHandlers.countingHandler(counter));
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(TestHandlers.countingHandler(counter));
MessageSelectorChain selectorChain = new MessageSelectorChain();
selectorChain.add(new MessageSelector() {
public boolean accept(Message<?> message) {
@@ -288,7 +289,7 @@ public class DefaultEndpointTests {
@Test
public void correlationId() {
QueueChannel replyChannel = new QueueChannel(1);
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(new MessageHandler() {
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new MessageHandler() {
public Message<?> handle(Message<?> message) {
return message;
}
@@ -303,7 +304,7 @@ public class DefaultEndpointTests {
@Test
public void correlationIdSetByHandlerTakesPrecedence() {
QueueChannel replyChannel = new QueueChannel(1);
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(new MessageHandler() {
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new MessageHandler() {
public Message<?> handle(Message<?> message) {
return MessageBuilder.fromMessage(message)
.setCorrelationId("ABC-123").build();

View File

@@ -30,8 +30,7 @@ import org.junit.Test;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.DefaultEndpoint;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.endpoint.AbstractInOutEndpoint;
/**
* @author Mark Fisher
@@ -45,14 +44,13 @@ public class MessageExchangeTemplateTests {
public void setUp() {
this.requestChannel = new QueueChannel();
this.requestChannel.setBeanName("requestChannel");
MessageHandler testHandler = new MessageHandler() {
AbstractInOutEndpoint endpoint = new AbstractInOutEndpoint() {
public Message<?> handle(Message<?> message) {
return new StringMessage(message.getPayload().toString().toUpperCase());
}
};
MessageBus bus = new DefaultMessageBus();
bus.registerChannel(requestChannel);
DefaultEndpoint<MessageHandler> endpoint = new DefaultEndpoint<MessageHandler>(testHandler);
endpoint.setBeanName("testEndpoint");
endpoint.setSource(requestChannel);
bus.registerEndpoint(endpoint);