diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierHandler.java index c6aebff690..7e2e7ad2d2 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierHandler.java @@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.endpoint.AbstractInOutEndpoint; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.message.BlockingTarget; import org.springframework.integration.message.Message; @@ -38,6 +39,7 @@ import org.springframework.integration.message.MessageHandlingException; import org.springframework.integration.message.MessageTarget; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; +import org.springframework.util.ObjectUtils; /** * Base class for {@link MessageBarrier}-based MessageHandlers. @@ -60,7 +62,7 @@ import org.springframework.util.CollectionUtils; * @author Mark Fisher * @author Marius Bogoevici */ -public abstract class AbstractMessageBarrierHandler implements MessageHandler, InitializingBean { +public abstract class AbstractMessageBarrierHandler extends AbstractInOutEndpoint implements InitializingBean { public final static long DEFAULT_SEND_TIMEOUT = 1000; @@ -72,8 +74,6 @@ public abstract class AbstractMessageBarrierHandler implements MessageHandler, I protected final Log logger = LogFactory.getLog(this.getClass()); - protected volatile MessageChannel outputChannel; - private volatile MessageChannel discardChannel; protected volatile long sendTimeout = DEFAULT_SEND_TIMEOUT; @@ -101,15 +101,6 @@ public abstract class AbstractMessageBarrierHandler implements MessageHandler, I } - /** - * Set the output channel for sending aggregated Messages. Note that - * precedence will be given to the 'returnAddress' of the aggregated - * message itself, then to the 'returnAddress' of the original message. - */ - public void setOutputChannel(MessageChannel outputChannel) { - this.outputChannel = outputChannel; - } - /** * Specify a channel for sending Messages that arrive after their aggregation * group has either completed or timed-out. @@ -149,16 +140,6 @@ public abstract class AbstractMessageBarrierHandler implements MessageHandler, I this.trackedCorrelationIdCapacity = trackedCorrelationIdCapacity; } - /** - * Initialize this handler. - */ - public void afterPropertiesSet() { - this.trackedCorrelationIds = new ArrayBlockingQueue(this.trackedCorrelationIdCapacity); - this.executor.scheduleWithFixedDelay(new ReaperTask(), - this.reaperInterval, this.reaperInterval, TimeUnit.MILLISECONDS); - this.initialized = true; - } - /** * Maximum time to wait (in milliseconds) for the completion strategy to * become true. The default is 60000 (1 minute). @@ -168,7 +149,18 @@ public abstract class AbstractMessageBarrierHandler implements MessageHandler, I this.timeout = timeout; } - public Message handle(Message message) { + /** + * Initialize this endpoint. + */ + public void afterPropertiesSet() { + this.trackedCorrelationIds = new ArrayBlockingQueue(this.trackedCorrelationIdCapacity); + this.executor.scheduleWithFixedDelay(new ReaperTask(), + this.reaperInterval, this.reaperInterval, TimeUnit.MILLISECONDS); + this.initialized = true; + } + + @Override + protected final Message handle(Message message) { if (!this.initialized) { this.afterPropertiesSet(); } @@ -196,6 +188,10 @@ public abstract class AbstractMessageBarrierHandler implements MessageHandler, I if (isBarrierRemovable(correlationId, releasedMessages)) { this.removeBarrier(correlationId); } + Message[] processedMessages = this.processReleasedMessages(correlationId, releasedMessages); + if (ObjectUtils.isEmpty(processedMessages)) { + return null; + } afterRelease(correlationId, releasedMessages); return null; } @@ -203,7 +199,7 @@ public abstract class AbstractMessageBarrierHandler implements MessageHandler, I private void afterRelease(Object correlationId, List> releasedMessages) { Message[] processedMessages = this.processReleasedMessages(correlationId, releasedMessages); for (Message result : processedMessages) { - MessageTarget replyTarget = this.outputChannel; + MessageTarget replyTarget = this.getTarget(); if (replyTarget == null) { replyTarget = this.resolveReplyTargetFromMessage(result); if (replyTarget == null) { @@ -283,15 +279,14 @@ public abstract class AbstractMessageBarrierHandler implements MessageHandler, I } /** - * Factory method for creating a suitable MessageBarrier - * implementation. + * Factory method for creating a suitable MessageBarrier implementation. */ protected abstract MessageBarrier createMessageBarrier(); /** * Implements the logic for deciding whether, based on what the * MessageBarrier has released so far, work for the correlationId - * can be considered done and the barrier can be released. + * can be considered complete and the barrier can be released. */ protected abstract boolean isBarrierRemovable(Object correlationId, List> releasedMessages); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java index 9477c8a43e..5fc0250f9d 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java @@ -20,7 +20,7 @@ import java.util.List; import java.util.concurrent.ScheduledExecutorService; import org.springframework.integration.message.Message; - +import org.springframework.integration.message.MessageHeaders; /** * An {@link AbstractMessageBarrierHandler} that waits for a group of @@ -36,10 +36,11 @@ import org.springframework.integration.message.Message; * * @author Marius Bogoevici */ -public class ResequencingMessageHandler extends AbstractMessageBarrierHandler{ +public class ResequencingMessageHandler extends AbstractMessageBarrierHandler { private volatile boolean releasePartialSequences = true; + public ResequencingMessageHandler() { this(null); } @@ -47,14 +48,14 @@ public class ResequencingMessageHandler extends AbstractMessageBarrierHandler{ public ResequencingMessageHandler(ScheduledExecutorService executor) { super(executor); } - - + + public void setReleasePartialSequences(boolean releasePartialSequences) { this.releasePartialSequences = releasePartialSequences; } protected MessageBarrier createMessageBarrier() { - return new ResequencingMessageBarrier(releasePartialSequences); + return new ResequencingMessageBarrier(this.releasePartialSequences); } protected Message[] processReleasedMessages(Object correlationId, List> messages) { @@ -62,8 +63,8 @@ public class ResequencingMessageHandler extends AbstractMessageBarrierHandler{ } protected boolean isBarrierRemovable(Object correlationId, List> releasedMessages) { - return (releasedMessages.get(releasedMessages.size() - 1).getHeaders().getSequenceNumber() == - releasedMessages.get(releasedMessages.size() - 1).getHeaders().getSequenceSize()); + MessageHeaders lastMessageHeaders = releasedMessages.get(releasedMessages.size() - 1).getHeaders(); + return (lastMessageHeaders.getSequenceNumber() == lastMessageHeaders.getSequenceSize()); } - + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java index 0b1d7db31a..e5db8827e3 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java @@ -24,7 +24,7 @@ import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.aggregator.AggregatingMessageHandler; import org.springframework.integration.aggregator.AggregatorAdapter; import org.springframework.integration.aggregator.CompletionStrategyAdapter; -import org.springframework.integration.handler.MessageHandler; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.util.StringUtils; /** @@ -34,7 +34,7 @@ import org.springframework.util.StringUtils; * @author Marius Bogoevici * @author Mark Fisher */ -public class AggregatorParser extends AbstractMessageEndpointParser { +public class AggregatorParser extends AbstractEndpointParser { public static final String COMPLETION_STRATEGY_REF_ATTRIBUTE = "completion-strategy"; @@ -58,53 +58,43 @@ public class AggregatorParser extends AbstractMessageEndpointParser { @Override - protected Class getHandlerAdapterClass() { + protected Class getEndpointClass() { return AggregatingMessageHandler.class; } @Override - protected boolean shouldCreateAdapter(Element element) { - return true; + protected Class getMethodInvokingAdapterClass() { + return AggregatorAdapter.class; } @Override - protected String parseAdapter(String ref, String method, Element element, ParserContext parserContext) { - BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(this.getHandlerAdapterClass()); - if (StringUtils.hasText(method)) { - String aggregatorAdapterBeanName = this.createAdapter(ref, method, parserContext, AggregatorAdapter.class); - builder.addConstructorArgReference(aggregatorAdapterBeanName); - } - else { - builder.addConstructorArgReference(ref); - } + protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { + super.doParse(element, parserContext, builder); final String completionStrategyRef = element.getAttribute(COMPLETION_STRATEGY_REF_ATTRIBUTE); final String completionStrategyMethod = element.getAttribute(COMPLETION_STRATEGY_METHOD_ATTRIBUTE); if (StringUtils.hasText(completionStrategyRef)) { if (StringUtils.hasText(completionStrategyMethod)) { - String adapterBeanName = this.createAdapter(completionStrategyRef, - completionStrategyMethod, parserContext, CompletionStrategyAdapter.class); + String adapterBeanName = this.createCompletionStrategyAdapter( + completionStrategyRef, completionStrategyMethod, parserContext); builder.addPropertyReference(COMPLETION_STRATEGY_PROPERTY, adapterBeanName); } else { builder.addPropertyReference(COMPLETION_STRATEGY_PROPERTY, completionStrategyRef); } } - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, OUTPUT_CHANNEL_ATTRIBUTE); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, REAPER_INTERVAL_ATTRIBUTE); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TIMEOUT_ATTRIBUTE); + } + + private String createCompletionStrategyAdapter(String ref, String method, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(CompletionStrategyAdapter.class); + builder.addConstructorArgReference(ref); + builder.addConstructorArgValue(method); return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry()); } - private String createAdapter(String ref, String method, ParserContext parserContext, Class adapterClass) { - BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(adapterClass); - builder.addConstructorArgReference(ref); - builder.addConstructorArgValue(method); - return BeanDefinitionReaderUtils.registerWithGeneratedName( - builder.getBeanDefinition(), parserContext.getRegistry()); - } - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java index 11bcede659..b5351822d1 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java @@ -29,6 +29,8 @@ import org.springframework.integration.aggregator.ResequencingMessageHandler; */ public class ResequencerParser extends AbstractSimpleBeanDefinitionParser { + public static final String INPUT_CHANNEL_ATTRIBUTE = "input-channel"; + public static final String OUTPUT_CHANNEL_ATTRIBUTE = "output-channel"; public static final String DISCARD_CHANNEL_ATTRIBUTE = "discard-channel"; @@ -41,14 +43,16 @@ public class ResequencerParser extends AbstractSimpleBeanDefinitionParser { @Override protected boolean isEligibleAttribute(String attributeName) { - return !OUTPUT_CHANNEL_ATTRIBUTE.equals(attributeName) + return !INPUT_CHANNEL_ATTRIBUTE.equals(attributeName) + &&!OUTPUT_CHANNEL_ATTRIBUTE.equals(attributeName) && !DISCARD_CHANNEL_ATTRIBUTE.equals(attributeName) && super.isEligibleAttribute(attributeName); } @Override protected void postProcess(BeanDefinitionBuilder builder, Element element) { - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, OUTPUT_CHANNEL_ATTRIBUTE); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, INPUT_CHANNEL_ATTRIBUTE, "source"); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, OUTPUT_CHANNEL_ATTRIBUTE, "target"); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java index d0a7648298..5c8c52a53e 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java @@ -61,7 +61,7 @@ public abstract class AbstractMethodAnnotationPostProcessor((MessageHandler) adapter); + protected AbstractEndpoint createEndpoint(Object originalBean, Object adapter) { + if (adapter instanceof org.springframework.integration.aggregator.Aggregator) { + AggregatingMessageHandler endpoint = new AggregatingMessageHandler((org.springframework.integration.aggregator.Aggregator) adapter); + this.configureCompletionStrategy(originalBean, endpoint); + return endpoint; } return null; } + @Override + protected void configureEndpoint(AbstractEndpoint endpoint, Aggregator annotation, Poller pollerAnnotation) { + super.configureEndpoint(endpoint, annotation, pollerAnnotation); + AggregatingMessageHandler aggregatorEndpoint = (AggregatingMessageHandler) endpoint; + String discardChannelName = annotation.discardChannel(); + if (StringUtils.hasText(discardChannelName)) { + MessageChannel discardChannel = this.getChannelRegistry().lookupChannel(discardChannelName); + if (discardChannel == null) { + throw new ConfigurationException("unable to resolve discardChannel '" + discardChannelName + "'"); + } + aggregatorEndpoint.setDiscardChannel(discardChannel); + } + aggregatorEndpoint.setSendTimeout(annotation.sendTimeout()); + aggregatorEndpoint.setSendPartialResultOnTimeout(annotation.sendPartialResultsOnTimeout()); + aggregatorEndpoint.setReaperInterval(annotation.reaperInterval()); + aggregatorEndpoint.setTimeout(annotation.timeout()); + aggregatorEndpoint.setTrackedCorrelationIdCapacity(annotation.trackedCorrelationIdCapacity()); + aggregatorEndpoint.afterPropertiesSet(); + } + private void configureCompletionStrategy(final Object object, final AggregatingMessageHandler handler) { ReflectionUtils.doWithMethods(object.getClass(), new ReflectionUtils.MethodCallback() { public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessor.java index 5a6ec8508b..b17a6885a8 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessor.java @@ -46,7 +46,7 @@ public class RouterAnnotationPostProcessor extends AbstractMethodAnnotationPostP } @Override - protected AbstractEndpoint createEndpoint(Object adapter) { + protected AbstractEndpoint createEndpoint(Object originalBean, Object adapter) { if (adapter instanceof MethodInvokingChannelResolver) { return new RouterEndpoint((MethodInvokingChannelResolver) adapter); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/ServiceActivatorAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/ServiceActivatorAnnotationPostProcessor.java index a92d61b585..7d480127a3 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/ServiceActivatorAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/ServiceActivatorAnnotationPostProcessor.java @@ -43,7 +43,7 @@ public class ServiceActivatorAnnotationPostProcessor extends AbstractMethodAnnot } @Override - protected AbstractEndpoint createEndpoint(Object adapter) { + protected AbstractEndpoint createEndpoint(Object originalBean, Object adapter) { if (adapter instanceof ServiceInvoker) { return new ServiceActivatorEndpoint((ServiceInvoker) adapter); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessor.java index 3d34e453d2..b8e384e5d5 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessor.java @@ -42,7 +42,7 @@ public class SplitterAnnotationPostProcessor extends AbstractMethodAnnotationPos } @Override - protected AbstractEndpoint createEndpoint(Object adapter) { + protected AbstractEndpoint createEndpoint(Object originalBean, Object adapter) { if (adapter instanceof MethodInvokingSplitter) { return new SplitterEndpoint((MethodInvokingSplitter) adapter); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/TransformerAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/TransformerAnnotationPostProcessor.java index 1e8a92b63c..712fd0e28d 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/TransformerAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/TransformerAnnotationPostProcessor.java @@ -42,7 +42,7 @@ public class TransformerAnnotationPostProcessor extends AbstractMethodAnnotation } @Override - protected AbstractEndpoint createEndpoint(Object adapter) { + protected AbstractEndpoint createEndpoint(Object originalBean, Object adapter) { if (adapter instanceof MethodInvokingTransformer) { return new TransformerEndpoint((MethodInvokingTransformer) adapter); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd index 6cc0bb16aa..f4061f007f 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd @@ -353,7 +353,7 @@ - Defines an aggregating message handler. + Defines an aggregating message endpoint. @@ -375,12 +375,11 @@ - Defines a resequencing message handler. + Defines a resequencing message endpoint. - - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java index e657ff4d3f..4d5eab8ede 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java @@ -33,7 +33,6 @@ import org.springframework.integration.aggregator.CompletionStrategy; import org.springframework.integration.aggregator.CompletionStrategyAdapter; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; -import org.springframework.integration.endpoint.DefaultEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.util.MethodInvoker; @@ -54,16 +53,15 @@ public class AggregatorParserTests { @Test public void testAggregation() { - DefaultEndpoint endpoint = (DefaultEndpoint) context.getBean("aggregatorWithReference"); AggregatingMessageHandler aggregatingHandler = - (AggregatingMessageHandler) new DirectFieldAccessor(endpoint).getPropertyValue("handler"); + (AggregatingMessageHandler) context.getBean("aggregatorWithReference"); TestAggregator aggregatorBean = (TestAggregator) context.getBean("aggregatorBean"); List> outboundMessages = new ArrayList>(); outboundMessages.add(createMessage("123", "id1", 3, 1, null)); outboundMessages.add(createMessage("789", "id1", 3, 3, null)); outboundMessages.add(createMessage("456", "id1", 3, 2, null)); for (Message message : outboundMessages) { - aggregatingHandler.handle(message); + aggregatingHandler.send(message); } Assert.assertEquals("One and only one message must have been aggregated", 1, aggregatorBean .getAggregatedMessages().size()); @@ -74,9 +72,8 @@ public class AggregatorParserTests { @Test public void testPropertyAssignment() throws Exception { - DefaultEndpoint endpoint = (DefaultEndpoint) context.getBean("completelyDefinedAggregator"); AggregatingMessageHandler completeAggregatingMessageHandler = - (AggregatingMessageHandler) new DirectFieldAccessor(endpoint).getPropertyValue("handler"); + (AggregatingMessageHandler) context.getBean("completelyDefinedAggregator"); TestAggregator testAggregator = (TestAggregator) context.getBean("aggregatorBean"); CompletionStrategy completionStrategy = (CompletionStrategy) context.getBean("completionStrategy"); MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); @@ -88,7 +85,7 @@ public class AggregatorParserTests { "The AggregatingMessageHandler is not injected with the appropriate CompletionStrategy instance", completionStrategy, messageHandlerFieldAccessor.getPropertyValue("completionStrategy")); Assert.assertEquals("The AggregatingMessageHandler is not injected with the appropriate output channel", - outputChannel, messageHandlerFieldAccessor.getPropertyValue("outputChannel")); + outputChannel, messageHandlerFieldAccessor.getPropertyValue("target")); Assert.assertEquals("The AggregatingMessageHandler is not injected with the appropriate discard channel", discardChannel, messageHandlerFieldAccessor.getPropertyValue("discardChannel")); Assert.assertEquals("The AggregatingMessageHandler is not set with the appropriate timeout value", 86420000l, @@ -108,14 +105,13 @@ public class AggregatorParserTests { @Test public void testSimpleJavaBeanAggregator() { List> outboundMessages = new ArrayList>(); - DefaultEndpoint endpoint = (DefaultEndpoint) context.getBean("aggregatorWithReferenceAndMethod"); AggregatingMessageHandler addingAggregator = - (AggregatingMessageHandler) new DirectFieldAccessor(endpoint).getPropertyValue("handler"); + (AggregatingMessageHandler) context.getBean("aggregatorWithReferenceAndMethod"); outboundMessages.add(createMessage(1l, "id1", 3, 1, null)); outboundMessages.add(createMessage(2l, "id1", 3, 3, null)); outboundMessages.add(createMessage(3l, "id1", 3, 2, null)); for (Message message : outboundMessages) { - addingAggregator.handle(message); + addingAggregator.send(message); } PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); Message response = outputChannel.receive(); @@ -135,9 +131,8 @@ public class AggregatorParserTests { @Test public void testAggregatorWithPojoCompletionStrategy(){ - DefaultEndpoint endpoint = (DefaultEndpoint) context.getBean("aggregatorWithPojoCompletionStrategy"); AggregatingMessageHandler aggregatorWithPojoCompletionStrategy = - (AggregatingMessageHandler) new DirectFieldAccessor(endpoint).getPropertyValue("handler"); + (AggregatingMessageHandler) context.getBean("aggregatorWithPojoCompletionStrategy"); CompletionStrategy completionStrategy = (CompletionStrategy) new DirectFieldAccessor(aggregatorWithPojoCompletionStrategy).getPropertyValue("completionStrategy"); Assert.assertTrue(completionStrategy instanceof CompletionStrategyAdapter); @@ -145,13 +140,13 @@ public class AggregatorParserTests { MethodInvoker invoker = (MethodInvoker) completionStrategyAccessor.getPropertyValue("invoker"); Assert.assertTrue(new DirectFieldAccessor(invoker).getPropertyValue("object") instanceof MaxValueCompletionStrategy); Assert.assertTrue(((Method)completionStrategyAccessor.getPropertyValue("method")).getName().equals("checkCompleteness")); - aggregatorWithPojoCompletionStrategy.handle(createMessage(1l, "id1", 0 , 0, null)); - aggregatorWithPojoCompletionStrategy.handle(createMessage(2l, "id1", 0 , 0, null)); - aggregatorWithPojoCompletionStrategy.handle(createMessage(3l, "id1", 0 , 0, null)); + aggregatorWithPojoCompletionStrategy.send(createMessage(1l, "id1", 0 , 0, null)); + aggregatorWithPojoCompletionStrategy.send(createMessage(2l, "id1", 0 , 0, null)); + aggregatorWithPojoCompletionStrategy.send(createMessage(3l, "id1", 0 , 0, null)); PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); Message reply = outputChannel.receive(0); Assert.assertNull(reply); - aggregatorWithPojoCompletionStrategy.handle(createMessage(5l, "id1", 0 , 0, null)); + aggregatorWithPojoCompletionStrategy.send(createMessage(5l, "id1", 0 , 0, null)); reply = outputChannel.receive(0); Assert.assertNotNull(reply); Assert.assertEquals(11l, reply.getPayload()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java index d63692dc4b..904433bfc0 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java @@ -48,15 +48,14 @@ public class ResequencerParserTests { @Test public void testResequencing() { - ResequencingMessageHandler resequencingHandler = (ResequencingMessageHandler) context - .getBean("defaultResequencer"); + MessageChannel inputChannel = (MessageChannel) context.getBean("inputChannel"); PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); List> outboundMessages = new ArrayList>(); outboundMessages.add(createMessage("123", "id1", 3, 3, outputChannel)); outboundMessages.add(createMessage("789", "id1", 3, 1, outputChannel)); outboundMessages.add(createMessage("456", "id1", 3, 2, outputChannel)); for (Message message : outboundMessages) { - resequencingHandler.handle(message); + inputChannel.send(message); } Message message1 = outputChannel.receive(500); Message message2 = outputChannel.receive(500); @@ -74,7 +73,7 @@ public class ResequencerParserTests { ResequencingMessageHandler resequencingHandler = (ResequencingMessageHandler) context .getBean("defaultResequencer"); DirectFieldAccessor messageHandlerFieldAccessor = new DirectFieldAccessor(resequencingHandler); - Assert.assertNull(messageHandlerFieldAccessor.getPropertyValue("outputChannel")); + Assert.assertNull(messageHandlerFieldAccessor.getPropertyValue("target")); Assert.assertNull(messageHandlerFieldAccessor.getPropertyValue("discardChannel")); Assert.assertEquals("The ResequencingMessageHandler is not set with the appropriate timeout value", 1000l, messageHandlerFieldAccessor.getPropertyValue("sendTimeout")); @@ -100,7 +99,7 @@ public class ResequencerParserTests { MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel"); DirectFieldAccessor messageHandlerFieldAccessor = new DirectFieldAccessor(completeResequencingMessageHandler); Assert.assertEquals("The ResequencingMessageHandler is not injected with the appropriate output channel", - outputChannel, messageHandlerFieldAccessor.getPropertyValue("outputChannel")); + outputChannel, messageHandlerFieldAccessor.getPropertyValue("target")); Assert.assertEquals("The ResequencingMessageHandler is not injected with the appropriate discard channel", discardChannel, messageHandlerFieldAccessor.getPropertyValue("discardChannel")); Assert.assertEquals("The ResequencingMessageHandler is not set with the appropriate timeout value", 86420000l, diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java index ee5c3a691e..6d42dd2f8e 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java @@ -21,9 +21,6 @@ import java.lang.reflect.Method; import org.junit.Assert; import org.junit.Test; -import org.springframework.aop.framework.Advised; -import org.springframework.aop.support.AopUtils; -import org.springframework.aop.support.DelegatingIntroductionInterceptor; import org.springframework.beans.DirectFieldAccessor; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; @@ -32,8 +29,6 @@ import org.springframework.integration.aggregator.CompletionStrategyAdapter; import org.springframework.integration.aggregator.SequenceSizeCompletionStrategy; import org.springframework.integration.bus.MessageBus; import org.springframework.integration.config.MessageBusParser; -import org.springframework.integration.endpoint.DefaultEndpoint; -import org.springframework.integration.handler.MessageHandler; /** * @author Marius Bogoevici @@ -49,7 +44,7 @@ public class AggregatorAnnotationTests { DirectFieldAccessor aggregatingMessageHandlerAccessor = getDirectFieldAccessorForAggregatingHandler(context, endpointName); Assert.assertTrue(aggregatingMessageHandlerAccessor.getPropertyValue("completionStrategy") instanceof SequenceSizeCompletionStrategy); - Assert.assertNull(aggregatingMessageHandlerAccessor.getPropertyValue("outputChannel")); + Assert.assertNull(aggregatingMessageHandlerAccessor.getPropertyValue("target")); Assert.assertNull(aggregatingMessageHandlerAccessor.getPropertyValue("discardChannel")); Assert.assertEquals(AggregatingMessageHandler.DEFAULT_SEND_TIMEOUT, aggregatingMessageHandlerAccessor .getPropertyValue("sendTimeout")); @@ -71,7 +66,7 @@ public class AggregatorAnnotationTests { endpointName); Assert.assertTrue(aggregatingMessageHandlerAccessor.getPropertyValue("completionStrategy") instanceof SequenceSizeCompletionStrategy); Assert.assertEquals(getMessageBus(context).lookupChannel("outputChannel"), aggregatingMessageHandlerAccessor - .getPropertyValue("outputChannel")); + .getPropertyValue("target")); Assert.assertEquals(getMessageBus(context).lookupChannel("discardChannel"), aggregatingMessageHandlerAccessor .getPropertyValue("discardChannel")); Assert.assertEquals(98765432l, aggregatingMessageHandlerAccessor @@ -106,20 +101,8 @@ public class AggregatorAnnotationTests { @SuppressWarnings("unchecked") private DirectFieldAccessor getDirectFieldAccessorForAggregatingHandler(ApplicationContext context, final String endpointName) { MessageBus messageBus = this.getMessageBus(context); - DefaultEndpoint endpoint = (DefaultEndpoint) messageBus.lookupEndpoint(endpointName + ".aggregator"); - MessageHandler handler = (MessageHandler) new DirectFieldAccessor(endpoint).getPropertyValue("handler"); - try { - if (AopUtils.isAopProxy(handler)) { - DelegatingIntroductionInterceptor interceptor = (DelegatingIntroductionInterceptor) - ((Advised) handler).getAdvisors()[0].getAdvice(); - Object delegate = new DirectFieldAccessor(interceptor).getPropertyValue("delegate"); - return new DirectFieldAccessor(delegate); - } - } - catch (Exception e) { - // will return the accessor for the handler - } - return new DirectFieldAccessor(handler); + AggregatingMessageHandler endpoint = (AggregatingMessageHandler) messageBus.lookupEndpoint(endpointName + ".aggregator"); + return new DirectFieldAccessor(endpoint); } private MessageBus getMessageBus(ApplicationContext context) { diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/resequencerParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/resequencerParserTests.xml index 748f10e525..a6594b3b7a 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/resequencerParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/resequencerParserTests.xml @@ -9,6 +9,8 @@ + + @@ -17,9 +19,12 @@ - + + + - \ No newline at end of file +