diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BinderAwareRouterBeanPostProcessor.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BinderAwareRouterBeanPostProcessor.java index 10f69018e..5135a1883 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BinderAwareRouterBeanPostProcessor.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BinderAwareRouterBeanPostProcessor.java @@ -16,7 +16,6 @@ package org.springframework.cloud.stream.binding; -import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.integration.router.AbstractMappingMessageRouter; import org.springframework.messaging.MessageChannel; @@ -28,26 +27,17 @@ import org.springframework.messaging.core.DestinationResolver; * * @author Mark Fisher * @author Gary Russell + * @author Oleg Zhurakousky + * */ -public class BinderAwareRouterBeanPostProcessor implements BeanPostProcessor { +public class BinderAwareRouterBeanPostProcessor { - private final DestinationResolver channelResolver; - - public BinderAwareRouterBeanPostProcessor(DestinationResolver channelResolver) { - this.channelResolver = channelResolver; - } - - @Override - public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { - return bean; - } - - @Override - public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { - if (bean instanceof AbstractMappingMessageRouter) { - ((AbstractMappingMessageRouter) bean).setChannelResolver(this.channelResolver); + public BinderAwareRouterBeanPostProcessor(AbstractMappingMessageRouter[] routers, DestinationResolver channelResolver) { + if (routers != null) { + for (AbstractMappingMessageRouter router : routers) { + router.setChannelResolver(channelResolver); + } } - return bean; } } diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java index 2a2a18f2d..e9ae7000a 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java @@ -24,14 +24,14 @@ import java.util.Map; import com.fasterxml.jackson.databind.ObjectMapper; -import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeansException; -import org.springframework.beans.factory.BeanFactoryUtils; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanPostProcessor; -import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -67,11 +67,12 @@ import org.springframework.integration.json.JsonPropertyAccessor; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.converter.MessageConverter; -import org.springframework.messaging.core.DestinationResolutionException; +import org.springframework.integration.router.AbstractMappingMessageRouter; import org.springframework.messaging.core.DestinationResolver; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory; import org.springframework.tuple.spel.TuplePropertyAccessor; +import org.springframework.util.ClassUtils; import org.springframework.util.CollectionUtils; /** @@ -218,12 +219,17 @@ public class BindingServiceConfiguration { return new ChannelBindingServiceProperties(bindingServiceProperties); } + @Bean + @ConditionalOnMissingBean + public BinderAwareRouterBeanPostProcessor binderAwareRouterBeanPostProcessor(@Autowired(required=false) AbstractMappingMessageRouter[] routers, + @Autowired(required=false)DestinationResolver channelResolver) { + return new BinderAwareRouterBeanPostProcessor(routers, channelResolver); + } + // IMPORTANT: Nested class to avoid instantiating all of the above early @Configuration protected static class PostProcessorConfiguration { - private BinderAwareChannelResolver binderAwareChannelResolver; - /** * Adds property accessors for use in SpEL expression evaluation */ @@ -253,51 +259,55 @@ public class BindingServiceConfiguration { public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; } + }; } @Bean - @ConditionalOnMissingBean(BinderAwareRouterBeanPostProcessor.class) - public BinderAwareRouterBeanPostProcessor binderAwareRouterBeanPostProcessor( - final ConfigurableListableBeanFactory beanFactory) { - // IMPORTANT: Lazy delegate to avoid instantiating all of the above early - return new BinderAwareRouterBeanPostProcessor(new DestinationResolver() { - - @Override - public MessageChannel resolveDestination(String name) throws DestinationResolutionException { - if (PostProcessorConfiguration.this.binderAwareChannelResolver == null) { - PostProcessorConfiguration.this.binderAwareChannelResolver = BeanFactoryUtils - .beanOfType(beanFactory, BinderAwareChannelResolver.class); - } - return PostProcessorConfiguration.this.binderAwareChannelResolver.resolveDestination(name); - } - - }); + public static BeanPostProcessor messageHandlerHeaderPropagationBeanPostProcessor() { + return new NotPropagatedHeadersBeanPostProcessor(); } - @Bean - public static BeanPostProcessor messageHandlerHeaderPropagationBeanPostProcessor( - final SpringIntegrationProperties springIntegrationProperties) { - return new BeanPostProcessor() { - @Override - public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { - // TODO: Filter out beans created by SCSt (not currently necessary) - Class beanClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass(); - if (AbstractReplyProducingMessageHandler.class.isAssignableFrom(beanClass)) { - AbstractReplyProducingMessageHandler messageHandler = (AbstractReplyProducingMessageHandler) bean; - messageHandler.addNotPropagatedHeaders( - springIntegrationProperties.getMessageHandlerNotPropagatedHeaders()); - } - return bean; + private static final class NotPropagatedHeadersBeanPostProcessor + implements BeanPostProcessor, BeanFactoryAware, SmartInitializingSingleton { + + private final List producingMessageHandlers = new ArrayList<>(); + + private BeanFactory beanFactory; + + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + this.beanFactory = beanFactory; + } + + @Override + public void afterSingletonsInstantiated() { + SpringIntegrationProperties springIntegrationProperties = + this.beanFactory.getBean(SpringIntegrationProperties.class); + + for (AbstractReplyProducingMessageHandler producingMessageHandler : producingMessageHandlers) { + producingMessageHandler.addNotPropagatedHeaders( + springIntegrationProperties.getMessageHandlerNotPropagatedHeaders()); } - @Override - public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { - return bean; - } + this.producingMessageHandlers.clear(); + } - }; + @Override + public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + // TODO: Filter out beans created by SCSt (not currently necessary) + Class beanClass = ClassUtils.getUserClass(bean); + if (AbstractReplyProducingMessageHandler.class.isAssignableFrom(beanClass)) { + this.producingMessageHandlers.add((AbstractReplyProducingMessageHandler) bean); + } + return bean; + } + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + return bean; + } } }