GH-1080: Fix StreamListener methods for proxies

Fix #1080

When class with `@StreamListener` method is proxied we end up
with the double target subscribers registration because we meet
the same method multiple times during `ReflectionUtils.doWithMethods()`

* Use `ReflectionUtils.getUniqueDeclaredMethods()` instead to extract
the list of method candidates

**Cherry-pick to 1.2.x**
This commit is contained in:
Soby Chacko
2017-09-27 14:04:36 -04:00
committed by Artem Bilan
parent 3701615a67
commit 9fc51cb6e2
2 changed files with 108 additions and 34 deletions

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.config;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import static org.mockito.Mockito.verify;
/**
* See issue https://github.com/spring-cloud/spring-cloud-stream/issues/1080
*
* StreamListener method called twice when using @SpyBean
*
* @author Soby Chacko
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class StreamListenerMethodRegisteredOnlyOnceTest {
@Autowired
private SomeSink sink;
@SpyBean
private SomeHandler handler;
@Test
public void should_handleSomeMessage() {
sink.channel().send(new GenericMessage<>("Payload"));
verify(handler).handleMessage(); //should only be invoked once.
}
public interface SomeSink {
@Input(Sink.INPUT)
SubscribableChannel channel();
}
@EnableBinding(SomeSink.class)
@EnableAutoConfiguration
public static class SomeHandler {
@StreamListener(Sink.INPUT)
public void handleMessage() {
}
}
}

View File

@@ -70,6 +70,7 @@ import org.springframework.util.StringUtils;
*
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
* @author Soby Chacko
*/
public class StreamListenerAnnotationBeanPostProcessor
implements BeanPostProcessor, ApplicationContextAware, BeanFactoryAware, SmartInitializingSingleton,
@@ -142,44 +143,42 @@ public class StreamListenerAnnotationBeanPostProcessor
@Override
public final Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass();
ReflectionUtils.doWithMethods(targetClass, new ReflectionUtils.MethodCallback() {
@Override
public void doWith(final Method method) throws IllegalArgumentException, IllegalAccessException {
StreamListener streamListener = AnnotatedElementUtils.findMergedAnnotation(method,
StreamListener.class);
if (streamListener != null && !method.isBridge()) {
streamListener = postProcessAnnotation(streamListener, method);
Assert.isTrue(method.getAnnotation(Input.class) == null,
StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER);
String methodAnnotatedInboundName = streamListener.value();
String methodAnnotatedOutboundName = StreamListenerMethodUtils.getOutboundBindingTargetName(method);
int inputAnnotationCount = StreamListenerMethodUtils.inputAnnotationCount(method);
int outputAnnotationCount = StreamListenerMethodUtils.outputAnnotationCount(method);
boolean isDeclarative = checkDeclarativeMethod(method, methodAnnotatedInboundName,
methodAnnotatedOutboundName);
StreamListenerMethodUtils.validateStreamListenerMethod(method, inputAnnotationCount,
outputAnnotationCount, methodAnnotatedInboundName, methodAnnotatedOutboundName,
isDeclarative, streamListener.condition());
if (!method.getReturnType().equals(Void.TYPE)) {
if (!StringUtils.hasText(methodAnnotatedOutboundName)) {
if (outputAnnotationCount == 0) {
throw new IllegalArgumentException(
StreamListenerErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED);
}
Assert.isTrue((outputAnnotationCount == 1),
StreamListenerErrorMessages.RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED);
Method[] uniqueDeclaredMethods = ReflectionUtils.getUniqueDeclaredMethods(targetClass);
for (Method method : uniqueDeclaredMethods) {
StreamListener streamListener = AnnotatedElementUtils.findMergedAnnotation(method,
StreamListener.class);
if (streamListener != null && !method.isBridge()) {
streamListener = postProcessAnnotation(streamListener, method);
Assert.isTrue(method.getAnnotation(Input.class) == null,
StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER);
String methodAnnotatedInboundName = streamListener.value();
String methodAnnotatedOutboundName = StreamListenerMethodUtils.getOutboundBindingTargetName(method);
int inputAnnotationCount = StreamListenerMethodUtils.inputAnnotationCount(method);
int outputAnnotationCount = StreamListenerMethodUtils.outputAnnotationCount(method);
boolean isDeclarative = checkDeclarativeMethod(method, methodAnnotatedInboundName,
methodAnnotatedOutboundName);
StreamListenerMethodUtils.validateStreamListenerMethod(method, inputAnnotationCount,
outputAnnotationCount, methodAnnotatedInboundName, methodAnnotatedOutboundName,
isDeclarative, streamListener.condition());
if (!method.getReturnType().equals(Void.TYPE)) {
if (!StringUtils.hasText(methodAnnotatedOutboundName)) {
if (outputAnnotationCount == 0) {
throw new IllegalArgumentException(
StreamListenerErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED);
}
}
if (isDeclarative) {
invokeSetupMethodOnListenedChannel(method, bean, methodAnnotatedInboundName,
methodAnnotatedOutboundName);
}
else {
registerHandlerMethodOnListenedChannel(method, streamListener, bean);
Assert.isTrue((outputAnnotationCount == 1),
StreamListenerErrorMessages.RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED);
}
}
if (isDeclarative) {
invokeSetupMethodOnListenedChannel(method, bean, methodAnnotatedInboundName,
methodAnnotatedOutboundName);
}
else {
registerHandlerMethodOnListenedChannel(method, streamListener, bean);
}
}
});
}
return bean;
}