From 6cb6347b82e092fe5e3e21fa96249ff5f613a2fc Mon Sep 17 00:00:00 2001 From: Mark Fisher Date: Sat, 5 Jul 2008 18:39:11 +0000 Subject: [PATCH] Endpoint now throws MessageRejectedException when a MessageSelector rejects a Message. --- .../config/EndpointInterceptorParser.java | 11 +--- .../config/IntegrationNamespaceUtils.java | 13 +++++ .../endpoint/AbstractEndpoint.java | 20 +++++--- .../integration/endpoint/SourceEndpoint.java | 31 +++++++++--- .../integration/endpoint/TargetEndpoint.java | 50 +++++++++++++------ .../message/MessageRejectedException.java | 39 +++++++++++++++ .../config/EndpointParserTests.java | 8 +-- .../endpoint/HandlerEndpointTests.java | 23 +++++++-- 8 files changed, 151 insertions(+), 44 deletions(-) create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/message/MessageRejectedException.java diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/EndpointInterceptorParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/EndpointInterceptorParser.java index ea9a827521..681fa33b36 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/EndpointInterceptorParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/EndpointInterceptorParser.java @@ -23,12 +23,9 @@ import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; -import org.springframework.beans.factory.config.BeanDefinitionHolder; import org.springframework.beans.factory.config.RuntimeBeanReference; -import org.springframework.beans.factory.parsing.BeanComponentDefinition; import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.ManagedList; -import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate; import org.springframework.beans.factory.xml.NamespaceHandler; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.util.Assert; @@ -58,12 +55,8 @@ public class EndpointInterceptorParser { Element childElement = (Element) child; String localName = child.getLocalName(); if ("bean".equals(localName)) { - BeanDefinitionParserDelegate beanParser = - new BeanDefinitionParserDelegate(parserContext.getReaderContext()); - beanParser.initDefaults(childElement.getOwnerDocument().getDocumentElement()); - BeanDefinitionHolder beanDefinitionHolder = beanParser.parseBeanDefinitionElement(childElement); - parserContext.registerBeanComponent(new BeanComponentDefinition(beanDefinitionHolder)); - interceptors.add(new RuntimeBeanReference(beanDefinitionHolder.getBeanName())); + interceptors.add(new RuntimeBeanReference( + IntegrationNamespaceUtils.parseBeanDefinitionElement(childElement, parserContext))); } else if ("ref".equals(localName)) { String ref = childElement.getAttribute("bean"); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java index 41a44568c1..52d2875553 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java @@ -18,8 +18,12 @@ package org.springframework.integration.config; import org.w3c.dom.Element; +import org.springframework.beans.factory.config.BeanDefinitionHolder; import org.springframework.beans.factory.config.RuntimeBeanReference; +import org.springframework.beans.factory.parsing.BeanComponentDefinition; import org.springframework.beans.factory.support.RootBeanDefinition; +import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate; +import org.springframework.beans.factory.xml.ParserContext; import org.springframework.util.StringUtils; /** @@ -68,4 +72,13 @@ public abstract class IntegrationNamespaceUtils { } } + public static String parseBeanDefinitionElement(Element element, ParserContext parserContext) { + BeanDefinitionParserDelegate beanParser = + new BeanDefinitionParserDelegate(parserContext.getReaderContext()); + beanParser.initDefaults(element.getOwnerDocument().getDocumentElement()); + BeanDefinitionHolder beanDefinitionHolder = beanParser.parseBeanDefinitionElement(element); + parserContext.registerBeanComponent(new BeanComponentDefinition(beanDefinitionHolder)); + return beanDefinitionHolder.getBeanName(); + } + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java index 1e317a458a..3a30a430e0 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java @@ -30,8 +30,9 @@ import org.springframework.integration.ConfigurationException; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.handler.MessageHandlerNotRunningException; +import org.springframework.integration.message.Command; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageHandlingException; +import org.springframework.integration.message.MessageRejectedException; import org.springframework.integration.scheduling.Schedule; /** @@ -221,8 +222,8 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware } public final boolean send(Message message) { - if (message == null) { - throw new IllegalArgumentException("Message must not be null."); + if (message == null || message.getPayload() == null) { + throw new IllegalArgumentException("Message and its payload must not be null."); } if (logger.isDebugEnabled()) { logger.debug("endpoint '" + this + "' handling message: " + message); @@ -230,14 +231,19 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware if (!this.isRunning()) { throw new MessageHandlerNotRunningException(message); } - if (!this.supports(message)) { - throw new MessageHandlingException(message, "unsupported message"); + if (message.getPayload() instanceof Command) { + return this.handleCommand((Command) message.getPayload()); } - return this.doInvoke(message); + if (!this.supports(message)) { + throw new MessageRejectedException(message, "unsupported message"); + } + return this.handleMessage(message); } protected abstract boolean supports(Message message); - protected abstract boolean doInvoke(Message message); + protected abstract boolean handleCommand(Command command); + + protected abstract boolean handleMessage(Message message); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java index 89c1ae0e3e..1bf320963b 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java @@ -18,6 +18,7 @@ package org.springframework.integration.endpoint; import org.springframework.integration.ConfigurationException; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.message.Command; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageDeliveryAware; import org.springframework.integration.message.MessageDeliveryException; @@ -42,15 +43,33 @@ public class SourceEndpoint extends AbstractEndpoint { } - protected boolean supports(Message message) { - return (message.getPayload() instanceof PollCommand); - } - - public final boolean doInvoke(Message pollCommandMessage) { - if (this.getOutputChannel() == null) { + @Override + public void initialize() { + if (this.getOutputChannelName() == null && this.getOutputChannel() == null) { throw new ConfigurationException( "no output channel has been configured for source endpoint '" + this.getName() + "'"); } + } + + @Override + protected final boolean supports(Message message) { + return false; + } + + @Override + protected final boolean handleMessage(Message message) { + return false; + } + + @Override + protected final boolean handleCommand(Command command) { + if (command instanceof PollCommand) { + return this.poll(); + } + return false; + } + + private boolean poll() { Message message = this.source.receive(); if (message == null) { return false; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/TargetEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/TargetEndpoint.java index e5814595e1..f27205a9f1 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/TargetEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/TargetEndpoint.java @@ -18,6 +18,8 @@ package org.springframework.integration.endpoint; import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.message.BlockingTarget; +import org.springframework.integration.message.Command; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageTarget; import org.springframework.integration.message.PollCommand; @@ -35,6 +37,10 @@ public class TargetEndpoint extends AbstractEndpoint { private volatile MessageSelector selector; + private volatile long receiveTimeout = 5000; + + private volatile long sendTimeout = 0; + private volatile boolean initialized; private final Object initializationMonitor = new Object(); @@ -62,6 +68,14 @@ public class TargetEndpoint extends AbstractEndpoint { this.selector = selector; } + public void setReceiveTimeout(long receiveTimeout) { + this.receiveTimeout = receiveTimeout; + } + + public void setSendTimeout(long sendTimeout) { + this.sendTimeout = sendTimeout; + } + protected void initialize() { synchronized (this.initializationMonitor) { if (this.initialized) { @@ -75,29 +89,37 @@ public class TargetEndpoint extends AbstractEndpoint { } @Override - protected final boolean doInvoke(Message message) { - if (message.getPayload() instanceof PollCommand) { + protected boolean supports(Message message) { + if (this.selector != null && !this.selector.accept(message)) { + if (logger.isDebugEnabled()) { + logger.debug("selector for endpoint '" + this + "' rejected message: " + message); + } + return false; + } + return true; + } + + @Override + protected final boolean handleMessage(Message message) { + return (this.sendTimeout >= 0 && this.target instanceof BlockingTarget) ? + ((BlockingTarget) this.target).send(message) : this.target.send(message); + } + + @Override + protected final boolean handleCommand(Command command) { + if (command instanceof PollCommand) { MessageChannel channel = this.getInputChannel(); if (channel != null) { - Message receivedMessage = channel.receive(5000); + Message receivedMessage = channel.receive(this.receiveTimeout); if (receivedMessage != null) { - return this.doInvoke(receivedMessage); + return this.handleMessage(receivedMessage); } } else if (logger.isDebugEnabled()) { logger.debug("TargetEndpoint unable to resolve channel '" + this.getInputChannelName() + "'"); } - return false; } - if (this.selector != null && !this.selector.accept(message)) { - return false; - } - return this.target.send(message); - } - - @Override - protected final boolean supports(Message message) { - return true; + return false; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageRejectedException.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageRejectedException.java new file mode 100644 index 0000000000..4258d67df4 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageRejectedException.java @@ -0,0 +1,39 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.message; + +/** + * Exception that indicates a message has been rejected by a selector. + * + * @author Mark Fisher + */ +@SuppressWarnings("serial") +public class MessageRejectedException extends MessageHandlingException { + + public MessageRejectedException(Message failedMessage) { + super(failedMessage); + } + + public MessageRejectedException(Message failedMessage, String description) { + super(failedMessage, description); + } + + public MessageRejectedException(Message failedMessage, String description, Throwable cause) { + super(failedMessage, description, cause); + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java index 0033623222..8bfa14389c 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java @@ -17,7 +17,6 @@ package org.springframework.integration.config; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -31,8 +30,9 @@ import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; -import org.springframework.integration.message.StringMessage; +import org.springframework.integration.message.MessageRejectedException; import org.springframework.integration.message.MessageTarget; +import org.springframework.integration.message.StringMessage; /** * @author Mark Fisher @@ -91,7 +91,7 @@ public class EndpointParserTests { assertEquals("foo", reply.getPayload()); } - @Test + @Test(expected=MessageRejectedException.class) public void testEndpointWithSelectorRejects() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "endpointWithSelector.xml", this.getClass()); @@ -99,7 +99,7 @@ public class EndpointParserTests { Message message = new GenericMessage(123); MessageChannel replyChannel = new QueueChannel(); message.getHeader().setReturnAddress(replyChannel); - assertFalse(endpoint.send(message)); + endpoint.send(message); } @Test diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/HandlerEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/HandlerEndpointTests.java index d2d73d6f93..702664ec9e 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/HandlerEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/HandlerEndpointTests.java @@ -36,6 +36,7 @@ import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.handler.MessageHandlerNotRunningException; import org.springframework.integration.handler.TestHandlers; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageRejectedException; import org.springframework.integration.message.StringMessage; import org.springframework.integration.message.selector.MessageSelector; import org.springframework.integration.message.selector.MessageSelectorChain; @@ -187,7 +188,7 @@ public class HandlerEndpointTests { assertTrue(exceptionThrown); } - @Test + @Test(expected=MessageRejectedException.class) public void testEndpointWithSelectorRejecting() { HandlerEndpoint endpoint = new HandlerEndpoint(TestHandlers.nullHandler()); endpoint.setMessageSelector(new MessageSelector() { @@ -196,7 +197,7 @@ public class HandlerEndpointTests { } }); endpoint.start(); - assertFalse(endpoint.send(new StringMessage("test"))); + endpoint.send(new StringMessage("test")); } @Test @@ -234,7 +235,14 @@ public class HandlerEndpointTests { }); endpoint.setMessageSelector(selectorChain); endpoint.start(); - assertFalse(endpoint.send(new StringMessage("test"))); + boolean exceptionWasThrown = false; + try { + endpoint.send(new StringMessage("test")); + } + catch (MessageRejectedException e) { + exceptionWasThrown = true; + } + assertTrue(exceptionWasThrown); assertEquals("only the first selector should have been invoked", 1, counter.get()); endpoint.stop(); } @@ -259,7 +267,14 @@ public class HandlerEndpointTests { }); endpoint.setMessageSelector(selectorChain); endpoint.start(); - assertFalse(endpoint.send(new StringMessage("test"))); + boolean exceptionWasThrown = false; + try { + endpoint.send(new StringMessage("test")); + } + catch (MessageRejectedException e) { + exceptionWasThrown = true; + } + assertTrue(exceptionWasThrown); assertEquals("both selectors should have been invoked", 2, selectorCounter.get()); assertEquals("the handler should not have been invoked", 0, handlerCounter.get()); endpoint.stop();