GH-118 - Reworked infrastructure to complete event publications.
Previously we used a dedicated BeanPostProcessor to decorate beans that expose transactional event listeners with an interceptor to mark event publications as completed if the method invocation completes successfully. We now have simplified the arrangement by using Spring's auto-proxy creation and registering an Advisor implementation. This allows us to properly and reliably order the interceptor between the async one (goes before ours) and the transactional one (goes after ours). Also, this significantly simplifies the implementation of the interceptor as we can assume only commit listener methods being intercepted in the first place.
This commit is contained in:
@@ -32,6 +32,14 @@
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-aop</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Test -->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Logging -->
|
||||
<dependency>
|
||||
|
||||
@@ -16,12 +16,14 @@
|
||||
package org.springframework.modulith.events.config;
|
||||
|
||||
import org.springframework.beans.factory.ObjectFactory;
|
||||
import org.springframework.beans.factory.config.BeanDefinition;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Role;
|
||||
import org.springframework.modulith.events.DefaultEventPublicationRegistry;
|
||||
import org.springframework.modulith.events.EventPublicationRegistry;
|
||||
import org.springframework.modulith.events.EventPublicationRepository;
|
||||
import org.springframework.modulith.events.support.CompletionRegisteringBeanPostProcessor;
|
||||
import org.springframework.modulith.events.support.CompletionRegisteringAdvisor;
|
||||
import org.springframework.modulith.events.support.PersistentApplicationEventMulticaster;
|
||||
|
||||
/**
|
||||
@@ -45,7 +47,8 @@ class EventPublicationConfiguration {
|
||||
}
|
||||
|
||||
@Bean
|
||||
static CompletionRegisteringBeanPostProcessor bpp(ObjectFactory<EventPublicationRegistry> store) {
|
||||
return new CompletionRegisteringBeanPostProcessor(store::getObject);
|
||||
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
|
||||
static CompletionRegisteringAdvisor completionRegisteringAdvisor(ObjectFactory<EventPublicationRegistry> registry) {
|
||||
return new CompletionRegisteringAdvisor(registry::getObject);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,204 @@
|
||||
/*
|
||||
* Copyright 2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.modulith.events.support;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.aopalliance.aop.Advice;
|
||||
import org.aopalliance.intercept.MethodInterceptor;
|
||||
import org.aopalliance.intercept.MethodInvocation;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.aop.Advisor;
|
||||
import org.springframework.aop.MethodMatcher;
|
||||
import org.springframework.aop.Pointcut;
|
||||
import org.springframework.aop.support.AbstractPointcutAdvisor;
|
||||
import org.springframework.aop.support.StaticMethodMatcher;
|
||||
import org.springframework.aop.support.annotation.AnnotationMatchingPointcut;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.annotation.AnnotatedElementUtils;
|
||||
import org.springframework.lang.NonNull;
|
||||
import org.springframework.modulith.events.EventPublicationRegistry;
|
||||
import org.springframework.modulith.events.PublicationTargetIdentifier;
|
||||
import org.springframework.transaction.event.TransactionPhase;
|
||||
import org.springframework.transaction.event.TransactionalApplicationListenerMethodAdapter;
|
||||
import org.springframework.transaction.event.TransactionalEventListener;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ConcurrentLruCache;
|
||||
|
||||
/**
|
||||
* An {@link Advisor} to decorate {@link TransactionalEventListener} annotated methods to mark the previously registered
|
||||
* event publications as completed on successful method execution.
|
||||
*
|
||||
* @author Oliver Drotbohm
|
||||
*/
|
||||
public class CompletionRegisteringAdvisor extends AbstractPointcutAdvisor {
|
||||
|
||||
private static final long serialVersionUID = 5649563426118669238L;
|
||||
|
||||
private final Pointcut pointcut;
|
||||
private final Advice advice;
|
||||
|
||||
/**
|
||||
* Creates a new {@link CompletionRegisteringAdvisor} for the given {@link EventPublicationRegistry}.
|
||||
*
|
||||
* @param registry must not be {@literal null}.
|
||||
*/
|
||||
public CompletionRegisteringAdvisor(Supplier<EventPublicationRegistry> registry) {
|
||||
|
||||
Assert.notNull(registry, "EventPublicationRegistry must not be null!");
|
||||
|
||||
this.pointcut = new AnnotationMatchingPointcut(null, TransactionalEventListener.class, true) {
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.aop.support.annotation.AnnotationMatchingPointcut#getMethodMatcher()
|
||||
*/
|
||||
@Override
|
||||
public MethodMatcher getMethodMatcher() {
|
||||
return new CommitListenerMethodMatcher(super.getMethodMatcher());
|
||||
}
|
||||
};
|
||||
|
||||
this.advice = new CompletionRegisteringMethodInterceptor(registry);
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.aop.PointcutAdvisor#getPointcut()
|
||||
*/
|
||||
public Pointcut getPointcut() {
|
||||
return pointcut;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.aop.Advisor#getAdvice()
|
||||
*/
|
||||
@Override
|
||||
public Advice getAdvice() {
|
||||
return advice;
|
||||
}
|
||||
|
||||
/**
|
||||
* An adapter for a delegating {@link MethodMatcher} to verify the
|
||||
*
|
||||
* @author Oliver Drotbohm
|
||||
*/
|
||||
private static class CommitListenerMethodMatcher extends StaticMethodMatcher {
|
||||
|
||||
private final MethodMatcher delegate;
|
||||
|
||||
/**
|
||||
* Creates a new {@link CommitListenerMethodMatcher} with the given delegate {@link MethodMatcher}.
|
||||
*
|
||||
* @param delegate must not be {@literal null}.
|
||||
*/
|
||||
public CommitListenerMethodMatcher(MethodMatcher delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.aop.MethodMatcher#matches(java.lang.reflect.Method, java.lang.Class)
|
||||
*/
|
||||
@Override
|
||||
public boolean matches(Method method, Class<?> targetClass) {
|
||||
|
||||
if (!delegate.matches(method, targetClass)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
var annotation = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);
|
||||
|
||||
return annotation != null && annotation.phase().equals(TransactionPhase.AFTER_COMMIT);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link MethodInterceptor} to trigger the completion of an event publication after a transaction event listener
|
||||
* method has been completed successfully.
|
||||
*
|
||||
* @author Oliver Drotbohm
|
||||
*/
|
||||
static class CompletionRegisteringMethodInterceptor implements MethodInterceptor, Ordered {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(CompletionRegisteringMethodInterceptor.class);
|
||||
|
||||
private static final ConcurrentLruCache<Method, TransactionalApplicationListenerMethodAdapter> ADAPTERS = new ConcurrentLruCache<>(
|
||||
100, CompletionRegisteringMethodInterceptor::createAdapter);
|
||||
|
||||
private final @NonNull Supplier<EventPublicationRegistry> registry;
|
||||
|
||||
/**
|
||||
* Creates a new {@link CompletionRegisteringMethodInterceptor} for the given {@link EventPublicationRegistry}.
|
||||
*
|
||||
* @param registry must not be {@literal null}.
|
||||
*/
|
||||
CompletionRegisteringMethodInterceptor(Supplier<EventPublicationRegistry> registry) {
|
||||
|
||||
Assert.notNull(registry, "EventPublicationRegistry must not be null!");
|
||||
|
||||
this.registry = registry;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.aopalliance.intercept.MethodInterceptor#invoke(org.aopalliance.intercept.MethodInvocation)
|
||||
*/
|
||||
@Override
|
||||
public Object invoke(MethodInvocation invocation) throws Throwable {
|
||||
|
||||
Object result = null;
|
||||
var method = invocation.getMethod();
|
||||
|
||||
try {
|
||||
result = invocation.proceed();
|
||||
} catch (Exception o_O) {
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Invocation of listener {} failed. Leaving event publication uncompleted.", method, o_O);
|
||||
} else {
|
||||
LOG.info("Invocation of listener {} failed with message {}. Leaving event publication uncompleted.",
|
||||
method, o_O.getMessage());
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Mark publication complete if the method is a transactional event listener.
|
||||
String adapterId = ADAPTERS.get(method).getListenerId();
|
||||
PublicationTargetIdentifier identifier = PublicationTargetIdentifier.of(adapterId);
|
||||
registry.get().markCompleted(invocation.getArguments()[0], identifier);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.core.Ordered#getOrder()
|
||||
*/
|
||||
@Override
|
||||
public int getOrder() {
|
||||
return Ordered.HIGHEST_PRECEDENCE + 10;
|
||||
}
|
||||
|
||||
private static TransactionalApplicationListenerMethodAdapter createAdapter(Method method) {
|
||||
return new TransactionalApplicationListenerMethodAdapter(null, method.getDeclaringClass(), method);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,257 +0,0 @@
|
||||
/*
|
||||
* Copyright 2017-2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.modulith.events.support;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.aopalliance.aop.Advice;
|
||||
import org.aopalliance.intercept.MethodInterceptor;
|
||||
import org.aopalliance.intercept.MethodInvocation;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.aop.framework.Advised;
|
||||
import org.springframework.aop.framework.AopProxyUtils;
|
||||
import org.springframework.aop.framework.ProxyFactory;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.annotation.AnnotatedElementUtils;
|
||||
import org.springframework.lang.NonNull;
|
||||
import org.springframework.modulith.events.EventPublicationRegistry;
|
||||
import org.springframework.modulith.events.PublicationTargetIdentifier;
|
||||
import org.springframework.transaction.event.TransactionPhase;
|
||||
import org.springframework.transaction.event.TransactionalApplicationListenerMethodAdapter;
|
||||
import org.springframework.transaction.event.TransactionalEventListener;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ConcurrentLruCache;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
import org.springframework.util.ReflectionUtils.MethodCallback;
|
||||
|
||||
/**
|
||||
* {@link BeanPostProcessor} that will add a {@link CompletionRegisteringMethodInterceptor} to the bean in case it
|
||||
* carries a {@link TransactionalEventListener} annotation so that the successful invocation of those methods mark the
|
||||
* event publication to those listeners as completed.
|
||||
*
|
||||
* @author Oliver Drotbohm
|
||||
*/
|
||||
public class CompletionRegisteringBeanPostProcessor implements BeanPostProcessor {
|
||||
|
||||
private final Supplier<EventPublicationRegistry> registry;
|
||||
|
||||
/**
|
||||
* Creates a new {@link CompletionRegisteringBeanPostProcessor} for the given {@link EventPublicationRegistry}.
|
||||
*
|
||||
* @param registry must not be {@literal null}.
|
||||
*/
|
||||
public CompletionRegisteringBeanPostProcessor(Supplier<EventPublicationRegistry> registry) {
|
||||
|
||||
Assert.notNull(registry, "EventPublicationRegistry must not be null!");
|
||||
|
||||
this.registry = registry;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.beans.factory.config.BeanPostProcessor#postProcessAfterInitialization(java.lang.Object, java.lang.String)
|
||||
*/
|
||||
@Override
|
||||
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
|
||||
|
||||
ProxyCreatingMethodCallback callback = new ProxyCreatingMethodCallback(registry, beanName, bean);
|
||||
|
||||
ReflectionUtils.doWithMethods(AopProxyUtils.ultimateTargetClass(bean), callback);
|
||||
|
||||
return callback.methodFound ? callback.bean : bean;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Method callback to find a {@link TransactionalEventListener} method and creating a proxy including an
|
||||
* {@link CompletionRegisteringBeanPostProcessor} for it or adding the latter to the already existing advisor chain.
|
||||
*
|
||||
* @author Oliver Drotbohm
|
||||
*/
|
||||
private static class ProxyCreatingMethodCallback implements MethodCallback {
|
||||
|
||||
private final Supplier<EventPublicationRegistry> registry;
|
||||
private final String beanName;
|
||||
private Object bean;
|
||||
private boolean methodFound;
|
||||
|
||||
/**
|
||||
* Creates a new {@link ProxyCreatingMethodCallback} for the given {@link EventPublicationRegistry}, bean name, bean
|
||||
* and whether a completing method has been found.
|
||||
*
|
||||
* @param registry must not be {@literal null}.
|
||||
* @param beanName must not be {@literal null} or empty.
|
||||
* @param bean must not be {@literal null}.
|
||||
*/
|
||||
ProxyCreatingMethodCallback(Supplier<EventPublicationRegistry> registry, String beanName, Object bean) {
|
||||
|
||||
Assert.notNull(registry, "EventPublicationRegistry must not be null!");
|
||||
Assert.hasText(beanName, "Bean name must not be null or empty!");
|
||||
Assert.notNull(bean, "Bean must not be null!");
|
||||
|
||||
this.registry = registry;
|
||||
this.beanName = beanName;
|
||||
this.bean = bean;
|
||||
this.methodFound = false;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.util.ReflectionUtils.MethodCallback#doWith(java.lang.reflect.Method)
|
||||
*/
|
||||
@Override
|
||||
public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
|
||||
|
||||
if (methodFound || !CompletionRegisteringMethodInterceptor.isCompletingMethod(method)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.methodFound = true;
|
||||
this.bean = createCompletionRegisteringProxy(bean,
|
||||
new CompletionRegisteringMethodInterceptor(registry, beanName));
|
||||
}
|
||||
|
||||
private static Object createCompletionRegisteringProxy(Object bean, Advice interceptor) {
|
||||
|
||||
if (bean instanceof Advised) {
|
||||
|
||||
Advised advised = (Advised) bean;
|
||||
advised.addAdvice(advised.getAdvisors().length, interceptor);
|
||||
|
||||
return bean;
|
||||
}
|
||||
|
||||
ProxyFactory factory = new ProxyFactory(bean);
|
||||
factory.setProxyTargetClass(true);
|
||||
factory.addAdvice(interceptor);
|
||||
|
||||
return factory.getProxy();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link MethodInterceptor} to trigger the completion of an event publication after a transaction event listener
|
||||
* method has been completed successfully.
|
||||
*
|
||||
* @author Oliver Drotbohm
|
||||
*/
|
||||
private static class CompletionRegisteringMethodInterceptor implements MethodInterceptor, Ordered {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(CompletionRegisteringMethodInterceptor.class);
|
||||
private static final ConcurrentLruCache<Method, Boolean> COMPLETING_METHOD = new ConcurrentLruCache<>(100,
|
||||
CompletionRegisteringMethodInterceptor::calculateIsCompletingMethod);
|
||||
private static final ConcurrentLruCache<CacheKey, TransactionalApplicationListenerMethodAdapter> ADAPTERS = new ConcurrentLruCache<>(
|
||||
100, CompletionRegisteringMethodInterceptor::createAdapter);
|
||||
|
||||
private final @NonNull Supplier<EventPublicationRegistry> registry;
|
||||
private final @NonNull String beanName;
|
||||
|
||||
/**
|
||||
* @param registry
|
||||
* @param beanName
|
||||
*/
|
||||
CompletionRegisteringMethodInterceptor(Supplier<EventPublicationRegistry> registry, String beanName) {
|
||||
|
||||
Assert.notNull(registry, "EventPublicationRegistry must not be null!");
|
||||
Assert.hasText(beanName, "Bean name must not be null or empty!");
|
||||
|
||||
this.registry = registry;
|
||||
this.beanName = beanName;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.aopalliance.intercept.MethodInterceptor#invoke(org.aopalliance.intercept.MethodInvocation)
|
||||
*/
|
||||
@Override
|
||||
public Object invoke(MethodInvocation invocation) throws Throwable {
|
||||
|
||||
Object result = null;
|
||||
Method method = invocation.getMethod();
|
||||
|
||||
try {
|
||||
result = invocation.proceed();
|
||||
} catch (Exception o_O) {
|
||||
|
||||
if (!isCompletingMethod(method)) {
|
||||
throw o_O;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Invocation of listener {} failed. Leaving event publication uncompleted.", method, o_O);
|
||||
} else {
|
||||
LOG.info("Invocation of listener {} failed with message {}. Leaving event publication uncompleted.",
|
||||
method, o_O.getMessage());
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
if (!isCompletingMethod(method)) {
|
||||
return result;
|
||||
}
|
||||
|
||||
// Mark publication complete if the method is a transactional event listener.
|
||||
String adapterId = ADAPTERS.get(new CacheKey(beanName, method)).getListenerId();
|
||||
PublicationTargetIdentifier identifier = PublicationTargetIdentifier.of(adapterId);
|
||||
registry.get().markCompleted(invocation.getArguments()[0], identifier);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
* @see org.springframework.core.Ordered#getOrder()
|
||||
*/
|
||||
@Override
|
||||
public int getOrder() {
|
||||
return Ordered.HIGHEST_PRECEDENCE - 10;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the given method is one that requires publication completion.
|
||||
*
|
||||
* @param method must not be {@literal null}.
|
||||
* @return
|
||||
*/
|
||||
static boolean isCompletingMethod(Method method) {
|
||||
|
||||
Assert.notNull(method, "Method must not be null!");
|
||||
|
||||
return COMPLETING_METHOD.get(method);
|
||||
}
|
||||
|
||||
private static boolean calculateIsCompletingMethod(Method method) {
|
||||
|
||||
TransactionalEventListener annotation = AnnotatedElementUtils.getMergedAnnotation(method,
|
||||
TransactionalEventListener.class);
|
||||
|
||||
return annotation == null ? false : annotation.phase().equals(TransactionPhase.AFTER_COMMIT);
|
||||
}
|
||||
|
||||
private static TransactionalApplicationListenerMethodAdapter createAdapter(CacheKey key) {
|
||||
return new TransactionalApplicationListenerMethodAdapter(key.beanName, key.method.getDeclaringClass(),
|
||||
key.method);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static record CacheKey(String beanName, Method method) {}
|
||||
}
|
||||
@@ -49,7 +49,7 @@ import org.springframework.util.Assert;
|
||||
* for incomplete publications and
|
||||
*
|
||||
* @author Oliver Drotbohm
|
||||
* @see CompletionRegisteringBeanPostProcessor
|
||||
* @see CompletionRegisteringAdvisor
|
||||
*/
|
||||
public class PersistentApplicationEventMulticaster extends AbstractApplicationEventMulticaster
|
||||
implements SmartInitializingSingleton {
|
||||
|
||||
@@ -0,0 +1,97 @@
|
||||
/*
|
||||
* Copyright 2023 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.modulith.events.support;
|
||||
|
||||
import static org.assertj.core.api.Assertions.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.springframework.aop.Advisor;
|
||||
import org.springframework.aop.framework.Advised;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.config.BeanDefinition;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Role;
|
||||
import org.springframework.modulith.events.EventPublicationRegistry;
|
||||
import org.springframework.modulith.events.support.CompletionRegisteringAdvisor.CompletionRegisteringMethodInterceptor;
|
||||
import org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.annotation.EnableTransactionManagement;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.transaction.event.TransactionalEventListener;
|
||||
import org.springframework.transaction.interceptor.TransactionInterceptor;
|
||||
|
||||
/**
|
||||
* @author Oliver Drotbohm
|
||||
*/
|
||||
@ExtendWith(SpringExtension.class)
|
||||
class CompletionRegisteringAdvisorIntegrationTests {
|
||||
|
||||
@Autowired SampleListener listener;
|
||||
|
||||
@EnableAsync
|
||||
@EnableTransactionManagement
|
||||
@Configuration
|
||||
static class TestConfiguration {
|
||||
|
||||
@Bean
|
||||
SampleListener listener() {
|
||||
return new SampleListener();
|
||||
}
|
||||
|
||||
@Bean
|
||||
PlatformTransactionManager transactionManager() {
|
||||
return mock(PlatformTransactionManager.class);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
|
||||
static CompletionRegisteringAdvisor completionRegisteringAdvisor() {
|
||||
|
||||
var publicationRegistry = mock(EventPublicationRegistry.class);
|
||||
|
||||
return new CompletionRegisteringAdvisor(() -> publicationRegistry);
|
||||
}
|
||||
}
|
||||
|
||||
static class SampleListener {
|
||||
|
||||
@Async
|
||||
@Transactional
|
||||
@TransactionalEventListener
|
||||
void on(Object event) {}
|
||||
}
|
||||
|
||||
@Test // #118
|
||||
void addsCompletionRegisteringInterceptor() throws Exception {
|
||||
|
||||
assertThat(listener).isInstanceOfSatisfying(Advised.class, it -> {
|
||||
|
||||
assertThat(it.getAdvisors())
|
||||
.extracting(Advisor::getAdvice)
|
||||
.<Class<?>> extracting(Object::getClass)
|
||||
.startsWith(
|
||||
AnnotationAsyncExecutionInterceptor.class,
|
||||
CompletionRegisteringMethodInterceptor.class,
|
||||
TransactionInterceptor.class);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -23,7 +23,7 @@ import java.util.function.BiConsumer;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.aop.framework.Advised;
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
import org.springframework.aop.framework.ProxyFactory;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.modulith.events.EventPublicationRegistry;
|
||||
import org.springframework.transaction.event.TransactionPhase;
|
||||
@@ -34,20 +34,11 @@ import org.springframework.transaction.event.TransactionalEventListener;
|
||||
*
|
||||
* @author Oliver Drotbohm
|
||||
*/
|
||||
class CompletionRegisteringBeanPostProcessorUnitTests {
|
||||
class CompletionRegisteringAdvisorUnitTests {
|
||||
|
||||
EventPublicationRegistry registry = mock(EventPublicationRegistry.class);
|
||||
BeanPostProcessor processor = new CompletionRegisteringBeanPostProcessor(() -> registry);
|
||||
SomeEventListener bean = new SomeEventListener();
|
||||
|
||||
@Test
|
||||
void doesNotProxyNonTransactionalEventListenerClass() {
|
||||
|
||||
NoEventListener bean = new NoEventListener();
|
||||
|
||||
assertThat(bean).isSameAs(processor.postProcessBeforeInitialization(bean, "bean"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void triggersCompletionForAfterCommitEventListener() throws Exception {
|
||||
assertCompletion(SomeEventListener::onAfterCommit);
|
||||
@@ -78,7 +69,7 @@ class CompletionRegisteringBeanPostProcessorUnitTests {
|
||||
|
||||
private void assertCompletion(BiConsumer<SomeEventListener, Object> consumer, boolean expected) {
|
||||
|
||||
Object processed = processor.postProcessAfterInitialization(bean, "listener");
|
||||
Object processed = createProxyFor(bean);
|
||||
|
||||
assertThat(processed).isInstanceOf(Advised.class);
|
||||
assertThat(processed).isInstanceOfSatisfying(SomeEventListener.class, //
|
||||
@@ -87,6 +78,13 @@ class CompletionRegisteringBeanPostProcessorUnitTests {
|
||||
verify(registry, times(expected ? 1 : 0)).markCompleted(any(), any());
|
||||
}
|
||||
|
||||
private Object createProxyFor(Object bean) {
|
||||
|
||||
ProxyFactory factory = new ProxyFactory(bean);
|
||||
factory.addAdvisor(new CompletionRegisteringAdvisor(() -> registry));
|
||||
return factory.getProxy();
|
||||
}
|
||||
|
||||
static class SomeEventListener {
|
||||
|
||||
@TransactionalEventListener
|
||||
@@ -100,6 +98,4 @@ class CompletionRegisteringBeanPostProcessorUnitTests {
|
||||
|
||||
void nonEventListener(Object object) {}
|
||||
}
|
||||
|
||||
static class NoEventListener {}
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<configuration>
|
||||
|
||||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder>
|
||||
<pattern>%d %5p %40.40c:%4L - %m%n</pattern>
|
||||
</encoder>
|
||||
</appender>
|
||||
|
||||
<root level="error">
|
||||
<appender-ref ref="console" />
|
||||
</root>
|
||||
|
||||
</configuration>
|
||||
Reference in New Issue
Block a user