polishing BinderAwareRouterBeanPostProcessor
- removed BeanPostProcessor from BinderAwareRouterBeanPostProcessor - simplified BinderAwareRouterBeanPostProcessor configuration in BindingServiceConfiguration Resolves #1171 Resolves #1170
This commit is contained in:
@@ -16,7 +16,6 @@
|
||||
|
||||
package org.springframework.cloud.stream.binding;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
import org.springframework.integration.router.AbstractMappingMessageRouter;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
@@ -28,26 +27,17 @@ import org.springframework.messaging.core.DestinationResolver;
|
||||
*
|
||||
* @author Mark Fisher
|
||||
* @author Gary Russell
|
||||
* @author Oleg Zhurakousky
|
||||
*
|
||||
*/
|
||||
public class BinderAwareRouterBeanPostProcessor implements BeanPostProcessor {
|
||||
public class BinderAwareRouterBeanPostProcessor {
|
||||
|
||||
private final DestinationResolver<MessageChannel> channelResolver;
|
||||
|
||||
public BinderAwareRouterBeanPostProcessor(DestinationResolver<MessageChannel> channelResolver) {
|
||||
this.channelResolver = channelResolver;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
|
||||
return bean;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
if (bean instanceof AbstractMappingMessageRouter) {
|
||||
((AbstractMappingMessageRouter) bean).setChannelResolver(this.channelResolver);
|
||||
public BinderAwareRouterBeanPostProcessor(AbstractMappingMessageRouter[] routers, DestinationResolver<MessageChannel> channelResolver) {
|
||||
if (routers != null) {
|
||||
for (AbstractMappingMessageRouter router : routers) {
|
||||
router.setChannelResolver(channelResolver);
|
||||
}
|
||||
}
|
||||
return bean;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -24,14 +24,14 @@ import java.util.Map;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import org.springframework.aop.support.AopUtils;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.BeanFactoryUtils;
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.beans.factory.BeanFactoryAware;
|
||||
import org.springframework.beans.factory.SmartInitializingSingleton;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.config.BeanDefinition;
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
@@ -67,11 +67,12 @@ import org.springframework.integration.json.JsonPropertyAccessor;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.messaging.core.DestinationResolutionException;
|
||||
import org.springframework.integration.router.AbstractMappingMessageRouter;
|
||||
import org.springframework.messaging.core.DestinationResolver;
|
||||
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
|
||||
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
|
||||
import org.springframework.tuple.spel.TuplePropertyAccessor;
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
/**
|
||||
@@ -218,12 +219,17 @@ public class BindingServiceConfiguration {
|
||||
return new ChannelBindingServiceProperties(bindingServiceProperties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public BinderAwareRouterBeanPostProcessor binderAwareRouterBeanPostProcessor(@Autowired(required=false) AbstractMappingMessageRouter[] routers,
|
||||
@Autowired(required=false)DestinationResolver<MessageChannel> channelResolver) {
|
||||
return new BinderAwareRouterBeanPostProcessor(routers, channelResolver);
|
||||
}
|
||||
|
||||
// IMPORTANT: Nested class to avoid instantiating all of the above early
|
||||
@Configuration
|
||||
protected static class PostProcessorConfiguration {
|
||||
|
||||
private BinderAwareChannelResolver binderAwareChannelResolver;
|
||||
|
||||
/**
|
||||
* Adds property accessors for use in SpEL expression evaluation
|
||||
*/
|
||||
@@ -253,51 +259,55 @@ public class BindingServiceConfiguration {
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
return bean;
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(BinderAwareRouterBeanPostProcessor.class)
|
||||
public BinderAwareRouterBeanPostProcessor binderAwareRouterBeanPostProcessor(
|
||||
final ConfigurableListableBeanFactory beanFactory) {
|
||||
// IMPORTANT: Lazy delegate to avoid instantiating all of the above early
|
||||
return new BinderAwareRouterBeanPostProcessor(new DestinationResolver<MessageChannel>() {
|
||||
|
||||
@Override
|
||||
public MessageChannel resolveDestination(String name) throws DestinationResolutionException {
|
||||
if (PostProcessorConfiguration.this.binderAwareChannelResolver == null) {
|
||||
PostProcessorConfiguration.this.binderAwareChannelResolver = BeanFactoryUtils
|
||||
.beanOfType(beanFactory, BinderAwareChannelResolver.class);
|
||||
}
|
||||
return PostProcessorConfiguration.this.binderAwareChannelResolver.resolveDestination(name);
|
||||
}
|
||||
|
||||
});
|
||||
public static BeanPostProcessor messageHandlerHeaderPropagationBeanPostProcessor() {
|
||||
return new NotPropagatedHeadersBeanPostProcessor();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public static BeanPostProcessor messageHandlerHeaderPropagationBeanPostProcessor(
|
||||
final SpringIntegrationProperties springIntegrationProperties) {
|
||||
return new BeanPostProcessor() {
|
||||
|
||||
@Override
|
||||
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
|
||||
// TODO: Filter out beans created by SCSt (not currently necessary)
|
||||
Class<?> beanClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass();
|
||||
if (AbstractReplyProducingMessageHandler.class.isAssignableFrom(beanClass)) {
|
||||
AbstractReplyProducingMessageHandler messageHandler = (AbstractReplyProducingMessageHandler) bean;
|
||||
messageHandler.addNotPropagatedHeaders(
|
||||
springIntegrationProperties.getMessageHandlerNotPropagatedHeaders());
|
||||
}
|
||||
return bean;
|
||||
private static final class NotPropagatedHeadersBeanPostProcessor
|
||||
implements BeanPostProcessor, BeanFactoryAware, SmartInitializingSingleton {
|
||||
|
||||
private final List<AbstractReplyProducingMessageHandler> producingMessageHandlers = new ArrayList<>();
|
||||
|
||||
private BeanFactory beanFactory;
|
||||
|
||||
@Override
|
||||
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
|
||||
this.beanFactory = beanFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterSingletonsInstantiated() {
|
||||
SpringIntegrationProperties springIntegrationProperties =
|
||||
this.beanFactory.getBean(SpringIntegrationProperties.class);
|
||||
|
||||
for (AbstractReplyProducingMessageHandler producingMessageHandler : producingMessageHandlers) {
|
||||
producingMessageHandler.addNotPropagatedHeaders(
|
||||
springIntegrationProperties.getMessageHandlerNotPropagatedHeaders());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
return bean;
|
||||
}
|
||||
this.producingMessageHandlers.clear();
|
||||
}
|
||||
|
||||
};
|
||||
@Override
|
||||
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
|
||||
// TODO: Filter out beans created by SCSt (not currently necessary)
|
||||
Class<?> beanClass = ClassUtils.getUserClass(bean);
|
||||
if (AbstractReplyProducingMessageHandler.class.isAssignableFrom(beanClass)) {
|
||||
this.producingMessageHandlers.add((AbstractReplyProducingMessageHandler) bean);
|
||||
}
|
||||
return bean;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
return bean;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user