From e310f18f17760955edcff7ebc25f79ed7f5e919f Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 22 Feb 2023 17:50:47 -0500 Subject: [PATCH] Refactor ReactivePulsarListenerAnnotationBeanPostProcessor - Extend ReactivePulsarListenerAnnotationBeanPostProcessor from AbstractPulsarAnnotationsBeanPostProcessor. --- ...arListenerAnnotationBeanPostProcessor.java | 412 +----------------- ...actPulsarAnnotationsBeanPostProcessor.java | 9 +- 2 files changed, 12 insertions(+), 409 deletions(-) diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java index e6849177..01f39bf7 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java @@ -16,64 +16,36 @@ package org.springframework.pulsar.reactive.config.annotation; -import java.io.IOException; -import java.io.StringReader; -import java.lang.reflect.AnnotatedElement; import java.lang.reflect.Method; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.BiFunction; import org.apache.pulsar.client.api.DeadLetterPolicy; -import org.springframework.aop.framework.Advised; import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeansException; -import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanInitializationException; -import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.NoSuchBeanDefinitionException; -import org.springframework.beans.factory.ObjectFactory; import org.springframework.beans.factory.SmartInitializingSingleton; -import org.springframework.beans.factory.config.BeanExpressionContext; -import org.springframework.beans.factory.config.BeanExpressionResolver; -import org.springframework.beans.factory.config.BeanPostProcessor; -import org.springframework.beans.factory.config.ConfigurableBeanFactory; -import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; -import org.springframework.beans.factory.config.Scope; import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.MethodIntrospector; -import org.springframework.core.Ordered; import org.springframework.core.annotation.AnnotatedElementUtils; import org.springframework.core.annotation.AnnotationUtils; -import org.springframework.core.convert.TypeDescriptor; -import org.springframework.core.convert.converter.ConditionalGenericConverter; import org.springframework.core.convert.converter.Converter; import org.springframework.core.convert.converter.GenericConverter; -import org.springframework.core.log.LogAccessor; import org.springframework.format.Formatter; import org.springframework.format.FormatterRegistry; -import org.springframework.format.support.DefaultFormattingConversionService; import org.springframework.lang.Nullable; -import org.springframework.messaging.converter.GenericMessageConverter; -import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; -import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory; -import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; +import org.springframework.pulsar.annotation.AbstractPulsarAnnotationsBeanPostProcessor; import org.springframework.pulsar.annotation.PulsarListenerConfigurer; import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames; import org.springframework.pulsar.config.PulsarListenerEndpointRegistrar; @@ -83,7 +55,6 @@ import org.springframework.pulsar.reactive.config.ReactivePulsarListenerEndpoint import org.springframework.pulsar.reactive.config.ReactivePulsarListenerEndpointRegistry; import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer; import org.springframework.util.Assert; -import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; /** @@ -108,6 +79,7 @@ import org.springframework.util.StringUtils; * * @param the payload type. * @author Christophe Bornet + * @author Soby Chacko * @see ReactivePulsarListener * @see EnableReactivePulsar * @see PulsarListenerConfigurer @@ -116,32 +88,16 @@ import org.springframework.util.StringUtils; * @see ReactivePulsarListenerEndpoint * @see MethodReactivePulsarListenerEndpoint */ -public class ReactivePulsarListenerAnnotationBeanPostProcessor - implements BeanPostProcessor, Ordered, ApplicationContextAware, InitializingBean, SmartInitializingSingleton { - - private final LogAccessor logger = new LogAccessor(this.getClass()); +public class ReactivePulsarListenerAnnotationBeanPostProcessor extends AbstractPulsarAnnotationsBeanPostProcessor + implements SmartInitializingSingleton { /** * The bean name of the default {@link ReactivePulsarListenerContainerFactory}. */ public static final String DEFAULT_REACTIVE_PULSAR_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "reactivePulsarListenerContainerFactory"; - private static final String THE_LEFT = "The ["; - - private static final String RESOLVED_TO_LEFT = "Resolved to ["; - - private static final String RIGHT_FOR_LEFT = "] for ["; - private static final String GENERATED_ID_PREFIX = "org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#"; - private ApplicationContext applicationContext; - - private BeanFactory beanFactory; - - private BeanExpressionResolver resolver; - - private BeanExpressionContext expressionContext; - private ReactivePulsarListenerEndpointRegistry endpointRegistry; private String defaultContainerFactoryBeanName = DEFAULT_REACTIVE_PULSAR_LISTENER_CONTAINER_FACTORY_BEAN_NAME; @@ -149,56 +105,12 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor private final PulsarListenerEndpointRegistrar registrar = new PulsarListenerEndpointRegistrar( ReactivePulsarListenerContainerFactory.class); - private final PulsarHandlerMethodFactoryAdapter messageHandlerMethodFactory = new PulsarHandlerMethodFactoryAdapter(); - - private Charset charset = StandardCharsets.UTF_8; - private final Set> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64)); private final ListenerScope listenerScope = new ListenerScope(); - private AnnotationEnhancer enhancer; - private final AtomicInteger counter = new AtomicInteger(); - @Override - public int getOrder() { - return LOWEST_PRECEDENCE; - } - - public void setEndpointRegistry(ReactivePulsarListenerEndpointRegistry endpointRegistry) { - this.endpointRegistry = endpointRegistry; - } - - public void setDefaultContainerFactoryBeanName(String containerFactoryBeanName) { - this.defaultContainerFactoryBeanName = containerFactoryBeanName; - } - - public void setCharset(Charset charset) { - Assert.notNull(charset, "'charset' cannot be null"); - this.charset = charset; - } - - @Override - public void afterPropertiesSet() { - buildEnhancer(); - } - - private void buildEnhancer() { - if (this.applicationContext != null) { - List enhancers = this.applicationContext - .getBeanProvider(AnnotationEnhancer.class, false).orderedStream().toList(); - if (!enhancers.isEmpty()) { - this.enhancer = (attrs, element) -> { - for (AnnotationEnhancer enh : enhancers) { - attrs = enh.apply(attrs, element); - } - return attrs; - }; - } - } - } - @Override public void afterSingletonsInstantiated() { this.registrar.setBeanFactory(this.beanFactory); @@ -220,7 +132,7 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor if (this.defaultContainerFactoryBeanName != null) { this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName); } - addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService); + addFormatters(this.messageHandlerMethodFactory.getDefaultFormattingConversionService()); // Actually register all listeners this.registrar.afterPropertiesSet(); @@ -316,18 +228,6 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor return factory; } - protected void assertBeanFactory() { - Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name"); - } - - protected String noBeanFoundMessage(Object target, String listenerBeanName, String requestedBeanName, - Class expectedClass) { - - return "Could not register Pulsar listener endpoint on [" + target + "] for bean " + listenerBeanName + ", no '" - + expectedClass.getSimpleName() + "' with id '" + requestedBeanName - + "' was found in the application context"; - } - private void processReactivePulsarListenerAnnotation(MethodReactivePulsarListenerEndpoint endpoint, ReactivePulsarListener reactivePulsarListener, Object bean, String[] topics, String topicPattern) { @@ -394,49 +294,6 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor } } - private Integer resolveExpressionAsInteger(String value, String attribute) { - Object resolved = resolveExpression(value); - Integer result = null; - if (resolved instanceof String) { - result = Integer.parseInt((String) resolved); - } - else if (resolved instanceof Number) { - result = ((Number) resolved).intValue(); - } - else if (resolved != null) { - throw new IllegalStateException( - THE_LEFT + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. " - + RESOLVED_TO_LEFT + resolved.getClass() + RIGHT_FOR_LEFT + value + "]"); - } - return result; - } - - private Boolean resolveExpressionAsBoolean(String value, String attribute) { - Object resolved = resolveExpression(value); - Boolean result = null; - if (resolved instanceof Boolean) { - result = (Boolean) resolved; - } - else if (resolved instanceof String) { - result = Boolean.parseBoolean((String) resolved); - } - else if (resolved != null) { - throw new IllegalStateException( - THE_LEFT + attribute + "] must resolve to a Boolean or a String that can be parsed as a Boolean. " - + RESOLVED_TO_LEFT + resolved.getClass() + RIGHT_FOR_LEFT + value + "]"); - } - return result; - } - - private void loadProperty(Properties properties, String property, Object value) { - try { - properties.load(new StringReader((String) value)); - } - catch (IOException e) { - this.logger.error(e, () -> "Failed to load property " + property + ", continuing..."); - } - } - private String getEndpointSubscriptionName(ReactivePulsarListener reactivePulsarListener) { if (StringUtils.hasText(reactivePulsarListener.subscriptionName())) { return resolveExpressionAsString(reactivePulsarListener.subscriptionName(), "subscriptionName"); @@ -455,18 +312,6 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor return resolveExpressionAsString(reactivePulsarListener.topicPattern(), "topicPattern"); } - private String resolveExpressionAsString(String value, String attribute) { - Object resolved = resolveExpression(value); - if (resolved instanceof String) { - return (String) resolved; - } - else if (resolved != null) { - throw new IllegalStateException(THE_LEFT + attribute + "] must resolve to a String. " + RESOLVED_TO_LEFT - + resolved.getClass() + RIGHT_FOR_LEFT + value + "]"); - } - return null; - } - private String[] resolveTopics(ReactivePulsarListener ReactivePulsarListener) { String[] topics = ReactivePulsarListener.topics(); List result = new ArrayList<>(); @@ -479,83 +324,15 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor return result.toArray(new String[0]); } - private Object resolveExpression(String value) { - return this.resolver.evaluate(resolve(value), this.expressionContext); - } - - private String resolve(String value) { - if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) { - return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value); - } - return value; - } - - @SuppressWarnings("unchecked") - private void resolveAsString(Object resolvedValue, List result) { - if (resolvedValue instanceof String[]) { - for (Object object : (String[]) resolvedValue) { - resolveAsString(object, result); - } - } - else if (resolvedValue instanceof String) { - result.add((String) resolvedValue); - } - else if (resolvedValue instanceof Iterable) { - for (Object object : (Iterable) resolvedValue) { - resolveAsString(object, result); - } - } - else { - throw new IllegalArgumentException( - "@ReactivePulsarListener can't resolve '%s' as a String".formatted(resolvedValue)); - } - } - - private Method checkProxy(Method methodArg, Object bean) { - Method method = methodArg; - if (AopUtils.isJdkDynamicProxy(bean)) { - try { - // Found a @ReactivePulsarListener method on the target class for this JDK - // proxy - // -> - // is it also present on the proxy itself? - method = bean.getClass().getMethod(method.getName(), method.getParameterTypes()); - Class[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces(); - for (Class iface : proxiedInterfaces) { - try { - method = iface.getMethod(method.getName(), method.getParameterTypes()); - break; - } - catch (@SuppressWarnings("unused") NoSuchMethodException noMethod) { - // NOSONAR - } - } - } - catch (SecurityException ex) { - ReflectionUtils.handleReflectionException(ex); - } - catch (NoSuchMethodException ex) { - throw new IllegalStateException("@ReactivePulsarListener method '%s' found on bean target class '%s', " - + "but not found in any interface(s) for bean JDK proxy. Either " - + "pull the method up to an interface or switch to subclass (CGLIB) " - + "proxies by setting proxy-target-class/proxyTargetClass " - + "attribute to 'true'".formatted(method.getName(), method.getDeclaringClass().getSimpleName()), - ex); - } - } - return method; - } - private Collection findListenerAnnotations(Class clazz) { Set listeners = new HashSet<>(); ReactivePulsarListener ann = AnnotatedElementUtils.findMergedAnnotation(clazz, ReactivePulsarListener.class); if (ann != null) { - ann = enhance(clazz, ann); listeners.add(ann); } ReactivePulsarListeners anns = AnnotationUtils.findAnnotation(clazz, ReactivePulsarListeners.class); if (anns != null) { - listeners.addAll(Arrays.stream(anns.value()).map(anno -> enhance(clazz, anno)).toList()); + listeners.addAll(Arrays.stream(anns.value()).toList()); } return listeners; } @@ -564,25 +341,15 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor Set listeners = new HashSet<>(); ReactivePulsarListener ann = AnnotatedElementUtils.findMergedAnnotation(method, ReactivePulsarListener.class); if (ann != null) { - ann = enhance(method, ann); listeners.add(ann); } ReactivePulsarListeners anns = AnnotationUtils.findAnnotation(method, ReactivePulsarListeners.class); if (anns != null) { - listeners.addAll(Arrays.stream(anns.value()).map(anno -> enhance(method, anno)).toList()); + listeners.addAll(Arrays.stream(anns.value()).toList()); } return listeners; } - private ReactivePulsarListener enhance(AnnotatedElement element, ReactivePulsarListener ann) { - if (this.enhancer == null) { - return ann; - } - return AnnotationUtils.synthesizeAnnotation( - this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element), - ReactivePulsarListener.class, null); - } - private void addFormatters(FormatterRegistry registry) { this.beanFactory.getBeanProvider(Converter.class).forEach(registry::addConverter); this.beanFactory.getBeanProvider(GenericConverter.class).forEach(registry::addConverter); @@ -600,169 +367,4 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor } } - public void setBeanFactory(BeanFactory beanFactory) { - this.beanFactory = beanFactory; - if (beanFactory instanceof ConfigurableListableBeanFactory) { - this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver(); - this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory, - this.listenerScope); - } - } - - private class PulsarHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory { - - private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService(); - - private MessageHandlerMethodFactory handlerMethodFactory; - - public void setHandlerMethodFactory(MessageHandlerMethodFactory pulsarHandlerMethodFactory1) { - this.handlerMethodFactory = pulsarHandlerMethodFactory1; - } - - @Override - public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) { - return getHandlerMethodFactory().createInvocableHandlerMethod(bean, method); - } - - private MessageHandlerMethodFactory getHandlerMethodFactory() { - if (this.handlerMethodFactory == null) { - this.handlerMethodFactory = createDefaultMessageHandlerMethodFactory(); - } - return this.handlerMethodFactory; - } - - private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() { - DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory(); - defaultFactory.setBeanFactory(ReactivePulsarListenerAnnotationBeanPostProcessor.this.beanFactory); - this.defaultFormattingConversionService.addConverter( - new BytesToStringConverter(ReactivePulsarListenerAnnotationBeanPostProcessor.this.charset)); - this.defaultFormattingConversionService.addConverter(new BytesToNumberConverter()); - defaultFactory.setConversionService(this.defaultFormattingConversionService); - GenericMessageConverter messageConverter = new GenericMessageConverter( - this.defaultFormattingConversionService); - defaultFactory.setMessageConverter(messageConverter); - - defaultFactory.afterPropertiesSet(); - - return defaultFactory; - } - - } - - private static class BytesToStringConverter implements Converter { - - private final Charset charset; - - BytesToStringConverter(Charset charset) { - this.charset = charset; - } - - @Override - public String convert(byte[] source) { - return new String(source, this.charset); - } - - } - - private final class BytesToNumberConverter implements ConditionalGenericConverter { - - BytesToNumberConverter() { - } - - @Override - @Nullable - public Set getConvertibleTypes() { - HashSet pairs = new HashSet<>(); - pairs.add(new ConvertiblePair(byte[].class, long.class)); - pairs.add(new ConvertiblePair(byte[].class, int.class)); - pairs.add(new ConvertiblePair(byte[].class, short.class)); - pairs.add(new ConvertiblePair(byte[].class, byte.class)); - pairs.add(new ConvertiblePair(byte[].class, Long.class)); - pairs.add(new ConvertiblePair(byte[].class, Integer.class)); - pairs.add(new ConvertiblePair(byte[].class, Short.class)); - pairs.add(new ConvertiblePair(byte[].class, Byte.class)); - return pairs; - } - - @Override - @Nullable - public Object convert(@Nullable Object source, TypeDescriptor sourceType, TypeDescriptor targetType) { - byte[] bytes = (byte[]) source; - if (targetType.getType().equals(long.class) || targetType.getType().equals(Long.class)) { - Assert.state(bytes.length >= 8, "At least 8 bytes needed to convert a byte[] to a long"); // NOSONAR - return ByteBuffer.wrap(bytes).getLong(); - } - else if (targetType.getType().equals(int.class) || targetType.getType().equals(Integer.class)) { - Assert.state(bytes.length >= 4, "At least 4 bytes needed to convert a byte[] to an integer"); // NOSONAR - return ByteBuffer.wrap(bytes).getInt(); - } - else if (targetType.getType().equals(short.class) || targetType.getType().equals(Short.class)) { - Assert.state(bytes.length >= 2, "At least 2 bytes needed to convert a byte[] to a short"); - return ByteBuffer.wrap(bytes).getShort(); - } - else if (targetType.getType().equals(byte.class) || targetType.getType().equals(Byte.class)) { - Assert.state(bytes.length >= 1, "At least 1 byte needed to convert a byte[] to a byte"); - return ByteBuffer.wrap(bytes).get(); - } - return null; - } - - @Override - public boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) { - if (sourceType.getType().equals(byte[].class)) { - Class target = targetType.getType(); - return target.equals(long.class) || target.equals(int.class) || target.equals(short.class) // NOSONAR - || target.equals(byte.class) || target.equals(Long.class) || target.equals(Integer.class) - || target.equals(Short.class) || target.equals(Byte.class); - } - return false; - } - - } - - static class ListenerScope implements Scope { - - private final Map listeners = new HashMap<>(); - - ListenerScope() { - } - - public void addListener(String key, Object bean) { - this.listeners.put(key, bean); - } - - public void removeListener(String key) { - this.listeners.remove(key); - } - - @Override - public Object get(String name, ObjectFactory objectFactory) { - return this.listeners.get(name); - } - - @Override - public Object remove(String name) { - return null; - } - - @Override - public void registerDestructionCallback(String name, Runnable callback) { - } - - @Override - public Object resolveContextualObject(String key) { - return this.listeners.get(key); - } - - @Override - public String getConversationId() { - return null; - } - - } - - public interface AnnotationEnhancer extends BiFunction, AnnotatedElement, Map> { - - } - } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/AbstractPulsarAnnotationsBeanPostProcessor.java b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/AbstractPulsarAnnotationsBeanPostProcessor.java index a387b38d..d4df961a 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/AbstractPulsarAnnotationsBeanPostProcessor.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/AbstractPulsarAnnotationsBeanPostProcessor.java @@ -256,13 +256,10 @@ public class AbstractPulsarAnnotationsBeanPostProcessor return result; } - protected static class ListenerScope implements Scope { + public static class ListenerScope implements Scope { private final Map listeners = new HashMap<>(); - ListenerScope() { - } - public void addListener(String key, Object bean) { this.listeners.put(key, bean); } @@ -335,6 +332,10 @@ public class AbstractPulsarAnnotationsBeanPostProcessor return defaultFactory; } + public DefaultFormattingConversionService getDefaultFormattingConversionService() { + return this.defaultFormattingConversionService; + } + } private static class BytesToStringConverter implements Converter {