diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java b/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java index 11b19670a3..52095370b5 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2007 the original author or authors. + * Copyright 2002-2008 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,8 +35,6 @@ import org.springframework.integration.router.AggregatingMessageHandler; @Handler public @interface Aggregator { - String defaultReplyChannel() default ""; - String discardChannel() default ""; long sendTimeout() default AggregatingMessageHandler.DEFAULT_SEND_TIMEOUT; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/config/AggregatorMessageHandlerCreator.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/config/AggregatorMessageHandlerCreator.java index 1439fa3c04..a236d89d4f 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/router/config/AggregatorMessageHandlerCreator.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/config/AggregatorMessageHandlerCreator.java @@ -20,8 +20,10 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.Map; +import org.springframework.aop.support.AopUtils; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.integration.annotation.CompletionStrategy; +import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.handler.config.AbstractMessageHandlerCreator; @@ -29,6 +31,7 @@ import org.springframework.integration.router.AggregatingMessageHandler; import org.springframework.integration.router.AggregatorAdapter; import org.springframework.integration.router.CompletionStrategyAdapter; import org.springframework.util.ReflectionUtils; +import org.springframework.util.StringUtils; /** * Creates an {@link AggregatorAdapter AggregatorAdapter} for methods that aggregate messages. @@ -37,8 +40,6 @@ import org.springframework.util.ReflectionUtils; */ public class AggregatorMessageHandlerCreator extends AbstractMessageHandlerCreator { - private static final String DEFAULT_REPLY_CHANNEL = "defaultReplyChannel"; - private static final String DISCARD_CHANNEL = "discardChannel"; private static final String SEND_TIMEOUT = "sendTimeout"; @@ -62,10 +63,7 @@ public class AggregatorMessageHandlerCreator extends AbstractMessageHandlerCreat public MessageHandler doCreateHandler(Object object, Method method, Map attributes) { AggregatingMessageHandler messageHandler = new AggregatingMessageHandler(new AggregatorAdapter(object, method)); - if (attributes.containsKey(DEFAULT_REPLY_CHANNEL)) { - messageHandler.setDefaultReplyChannel(this.channelRegistry.lookupChannel( - (String) attributes.get(DEFAULT_REPLY_CHANNEL))); - } + this.configureDefaultReplyChannel(messageHandler, object); if (attributes.containsKey(DISCARD_CHANNEL)) { messageHandler.setDiscardChannel(this.channelRegistry.lookupChannel( (String) attributes.get(DISCARD_CHANNEL))); @@ -91,6 +89,17 @@ public class AggregatorMessageHandlerCreator extends AbstractMessageHandlerCreat return messageHandler; } + private void configureDefaultReplyChannel(AggregatingMessageHandler handler, Object originalObject) { + MessageEndpoint endpointAnnotation = AnnotationUtils.findAnnotation( + AopUtils.getTargetClass(originalObject), MessageEndpoint.class); + if (endpointAnnotation != null) { + String outputChannelName = endpointAnnotation.output(); + if (StringUtils.hasText(outputChannelName)) { + handler.setDefaultReplyChannel(this.channelRegistry.lookupChannel(outputChannelName)); + } + } + } + private void configureCompletionStrategy(final Object object, final AggregatingMessageHandler handler) { ReflectionUtils.doWithMethods(object.getClass(), new ReflectionUtils.MethodCallback() { public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException { diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/TestAnnotatedEndpointWithCustomizedAggregator.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/TestAnnotatedEndpointWithCustomizedAggregator.java index 2a5881869d..8c4dff8aa1 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/TestAnnotatedEndpointWithCustomizedAggregator.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/TestAnnotatedEndpointWithCustomizedAggregator.java @@ -32,13 +32,13 @@ import org.springframework.stereotype.Component; /** * @author Marius Bogoevici */ -@MessageEndpoint(input = "inputChannel") +@MessageEndpoint(input = "inputChannel", output= "replyChannel") @Component("endpointWithCustomizedAnnotation") public class TestAnnotatedEndpointWithCustomizedAggregator { private final ConcurrentMap> aggregatedMessages = new ConcurrentHashMap>(); - @Aggregator(defaultReplyChannel = "replyChannel", discardChannel = "discardChannel", + @Aggregator(discardChannel = "discardChannel", reaperInterval = 1234, sendPartialResultsOnTimeout = true, sendTimeout = 98765432, timeout = 4567890, trackedCorrelationIdCapacity = 42) public Message aggregatingMethod(List> messages) {