diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractSubscribableChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractSubscribableChannel.java index 5129c1e14c..88b6662cc2 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractSubscribableChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractSubscribableChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2008 the original author or authors. + * Copyright 2002-2009 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,32 +28,27 @@ import org.springframework.util.Assert; * * @author Mark Fisher */ -public class AbstractSubscribableChannel extends AbstractMessageChannel implements SubscribableChannel { - - private final T dispatcher; - - - public AbstractSubscribableChannel(T dispatcher) { - Assert.notNull(dispatcher, "dispatcher must not be null"); - this.dispatcher = dispatcher; - } - - - protected T getDispatcher() { - return this.dispatcher; - } +public abstract class AbstractSubscribableChannel extends AbstractMessageChannel implements SubscribableChannel { public boolean subscribe(MessageHandler handler) { - return this.dispatcher.addHandler(handler); + return this.getRequiredDispatcher().addHandler(handler); } public boolean unsubscribe(MessageHandler handle) { - return this.dispatcher.removeHandler(handle); + return this.getRequiredDispatcher().removeHandler(handle); } @Override protected boolean doSend(Message message, long timeout) { - return this.dispatcher.dispatch(message); + return this.getRequiredDispatcher().dispatch(message); } + private MessageDispatcher getRequiredDispatcher() { + MessageDispatcher dispatcher = this.getDispatcher(); + Assert.state(dispatcher != null, "dispatcher must not be null"); + return dispatcher; + } + + protected abstract MessageDispatcher getDispatcher(); + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java index b4a0363e41..6496aea328 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2008 the original author or authors. + * Copyright 2002-2009 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package org.springframework.integration.channel; import org.springframework.integration.dispatcher.AbstractUnicastDispatcher; import org.springframework.integration.dispatcher.RoundRobinDispatcher; +import org.springframework.util.Assert; /** * A channel that invokes a single subscriber for each sent Message. @@ -27,14 +28,24 @@ import org.springframework.integration.dispatcher.RoundRobinDispatcher; * @author Mark Fisher * @author Iwein Fuld */ -public class DirectChannel extends AbstractSubscribableChannel { +public class DirectChannel extends AbstractSubscribableChannel { + + private final AbstractUnicastDispatcher dispatcher; + public DirectChannel() { - super(new RoundRobinDispatcher()); + this.dispatcher = new RoundRobinDispatcher(); } - - public DirectChannel(AbstractUnicastDispatcher dispatcher){ - super(dispatcher); + + public DirectChannel(AbstractUnicastDispatcher dispatcher) { + Assert.notNull(dispatcher, "dispatcher must not be null"); + this.dispatcher = dispatcher; + } + + + @Override + protected AbstractUnicastDispatcher getDispatcher() { + return this.dispatcher; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java index 37068dfd62..98630a6888 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2008 the original author or authors. + * Copyright 2002-2009 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,9 @@ import org.springframework.integration.util.ErrorHandlingTaskExecutor; * * @author Mark Fisher */ -public class PublishSubscribeChannel extends AbstractSubscribableChannel implements BeanFactoryAware { +public class PublishSubscribeChannel extends AbstractSubscribableChannel implements BeanFactoryAware { + + private volatile BroadcastingDispatcher dispatcher; private volatile TaskExecutor taskExecutor; @@ -37,13 +39,18 @@ public class PublishSubscribeChannel extends AbstractSubscribableChannelThe {@link Comparator} that determines the order may be provided via the - * {@link #setComparator(Comparator)} method. If none is provided, the default - * will be an instance of {@link OrderComparator}, and any {@link MessageHandler} - * that implements {@link org.springframework.core.Ordered} and has an order - * value other than LOWEST_PRECEDENCE will be ordered accordingly. Any other - * handlers will be placed at the end of the list in their original - * insertion-order. + *

+ * The subclasses implement the actual dispatching strategy, but this base + * class manages the registration of {@link MessageHandler}s. Although the + * implemented dispatching strategies may invoke handles in different ways + * (e.g. round-robin vs. failover), this class does maintain the order of the + * underlying collection. See the {@link OrderedAwareLinkedHashSet} for more + * detail. * * @author Mark Fisher * @author Iwein Fuld @@ -53,24 +50,9 @@ public abstract class AbstractDispatcher implements MessageDispatcher { private final Set handlers = new OrderedAwareLinkedHashSet(); - private volatile TaskExecutor taskExecutor; - private final Object handlerListMonitor = new Object(); - /** - * Specify a {@link TaskExecutor} for invoking the handlers. If none is - * provided, the invocation will occur in the thread that runs this polling - * dispatcher. - */ - public void setTaskExecutor(TaskExecutor taskExecutor) { - this.taskExecutor = taskExecutor; - } - - protected TaskExecutor getTaskExecutor() { - return this.taskExecutor; - } - @SuppressWarnings("unchecked") protected List getHandlers() { return Collections.unmodifiableList(new ArrayList(this.handlers)); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java index 73c6cbdc09..7c1a931ba7 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java @@ -38,6 +38,17 @@ public class BroadcastingDispatcher extends AbstractDispatcher { private volatile boolean applySequence; + private final TaskExecutor taskExecutor; + + + public BroadcastingDispatcher() { + this.taskExecutor = null; + } + + public BroadcastingDispatcher(TaskExecutor taskExecutor) { + this.taskExecutor = taskExecutor; + } + /** * Specify whether to apply sequence numbers to the messages @@ -48,6 +59,7 @@ public class BroadcastingDispatcher extends AbstractDispatcher { this.applySequence = applySequence; } + public boolean dispatch(Message message) { int sequenceNumber = 1; List handlers = this.getHandlers(); @@ -60,9 +72,8 @@ public class BroadcastingDispatcher extends AbstractDispatcher { .setCorrelationId(message.getHeaders().getId()) .setHeader(MessageHeaders.ID, UUID.randomUUID()) .build(); - TaskExecutor executor = this.getTaskExecutor(); - if (executor != null) { - executor.execute(new Runnable() { + if (this.taskExecutor != null) { + this.taskExecutor.execute(new Runnable() { public void run() { BroadcastingDispatcher.this.sendMessageToHandler(messageToSend, handler); } @@ -74,4 +85,5 @@ public class BroadcastingDispatcher extends AbstractDispatcher { } return true; } + } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java index 1be714f0f2..d628b3ac3b 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java @@ -62,8 +62,6 @@ public class BroadcastingDispatcherTests { @Before public void init() { - dispatcher = new BroadcastingDispatcher(); - dispatcher.setTaskExecutor(taskExecutorMock); reset(globalMocks); defaultTaskExecutorMock(); } @@ -71,7 +69,7 @@ public class BroadcastingDispatcherTests { @Test public void singleTargetWithoutTaskExecutor() throws Exception { - dispatcher.setTaskExecutor(null); + dispatcher = new BroadcastingDispatcher(); dispatcher.addHandler(targetMock1); targetMock1.handleMessage(messageMock); expectLastCall(); @@ -82,6 +80,7 @@ public class BroadcastingDispatcherTests { @Test public void singleTargetWithTaskExecutor() throws Exception { + dispatcher = new BroadcastingDispatcher(taskExecutorMock); dispatcher.addHandler(targetMock1); targetMock1.handleMessage(messageMock); expectLastCall(); @@ -92,7 +91,7 @@ public class BroadcastingDispatcherTests { @Test public void multipleTargetsWithoutTaskExecutor() { - dispatcher.setTaskExecutor(null); + dispatcher = new BroadcastingDispatcher(); dispatcher.addHandler(targetMock1); dispatcher.addHandler(targetMock2); dispatcher.addHandler(targetMock3); @@ -109,6 +108,7 @@ public class BroadcastingDispatcherTests { @Test public void multipleTargetsWithTaskExecutor() { + dispatcher = new BroadcastingDispatcher(taskExecutorMock); dispatcher.addHandler(targetMock1); dispatcher.addHandler(targetMock2); dispatcher.addHandler(targetMock3); @@ -125,6 +125,7 @@ public class BroadcastingDispatcherTests { @Test public void multipleTargetsPartialFailureFirst() { + dispatcher = new BroadcastingDispatcher(taskExecutorMock); reset(taskExecutorMock); dispatcher.addHandler(targetMock1); dispatcher.addHandler(targetMock2); @@ -141,6 +142,7 @@ public class BroadcastingDispatcherTests { @Test public void multipleTargetsPartialFailureMiddle() { + dispatcher = new BroadcastingDispatcher(taskExecutorMock); reset(taskExecutorMock); dispatcher.addHandler(targetMock1); dispatcher.addHandler(targetMock2); @@ -157,6 +159,7 @@ public class BroadcastingDispatcherTests { @Test public void multipleTargetsPartialFailureLast() { + dispatcher = new BroadcastingDispatcher(taskExecutorMock); reset(taskExecutorMock); dispatcher.addHandler(targetMock1); dispatcher.addHandler(targetMock2); @@ -173,6 +176,7 @@ public class BroadcastingDispatcherTests { @Test public void multipleTargetsAllFail() { + dispatcher = new BroadcastingDispatcher(taskExecutorMock); reset(taskExecutorMock); dispatcher.addHandler(targetMock1); dispatcher.addHandler(targetMock2); @@ -185,6 +189,7 @@ public class BroadcastingDispatcherTests { @Test public void noDuplicateSubscription() { + dispatcher = new BroadcastingDispatcher(taskExecutorMock); dispatcher.addHandler(targetMock1); dispatcher.addHandler(targetMock1); dispatcher.addHandler(targetMock1); @@ -197,6 +202,7 @@ public class BroadcastingDispatcherTests { @Test public void removeConsumerBeforeSend() { + dispatcher = new BroadcastingDispatcher(taskExecutorMock); dispatcher.addHandler(targetMock1); dispatcher.addHandler(targetMock2); dispatcher.addHandler(targetMock3); @@ -212,6 +218,7 @@ public class BroadcastingDispatcherTests { @Test public void removeConsumerBetweenSends() { + dispatcher = new BroadcastingDispatcher(taskExecutorMock); dispatcher.addHandler(targetMock1); dispatcher.addHandler(targetMock2); dispatcher.addHandler(targetMock3);