diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java index cebae71dcc..bb981b82b9 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java @@ -54,20 +54,6 @@ public class AggregatorParser extends AbstractHandlerEndpointParser { private static final String COMPLETION_STRATEGY_PROPERTY = "completionStrategy"; - private static final String DEFAULT_REPLY_CHANNEL_PROPERTY = "defaultReplyChannel"; - - private static final String DISCARD_CHANNEL_PROPERTY = "discardChannel"; - - private static final String SEND_TIMEOUT_PROPERTY = "sendTimeout"; - - private static final String SEND_PARTIAL_RESULT_ON_TIMEOUT_PROPERTY = "sendPartialResultOnTimeout"; - - private static final String REAPER_INTERVAL_PROPERTY = "reaperInterval"; - - public static final String TRACKED_CORRELATION_ID_CAPACITY_PROPERTY = "trackedCorrelationIdCapacity"; - - public static final String TIMEOUT = "timeout"; - public static final String AGGREGATOR_ELEMENT = "aggregator"; @@ -103,19 +89,13 @@ public class AggregatorParser extends AbstractHandlerEndpointParser { builder.addPropertyReference(COMPLETION_STRATEGY_PROPERTY, completionStrategyRef); } } - IntegrationNamespaceUtils.setBeanReferenceIfAttributeDefined(builder, DEFAULT_REPLY_CHANNEL_PROPERTY, - element, OUTPUT_CHANNEL_ATTRIBUTE); - IntegrationNamespaceUtils.setBeanReferenceIfAttributeDefined(builder, DISCARD_CHANNEL_PROPERTY, element, - DISCARD_CHANNEL_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, SEND_TIMEOUT_PROPERTY, element, - SEND_TIMEOUT_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, SEND_PARTIAL_RESULT_ON_TIMEOUT_PROPERTY, - element, SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, REAPER_INTERVAL_PROPERTY, element, - REAPER_INTERVAL_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, TRACKED_CORRELATION_ID_CAPACITY_PROPERTY, - element, TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, TIMEOUT, element, TIMEOUT_ATTRIBUTE); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, OUTPUT_CHANNEL_ATTRIBUTE); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, REAPER_INTERVAL_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TIMEOUT_ATTRIBUTE); return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry()); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java index 9f66b8ba71..639a87dcb8 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java @@ -24,6 +24,7 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate; import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.core.Conventions; import org.springframework.integration.message.AsyncMessageExchangeTemplate; import org.springframework.integration.message.MessageExchangeTemplate; import org.springframework.transaction.support.DefaultTransactionDefinition; @@ -39,38 +40,47 @@ import org.springframework.util.xml.DomUtils; public abstract class IntegrationNamespaceUtils { /** - * Populates the property identified by propertyName on the bean definition - * to the value of the attribute specified by attributeName, if that - * attribute is defined in the element + * Populates the bean definition property corresponding to the specified + * attributeName with the value of that attribute if it is defined in the + * given element. The property name will be the camel-case equivalent of + * the lower case hyphen separated attribute (e.g. the "foo-bar" attribute + * would match the "fooBar" property). * * @param beanDefinition - the bean definition to be configured - * @param propertyName - the name of the bean property to be set * @param element - the XML element where the attribute should be defined * @param attributeName - the name of the attribute whose value will be set * on the property + * + * @see Conventions#attributeNameToPropertyName(String) */ - public static void setValueIfAttributeDefined(BeanDefinitionBuilder builder, String propertyName, + public static void setValueIfAttributeDefined(BeanDefinitionBuilder builder, Element element, String attributeName) { - final String attributeValue = element.getAttribute(attributeName); + String attributeValue = element.getAttribute(attributeName); + String propertyName = Conventions.attributeNameToPropertyName(attributeName); if (StringUtils.hasText(attributeValue)) { builder.addPropertyValue(propertyName, attributeValue); } } /** - * Populates the property given by propertyName on the given bean definition - * to a reference to a bean identified by the value of the attribute - * specified by attributeName, if that attribute is defined in the element + * Populates the bean definition property corresponding to the specified + * attributeName with the reference to a bean identified by the value of + * that attribute if the attribute is defined in the given element. The + * property name will be the camel-case equivalent of the lower case + * hyphen separated attribute (e.g. the "foo-bar" attribute would match + * the "fooBar" property). * * @param beanDefinition - the bean definition to be configured - * @param propertyName - the name of the bean property to be set * @param element - the XML element where the attribute should be defined - * @param attributeName - the id of the bean which will be used to populate - * the property + * @param attributeName - the name of the attribute whose value will be + * used as a bean reference to populate the property + * + * @see Conventions#attributeNameToPropertyName(String) */ - public static void setBeanReferenceIfAttributeDefined(BeanDefinitionBuilder builder, String propertyName, + public static void setReferenceIfAttributeDefined(BeanDefinitionBuilder builder, Element element, String attributeName) { - final String attributeValue = element.getAttribute(attributeName); + String attributeValue = element.getAttribute(attributeName); + String propertyName = Conventions.attributeNameToPropertyName(attributeName); if (StringUtils.hasText(attributeValue)) { builder.addPropertyReference(propertyName, attributeValue); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd index 1f6dc3576c..c4a700b2a2 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd @@ -409,7 +409,7 @@ - + diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractMessageBarrierHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractMessageBarrierHandler.java index bc7784045f..62dd197df3 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractMessageBarrierHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractMessageBarrierHandler.java @@ -69,7 +69,7 @@ public abstract class AbstractMessageBarrierHandler implements MessageHandler, I protected final Log logger = LogFactory.getLog(this.getClass()); - protected volatile MessageChannel defaultReplyChannel; + protected volatile MessageChannel outputChannel; private volatile MessageChannel discardChannel; @@ -99,12 +99,12 @@ public abstract class AbstractMessageBarrierHandler implements MessageHandler, I /** - * Set the default channel for sending aggregated Messages. Note that + * Set the output channel for sending aggregated Messages. Note that * precedence will be given to the 'returnAddress' of the aggregated * message itself, then to the 'returnAddress' of the original message. */ - public void setDefaultReplyChannel(MessageChannel defaultReplyChannel) { - this.defaultReplyChannel = defaultReplyChannel; + public void setOutputChannel(MessageChannel outputChannel) { + this.outputChannel = outputChannel; } /** @@ -204,7 +204,7 @@ public abstract class AbstractMessageBarrierHandler implements MessageHandler, I if (replyChannel == null) { replyChannel = this.resolveReplyChannelFromMessage(releasedMessages.get(0)); if (replyChannel == null) { - replyChannel = this.defaultReplyChannel; + replyChannel = this.outputChannel; } } if (replyChannel != null) { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/config/AggregatorMessageHandlerCreator.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/config/AggregatorMessageHandlerCreator.java index a236d89d4f..e3a8215004 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/router/config/AggregatorMessageHandlerCreator.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/config/AggregatorMessageHandlerCreator.java @@ -95,7 +95,7 @@ public class AggregatorMessageHandlerCreator extends AbstractMessageHandlerCreat if (endpointAnnotation != null) { String outputChannelName = endpointAnnotation.output(); if (StringUtils.hasText(outputChannelName)) { - handler.setDefaultReplyChannel(this.channelRegistry.lookupChannel(outputChannelName)); + handler.setOutputChannel(this.channelRegistry.lookupChannel(outputChannelName)); } } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/config/ResequencerParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/config/ResequencerParser.java index f7304ec60a..16b5565168 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/router/config/ResequencerParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/config/ResequencerParser.java @@ -20,8 +20,8 @@ import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser; +import org.springframework.integration.config.IntegrationNamespaceUtils; import org.springframework.integration.router.ResequencingMessageHandler; -import org.springframework.util.StringUtils; /** * Parser for the <resequencer> tag. @@ -30,14 +30,10 @@ import org.springframework.util.StringUtils; */ public class ResequencerParser extends AbstractSimpleBeanDefinitionParser { - public static final String DEFAULT_REPLY_CHANNEL_ATTRIBUTE = "default-reply-channel"; + public static final String OUTPUT_CHANNEL_ATTRIBUTE = "output-channel"; public static final String DISCARD_CHANNEL_ATTRIBUTE = "discard-channel"; - private static final String DEFAULT_REPLY_CHANNEL_PROPERTY = "defaultReplyChannel"; - - private static final String DISCARD_CHANNEL_PROPERTY = "discardChannel"; - @Override protected Class getBeanClass(Element element) { @@ -46,21 +42,15 @@ public class ResequencerParser extends AbstractSimpleBeanDefinitionParser { @Override protected boolean isEligibleAttribute(String attributeName) { - return !DEFAULT_REPLY_CHANNEL_ATTRIBUTE.equals(attributeName) + return !OUTPUT_CHANNEL_ATTRIBUTE.equals(attributeName) && !DISCARD_CHANNEL_ATTRIBUTE.equals(attributeName) && super.isEligibleAttribute(attributeName); } @Override - protected void postProcess(BeanDefinitionBuilder beanDefinition, Element element) { - if (StringUtils.hasText(element.getAttribute(DEFAULT_REPLY_CHANNEL_ATTRIBUTE))) { - beanDefinition.addPropertyReference(DEFAULT_REPLY_CHANNEL_PROPERTY, - element.getAttribute(DEFAULT_REPLY_CHANNEL_ATTRIBUTE)); - } - if (StringUtils.hasText(element.getAttribute(DISCARD_CHANNEL_ATTRIBUTE))) { - beanDefinition.addPropertyReference(DISCARD_CHANNEL_PROPERTY, - element.getAttribute(DISCARD_CHANNEL_ATTRIBUTE)); - } + protected void postProcess(BeanDefinitionBuilder builder, Element element) { + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, OUTPUT_CHANNEL_ATTRIBUTE); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE); } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java index 105db142ed..49ec17ab0e 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java @@ -77,7 +77,7 @@ public class AggregatorParserTests { (AggregatingMessageHandler) endpoint.getHandler(); TestAggregator testAggregator = (TestAggregator) context.getBean("aggregatorBean"); CompletionStrategy completionStrategy = (CompletionStrategy) context.getBean("completionStrategy"); - MessageChannel defaultReplyChannel = (MessageChannel) context.getBean("replyChannel"); + MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel"); DirectFieldAccessor messageHandlerFieldAccessor = new DirectFieldAccessor(completeAggregatingMessageHandler); Assert.assertEquals("The AggregatingMessageHandler is not injected with the appropriate Aggregator instance", @@ -85,8 +85,8 @@ public class AggregatorParserTests { Assert.assertEquals( "The AggregatingMessageHandler is not injected with the appropriate CompletionStrategy instance", completionStrategy, messageHandlerFieldAccessor.getPropertyValue("completionStrategy")); - Assert.assertEquals("The AggregatingMessageHandler is not injected with the appropriate default reply channel", - defaultReplyChannel, messageHandlerFieldAccessor.getPropertyValue("defaultReplyChannel")); + Assert.assertEquals("The AggregatingMessageHandler is not injected with the appropriate output channel", + outputChannel, messageHandlerFieldAccessor.getPropertyValue("outputChannel")); Assert.assertEquals("The AggregatingMessageHandler is not injected with the appropriate discard channel", discardChannel, messageHandlerFieldAccessor.getPropertyValue("discardChannel")); Assert.assertEquals("The AggregatingMessageHandler is not set with the appropriate timeout value", 86420000l, @@ -114,8 +114,8 @@ public class AggregatorParserTests { for (Message message : outboundMessages) { addingAggregator.handle(message); } - MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); - Message response = replyChannel.receive(); + MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); + Message response = outputChannel.receive(); Assert.assertEquals(6l, response.getPayload()); } @@ -144,11 +144,11 @@ public class AggregatorParserTests { aggregatorWithPojoCompletionStrategy.handle(createMessage(1l, "id1", 0 , 0, null)); aggregatorWithPojoCompletionStrategy.handle(createMessage(2l, "id1", 0 , 0, null)); aggregatorWithPojoCompletionStrategy.handle(createMessage(3l, "id1", 0 , 0, null)); - MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); - Message reply = replyChannel.receive(0); + MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); + Message reply = outputChannel.receive(0); Assert.assertNull(reply); aggregatorWithPojoCompletionStrategy.handle(createMessage(5l, "id1", 0 , 0, null)); - reply = replyChannel.receive(0); + reply = outputChannel.receive(0); Assert.assertNotNull(reply); Assert.assertEquals(11l, reply.getPayload()); } @@ -160,12 +160,12 @@ public class AggregatorParserTests { private static Message createMessage(T payload, Object correlationId, int sequenceSize, int sequenceNumber, - MessageChannel replyChannel) { + MessageChannel outputChannel) { return MessageBuilder.fromPayload(payload) .setCorrelationId(correlationId) .setSequenceSize(sequenceSize) .setSequenceNumber(sequenceNumber) - .setReturnAddress(replyChannel).build(); + .setReturnAddress(outputChannel).build(); } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java index b88f6a9c62..eb89835092 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java @@ -49,17 +49,17 @@ public class ResequencerParserTests { public void testResequencing() { ResequencingMessageHandler resequencingHandler = (ResequencingMessageHandler) context .getBean("defaultResequencer"); - MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); + MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); List> outboundMessages = new ArrayList>(); - outboundMessages.add(createMessage("123", "id1", 3, 3, replyChannel)); - outboundMessages.add(createMessage("789", "id1", 3, 1, replyChannel)); - outboundMessages.add(createMessage("456", "id1", 3, 2, replyChannel)); + outboundMessages.add(createMessage("123", "id1", 3, 3, outputChannel)); + outboundMessages.add(createMessage("789", "id1", 3, 1, outputChannel)); + outboundMessages.add(createMessage("456", "id1", 3, 2, outputChannel)); for (Message message : outboundMessages) { resequencingHandler.handle(message); } - Message message1 = replyChannel.receive(500); - Message message2 = replyChannel.receive(500); - Message message3 = replyChannel.receive(500); + Message message1 = outputChannel.receive(500); + Message message2 = outputChannel.receive(500); + Message message3 = outputChannel.receive(500); Assert.assertNotNull(message1); Assert.assertEquals(new Integer(1), message1.getHeaders().getSequenceNumber()); Assert.assertNotNull(message2); @@ -73,7 +73,7 @@ public class ResequencerParserTests { ResequencingMessageHandler resequencingHandler = (ResequencingMessageHandler) context .getBean("defaultResequencer"); DirectFieldAccessor messageHandlerFieldAccessor = new DirectFieldAccessor(resequencingHandler); - Assert.assertNull(messageHandlerFieldAccessor.getPropertyValue("defaultReplyChannel")); + Assert.assertNull(messageHandlerFieldAccessor.getPropertyValue("outputChannel")); Assert.assertNull(messageHandlerFieldAccessor.getPropertyValue("discardChannel")); Assert.assertEquals("The ResequencingMessageHandler is not set with the appropriate timeout value", 1000l, messageHandlerFieldAccessor.getPropertyValue("sendTimeout")); @@ -95,11 +95,11 @@ public class ResequencerParserTests { public void testPropertyAssignment() throws Exception { ResequencingMessageHandler completeResequencingMessageHandler = (ResequencingMessageHandler) context .getBean("completelyDefinedResequencer"); - MessageChannel defaultReplyChannel = (MessageChannel) context.getBean("replyChannel"); + MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel"); DirectFieldAccessor messageHandlerFieldAccessor = new DirectFieldAccessor(completeResequencingMessageHandler); - Assert.assertEquals("The ResequencingMessageHandler is not injected with the appropriate default reply channel", - defaultReplyChannel, messageHandlerFieldAccessor.getPropertyValue("defaultReplyChannel")); + Assert.assertEquals("The ResequencingMessageHandler is not injected with the appropriate output channel", + outputChannel, messageHandlerFieldAccessor.getPropertyValue("outputChannel")); Assert.assertEquals("The ResequencingMessageHandler is not injected with the appropriate discard channel", discardChannel, messageHandlerFieldAccessor.getPropertyValue("discardChannel")); Assert.assertEquals("The ResequencingMessageHandler is not set with the appropriate timeout value", 86420000l, @@ -119,12 +119,12 @@ public class ResequencerParserTests { } private static Message createMessage(T payload, Object correlationId, int sequenceSize, int sequenceNumber, - MessageChannel replyChannel) { + MessageChannel outputChannel) { return MessageBuilder.fromPayload(payload) .setCorrelationId(correlationId) .setSequenceSize(sequenceSize) .setSequenceNumber(sequenceNumber) - .setReturnAddress(replyChannel) + .setReturnAddress(outputChannel) .build(); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml index 12dcb99d5e..7f48e2764c 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml @@ -10,14 +10,14 @@ - + + output-channel="outputChannel"/> - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/resequencerParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/resequencerParserTests.xml index 234fd028ce..1ca55033e6 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/resequencerParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/resequencerParserTests.xml @@ -9,14 +9,14 @@ - +