Endpoints now expose a setter for a Poller strategy and no longer implement Lifecycle.

This commit is contained in:
Mark Fisher
2008-07-14 18:29:07 +00:00
parent 9bcd9e12c8
commit ca7dc3922e
19 changed files with 169 additions and 329 deletions

View File

@@ -189,7 +189,7 @@ public class DefaultMessageBusTests {
MessageBus bus = new DefaultMessageBus();
CountDownLatch latch = new CountDownLatch(1);
SourceEndpoint sourceEndpoint = new SourceEndpoint(new FailingSource(latch));
sourceEndpoint.setOutputChannel(new QueueChannel());
sourceEndpoint.setTarget(new QueueChannel());
sourceEndpoint.setSchedule(new PollingSchedule(1000));
sourceEndpoint.setName("testEndpoint");
bus.registerEndpoint(sourceEndpoint);

View File

@@ -12,7 +12,7 @@
<bean id="endpoint" class="org.springframework.integration.endpoint.HandlerEndpoint">
<constructor-arg ref="handler"/>
<property name="inputChannel" ref="sourceChannel"/>
<property name="source" ref="sourceChannel"/>
<property name="outputChannelName" value="targetChannel"/>
</bean>

View File

@@ -18,11 +18,14 @@ package org.springframework.integration.config;
import static org.junit.Assert.assertEquals;
import java.util.List;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.EndpointInterceptor;
import org.springframework.integration.endpoint.EndpointPoller;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.endpoint.SourceEndpoint;
@@ -83,12 +86,15 @@ public class EndpointInterceptorTests {
}
@SuppressWarnings("unchecked")
private static void testInterceptors(MessageEndpoint endpoint, ClassPathXmlApplicationContext context, boolean innerBeans) {
TestPreSendInterceptor preInterceptor = null;
TestAroundSendEndpointInterceptor aroundInterceptor = null;
if (innerBeans) {
preInterceptor = (TestPreSendInterceptor) ((AbstractEndpoint) endpoint).getInterceptors().get(0);
aroundInterceptor = (TestAroundSendEndpointInterceptor) ((AbstractEndpoint) endpoint).getInterceptors().get(1);
DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint);
List<EndpointInterceptor> interceptors = (List<EndpointInterceptor>) accessor.getPropertyValue("interceptors");
preInterceptor = (TestPreSendInterceptor) interceptors.get(0);
aroundInterceptor = (TestAroundSendEndpointInterceptor) interceptors.get(1);
}
else {
preInterceptor = (TestPreSendInterceptor) context.getBean("preInterceptor");

View File

@@ -25,6 +25,7 @@ import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -175,6 +176,7 @@ public class MessagingAnnotationPostProcessorTests {
}
@Test
@SuppressWarnings("unchecked")
public void testConcurrencyAnnotationWithValues() {
MessageBus messageBus = new DefaultMessageBus();
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
@@ -182,9 +184,10 @@ public class MessagingAnnotationPostProcessorTests {
ConcurrencyAnnotationTestBean testBean = new ConcurrencyAnnotationTestBean();
postProcessor.postProcessAfterInitialization(testBean, "testBean");
HandlerEndpoint endpoint = (HandlerEndpoint) messageBus.lookupEndpoint("testBean.MessageHandler.endpoint");
assertEquals(1, endpoint.getInterceptors().size());
EndpointInterceptor interceptor = endpoint.getInterceptors().get(0);
DirectFieldAccessor accessor = new DirectFieldAccessor(interceptor);
DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint);
List<EndpointInterceptor> interceptors = (List<EndpointInterceptor>) accessor.getPropertyValue("interceptors");
assertEquals(1, interceptors.size());
EndpointInterceptor interceptor = interceptors.get(0);
accessor = new DirectFieldAccessor(interceptor);
ConcurrentTaskExecutor cte = (ConcurrentTaskExecutor) accessor.getPropertyValue("executor");
ThreadPoolExecutor executor = (ThreadPoolExecutor) cte.getConcurrentExecutor();

View File

@@ -52,9 +52,7 @@ public class BroadcastingDispatcherTests {
private static MessageTarget createEndpoint(MessageHandler handler) {
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
endpoint.start();
return endpoint;
return new HandlerEndpoint(handler);
}
}

View File

@@ -61,9 +61,7 @@ public class SimpleDispatcherTests {
private static MessageTarget createEndpoint(MessageHandler handler) {
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
endpoint.start();
return endpoint;
return new HandlerEndpoint(handler);
}
}

View File

@@ -33,7 +33,6 @@ import org.springframework.integration.channel.DefaultChannelRegistry;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.handler.MessageHandlerNotRunningException;
import org.springframework.integration.handler.TestHandlers;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageRejectedException;
@@ -59,9 +58,7 @@ public class HandlerEndpointTests {
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
endpoint.setChannelRegistry(channelRegistry);
endpoint.setOutputChannelName("replyChannel");
endpoint.start();
endpoint.send(new StringMessage(1, "test"));
endpoint.stop();
Message<?> reply = replyChannel.receive(50);
assertNotNull(reply);
assertEquals("hello test", reply.getPayload());
@@ -76,11 +73,9 @@ public class HandlerEndpointTests {
}
};
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
endpoint.start();
StringMessage testMessage = new StringMessage(1, "test");
testMessage.getHeader().setReturnAddress(replyChannel);
endpoint.send(testMessage);
endpoint.stop();
Message<?> reply = replyChannel.receive(50);
assertNotNull(reply);
assertEquals("hello test", reply.getPayload());
@@ -98,11 +93,9 @@ public class HandlerEndpointTests {
};
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
endpoint.setChannelRegistry(channelRegistry);
endpoint.start();
StringMessage testMessage = new StringMessage(1, "test");
testMessage.getHeader().setReturnAddress("replyChannel");
endpoint.send(testMessage);
endpoint.stop();
Message<?> reply = replyChannel.receive(50);
assertNotNull(reply);
assertEquals("hello test", reply.getPayload());
@@ -121,7 +114,6 @@ public class HandlerEndpointTests {
};
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
endpoint.setChannelRegistry(channelRegistry);
endpoint.start();
StringMessage testMessage = new StringMessage("test");
testMessage.getHeader().setReturnAddress(replyChannel1);
endpoint.send(testMessage);
@@ -137,7 +129,6 @@ public class HandlerEndpointTests {
reply2 = replyChannel2.receive(0);
assertNotNull(reply2);
assertEquals("hello test", reply2.getPayload());
endpoint.stop();
}
@Test
@@ -155,39 +146,13 @@ public class HandlerEndpointTests {
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
endpoint.setChannelRegistry(channelRegistry);
endpoint.setOutputChannelName("replyChannel");
endpoint.start();
endpoint.send(new StringMessage(1, "test"));
endpoint.stop();
latch.await(500, TimeUnit.MILLISECONDS);
assertEquals("handler should have been invoked within allotted time", 0, latch.getCount());
Message<?> reply = replyChannel.receive(0);
assertNull(reply);
}
@Test(expected=MessageHandlerNotRunningException.class)
public void testEndpointDoesNotHandleMessagesWhenNotYetStarted() {
HandlerEndpoint endpoint = new HandlerEndpoint(TestHandlers.nullHandler());
endpoint.send(new StringMessage("test"));
}
@Test
public void testEndpointDoesNotHandleMessagesAfterBeingStopped() {
AtomicInteger counter = new AtomicInteger();
HandlerEndpoint endpoint = new HandlerEndpoint(TestHandlers.countingHandler(counter));
boolean exceptionThrown = false;
try {
endpoint.start();
endpoint.send(new StringMessage("test1"));
endpoint.stop();
endpoint.send(new StringMessage("test2"));
}
catch (MessageHandlerNotRunningException e) {
exceptionThrown = true;
}
assertEquals("handler should have been invoked exactly once", 1, counter.get());
assertTrue(exceptionThrown);
}
@Test(expected=MessageRejectedException.class)
public void testEndpointWithSelectorRejecting() {
HandlerEndpoint endpoint = new HandlerEndpoint(TestHandlers.nullHandler());
@@ -196,7 +161,6 @@ public class HandlerEndpointTests {
return false;
}
});
endpoint.start();
endpoint.send(new StringMessage("test"));
}
@@ -209,11 +173,9 @@ public class HandlerEndpointTests {
return true;
}
});
endpoint.start();
endpoint.send(new StringMessage("test"));
latch.await(100, TimeUnit.MILLISECONDS);
assertEquals("handler should have been invoked", 0, latch.getCount());
endpoint.stop();
}
@Test
@@ -234,7 +196,6 @@ public class HandlerEndpointTests {
}
});
endpoint.setMessageSelector(selectorChain);
endpoint.start();
boolean exceptionWasThrown = false;
try {
endpoint.send(new StringMessage("test"));
@@ -244,7 +205,6 @@ public class HandlerEndpointTests {
}
assertTrue(exceptionWasThrown);
assertEquals("only the first selector should have been invoked", 1, counter.get());
endpoint.stop();
}
@Test
@@ -266,7 +226,6 @@ public class HandlerEndpointTests {
}
});
endpoint.setMessageSelector(selectorChain);
endpoint.start();
boolean exceptionWasThrown = false;
try {
endpoint.send(new StringMessage("test"));
@@ -277,7 +236,6 @@ public class HandlerEndpointTests {
assertTrue(exceptionWasThrown);
assertEquals("both selectors should have been invoked", 2, selectorCounter.get());
assertEquals("the handler should not have been invoked", 0, handlerCounter.get());
endpoint.stop();
}
@Test
@@ -298,10 +256,8 @@ public class HandlerEndpointTests {
}
});
endpoint.setMessageSelector(selectorChain);
endpoint.start();
assertTrue(endpoint.send(new StringMessage("test")));
assertEquals("both selectors and handler should have been invoked", 3, counter.get());
endpoint.stop();
}
@Test
@@ -312,7 +268,6 @@ public class HandlerEndpointTests {
return message;
}
});
endpoint.start();
Message<?> message = new StringMessage("test");
message.getHeader().setReturnAddress(replyChannel);
endpoint.send(message);
@@ -329,7 +284,6 @@ public class HandlerEndpointTests {
return message;
}
});
endpoint.start();
Message<?> message = new StringMessage("test");
message.getHeader().setReturnAddress(replyChannel);
endpoint.send(message);

View File

@@ -26,7 +26,6 @@ import org.junit.Test;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageHandlingException;
import org.springframework.integration.message.MessageSource;
/**
@@ -39,7 +38,7 @@ public class SourceEndpointTests {
TestSource source = new TestSource("testing", 1);
QueueChannel channel = new QueueChannel();
SourceEndpoint endpoint = new SourceEndpoint(source);
endpoint.setOutputChannel(channel);
endpoint.setTarget(channel);
endpoint.afterPropertiesSet();
endpoint.send(new GenericMessage<EndpointPoller>(new EndpointPoller()));
Message<?> message = channel.receive(1000);
@@ -47,16 +46,6 @@ public class SourceEndpointTests {
assertEquals("testing.1", message.getPayload());
}
@Test(expected = MessageHandlingException.class)
public void testAutoStartupDisabled() {
TestSource source = new TestSource("testing", 1);
QueueChannel channel = new QueueChannel();
SourceEndpoint endpoint = new SourceEndpoint(source);
endpoint.setOutputChannel(channel);
endpoint.setAutoStartup(false);
endpoint.afterPropertiesSet();
endpoint.send(new GenericMessage<EndpointPoller>(new EndpointPoller()));
}
private static class TestSource implements MessageSource<String> {

View File

@@ -19,7 +19,7 @@
<property name="methodName" value="foo"/>
</bean>
</constructor-arg>
<property name="outputChannel" ref="channel"/>
<property name="target" ref="channel"/>
<property name="schedule">
<bean class="org.springframework.integration.scheduling.PollingSchedule">
<constructor-arg value="1000"/>
@@ -34,7 +34,7 @@
<bean id="targetEndpoint" class="org.springframework.integration.endpoint.TargetEndpoint">
<constructor-arg ref="target"/>
<property name="inputChannel" ref="channel"/>
<property name="source" ref="channel"/>
</bean>
<bean id="sink" class="org.springframework.integration.handler.TestSink"/>