diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/spring-integration-adapters-1.0.xsd b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/spring-integration-adapters-1.0.xsd index ac4a1dc19b..9cb7fdaf70 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/spring-integration-adapters-1.0.xsd +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/spring-integration-adapters-1.0.xsd @@ -254,9 +254,6 @@ - - - @@ -270,20 +267,22 @@ - - - - - Configures a target that writes to stdout (System.out) or to stderr (System.err) - if the "error" attribute is set to true. - - - - - - - - + + + + + + + + Configures an outbound Channel Adapter that writes to stdout (System.out) + or to stderr (System.err) depending on the element name. + + + + + + + diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/mail/SubscribableMailSource.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/mail/SubscribableMailSource.java index 3cfd34dec5..3ab6b68982 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/mail/SubscribableMailSource.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/mail/SubscribableMailSource.java @@ -26,9 +26,9 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.context.Lifecycle; import org.springframework.core.task.TaskExecutor; import org.springframework.integration.adapter.mail.monitor.AsyncMonitoringStrategy; -import org.springframework.integration.dispatcher.BroadcastingDispatcher; +import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.MessageTarget; -import org.springframework.integration.message.SubscribableSource; import org.springframework.util.Assert; /** @@ -38,11 +38,11 @@ import org.springframework.util.Assert; * * @author Jonas Partner */ -public class SubscribableMailSource implements SubscribableSource, Lifecycle, DisposableBean { +public class SubscribableMailSource implements MessageSource, Lifecycle, DisposableBean { private final Log logger = LogFactory.getLog(this.getClass()); - private final BroadcastingDispatcher dispatcher = new BroadcastingDispatcher(); + private volatile MessageChannel outputChannel; private final TaskExecutor taskExecutor; @@ -61,16 +61,8 @@ public class SubscribableMailSource implements SubscribableSource, Lifecycle, Di } - public void setApplySequence(boolean applySequence) { - this.dispatcher.setApplySequence(applySequence); - } - - public boolean subscribe(MessageTarget target) { - return this.dispatcher.subscribe(target); - } - - public boolean unsubscribe(MessageTarget target) { - return this.dispatcher.unsubscribe(target); + public void setOutputChannel(MessageChannel outputChannel) { + this.outputChannel = outputChannel; } public void setConverter(MailMessageConverter converter) { @@ -146,7 +138,7 @@ public class SubscribableMailSource implements SubscribableSource, Lifecycle, Di while (!Thread.currentThread().isInterrupted()) { Message[] messages = this.folderConnection.receive(); for (Message message : messages) { - dispatcher.send(converter.create((MimeMessage) message)); + outputChannel.send(converter.create((MimeMessage) message)); } } } diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/ByteStreamTarget.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/ByteStreamTarget.java index b376d19784..0828ebe74f 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/ByteStreamTarget.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/ByteStreamTarget.java @@ -23,16 +23,16 @@ import java.io.OutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessagingException; -import org.springframework.integration.message.MessageTarget; /** * A target that writes a byte array to an {@link OutputStream}. * * @author Mark Fisher */ -public class ByteStreamTarget implements MessageTarget { +public class ByteStreamTarget extends AbstractEndpoint { private final Log logger = LogFactory.getLog(this.getClass()); @@ -53,7 +53,8 @@ public class ByteStreamTarget implements MessageTarget { } - public boolean send(Message message) { + @Override + public boolean sendInternal(Message message) { Object payload = message.getPayload(); if (payload == null) { if (logger.isWarnEnabled()) { diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/CharacterStreamTarget.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/CharacterStreamTarget.java index 00356f207c..ccd310f83d 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/CharacterStreamTarget.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/CharacterStreamTarget.java @@ -27,9 +27,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.integration.ConfigurationException; +import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessagingException; -import org.springframework.integration.message.MessageTarget; import org.springframework.util.Assert; /** @@ -41,7 +41,7 @@ import org.springframework.util.Assert; * * @author Mark Fisher */ -public class CharacterStreamTarget implements MessageTarget { +public class CharacterStreamTarget extends AbstractEndpoint { private final Log logger = LogFactory.getLog(this.getClass()); @@ -117,7 +117,8 @@ public class CharacterStreamTarget implements MessageTarget { this.shouldAppendNewLine = shouldAppendNewLine; } - public boolean send(Message message) { + @Override + public boolean sendInternal(Message message) { Object payload = message.getPayload(); if (payload == null) { if (logger.isWarnEnabled()) { diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/config/ConsoleTargetParser.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/config/ConsoleTargetParser.java index 66cf004011..13550aea0d 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/config/ConsoleTargetParser.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/config/ConsoleTargetParser.java @@ -18,13 +18,20 @@ package org.springframework.integration.adapter.stream.config; import org.w3c.dom.Element; +import org.springframework.beans.factory.BeanDefinitionStoreException; +import org.springframework.beans.factory.config.BeanDefinitionHolder; +import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.integration.ConfigurationException; import org.springframework.integration.adapter.stream.CharacterStreamTarget; +import org.springframework.integration.channel.DirectChannel; import org.springframework.util.StringUtils; /** - * Parser for the <console-target> element. + * Parser for the "stdout-" and "stderr-channel-adapter" elements. * * @author Mark Fisher */ @@ -36,18 +43,21 @@ public class ConsoleTargetParser extends AbstractSingleBeanDefinitionParser { } @Override - protected boolean shouldGenerateId() { - return false; + protected String resolveId(Element element, AbstractBeanDefinition definition, ParserContext parserContext) throws BeanDefinitionStoreException { + String id = element.getAttribute("id"); + if (!element.hasAttribute("channel")) { + // the created channel will get the 'id', so the adapter's bean name includes a suffix + id = id + ".adapter"; + } + else if (!StringUtils.hasText(id)) { + id = parserContext.getReaderContext().generateBeanName(definition); + } + return id; } @Override - protected boolean shouldGenerateIdAsFallback() { - return true; - } - - @Override - protected void doParse(Element element, BeanDefinitionBuilder builder) { - if ("true".equals(element.getAttribute("error"))) { + protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { + if (element.getLocalName().startsWith("stderr")) { builder.setFactoryMethod("stderr"); } else { @@ -60,6 +70,25 @@ public class ConsoleTargetParser extends AbstractSingleBeanDefinitionParser { if ("true".equals(element.getAttribute("append-newline"))) { builder.addPropertyValue("shouldAppendNewLine", Boolean.TRUE); } + String channelName = element.getAttribute("channel"); + if (StringUtils.hasText(channelName)) { + builder.addPropertyReference("source", channelName); + } + else { + builder.addPropertyReference("source", this.createDirectChannel(element, parserContext)); + } + } + + private String createDirectChannel(Element element, ParserContext parserContext) { + String channelId = element.getAttribute("id"); + if (!StringUtils.hasText(channelId)) { + throw new ConfigurationException("The channel-adapter's 'id' attribute is required when no 'channel' " + + "reference has been provided, because that 'id' would be used for the created channel."); + } + BeanDefinitionBuilder channelBuilder = BeanDefinitionBuilder.genericBeanDefinition(DirectChannel.class); + BeanDefinitionHolder holder = new BeanDefinitionHolder(channelBuilder.getBeanDefinition(), channelId); + BeanDefinitionReaderUtils.registerBeanDefinition(holder, parserContext.getRegistry()); + return channelId; } } diff --git a/org.springframework.integration.adapter/src/main/resources/META-INF/spring-integration.parsers b/org.springframework.integration.adapter/src/main/resources/META-INF/spring-integration.parsers index 24f2fe7899..9342b0b362 100644 --- a/org.springframework.integration.adapter/src/main/resources/META-INF/spring-integration.parsers +++ b/org.springframework.integration.adapter/src/main/resources/META-INF/spring-integration.parsers @@ -1,5 +1,4 @@ console-source=org.springframework.integration.adapter.stream.config.ConsoleSourceParser -console-target=org.springframework.integration.adapter.stream.config.ConsoleTargetParser file-source=org.springframework.integration.adapter.file.config.FileSourceParser file-target=org.springframework.integration.adapter.file.config.FileTargetParser ftp-source=org.springframework.integration.adapter.ftp.config.FtpSourceParser @@ -12,4 +11,6 @@ mail-target=org.springframework.integration.adapter.mail.config.MailTargetParser polling-mail-source=org.springframework.integration.adapter.mail.config.PollingMailSourceParser imap-idle-mail-source=org.springframework.integration.adapter.mail.config.SubscribableImapIdleMailSourceParser rmi-gateway=org.springframework.integration.adapter.rmi.config.RmiGatewayParser -rmi-handler=org.springframework.integration.adapter.rmi.config.RmiHandlerParser \ No newline at end of file +rmi-handler=org.springframework.integration.adapter.rmi.config.RmiHandlerParser +stderr-channel-adapter=org.springframework.integration.adapter.stream.config.ConsoleTargetParser +stdout-channel-adapter=org.springframework.integration.adapter.stream.config.ConsoleTargetParser \ No newline at end of file diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/mail/SubscribableMailSourceTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/mail/SubscribableMailSourceTests.java index 285fda11b0..2c5c6c43a8 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/mail/SubscribableMailSourceTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/mail/SubscribableMailSourceTests.java @@ -1,9 +1,24 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.springframework.integration.adapter.mail; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import javax.mail.internet.MimeMessage; @@ -12,15 +27,13 @@ import org.easymock.classextension.EasyMock; import org.junit.Before; import org.junit.Test; import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageTarget; import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; /** - * * @author Jonas Partner - * */ public class SubscribableMailSourceTests { @@ -34,24 +47,19 @@ public class SubscribableMailSourceTests { @Test public void testReceive() throws Exception { javax.mail.Message message = EasyMock.createMock(MimeMessage.class); - StubFolderConnection folderConnection = new StubFolderConnection( - message); - StubTarget target = new StubTarget(); - - SubscribableMailSource mailSource = new SubscribableMailSource( - folderConnection, executor); - mailSource.subscribe(target); + StubFolderConnection folderConnection = new StubFolderConnection(message); + QueueChannel channel = new QueueChannel(); + SubscribableMailSource mailSource = new SubscribableMailSource(folderConnection, executor); + mailSource.setOutputChannel(channel); mailSource.setConverter(new StubMessageConvertor()); - mailSource.start(); - Thread.sleep(1000); + Message result = channel.receive(1000); mailSource.stop(); - - assertEquals("Wrong message count", 1, target.messages.size()); - assertEquals("Wrong payload", message, target.messages.get(0) - .getPayload()); + assertNotNull(result); + assertEquals("Wrong payload", message, result.getPayload()); } + private static class StubFolderConnection implements FolderConnection { ConcurrentLinkedQueue messages = new ConcurrentLinkedQueue(); @@ -73,25 +81,12 @@ public class SubscribableMailSourceTests { } public void start() { - } public void stop() { - } - } - private static class StubTarget implements MessageTarget { - - List> messages = new ArrayList>(); - - public boolean send(Message message) { - messages.add(message); - return true; - } - - } private static class StubMessageConvertor implements MailMessageConverter { diff --git a/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/quote/quoteDemo.xml b/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/quote/quoteDemo.xml index 9e11e43b68..a54ab05221 100644 --- a/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/quote/quoteDemo.xml +++ b/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/quote/quoteDemo.xml @@ -15,8 +15,6 @@ - - - + diff --git a/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/ws/temperatureConversion.xml b/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/ws/temperatureConversion.xml index cfcec298dc..4c068991ee 100644 --- a/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/ws/temperatureConversion.xml +++ b/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/ws/temperatureConversion.xml @@ -19,8 +19,6 @@ uri="http://www.w3schools.com/webservices/tempconvert.asmx"/> - - - + diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java index 6d640a3372..2d58b02cc0 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java @@ -17,13 +17,13 @@ package org.springframework.integration.channel; import org.springframework.integration.dispatcher.SimpleDispatcher; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.SubscribableSource; -import org.springframework.integration.message.MessageTarget; /** - * A channel that invokes the subscribed {@link MessageTarget target(s)} in - * the sender's thread (returning after at most one handles the message). + * A channel that invokes the subscribed {@link MessageEndpoint endpoint(s)} + * in the sender's thread (returning after at most one accepts the message). * * @author Dave Syer * @author Mark Fisher @@ -33,12 +33,12 @@ public class DirectChannel extends AbstractMessageChannel implements Subscribabl private final SimpleDispatcher dispatcher = new SimpleDispatcher(); - public boolean subscribe(MessageTarget target) { - return this.dispatcher.subscribe(target); + public boolean subscribe(MessageEndpoint endpoint) { + return this.dispatcher.subscribe(endpoint); } - public boolean unsubscribe(MessageTarget target) { - return this.dispatcher.unsubscribe(target); + public boolean unsubscribe(MessageEndpoint endpoint) { + return this.dispatcher.unsubscribe(endpoint); } @Override diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java index e0cc1289c3..afcaad3c9b 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java @@ -18,8 +18,8 @@ package org.springframework.integration.channel; import org.springframework.core.task.TaskExecutor; import org.springframework.integration.dispatcher.BroadcastingDispatcher; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageTarget; import org.springframework.integration.message.SubscribableSource; /** @@ -48,12 +48,12 @@ public class PublishSubscribeChannel extends AbstractMessageChannel implements S this.dispatcher.setApplySequence(applySequence); } - public boolean subscribe(MessageTarget target) { - return this.dispatcher.subscribe(target); + public boolean subscribe(MessageEndpoint endpoint) { + return this.dispatcher.subscribe(endpoint); } - public boolean unsubscribe(MessageTarget target) { - return this.dispatcher.unsubscribe(target); + public boolean unsubscribe(MessageEndpoint endpoint) { + return this.dispatcher.unsubscribe(endpoint); } @Override diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java index 444956c3bb..3bef059c4e 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java @@ -23,8 +23,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageTarget; /** * Base class for {@link MessageDispatcher} implementations. @@ -35,7 +35,7 @@ public abstract class AbstractDispatcher implements MessageDispatcher { protected final Log logger = LogFactory.getLog(this.getClass()); - protected final Set targets = new CopyOnWriteArraySet(); + protected final Set endpoints = new CopyOnWriteArraySet(); private volatile TaskExecutor taskExecutor; @@ -46,18 +46,18 @@ public abstract class AbstractDispatcher implements MessageDispatcher { return "dispatcher"; } - public boolean subscribe(MessageTarget target) { - return this.targets.add(target); + public boolean subscribe(MessageEndpoint endpoint) { + return this.endpoints.add(endpoint); } - public boolean unsubscribe(MessageTarget target) { - return this.targets.remove(target); + public boolean unsubscribe(MessageEndpoint endpoint) { + return this.endpoints.remove(endpoint); } /** - * Specify a {@link TaskExecutor} for invoking the target endpoints. + * Specify a {@link TaskExecutor} for invoking the endpoints. * If none is provided, the invocation will occur in the thread - * that runs this polling dispatcher. + * that runs this polling dispatcher. */ public void setTaskExecutor(TaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; @@ -68,14 +68,14 @@ public abstract class AbstractDispatcher implements MessageDispatcher { } /** - * A convenience method for subclasses to send a Message to a single target. + * A convenience method for subclasses to send a Message to a single endpoint. */ - protected final boolean sendMessageToTarget(Message message, MessageTarget target) { - return target.send(message); + protected final boolean sendMessageToEndpoint(Message message, MessageEndpoint endpoint) { + return endpoint.send(message); } public String toString() { - return this.getClass().getSimpleName() + " with targets: " + this.targets; + return this.getClass().getSimpleName() + " with endpoints: " + this.endpoints; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java index e6d319cef8..2b44ef4523 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java @@ -17,15 +17,15 @@ package org.springframework.integration.dispatcher; import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; -import org.springframework.integration.message.MessageTarget; /** * A broadcasting dispatcher implementation. It makes a best effort to - * send the message to each of its targets. If it fails to send to any - * one target, it will log a warn-level message but continue to send - * to the other targets. + * send the message to each of its endpoints. If it fails to send to any + * one endpoints, it will log a warn-level message but continue to send + * to the other endpoints. * * @author Mark Fisher */ @@ -36,7 +36,7 @@ public class BroadcastingDispatcher extends AbstractDispatcher { /** * Specify whether to apply sequence numbers to the messages - * prior to sending to the targets. By default, sequence + * prior to sending to the endpoints. By default, sequence * numbers will not be applied */ public void setApplySequence(boolean applySequence) { @@ -45,8 +45,8 @@ public class BroadcastingDispatcher extends AbstractDispatcher { public boolean send(Message message) { int sequenceNumber = 1; - int sequenceSize = this.targets.size(); - for (final MessageTarget target : this.targets) { + int sequenceSize = this.endpoints.size(); + for (final MessageEndpoint endpoint : this.endpoints) { final Message messageToSend = (!this.applySequence) ? message : MessageBuilder.fromMessage(message) .setSequenceNumber(sequenceNumber++) @@ -56,12 +56,12 @@ public class BroadcastingDispatcher extends AbstractDispatcher { if (executor != null) { executor.execute(new Runnable() { public void run() { - sendMessageToTarget(messageToSend, target); + sendMessageToEndpoint(messageToSend, endpoint); } }); } else { - this.sendMessageToTarget(messageToSend, target); + this.sendMessageToEndpoint(messageToSend, endpoint); } } return true; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/MessageDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/MessageDispatcher.java index a860faf309..752044fce8 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/MessageDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/MessageDispatcher.java @@ -17,8 +17,8 @@ package org.springframework.integration.dispatcher; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageTarget; import org.springframework.integration.message.SubscribableSource; /** @@ -30,8 +30,8 @@ public interface MessageDispatcher extends MessageChannel, SubscribableSource { boolean send(Message message); - boolean subscribe(MessageTarget target); + boolean subscribe(MessageEndpoint endpoint); - boolean unsubscribe(MessageTarget target); + boolean unsubscribe(MessageEndpoint endpoint); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcher.java index c9f0629fce..9ab4e68330 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcher.java @@ -19,9 +19,9 @@ package org.springframework.integration.dispatcher; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.BlockingSource; import org.springframework.integration.message.MessageExchangeTemplate; -import org.springframework.integration.message.MessageTarget; import org.springframework.integration.message.PollableSource; import org.springframework.integration.message.SubscribableSource; import org.springframework.integration.scheduling.SchedulableTask; @@ -99,12 +99,12 @@ public class PollingDispatcher implements SchedulableTask, SubscribableSource { this.maxMessagesPerPoll = maxMessagesPerPoll; } - public boolean subscribe(MessageTarget target) { - return this.dispatcher.subscribe(target); + public boolean subscribe(MessageEndpoint endpoint) { + return this.dispatcher.subscribe(endpoint); } - public boolean unsubscribe(MessageTarget target) { - return this.dispatcher.unsubscribe(target); + public boolean unsubscribe(MessageEndpoint endpoint) { + return this.dispatcher.unsubscribe(endpoint); } public Schedule getSchedule() { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java index 4f6517af98..8f5bbe6409 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java @@ -16,19 +16,19 @@ package org.springframework.integration.dispatcher; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageRejectedException; -import org.springframework.integration.message.MessageTarget; /** * Basic implementation of {@link MessageDispatcher} that will attempt - * to send a {@link Message} to one of its targets. As soon as one - * of the targets accepts the Message, the dispatcher will return 'true'. + * to send a {@link Message} to one of its endpoints. As soon as one + * of the endpoints accepts the Message, the dispatcher will return 'true'. *

- * If the dispatcher has no targets, a {@link MessageDeliveryException} - * will be thrown. If all targets reject the Message, the dispatcher will - * throw a MessageRejectedException. If all targets return 'false' + * If the dispatcher has no endpoints, a {@link MessageDeliveryException} + * will be thrown. If all endpoints reject the Message, the dispatcher will + * throw a MessageRejectedException. If all endpoints return 'false' * (e.g. due to a timeout), the dispatcher will return 'false'. * * @author Mark Fisher @@ -36,30 +36,30 @@ import org.springframework.integration.message.MessageTarget; public class SimpleDispatcher extends AbstractDispatcher { public boolean send(Message message) { - if (this.targets.size() == 0) { - throw new MessageDeliveryException(message, "Dispatcher has no targets."); + if (this.endpoints.size() == 0) { + throw new MessageDeliveryException(message, "Dispatcher has no subscribers."); } int count = 0; int rejectedExceptionCount = 0; - for (MessageTarget target : this.targets) { + for (MessageEndpoint endpoint : this.endpoints) { count++; try { - if (this.sendMessageToTarget(message, target)) { + if (this.sendMessageToEndpoint(message, endpoint)) { return true; } if (logger.isDebugEnabled()) { - logger.debug("Failed to send message to target, continuing with other targets if available."); + logger.debug("Failed to send message to endpoint, continuing with other endpoints if available."); } } catch (MessageRejectedException e) { rejectedExceptionCount++; if (logger.isDebugEnabled()) { - logger.debug("Target '" + target + "' rejected Message, continuing with other targets if available.", e); + logger.debug("Endpoint '" + endpoint + "' rejected Message, continuing with other endpoints if available.", e); } } } if (rejectedExceptionCount == count) { - throw new MessageRejectedException(message, "All of dispatcher's targets rejected Message."); + throw new MessageRejectedException(message, "All of dispatcher's endpoints rejected Message."); } return false; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java index ca1ba2fd41..7361a06bd8 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java @@ -16,18 +16,20 @@ package org.springframework.integration.endpoint; +import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageSource; -import org.springframework.integration.message.MessageTarget; /** * Base interface for message endpoints. * * @author Mark Fisher */ -public interface MessageEndpoint extends MessageTarget { +public interface MessageEndpoint { String getName(); MessageSource getSource(); + boolean send(Message message); + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/SubscribableSource.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/SubscribableSource.java index 7722d061c9..1cc02ea853 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/message/SubscribableSource.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/SubscribableSource.java @@ -16,6 +16,8 @@ package org.springframework.integration.message; +import org.springframework.integration.endpoint.MessageEndpoint; + /** * Interface for any source of messages that accepts subscribers. * @@ -24,13 +26,13 @@ package org.springframework.integration.message; public interface SubscribableSource extends MessageSource { /** - * Register a {@link MessageTarget} as a subscriber to this source. + * Register a {@link MessageEndpoint} as a subscriber to this source. */ - boolean subscribe(MessageTarget target); + boolean subscribe(MessageEndpoint endpoint); /** - * Remove a {@link MessageTarget} from the subscribers of this source. + * Remove a {@link MessageEndpoint} from the subscribers of this source. */ - boolean unsubscribe(MessageTarget target); + boolean unsubscribe(MessageEndpoint endpoint); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/DirectChannelTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/DirectChannelTests.java index 5e84a93b13..a0986cd4bb 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/DirectChannelTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/DirectChannelTests.java @@ -25,8 +25,9 @@ import java.util.concurrent.TimeUnit; import org.junit.Test; import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageTarget; +import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.StringMessage; /** @@ -61,7 +62,7 @@ public class DirectChannelTests { } - private static class ThreadNameExtractingTestTarget implements MessageTarget { + private static class ThreadNameExtractingTestTarget implements MessageEndpoint { private String threadName; @@ -83,6 +84,15 @@ public class DirectChannelTests { } return true; } + + // TODO: remove once this is a consumer instead of endpoint + public String getName() { + return null; + } + + public MessageSource getSource() { + return null; + } } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/TestSubscribableSource.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/TestSubscribableSource.java index 8950ff6822..78dff2df94 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/TestSubscribableSource.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/TestSubscribableSource.java @@ -19,8 +19,8 @@ package org.springframework.integration.channel.config; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageTarget; import org.springframework.integration.message.SubscribableSource; /** @@ -28,20 +28,20 @@ import org.springframework.integration.message.SubscribableSource; */ public class TestSubscribableSource implements SubscribableSource { - private final List targets = new CopyOnWriteArrayList(); + private final List endpoints = new CopyOnWriteArrayList(); - public boolean subscribe(MessageTarget target) { - return this.targets.add(target); + public boolean subscribe(MessageEndpoint endpoint) { + return this.endpoints.add(endpoint); } - public boolean unsubscribe(MessageTarget target) { - return this.targets.remove(target); + public boolean unsubscribe(MessageEndpoint endpoint) { + return this.endpoints.remove(endpoint); } public void publishMessage(Message message) { - for (MessageTarget target : this.targets) { - target.send(message); + for (MessageEndpoint endpoint : this.endpoints) { + endpoint.send(message); } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java index 99ec92e286..f42173d1fe 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java @@ -28,11 +28,11 @@ import org.junit.Test; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.MessageRejectedException; -import org.springframework.integration.message.MessageTarget; import org.springframework.integration.message.StringMessage; /** @@ -57,7 +57,7 @@ public class EndpointParserTests { public void testEndpointWithSelectorAccepts() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "endpointWithSelector.xml", this.getClass()); - MessageTarget endpoint = (MessageTarget) context.getBean("endpoint"); + MessageEndpoint endpoint = (MessageEndpoint) context.getBean("endpoint"); QueueChannel replyChannel = new QueueChannel(); Message message = MessageBuilder.fromPayload("test") .setReturnAddress(replyChannel).build(); @@ -71,7 +71,7 @@ public class EndpointParserTests { public void testEndpointWithSelectorRejects() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "endpointWithSelector.xml", this.getClass()); - MessageTarget endpoint = (MessageTarget) context.getBean("endpoint"); + MessageEndpoint endpoint = (MessageEndpoint) context.getBean("endpoint"); MessageChannel replyChannel = new QueueChannel(); Message message = MessageBuilder.fromPayload(123) .setReturnAddress(replyChannel).build(); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java index b6dca4e3de..cdf056d63f 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java @@ -318,12 +318,18 @@ public class MessagingAnnotationPostProcessorTests { DirectChannel testChannel = (DirectChannel) messageBus.lookupChannel("testChannel"); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference> receivedMessage = new AtomicReference>(); - testChannel.subscribe(new org.springframework.integration.message.MessageTarget() { + testChannel.subscribe(new org.springframework.integration.endpoint.MessageEndpoint() { public boolean send(Message message) { receivedMessage.set(message); latch.countDown(); return false; } + public String getName() { + return null; + } + public MessageSource getSource() { + return null; + } }); latch.await(3, TimeUnit.SECONDS); assertEquals(0, latch.getCount()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java index 2320f97001..990356c399 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java @@ -35,8 +35,9 @@ import org.junit.Before; import org.junit.Test; import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageTarget; +import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.StringMessage; /** @@ -51,11 +52,11 @@ public class BroadcastingDispatcherTests { private Message messageMock = createMock(Message.class); - private MessageTarget targetMock1 = createMock(MessageTarget.class); + private MessageEndpoint targetMock1 = createMock(MessageEndpoint.class); - private MessageTarget targetMock2 = createMock(MessageTarget.class); + private MessageEndpoint targetMock2 = createMock(MessageEndpoint.class); - private MessageTarget targetMock3 = createMock(MessageTarget.class); + private MessageEndpoint targetMock3 = createMock(MessageEndpoint.class); private Object[] globalMocks = new Object[] { messageMock, taskExecutorMock, targetMock1, targetMock2, targetMock3 }; @@ -213,8 +214,8 @@ public class BroadcastingDispatcherTests { public void applySequenceDisabledByDefault() { BroadcastingDispatcher dispatcher = new BroadcastingDispatcher(); final List> messages = Collections.synchronizedList(new ArrayList>()); - MessageTarget target1 = new MessageStoringTestTarget(messages); - MessageTarget target2 = new MessageStoringTestTarget(messages); + MessageEndpoint target1 = new MessageStoringTestEndpoint(messages); + MessageEndpoint target2 = new MessageStoringTestEndpoint(messages); dispatcher.subscribe(target1); dispatcher.subscribe(target2); dispatcher.send(new StringMessage("test")); @@ -230,9 +231,9 @@ public class BroadcastingDispatcherTests { BroadcastingDispatcher dispatcher = new BroadcastingDispatcher(); dispatcher.setApplySequence(true); final List> messages = Collections.synchronizedList(new ArrayList>()); - MessageTarget target1 = new MessageStoringTestTarget(messages); - MessageTarget target2 = new MessageStoringTestTarget(messages); - MessageTarget target3 = new MessageStoringTestTarget(messages); + MessageEndpoint target1 = new MessageStoringTestEndpoint(messages); + MessageEndpoint target2 = new MessageStoringTestEndpoint(messages); + MessageEndpoint target3 = new MessageStoringTestEndpoint(messages); dispatcher.subscribe(target1); dispatcher.subscribe(target2); dispatcher.subscribe(target3); @@ -275,11 +276,11 @@ public class BroadcastingDispatcherTests { } - private static class MessageStoringTestTarget implements MessageTarget { + private static class MessageStoringTestEndpoint implements MessageEndpoint { private final List> messageList; - MessageStoringTestTarget(List> messageList) { + MessageStoringTestEndpoint(List> messageList) { this.messageList = messageList; } @@ -287,6 +288,14 @@ public class BroadcastingDispatcherTests { this.messageList.add(message); return true; } + + public String getName() { + return null; + } + + public MessageSource getSource() { + return null; + } }; } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java index be4c4d4323..ddfabc659a 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java @@ -27,12 +27,13 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.springframework.integration.endpoint.AbstractInOutEndpoint; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.endpoint.ServiceActivatorEndpoint; import org.springframework.integration.handler.TestHandlers; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageRejectedException; -import org.springframework.integration.message.MessageTarget; +import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.StringMessage; import org.springframework.integration.message.selector.MessageSelector; @@ -69,7 +70,7 @@ public class SimpleDispatcherTests { public void noDuplicateSubscriptions() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); - MessageTarget target = new CountingTestTarget(counter, false); + MessageEndpoint target = new CountingTestEndpoint(counter, false); dispatcher.subscribe(target); dispatcher.subscribe(target); dispatcher.send(new StringMessage("test")); @@ -80,9 +81,9 @@ public class SimpleDispatcherTests { public void unsubscribeBeforeSend() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); - MessageTarget target1 = new CountingTestTarget(counter, false); - MessageTarget target2 = new CountingTestTarget(counter, false); - MessageTarget target3 = new CountingTestTarget(counter, false); + MessageEndpoint target1 = new CountingTestEndpoint(counter, false); + MessageEndpoint target2 = new CountingTestEndpoint(counter, false); + MessageEndpoint target3 = new CountingTestEndpoint(counter, false); dispatcher.subscribe(target1); dispatcher.subscribe(target2); dispatcher.subscribe(target3); @@ -95,9 +96,9 @@ public class SimpleDispatcherTests { public void unsubscribeBetweenSends() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); - MessageTarget target1 = new CountingTestTarget(counter, false); - MessageTarget target2 = new CountingTestTarget(counter, false); - MessageTarget target3 = new CountingTestTarget(counter, false); + MessageEndpoint target1 = new CountingTestEndpoint(counter, false); + MessageEndpoint target2 = new CountingTestEndpoint(counter, false); + MessageEndpoint target3 = new CountingTestEndpoint(counter, false); dispatcher.subscribe(target1); dispatcher.subscribe(target2); dispatcher.subscribe(target3); @@ -115,7 +116,7 @@ public class SimpleDispatcherTests { public void unsubscribeLastTargetCausesDeliveryException() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); - MessageTarget target = new CountingTestTarget(counter, false); + MessageEndpoint target = new CountingTestEndpoint(counter, false); dispatcher.subscribe(target); dispatcher.send(new StringMessage("test1")); assertEquals(1, counter.get()); @@ -183,9 +184,9 @@ public class SimpleDispatcherTests { public void firstHandlerReturnsTrue() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); - MessageTarget target1 = new CountingTestTarget(counter, true); - MessageTarget target2 = new CountingTestTarget(counter, false); - MessageTarget target3 = new CountingTestTarget(counter, false); + MessageEndpoint target1 = new CountingTestEndpoint(counter, true); + MessageEndpoint target2 = new CountingTestEndpoint(counter, false); + MessageEndpoint target3 = new CountingTestEndpoint(counter, false); dispatcher.subscribe(target1); dispatcher.subscribe(target2); dispatcher.subscribe(target3); @@ -197,9 +198,9 @@ public class SimpleDispatcherTests { public void middleHandlerReturnsTrue() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); - MessageTarget target1 = new CountingTestTarget(counter, false); - MessageTarget target2 = new CountingTestTarget(counter, true); - MessageTarget target3 = new CountingTestTarget(counter, false); + MessageEndpoint target1 = new CountingTestEndpoint(counter, false); + MessageEndpoint target2 = new CountingTestEndpoint(counter, true); + MessageEndpoint target3 = new CountingTestEndpoint(counter, false); dispatcher.subscribe(target1); dispatcher.subscribe(target2); dispatcher.subscribe(target3); @@ -211,9 +212,9 @@ public class SimpleDispatcherTests { public void allHandlersReturnFalse() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); - MessageTarget target1 = new CountingTestTarget(counter, false); - MessageTarget target2 = new CountingTestTarget(counter, false); - MessageTarget target3 = new CountingTestTarget(counter, false); + MessageEndpoint target1 = new CountingTestEndpoint(counter, false); + MessageEndpoint target2 = new CountingTestEndpoint(counter, false); + MessageEndpoint target3 = new CountingTestEndpoint(counter, false); dispatcher.subscribe(target1); dispatcher.subscribe(target2); dispatcher.subscribe(target3); @@ -245,13 +246,13 @@ public class SimpleDispatcherTests { } - private static class CountingTestTarget implements MessageTarget { + private static class CountingTestEndpoint implements MessageEndpoint { private final AtomicInteger counter; private final boolean returnValue; - CountingTestTarget(AtomicInteger counter, boolean returnValue) { + CountingTestEndpoint(AtomicInteger counter, boolean returnValue) { this.counter = counter; this.returnValue = returnValue; } @@ -260,6 +261,14 @@ public class SimpleDispatcherTests { this.counter.incrementAndGet(); return this.returnValue; } + + public String getName() { + return null; + } + + public MessageSource getSource() { + return null; + } } }