Removed SourceEndpoint. Also, MessageExchangeTemplate now wraps any Exception thrown in source.receive() in a MessagingException.

This commit is contained in:
Mark Fisher
2008-08-05 21:11:43 +00:00
parent 8274dfc428
commit 1e82a8d568
3 changed files with 22 additions and 60 deletions

View File

@@ -30,18 +30,18 @@ import org.springframework.beans.factory.BeanCreationException;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.PollableChannelAdapter;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dispatcher.PublishSubscribeChannel;
import org.springframework.integration.endpoint.SourceEndpoint;
import org.springframework.integration.endpoint.HandlerEndpoint;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.message.ErrorMessage;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.MessagingException;
import org.springframework.integration.message.PollableSource;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.scheduling.PollingSchedule;
/**
* @author Mark Fisher
@@ -193,18 +193,26 @@ public class DefaultMessageBusTests {
public void testErrorChannelWithFailedDispatch() throws InterruptedException {
MessageBus bus = new DefaultMessageBus();
CountDownLatch latch = new CountDownLatch(1);
SourceEndpoint sourceEndpoint = new SourceEndpoint(new FailingSource(latch));
sourceEndpoint.setTarget(new QueueChannel());
sourceEndpoint.setSchedule(new PollingSchedule(1000));
sourceEndpoint.setName("testEndpoint");
bus.registerEndpoint(sourceEndpoint);
PollableChannelAdapter channelAdapter = new PollableChannelAdapter(
"testChannel", new FailingSource(latch), null);
MessageHandler handler = new MessageHandler() {
public Message<?> handle(Message<?> message) {
return message;
}
};
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
endpoint.setName("testEndpoint");
endpoint.setSource(channelAdapter);
bus.registerEndpoint(endpoint);
bus.start();
latch.await(2000, TimeUnit.MILLISECONDS);
Message<?> message = ((PollableChannel) bus.getErrorChannel()).receive(5000);
bus.stop();
assertNotNull("message should not be null", message);
assertTrue(message instanceof ErrorMessage);
assertEquals("intentional test failure", ((ErrorMessage) message).getPayload().getMessage());
bus.stop();
Throwable exception = ((ErrorMessage) message).getPayload();
assertTrue(exception instanceof MessagingException);
assertEquals("intentional test failure", exception.getCause().getMessage());
}
@Test(expected = BeanCreationException.class)