Support executor qualification with @Async#value

Prior to this change, Spring's @Async annotation support was tied to a
single AsyncTaskExecutor bean, meaning that all methods marked with
@Async were forced to use the same executor. This is an undesirable
limitation, given that certain methods may have different priorities,
etc. This leads to the need to (optionally) qualify which executor
should handle each method.

This is similar to the way that Spring's @Transactional annotation was
originally tied to a single PlatformTransactionManager, but in Spring
3.0 was enhanced to allow for a qualifier via the #value attribute, e.g.

  @Transactional(ptm1)
  public void m() { ... }

where ptm1 is either the name of a PlatformTransactionManager bean or
a qualifier value associated with a PlatformTransactionManager bean,
e.g. via the <qualifier> element in XML or the @Qualifier annotation.

This commit introduces the same approach to @Async and its relationship
to underlying executor beans. As always, the following syntax remains
supported

  @Async
  public void m() { ... }

indicating that calls to #m will be delegated to the default executor,
i.e. the executor provided to

  <task:annotation-driven executor=.../>

or the executor specified when authoring a @Configuration class that
implements AsyncConfigurer and its #getAsyncExecutor method.

However, it now also possible to qualify which executor should be used
on a method-by-method basis, e.g.

  @Async(e1)
  public void m() { ... }

indicating that calls to #m will be delegated to the executor bean
named or otherwise qualified as e1. Unlike the default executor
which is specified up front at configuration time as described above,
the e1 executor bean is looked up within the container on the first
execution of #m and then cached in association with that method for the
lifetime of the container.

Class-level use of Async#value behaves as expected, indicating that all
methods within the annotated class should be executed with the named
executor. In the case of both method- and class-level annotations, any
method-level #value overrides any class level #value.

This commit introduces the following major changes:

 - Add @Async#value attribute for executor qualification

 - Introduce AsyncExecutionAspectSupport as a common base class for
   both MethodInterceptor- and AspectJ-based async aspects. This base
   class provides common structure for specifying the default executor
   (#setExecutor) as well as logic for determining (and caching) which
   executor should execute a given method (#determineAsyncExecutor) and
   an abstract method to allow subclasses to provide specific strategies
   for executor qualification (#getExecutorQualifier).

 - Introduce AnnotationAsyncExecutionInterceptor as a specialization of
   the existing AsyncExecutionInterceptor to allow for introspection of
   the @Async annotation and its #value attribute for a given method.
   Note that this new subclass was necessary for packaging reasons -
   the original AsyncExecutionInterceptor lives in
   org.springframework.aop and therefore does not have visibility to
   the @Async annotation in org.springframework.scheduling.annotation.
   This new subclass replaces usage of AsyncExecutionInterceptor
   throughout the framework, though the latter remains usable and
   undeprecated for compatibility with any existing third-party
   extensions.

 - Add documentation to spring-task-3.2.xsd and reference manual
   explaining @Async executor qualification

 - Add tests covering all new functionality

Note that the public API of all affected components remains backward-
compatible.

Issue: SPR-9443
Backport-Issue: SPR-6847
Backport-Commit: ed0576c181
This commit is contained in:
Chris Beams
2012-06-27 21:00:33 +02:00
parent e8006bdf78
commit 8bab873107
16 changed files with 564 additions and 48 deletions

View File

@@ -0,0 +1,53 @@
/*
* Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.scheduling.annotation;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
/**
* Unit tests for {@link AnnotationAsyncExecutionInterceptor}.
*
* @author Chris Beams
* @since 3.1.2
*/
public class AnnotationAsyncExecutionInterceptorTests {
@Test
@SuppressWarnings("unused")
public void testGetExecutorQualifier() throws SecurityException, NoSuchMethodException {
AnnotationAsyncExecutionInterceptor i = new AnnotationAsyncExecutionInterceptor(null);
{
class C { @Async("qMethod") void m() { } }
assertThat(i.getExecutorQualifier(C.class.getDeclaredMethod("m")), is("qMethod"));
}
{
@Async("qClass") class C { void m() { } }
assertThat(i.getExecutorQualifier(C.class.getDeclaredMethod("m")), is("qClass"));
}
{
@Async("qClass") class C { @Async("qMethod") void m() { } }
assertThat(i.getExecutorQualifier(C.class.getDeclaredMethod("m")), is("qMethod"));
}
{
@Async("qClass") class C { @Async void m() { } }
assertThat(i.getExecutorQualifier(C.class.getDeclaredMethod("m")), is(""));
}
}
}

View File

@@ -25,11 +25,13 @@ import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static org.junit.Assert.*;
/**
* @author Juergen Hoeller
* @author Chris Beams
*/
public class AsyncExecutionTests {
@@ -55,6 +57,26 @@ public class AsyncExecutionTests {
assertEquals("20", future.get());
}
@Test
public void asyncMethodsWithQualifier() throws Exception {
originalThreadName = Thread.currentThread().getName();
GenericApplicationContext context = new GenericApplicationContext();
context.registerBeanDefinition("asyncTest", new RootBeanDefinition(AsyncMethodWithQualifierBean.class));
context.registerBeanDefinition("autoProxyCreator", new RootBeanDefinition(DefaultAdvisorAutoProxyCreator.class));
context.registerBeanDefinition("asyncAdvisor", new RootBeanDefinition(AsyncAnnotationAdvisor.class));
context.registerBeanDefinition("e0", new RootBeanDefinition(ThreadPoolTaskExecutor.class));
context.registerBeanDefinition("e1", new RootBeanDefinition(ThreadPoolTaskExecutor.class));
context.registerBeanDefinition("e2", new RootBeanDefinition(ThreadPoolTaskExecutor.class));
context.refresh();
AsyncMethodWithQualifierBean asyncTest = context.getBean("asyncTest", AsyncMethodWithQualifierBean.class);
asyncTest.doNothing(5);
asyncTest.doSomething(10);
Future<String> future = asyncTest.returnSomething(20);
assertEquals("20", future.get());
Future<String> future2 = asyncTest.returnSomething2(30);
assertEquals("30", future2.get());
}
@Test
public void asyncClass() throws Exception {
originalThreadName = Thread.currentThread().getName();
@@ -165,6 +187,34 @@ public class AsyncExecutionTests {
}
@Async("e0")
public static class AsyncMethodWithQualifierBean {
public void doNothing(int i) {
assertTrue(Thread.currentThread().getName().equals(originalThreadName));
}
@Async("e1")
public void doSomething(int i) {
assertTrue(!Thread.currentThread().getName().equals(originalThreadName));
assertTrue(Thread.currentThread().getName().startsWith("e1-"));
}
@Async("e2")
public Future<String> returnSomething(int i) {
assertTrue(!Thread.currentThread().getName().equals(originalThreadName));
assertTrue(Thread.currentThread().getName().startsWith("e2-"));
return new AsyncResult<String>(Integer.toString(i));
}
public Future<String> returnSomething2(int i) {
assertTrue(!Thread.currentThread().getName().equals(originalThreadName));
assertTrue(Thread.currentThread().getName().startsWith("e0-"));
return new AsyncResult<String>(Integer.toString(i));
}
}
@Async
public static class AsyncClassBean {

View File

@@ -21,7 +21,9 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import org.junit.Test;
@@ -29,6 +31,7 @@ import org.springframework.aop.Advisor;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.BeanDefinitionStoreException;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.AdviceMode;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
@@ -71,6 +74,48 @@ public class EnableAsyncTests {
}
@SuppressWarnings("unchecked")
@Test
public void withAsyncBeanWithExecutorQualifiedByName() throws ExecutionException, InterruptedException {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
ctx.register(AsyncWithExecutorQualifiedByNameConfig.class);
ctx.refresh();
AsyncBeanWithExecutorQualifiedByName asyncBean = ctx.getBean(AsyncBeanWithExecutorQualifiedByName.class);
Future<Thread> workerThread0 = asyncBean.work0();
assertThat(workerThread0.get().getName(), not(anyOf(startsWith("e1-"), startsWith("otherExecutor-"))));
Future<Thread> workerThread = asyncBean.work();
assertThat(workerThread.get().getName(), startsWith("e1-"));
Future<Thread> workerThread2 = asyncBean.work2();
assertThat(workerThread2.get().getName(), startsWith("otherExecutor-"));
Future<Thread> workerThread3 = asyncBean.work3();
assertThat(workerThread3.get().getName(), startsWith("otherExecutor-"));
}
static class AsyncBeanWithExecutorQualifiedByName {
@Async
public Future<Thread> work0() {
return new AsyncResult<Thread>(Thread.currentThread());
}
@Async("e1")
public Future<Thread> work() {
return new AsyncResult<Thread>(Thread.currentThread());
}
@Async("otherExecutor")
public Future<Thread> work2() {
return new AsyncResult<Thread>(Thread.currentThread());
}
@Async("e2")
public Future<Thread> work3() {
return new AsyncResult<Thread>(Thread.currentThread());
}
}
static class AsyncBean {
private Thread threadOfExecution;
@@ -208,6 +253,28 @@ public class EnableAsyncTests {
executor.initialize();
return executor;
}
}
@Configuration
@EnableAsync
static class AsyncWithExecutorQualifiedByNameConfig {
@Bean
public AsyncBeanWithExecutorQualifiedByName asyncBean() {
return new AsyncBeanWithExecutorQualifiedByName();
}
@Bean
public Executor e1() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
return executor;
}
@Bean
@Qualifier("e2")
public Executor otherExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
return executor;
}
}
}