diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMethodRegisteredOnlyOnceTest.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMethodRegisteredOnlyOnceTest.java new file mode 100644 index 000000000..457b077e5 --- /dev/null +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMethodRegisteredOnlyOnceTest.java @@ -0,0 +1,75 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.config; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.SpyBean; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import static org.mockito.Mockito.verify; + +/** + * See issue https://github.com/spring-cloud/spring-cloud-stream/issues/1080 + * + * StreamListener method called twice when using @SpyBean + * + * @author Soby Chacko + */ +@RunWith(SpringJUnit4ClassRunner.class) +@SpringBootTest +public class StreamListenerMethodRegisteredOnlyOnceTest { + + @Autowired + private SomeSink sink; + + @SpyBean + private SomeHandler handler; + + @Test + public void should_handleSomeMessage() { + sink.channel().send(new GenericMessage<>("Payload")); + verify(handler).handleMessage(); //should only be invoked once. + } + + public interface SomeSink { + + @Input(Sink.INPUT) + SubscribableChannel channel(); + + } + + @EnableBinding(SomeSink.class) + @EnableAutoConfiguration + public static class SomeHandler { + + @StreamListener(Sink.INPUT) + public void handleMessage() { + } + + } +} diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java index 3ad46fe15..99edea991 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java @@ -70,6 +70,7 @@ import org.springframework.util.StringUtils; * * @author Marius Bogoevici * @author Ilayaperumal Gopinathan + * @author Soby Chacko */ public class StreamListenerAnnotationBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware, BeanFactoryAware, SmartInitializingSingleton, @@ -142,44 +143,42 @@ public class StreamListenerAnnotationBeanPostProcessor @Override public final Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { Class targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass(); - ReflectionUtils.doWithMethods(targetClass, new ReflectionUtils.MethodCallback() { - @Override - public void doWith(final Method method) throws IllegalArgumentException, IllegalAccessException { - StreamListener streamListener = AnnotatedElementUtils.findMergedAnnotation(method, - StreamListener.class); - if (streamListener != null && !method.isBridge()) { - streamListener = postProcessAnnotation(streamListener, method); - Assert.isTrue(method.getAnnotation(Input.class) == null, - StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER); - String methodAnnotatedInboundName = streamListener.value(); - String methodAnnotatedOutboundName = StreamListenerMethodUtils.getOutboundBindingTargetName(method); - int inputAnnotationCount = StreamListenerMethodUtils.inputAnnotationCount(method); - int outputAnnotationCount = StreamListenerMethodUtils.outputAnnotationCount(method); - boolean isDeclarative = checkDeclarativeMethod(method, methodAnnotatedInboundName, - methodAnnotatedOutboundName); - StreamListenerMethodUtils.validateStreamListenerMethod(method, inputAnnotationCount, - outputAnnotationCount, methodAnnotatedInboundName, methodAnnotatedOutboundName, - isDeclarative, streamListener.condition()); - if (!method.getReturnType().equals(Void.TYPE)) { - if (!StringUtils.hasText(methodAnnotatedOutboundName)) { - if (outputAnnotationCount == 0) { - throw new IllegalArgumentException( - StreamListenerErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED); - } - Assert.isTrue((outputAnnotationCount == 1), - StreamListenerErrorMessages.RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED); + Method[] uniqueDeclaredMethods = ReflectionUtils.getUniqueDeclaredMethods(targetClass); + for (Method method : uniqueDeclaredMethods) { + StreamListener streamListener = AnnotatedElementUtils.findMergedAnnotation(method, + StreamListener.class); + if (streamListener != null && !method.isBridge()) { + streamListener = postProcessAnnotation(streamListener, method); + Assert.isTrue(method.getAnnotation(Input.class) == null, + StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER); + String methodAnnotatedInboundName = streamListener.value(); + String methodAnnotatedOutboundName = StreamListenerMethodUtils.getOutboundBindingTargetName(method); + int inputAnnotationCount = StreamListenerMethodUtils.inputAnnotationCount(method); + int outputAnnotationCount = StreamListenerMethodUtils.outputAnnotationCount(method); + boolean isDeclarative = checkDeclarativeMethod(method, methodAnnotatedInboundName, + methodAnnotatedOutboundName); + StreamListenerMethodUtils.validateStreamListenerMethod(method, inputAnnotationCount, + outputAnnotationCount, methodAnnotatedInboundName, methodAnnotatedOutboundName, + isDeclarative, streamListener.condition()); + if (!method.getReturnType().equals(Void.TYPE)) { + if (!StringUtils.hasText(methodAnnotatedOutboundName)) { + if (outputAnnotationCount == 0) { + throw new IllegalArgumentException( + StreamListenerErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED); } - } - if (isDeclarative) { - invokeSetupMethodOnListenedChannel(method, bean, methodAnnotatedInboundName, - methodAnnotatedOutboundName); - } - else { - registerHandlerMethodOnListenedChannel(method, streamListener, bean); + Assert.isTrue((outputAnnotationCount == 1), + StreamListenerErrorMessages.RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED); } } + if (isDeclarative) { + invokeSetupMethodOnListenedChannel(method, bean, methodAnnotatedInboundName, + methodAnnotatedOutboundName); + } + else { + registerHandlerMethodOnListenedChannel(method, streamListener, bean); + } } - }); + } return bean; }