Add interceptors for async processing

This change introduces two new interceptors with callback methods
for concurrent request handling. These interfaces are
CallableProcessingInterceptor and DeferredResultProcessingInterceptor.

Unlike a HandlerInterceptor, and its AsyncHandlerInterceptor sub-type,
which intercepts the invocation of a handler in he main request
processing thread, the two new interfaces are aimed at intercepting the
asynchronous execution of a Callable or a DeferredResult.

This allows for the registration of thread initialization logic in the
case of Callable executed with an AsyncTaskExecutor, or for centralized
tracking of the completion and/or expiration of a DeferredResult.
This commit is contained in:
Rossen Stoyanchev
2012-09-19 09:25:50 -04:00
parent 30bce7fa16
commit 57c36dd395
24 changed files with 735 additions and 267 deletions

View File

@@ -17,6 +17,7 @@
package org.springframework.orm.hibernate3.support;
import java.io.IOException;
import java.util.concurrent.Callable;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
@@ -32,9 +33,10 @@ import org.springframework.orm.hibernate3.SessionHolder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.async.CallableProcessingInterceptor;
import org.springframework.web.context.request.async.WebAsyncManager;
import org.springframework.web.context.request.async.WebAsyncManager.WebAsyncThreadInitializer;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.context.support.WebApplicationContextUtils;
import org.springframework.web.filter.OncePerRequestFilter;
@@ -195,14 +197,14 @@ public class OpenSessionInViewFilter extends OncePerRequestFilter {
participate = true;
}
else {
if (isFirstRequest || !asyncManager.initializeAsyncThread(key)) {
if (isFirstRequest || !applySessionBindingInterceptor(asyncManager, key)) {
logger.debug("Opening single Hibernate Session in OpenSessionInViewFilter");
Session session = getSession(sessionFactory);
SessionHolder sessionHolder = new SessionHolder(session);
TransactionSynchronizationManager.bindResource(sessionFactory, sessionHolder);
WebAsyncThreadInitializer initializer = createAsyncThreadInitializer(sessionFactory, sessionHolder);
asyncManager.registerAsyncThreadInitializer(key, initializer);
asyncManager.registerCallableInterceptor(key,
new SessionBindingCallableInterceptor(sessionFactory, sessionHolder));
}
}
}
@@ -304,17 +306,40 @@ public class OpenSessionInViewFilter extends OncePerRequestFilter {
SessionFactoryUtils.closeSession(session);
}
private WebAsyncThreadInitializer createAsyncThreadInitializer(final SessionFactory sessionFactory,
final SessionHolder sessionHolder) {
private boolean applySessionBindingInterceptor(WebAsyncManager asyncManager, String key) {
if (asyncManager.getCallableInterceptor(key) == null) {
return false;
}
((SessionBindingCallableInterceptor) asyncManager.getCallableInterceptor(key)).initializeThread();
return true;
}
return new WebAsyncThreadInitializer() {
public void initialize() {
TransactionSynchronizationManager.bindResource(sessionFactory, sessionHolder);
}
public void reset() {
TransactionSynchronizationManager.unbindResource(sessionFactory);
}
};
/**
* Bind and unbind the Hibernate {@code Session} to the current thread.
*/
private static class SessionBindingCallableInterceptor implements CallableProcessingInterceptor {
private final SessionFactory sessionFactory;
private final SessionHolder sessionHolder;
public SessionBindingCallableInterceptor(SessionFactory sessionFactory, SessionHolder sessionHolder) {
this.sessionFactory = sessionFactory;
this.sessionHolder = sessionHolder;
}
public void preProcess(NativeWebRequest request, Callable<?> task) {
initializeThread();
}
private void initializeThread() {
TransactionSynchronizationManager.bindResource(this.sessionFactory, this.sessionHolder);
}
public void postProcess(NativeWebRequest request, Callable<?> task, Object concurrentResult) {
TransactionSynchronizationManager.unbindResource(this.sessionFactory);
}
}
}

View File

@@ -16,6 +16,8 @@
package org.springframework.orm.hibernate3.support;
import java.util.concurrent.Callable;
import org.hibernate.HibernateException;
import org.hibernate.Session;
import org.springframework.dao.DataAccessException;
@@ -25,10 +27,11 @@ import org.springframework.orm.hibernate3.SessionHolder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.ui.ModelMap;
import org.springframework.web.context.request.AsyncWebRequestInterceptor;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.WebRequest;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.context.request.async.CallableProcessingInterceptor;
import org.springframework.web.context.request.async.WebAsyncManager;
import org.springframework.web.context.request.async.WebAsyncManager.WebAsyncThreadInitializer;
import org.springframework.web.context.request.async.WebAsyncUtils;
/**
* Spring web request interceptor that binds a Hibernate <code>Session</code> to the
@@ -147,7 +150,7 @@ public class OpenSessionInViewInterceptor extends HibernateAccessor implements A
String participateAttributeName = getParticipateAttributeName();
if (asyncManager.hasConcurrentResult()) {
if (asyncManager.initializeAsyncThread(participateAttributeName)) {
if (applySessionBindingInterceptor(asyncManager, participateAttributeName)) {
return;
}
}
@@ -169,8 +172,8 @@ public class OpenSessionInViewInterceptor extends HibernateAccessor implements A
SessionHolder sessionHolder = new SessionHolder(session);
TransactionSynchronizationManager.bindResource(getSessionFactory(), sessionHolder);
WebAsyncThreadInitializer asyncThreadInitializer = createThreadInitializer(sessionHolder);
asyncManager.registerAsyncThreadInitializer(participateAttributeName, asyncThreadInitializer);
asyncManager.registerCallableInterceptor(participateAttributeName,
new SessionBindingCallableInterceptor(sessionHolder));
}
else {
// deferred close mode
@@ -261,15 +264,36 @@ public class OpenSessionInViewInterceptor extends HibernateAccessor implements A
return getSessionFactory().toString() + PARTICIPATE_SUFFIX;
}
private WebAsyncThreadInitializer createThreadInitializer(final SessionHolder sessionHolder) {
return new WebAsyncThreadInitializer() {
public void initialize() {
TransactionSynchronizationManager.bindResource(getSessionFactory(), sessionHolder);
}
public void reset() {
TransactionSynchronizationManager.unbindResource(getSessionFactory());
}
};
private boolean applySessionBindingInterceptor(WebAsyncManager asyncManager, String key) {
if (asyncManager.getCallableInterceptor(key) == null) {
return false;
}
((SessionBindingCallableInterceptor) asyncManager.getCallableInterceptor(key)).initializeThread();
return true;
}
/**
* Bind and unbind the Hibernate {@code Session} to the current thread.
*/
private class SessionBindingCallableInterceptor implements CallableProcessingInterceptor {
private final SessionHolder sessionHolder;
public SessionBindingCallableInterceptor(SessionHolder sessionHolder) {
this.sessionHolder = sessionHolder;
}
public void preProcess(NativeWebRequest request, Callable<?> task) {
initializeThread();
}
private void initializeThread() {
TransactionSynchronizationManager.bindResource(getSessionFactory(), this.sessionHolder);
}
public void postProcess(NativeWebRequest request, Callable<?> task, Object concurrentResult) {
TransactionSynchronizationManager.unbindResource(getSessionFactory());
}
}
}

View File

@@ -17,6 +17,7 @@
package org.springframework.orm.hibernate4.support;
import java.io.IOException;
import java.util.concurrent.Callable;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
@@ -32,9 +33,10 @@ import org.springframework.orm.hibernate4.SessionFactoryUtils;
import org.springframework.orm.hibernate4.SessionHolder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.async.CallableProcessingInterceptor;
import org.springframework.web.context.request.async.WebAsyncManager;
import org.springframework.web.context.request.async.WebAsyncManager.WebAsyncThreadInitializer;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.context.support.WebApplicationContextUtils;
import org.springframework.web.filter.OncePerRequestFilter;
@@ -126,14 +128,14 @@ public class OpenSessionInViewFilter extends OncePerRequestFilter {
participate = true;
}
else {
if (isFirstRequest || !asyncManager.initializeAsyncThread(key)) {
if (isFirstRequest || !applySessionBindingInterceptor(asyncManager, key)) {
logger.debug("Opening Hibernate Session in OpenSessionInViewFilter");
Session session = openSession(sessionFactory);
SessionHolder sessionHolder = new SessionHolder(session);
TransactionSynchronizationManager.bindResource(sessionFactory, sessionHolder);
WebAsyncThreadInitializer initializer = createAsyncThreadInitializer(sessionFactory, sessionHolder);
asyncManager.registerAsyncThreadInitializer(key, initializer);
asyncManager.registerCallableInterceptor(key,
new SessionBindingCallableInterceptor(sessionFactory, sessionHolder));
}
}
@@ -201,17 +203,39 @@ public class OpenSessionInViewFilter extends OncePerRequestFilter {
}
}
private WebAsyncThreadInitializer createAsyncThreadInitializer(final SessionFactory sessionFactory,
final SessionHolder sessionHolder) {
return new WebAsyncThreadInitializer() {
public void initialize() {
TransactionSynchronizationManager.bindResource(sessionFactory, sessionHolder);
}
public void reset() {
TransactionSynchronizationManager.unbindResource(sessionFactory);
}
};
private boolean applySessionBindingInterceptor(WebAsyncManager asyncManager, String key) {
if (asyncManager.getCallableInterceptor(key) == null) {
return false;
}
((SessionBindingCallableInterceptor) asyncManager.getCallableInterceptor(key)).initializeThread();
return true;
}
/**
* Bind and unbind the Hibernate {@code Session} to the current thread.
*/
private static class SessionBindingCallableInterceptor implements CallableProcessingInterceptor {
private final SessionFactory sessionFactory;
private final SessionHolder sessionHolder;
public SessionBindingCallableInterceptor(SessionFactory sessionFactory, SessionHolder sessionHolder) {
this.sessionFactory = sessionFactory;
this.sessionHolder = sessionHolder;
}
public void preProcess(NativeWebRequest request, Callable<?> task) {
initializeThread();
}
private void initializeThread() {
TransactionSynchronizationManager.bindResource(this.sessionFactory, this.sessionHolder);
}
public void postProcess(NativeWebRequest request, Callable<?> task, Object concurrentResult) {
TransactionSynchronizationManager.unbindResource(this.sessionFactory);
}
}
}

View File

@@ -16,6 +16,8 @@
package org.springframework.orm.hibernate4.support;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hibernate.FlushMode;
@@ -29,10 +31,11 @@ import org.springframework.orm.hibernate4.SessionHolder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.ui.ModelMap;
import org.springframework.web.context.request.AsyncWebRequestInterceptor;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.WebRequest;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.context.request.async.CallableProcessingInterceptor;
import org.springframework.web.context.request.async.WebAsyncManager;
import org.springframework.web.context.request.async.WebAsyncManager.WebAsyncThreadInitializer;
import org.springframework.web.context.request.async.WebAsyncUtils;
/**
* Spring web request interceptor that binds a Hibernate <code>Session</code> to the
@@ -109,7 +112,7 @@ public class OpenSessionInViewInterceptor implements AsyncWebRequestInterceptor
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
if (asyncManager.hasConcurrentResult()) {
if (asyncManager.initializeAsyncThread(participateAttributeName)) {
if (applySessionBindingInterceptor(asyncManager, participateAttributeName)) {
return;
}
}
@@ -126,8 +129,8 @@ public class OpenSessionInViewInterceptor implements AsyncWebRequestInterceptor
SessionHolder sessionHolder = new SessionHolder(session);
TransactionSynchronizationManager.bindResource(getSessionFactory(), sessionHolder);
WebAsyncThreadInitializer asyncThreadInitializer = createThreadInitializer(sessionHolder);
asyncManager.registerAsyncThreadInitializer(participateAttributeName, asyncThreadInitializer);
asyncManager.registerCallableInterceptor(participateAttributeName,
new SessionBindingCallableInterceptor(sessionHolder));
}
}
@@ -200,15 +203,37 @@ public class OpenSessionInViewInterceptor implements AsyncWebRequestInterceptor
return getSessionFactory().toString() + PARTICIPATE_SUFFIX;
}
private WebAsyncThreadInitializer createThreadInitializer(final SessionHolder sessionHolder) {
return new WebAsyncThreadInitializer() {
public void initialize() {
TransactionSynchronizationManager.bindResource(getSessionFactory(), sessionHolder);
}
public void reset() {
TransactionSynchronizationManager.unbindResource(getSessionFactory());
}
};
private boolean applySessionBindingInterceptor(WebAsyncManager asyncManager, String key) {
if (asyncManager.getCallableInterceptor(key) == null) {
return false;
}
((SessionBindingCallableInterceptor) asyncManager.getCallableInterceptor(key)).initializeThread();
return true;
}
/**
* Bind and unbind the Hibernate {@code Session} to the current thread.
*/
private class SessionBindingCallableInterceptor implements CallableProcessingInterceptor {
private final SessionHolder sessionHolder;
public SessionBindingCallableInterceptor(SessionHolder sessionHolder) {
this.sessionHolder = sessionHolder;
}
public void preProcess(NativeWebRequest request, Callable<?> task) {
initializeThread();
}
private void initializeThread() {
TransactionSynchronizationManager.bindResource(getSessionFactory(), this.sessionHolder);
}
public void postProcess(NativeWebRequest request, Callable<?> task, Object concurrentResult) {
TransactionSynchronizationManager.unbindResource(getSessionFactory());
}
}
}

View File

@@ -17,6 +17,7 @@
package org.springframework.orm.jpa.support;
import java.io.IOException;
import java.util.concurrent.Callable;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
@@ -32,9 +33,10 @@ import org.springframework.orm.jpa.EntityManagerHolder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.StringUtils;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.async.CallableProcessingInterceptor;
import org.springframework.web.context.request.async.WebAsyncManager;
import org.springframework.web.context.request.async.WebAsyncManager.WebAsyncThreadInitializer;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.context.support.WebApplicationContextUtils;
import org.springframework.web.filter.OncePerRequestFilter;
@@ -150,15 +152,14 @@ public class OpenEntityManagerInViewFilter extends OncePerRequestFilter {
participate = true;
}
else {
if (isFirstRequest || !asyncManager.initializeAsyncThread(key)) {
if (isFirstRequest || !applyEntityManagerBindingInterceptor(asyncManager, key)) {
logger.debug("Opening JPA EntityManager in OpenEntityManagerInViewFilter");
try {
EntityManager em = createEntityManager(emf);
EntityManagerHolder emHolder = new EntityManagerHolder(em);
TransactionSynchronizationManager.bindResource(emf, emHolder);
WebAsyncThreadInitializer initializer = createAsyncThreadInitializer(emf, emHolder);
asyncManager.registerAsyncThreadInitializer(key, initializer);
asyncManager.registerCallableInterceptor(key, new EntityManagerBindingCallableInterceptor(emf, emHolder));
}
catch (PersistenceException ex) {
throw new DataAccessResourceFailureException("Could not create JPA EntityManager", ex);
@@ -230,17 +231,40 @@ public class OpenEntityManagerInViewFilter extends OncePerRequestFilter {
return emf.createEntityManager();
}
private WebAsyncThreadInitializer createAsyncThreadInitializer(final EntityManagerFactory emFactory,
final EntityManagerHolder emHolder) {
private boolean applyEntityManagerBindingInterceptor(WebAsyncManager asyncManager, String key) {
if (asyncManager.getCallableInterceptor(key) == null) {
return false;
}
((EntityManagerBindingCallableInterceptor) asyncManager.getCallableInterceptor(key)).initializeThread();
return true;
}
return new WebAsyncThreadInitializer() {
public void initialize() {
TransactionSynchronizationManager.bindResource(emFactory, emHolder);
}
public void reset() {
TransactionSynchronizationManager.unbindResource(emFactory);
}
};
/**
* Bind and unbind the {@code EntityManager} to the current thread.
*/
private static class EntityManagerBindingCallableInterceptor implements CallableProcessingInterceptor {
private final EntityManagerFactory emFactory;
private final EntityManagerHolder emHolder;
public EntityManagerBindingCallableInterceptor(EntityManagerFactory emFactory, EntityManagerHolder emHolder) {
this.emFactory = emFactory;
this.emHolder = emHolder;
}
public void preProcess(NativeWebRequest request, Callable<?> task) {
initializeThread();
}
private void initializeThread() {
TransactionSynchronizationManager.bindResource(this.emFactory, this.emHolder);
}
public void postProcess(NativeWebRequest request, Callable<?> task, Object concurrentResult) {
TransactionSynchronizationManager.unbindResource(this.emFactory);
}
}
}

View File

@@ -16,6 +16,8 @@
package org.springframework.orm.jpa.support;
import java.util.concurrent.Callable;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceException;
@@ -27,10 +29,11 @@ import org.springframework.orm.jpa.EntityManagerHolder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.ui.ModelMap;
import org.springframework.web.context.request.AsyncWebRequestInterceptor;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.WebRequest;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.context.request.async.CallableProcessingInterceptor;
import org.springframework.web.context.request.async.WebAsyncManager;
import org.springframework.web.context.request.async.WebAsyncManager.WebAsyncThreadInitializer;
import org.springframework.web.context.request.async.WebAsyncUtils;
/**
* Spring web request interceptor that binds a JPA EntityManager to the
@@ -76,7 +79,7 @@ public class OpenEntityManagerInViewInterceptor extends EntityManagerFactoryAcce
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
if (asyncManager.hasConcurrentResult()) {
if (asyncManager.initializeAsyncThread(participateAttributeName)) {
if (applyCallableInterceptor(asyncManager, participateAttributeName)) {
return;
}
}
@@ -94,8 +97,8 @@ public class OpenEntityManagerInViewInterceptor extends EntityManagerFactoryAcce
EntityManagerHolder emHolder = new EntityManagerHolder(em);
TransactionSynchronizationManager.bindResource(getEntityManagerFactory(), emHolder);
WebAsyncThreadInitializer asyncThreadInitializer = createThreadInitializer(emHolder);
asyncManager.registerAsyncThreadInitializer(participateAttributeName, asyncThreadInitializer);
asyncManager.registerCallableInterceptor(participateAttributeName,
new EntityManagerBindingCallableInterceptor(emHolder));
}
catch (PersistenceException ex) {
throw new DataAccessResourceFailureException("Could not create JPA EntityManager", ex);
@@ -147,15 +150,39 @@ public class OpenEntityManagerInViewInterceptor extends EntityManagerFactoryAcce
return getEntityManagerFactory().toString() + PARTICIPATE_SUFFIX;
}
private WebAsyncThreadInitializer createThreadInitializer(final EntityManagerHolder emHolder) {
return new WebAsyncThreadInitializer() {
public void initialize() {
TransactionSynchronizationManager.bindResource(getEntityManagerFactory(), emHolder);
}
public void reset() {
TransactionSynchronizationManager.unbindResource(getEntityManagerFactory());
}
};
private boolean applyCallableInterceptor(WebAsyncManager asyncManager, String key) {
if (asyncManager.getCallableInterceptor(key) == null) {
return false;
}
((EntityManagerBindingCallableInterceptor) asyncManager.getCallableInterceptor(key)).initializeThread();
return true;
}
/**
* Bind and unbind the Hibernate {@code Session} to the current thread.
*/
private class EntityManagerBindingCallableInterceptor implements CallableProcessingInterceptor {
private final EntityManagerHolder emHolder;
public EntityManagerBindingCallableInterceptor(EntityManagerHolder emHolder) {
this.emHolder = emHolder;
}
public void preProcess(NativeWebRequest request, Callable<?> task) {
initializeThread();
}
private void initializeThread() {
TransactionSynchronizationManager.bindResource(getEntityManagerFactory(), this.emHolder);
}
public void postProcess(NativeWebRequest request, Callable<?> task, Object concurrentResult) {
TransactionSynchronizationManager.unbindResource(getEntityManagerFactory());
}
}
}