Refactor ReactivePulsarListenerAnnotationBeanPostProcessor

- Extend ReactivePulsarListenerAnnotationBeanPostProcessor
   from AbstractPulsarAnnotationsBeanPostProcessor.
This commit is contained in:
Soby Chacko
2023-02-22 17:50:47 -05:00
parent 6cf83a4296
commit e310f18f17
2 changed files with 12 additions and 409 deletions

View File

@@ -16,64 +16,36 @@
package org.springframework.pulsar.reactive.config.annotation;
import java.io.IOException;
import java.io.StringReader;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.BeanExpressionContext;
import org.springframework.beans.factory.config.BeanExpressionResolver;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.Scope;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.core.convert.converter.ConditionalGenericConverter;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.convert.converter.GenericConverter;
import org.springframework.core.log.LogAccessor;
import org.springframework.format.Formatter;
import org.springframework.format.FormatterRegistry;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.pulsar.annotation.AbstractPulsarAnnotationsBeanPostProcessor;
import org.springframework.pulsar.annotation.PulsarListenerConfigurer;
import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames;
import org.springframework.pulsar.config.PulsarListenerEndpointRegistrar;
@@ -83,7 +55,6 @@ import org.springframework.pulsar.reactive.config.ReactivePulsarListenerEndpoint
import org.springframework.pulsar.reactive.config.ReactivePulsarListenerEndpointRegistry;
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
/**
@@ -108,6 +79,7 @@ import org.springframework.util.StringUtils;
*
* @param <V> the payload type.
* @author Christophe Bornet
* @author Soby Chacko
* @see ReactivePulsarListener
* @see EnableReactivePulsar
* @see PulsarListenerConfigurer
@@ -116,32 +88,16 @@ import org.springframework.util.StringUtils;
* @see ReactivePulsarListenerEndpoint
* @see MethodReactivePulsarListenerEndpoint
*/
public class ReactivePulsarListenerAnnotationBeanPostProcessor<V>
implements BeanPostProcessor, Ordered, ApplicationContextAware, InitializingBean, SmartInitializingSingleton {
private final LogAccessor logger = new LogAccessor(this.getClass());
public class ReactivePulsarListenerAnnotationBeanPostProcessor<V> extends AbstractPulsarAnnotationsBeanPostProcessor
implements SmartInitializingSingleton {
/**
* The bean name of the default {@link ReactivePulsarListenerContainerFactory}.
*/
public static final String DEFAULT_REACTIVE_PULSAR_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "reactivePulsarListenerContainerFactory";
private static final String THE_LEFT = "The [";
private static final String RESOLVED_TO_LEFT = "Resolved to [";
private static final String RIGHT_FOR_LEFT = "] for [";
private static final String GENERATED_ID_PREFIX = "org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#";
private ApplicationContext applicationContext;
private BeanFactory beanFactory;
private BeanExpressionResolver resolver;
private BeanExpressionContext expressionContext;
private ReactivePulsarListenerEndpointRegistry<?> endpointRegistry;
private String defaultContainerFactoryBeanName = DEFAULT_REACTIVE_PULSAR_LISTENER_CONTAINER_FACTORY_BEAN_NAME;
@@ -149,56 +105,12 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor<V>
private final PulsarListenerEndpointRegistrar registrar = new PulsarListenerEndpointRegistrar(
ReactivePulsarListenerContainerFactory.class);
private final PulsarHandlerMethodFactoryAdapter messageHandlerMethodFactory = new PulsarHandlerMethodFactoryAdapter();
private Charset charset = StandardCharsets.UTF_8;
private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
private final ListenerScope listenerScope = new ListenerScope();
private AnnotationEnhancer enhancer;
private final AtomicInteger counter = new AtomicInteger();
@Override
public int getOrder() {
return LOWEST_PRECEDENCE;
}
public void setEndpointRegistry(ReactivePulsarListenerEndpointRegistry<?> endpointRegistry) {
this.endpointRegistry = endpointRegistry;
}
public void setDefaultContainerFactoryBeanName(String containerFactoryBeanName) {
this.defaultContainerFactoryBeanName = containerFactoryBeanName;
}
public void setCharset(Charset charset) {
Assert.notNull(charset, "'charset' cannot be null");
this.charset = charset;
}
@Override
public void afterPropertiesSet() {
buildEnhancer();
}
private void buildEnhancer() {
if (this.applicationContext != null) {
List<AnnotationEnhancer> enhancers = this.applicationContext
.getBeanProvider(AnnotationEnhancer.class, false).orderedStream().toList();
if (!enhancers.isEmpty()) {
this.enhancer = (attrs, element) -> {
for (AnnotationEnhancer enh : enhancers) {
attrs = enh.apply(attrs, element);
}
return attrs;
};
}
}
}
@Override
public void afterSingletonsInstantiated() {
this.registrar.setBeanFactory(this.beanFactory);
@@ -220,7 +132,7 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor<V>
if (this.defaultContainerFactoryBeanName != null) {
this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
}
addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
addFormatters(this.messageHandlerMethodFactory.getDefaultFormattingConversionService());
// Actually register all listeners
this.registrar.afterPropertiesSet();
@@ -316,18 +228,6 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor<V>
return factory;
}
protected void assertBeanFactory() {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
}
protected String noBeanFoundMessage(Object target, String listenerBeanName, String requestedBeanName,
Class<?> expectedClass) {
return "Could not register Pulsar listener endpoint on [" + target + "] for bean " + listenerBeanName + ", no '"
+ expectedClass.getSimpleName() + "' with id '" + requestedBeanName
+ "' was found in the application context";
}
private void processReactivePulsarListenerAnnotation(MethodReactivePulsarListenerEndpoint<?> endpoint,
ReactivePulsarListener reactivePulsarListener, Object bean, String[] topics, String topicPattern) {
@@ -394,49 +294,6 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor<V>
}
}
private Integer resolveExpressionAsInteger(String value, String attribute) {
Object resolved = resolveExpression(value);
Integer result = null;
if (resolved instanceof String) {
result = Integer.parseInt((String) resolved);
}
else if (resolved instanceof Number) {
result = ((Number) resolved).intValue();
}
else if (resolved != null) {
throw new IllegalStateException(
THE_LEFT + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. "
+ RESOLVED_TO_LEFT + resolved.getClass() + RIGHT_FOR_LEFT + value + "]");
}
return result;
}
private Boolean resolveExpressionAsBoolean(String value, String attribute) {
Object resolved = resolveExpression(value);
Boolean result = null;
if (resolved instanceof Boolean) {
result = (Boolean) resolved;
}
else if (resolved instanceof String) {
result = Boolean.parseBoolean((String) resolved);
}
else if (resolved != null) {
throw new IllegalStateException(
THE_LEFT + attribute + "] must resolve to a Boolean or a String that can be parsed as a Boolean. "
+ RESOLVED_TO_LEFT + resolved.getClass() + RIGHT_FOR_LEFT + value + "]");
}
return result;
}
private void loadProperty(Properties properties, String property, Object value) {
try {
properties.load(new StringReader((String) value));
}
catch (IOException e) {
this.logger.error(e, () -> "Failed to load property " + property + ", continuing...");
}
}
private String getEndpointSubscriptionName(ReactivePulsarListener reactivePulsarListener) {
if (StringUtils.hasText(reactivePulsarListener.subscriptionName())) {
return resolveExpressionAsString(reactivePulsarListener.subscriptionName(), "subscriptionName");
@@ -455,18 +312,6 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor<V>
return resolveExpressionAsString(reactivePulsarListener.topicPattern(), "topicPattern");
}
private String resolveExpressionAsString(String value, String attribute) {
Object resolved = resolveExpression(value);
if (resolved instanceof String) {
return (String) resolved;
}
else if (resolved != null) {
throw new IllegalStateException(THE_LEFT + attribute + "] must resolve to a String. " + RESOLVED_TO_LEFT
+ resolved.getClass() + RIGHT_FOR_LEFT + value + "]");
}
return null;
}
private String[] resolveTopics(ReactivePulsarListener ReactivePulsarListener) {
String[] topics = ReactivePulsarListener.topics();
List<String> result = new ArrayList<>();
@@ -479,83 +324,15 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor<V>
return result.toArray(new String[0]);
}
private Object resolveExpression(String value) {
return this.resolver.evaluate(resolve(value), this.expressionContext);
}
private String resolve(String value) {
if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) {
return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
}
return value;
}
@SuppressWarnings("unchecked")
private void resolveAsString(Object resolvedValue, List<String> result) {
if (resolvedValue instanceof String[]) {
for (Object object : (String[]) resolvedValue) {
resolveAsString(object, result);
}
}
else if (resolvedValue instanceof String) {
result.add((String) resolvedValue);
}
else if (resolvedValue instanceof Iterable) {
for (Object object : (Iterable<Object>) resolvedValue) {
resolveAsString(object, result);
}
}
else {
throw new IllegalArgumentException(
"@ReactivePulsarListener can't resolve '%s' as a String".formatted(resolvedValue));
}
}
private Method checkProxy(Method methodArg, Object bean) {
Method method = methodArg;
if (AopUtils.isJdkDynamicProxy(bean)) {
try {
// Found a @ReactivePulsarListener method on the target class for this JDK
// proxy
// ->
// is it also present on the proxy itself?
method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
for (Class<?> iface : proxiedInterfaces) {
try {
method = iface.getMethod(method.getName(), method.getParameterTypes());
break;
}
catch (@SuppressWarnings("unused") NoSuchMethodException noMethod) {
// NOSONAR
}
}
}
catch (SecurityException ex) {
ReflectionUtils.handleReflectionException(ex);
}
catch (NoSuchMethodException ex) {
throw new IllegalStateException("@ReactivePulsarListener method '%s' found on bean target class '%s', "
+ "but not found in any interface(s) for bean JDK proxy. Either "
+ "pull the method up to an interface or switch to subclass (CGLIB) "
+ "proxies by setting proxy-target-class/proxyTargetClass "
+ "attribute to 'true'".formatted(method.getName(), method.getDeclaringClass().getSimpleName()),
ex);
}
}
return method;
}
private Collection<ReactivePulsarListener> findListenerAnnotations(Class<?> clazz) {
Set<ReactivePulsarListener> listeners = new HashSet<>();
ReactivePulsarListener ann = AnnotatedElementUtils.findMergedAnnotation(clazz, ReactivePulsarListener.class);
if (ann != null) {
ann = enhance(clazz, ann);
listeners.add(ann);
}
ReactivePulsarListeners anns = AnnotationUtils.findAnnotation(clazz, ReactivePulsarListeners.class);
if (anns != null) {
listeners.addAll(Arrays.stream(anns.value()).map(anno -> enhance(clazz, anno)).toList());
listeners.addAll(Arrays.stream(anns.value()).toList());
}
return listeners;
}
@@ -564,25 +341,15 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor<V>
Set<ReactivePulsarListener> listeners = new HashSet<>();
ReactivePulsarListener ann = AnnotatedElementUtils.findMergedAnnotation(method, ReactivePulsarListener.class);
if (ann != null) {
ann = enhance(method, ann);
listeners.add(ann);
}
ReactivePulsarListeners anns = AnnotationUtils.findAnnotation(method, ReactivePulsarListeners.class);
if (anns != null) {
listeners.addAll(Arrays.stream(anns.value()).map(anno -> enhance(method, anno)).toList());
listeners.addAll(Arrays.stream(anns.value()).toList());
}
return listeners;
}
private ReactivePulsarListener enhance(AnnotatedElement element, ReactivePulsarListener ann) {
if (this.enhancer == null) {
return ann;
}
return AnnotationUtils.synthesizeAnnotation(
this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element),
ReactivePulsarListener.class, null);
}
private void addFormatters(FormatterRegistry registry) {
this.beanFactory.getBeanProvider(Converter.class).forEach(registry::addConverter);
this.beanFactory.getBeanProvider(GenericConverter.class).forEach(registry::addConverter);
@@ -600,169 +367,4 @@ public class ReactivePulsarListenerAnnotationBeanPostProcessor<V>
}
}
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
if (beanFactory instanceof ConfigurableListableBeanFactory) {
this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
this.listenerScope);
}
}
private class PulsarHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {
private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();
private MessageHandlerMethodFactory handlerMethodFactory;
public void setHandlerMethodFactory(MessageHandlerMethodFactory pulsarHandlerMethodFactory1) {
this.handlerMethodFactory = pulsarHandlerMethodFactory1;
}
@Override
public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {
return getHandlerMethodFactory().createInvocableHandlerMethod(bean, method);
}
private MessageHandlerMethodFactory getHandlerMethodFactory() {
if (this.handlerMethodFactory == null) {
this.handlerMethodFactory = createDefaultMessageHandlerMethodFactory();
}
return this.handlerMethodFactory;
}
private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
defaultFactory.setBeanFactory(ReactivePulsarListenerAnnotationBeanPostProcessor.this.beanFactory);
this.defaultFormattingConversionService.addConverter(
new BytesToStringConverter(ReactivePulsarListenerAnnotationBeanPostProcessor.this.charset));
this.defaultFormattingConversionService.addConverter(new BytesToNumberConverter());
defaultFactory.setConversionService(this.defaultFormattingConversionService);
GenericMessageConverter messageConverter = new GenericMessageConverter(
this.defaultFormattingConversionService);
defaultFactory.setMessageConverter(messageConverter);
defaultFactory.afterPropertiesSet();
return defaultFactory;
}
}
private static class BytesToStringConverter implements Converter<byte[], String> {
private final Charset charset;
BytesToStringConverter(Charset charset) {
this.charset = charset;
}
@Override
public String convert(byte[] source) {
return new String(source, this.charset);
}
}
private final class BytesToNumberConverter implements ConditionalGenericConverter {
BytesToNumberConverter() {
}
@Override
@Nullable
public Set<ConvertiblePair> getConvertibleTypes() {
HashSet<ConvertiblePair> pairs = new HashSet<>();
pairs.add(new ConvertiblePair(byte[].class, long.class));
pairs.add(new ConvertiblePair(byte[].class, int.class));
pairs.add(new ConvertiblePair(byte[].class, short.class));
pairs.add(new ConvertiblePair(byte[].class, byte.class));
pairs.add(new ConvertiblePair(byte[].class, Long.class));
pairs.add(new ConvertiblePair(byte[].class, Integer.class));
pairs.add(new ConvertiblePair(byte[].class, Short.class));
pairs.add(new ConvertiblePair(byte[].class, Byte.class));
return pairs;
}
@Override
@Nullable
public Object convert(@Nullable Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {
byte[] bytes = (byte[]) source;
if (targetType.getType().equals(long.class) || targetType.getType().equals(Long.class)) {
Assert.state(bytes.length >= 8, "At least 8 bytes needed to convert a byte[] to a long"); // NOSONAR
return ByteBuffer.wrap(bytes).getLong();
}
else if (targetType.getType().equals(int.class) || targetType.getType().equals(Integer.class)) {
Assert.state(bytes.length >= 4, "At least 4 bytes needed to convert a byte[] to an integer"); // NOSONAR
return ByteBuffer.wrap(bytes).getInt();
}
else if (targetType.getType().equals(short.class) || targetType.getType().equals(Short.class)) {
Assert.state(bytes.length >= 2, "At least 2 bytes needed to convert a byte[] to a short");
return ByteBuffer.wrap(bytes).getShort();
}
else if (targetType.getType().equals(byte.class) || targetType.getType().equals(Byte.class)) {
Assert.state(bytes.length >= 1, "At least 1 byte needed to convert a byte[] to a byte");
return ByteBuffer.wrap(bytes).get();
}
return null;
}
@Override
public boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) {
if (sourceType.getType().equals(byte[].class)) {
Class<?> target = targetType.getType();
return target.equals(long.class) || target.equals(int.class) || target.equals(short.class) // NOSONAR
|| target.equals(byte.class) || target.equals(Long.class) || target.equals(Integer.class)
|| target.equals(Short.class) || target.equals(Byte.class);
}
return false;
}
}
static class ListenerScope implements Scope {
private final Map<String, Object> listeners = new HashMap<>();
ListenerScope() {
}
public void addListener(String key, Object bean) {
this.listeners.put(key, bean);
}
public void removeListener(String key) {
this.listeners.remove(key);
}
@Override
public Object get(String name, ObjectFactory<?> objectFactory) {
return this.listeners.get(name);
}
@Override
public Object remove(String name) {
return null;
}
@Override
public void registerDestructionCallback(String name, Runnable callback) {
}
@Override
public Object resolveContextualObject(String key) {
return this.listeners.get(key);
}
@Override
public String getConversationId() {
return null;
}
}
public interface AnnotationEnhancer extends BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>> {
}
}

View File

@@ -256,13 +256,10 @@ public class AbstractPulsarAnnotationsBeanPostProcessor
return result;
}
protected static class ListenerScope implements Scope {
public static class ListenerScope implements Scope {
private final Map<String, Object> listeners = new HashMap<>();
ListenerScope() {
}
public void addListener(String key, Object bean) {
this.listeners.put(key, bean);
}
@@ -335,6 +332,10 @@ public class AbstractPulsarAnnotationsBeanPostProcessor
return defaultFactory;
}
public DefaultFormattingConversionService getDefaultFormattingConversionService() {
return this.defaultFormattingConversionService;
}
}
private static class BytesToStringConverter implements Converter<byte[], String> {