Refactor Servlet 3 async support

As a result of the refactoring, the AsyncContext dispatch mechanism is
used much more centrally. Effectively every asynchronously processed
request involves one initial (container) thread, a second thread to
produce the handler return value asynchronously, and a third thread
as a result of a dispatch back to the container to resume processing
of the asynchronous resuilt.

Other updates include the addition of a MockAsyncContext and support
of related request method in the test packages of spring-web and
spring-webmvc. Also an upgrade of a Jetty test dependency required
to make tests pass.

Issue: SPR-9433
This commit is contained in:
Rossen Stoyanchev
2012-07-24 16:00:05 -04:00
parent 026ee846c7
commit 529e62921d
47 changed files with 1837 additions and 1741 deletions

View File

@@ -17,6 +17,7 @@
package org.springframework.orm.hibernate3.support;
import java.io.IOException;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@@ -25,14 +26,15 @@ import javax.servlet.http.HttpServletResponse;
import org.hibernate.FlushMode;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.orm.hibernate3.SessionFactoryUtils;
import org.springframework.orm.hibernate3.SessionHolder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.request.async.AbstractDelegatingCallable;
import org.springframework.web.context.request.async.AsyncExecutionChain;
import org.springframework.web.context.request.async.AsyncWebUtils;
import org.springframework.web.context.request.async.WebAsyncManager;
import org.springframework.web.context.request.async.WebAsyncManager.AsyncThreadInitializer;
import org.springframework.web.context.support.WebApplicationContextUtils;
import org.springframework.web.filter.OncePerRequestFilter;
@@ -165,16 +167,27 @@ public class OpenSessionInViewFilter extends OncePerRequestFilter {
}
/**
* The default value is "true" so that the filter may re-bind the opened
* {@code Session} to each asynchronously dispatched thread and postpone
* closing it until the very last asynchronous dispatch.
*/
@Override
protected boolean shouldFilterAsyncDispatches() {
return true;
}
@Override
protected void doFilterInternal(
HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException {
AsyncExecutionChain chain = AsyncExecutionChain.getForCurrentRequest(request);
SessionFactory sessionFactory = lookupSessionFactory(request);
boolean participate = false;
WebAsyncManager asyncManager = AsyncWebUtils.getAsyncManager(request);
String key = getAlreadyFilteredAttributeName();
if (isSingleSession()) {
// single session mode
if (TransactionSynchronizationManager.hasResource(sessionFactory)) {
@@ -182,16 +195,20 @@ public class OpenSessionInViewFilter extends OncePerRequestFilter {
participate = true;
}
else {
logger.debug("Opening single Hibernate Session in OpenSessionInViewFilter");
Session session = getSession(sessionFactory);
SessionHolder sessionHolder = new SessionHolder(session);
TransactionSynchronizationManager.bindResource(sessionFactory, sessionHolder);
if (!isAsyncDispatch(request) || !asyncManager.applyAsyncThreadInitializer(key)) {
logger.debug("Opening single Hibernate Session in OpenSessionInViewFilter");
Session session = getSession(sessionFactory);
SessionHolder sessionHolder = new SessionHolder(session);
TransactionSynchronizationManager.bindResource(sessionFactory, sessionHolder);
chain.push(getAsyncCallable(request, sessionFactory, sessionHolder));
AsyncThreadInitializer initializer = createAsyncThreadInitializer(sessionFactory, sessionHolder);
asyncManager.registerAsyncThreadInitializer(key, initializer);
}
}
}
else {
// deferred close mode
Assert.state(isLastRequestThread(request), "Deferred close mode is not supported on async dispatches");
if (SessionFactoryUtils.isDeferredCloseActive(sessionFactory)) {
// Do not modify deferred close: just set the participate flag.
participate = true;
@@ -210,16 +227,12 @@ public class OpenSessionInViewFilter extends OncePerRequestFilter {
// single session mode
SessionHolder sessionHolder =
(SessionHolder) TransactionSynchronizationManager.unbindResource(sessionFactory);
if (!chain.pop()) {
return;
if (isLastRequestThread(request)) {
logger.debug("Closing single Hibernate Session in OpenSessionInViewFilter");
closeSession(sessionHolder.getSession(), sessionFactory);
}
logger.debug("Closing single Hibernate Session in OpenSessionInViewFilter");
closeSession(sessionHolder.getSession(), sessionFactory);
}
else {
if (chain.isAsyncStarted()) {
throw new IllegalStateException("Deferred close is not supported with async requests.");
}
// deferred close mode
SessionFactoryUtils.processDeferredClose(sessionFactory);
}
@@ -227,6 +240,19 @@ public class OpenSessionInViewFilter extends OncePerRequestFilter {
}
}
private AsyncThreadInitializer createAsyncThreadInitializer(final SessionFactory sessionFactory,
final SessionHolder sessionHolder) {
return new AsyncThreadInitializer() {
public void initialize() {
TransactionSynchronizationManager.bindResource(sessionFactory, sessionHolder);
}
public void reset() {
TransactionSynchronizationManager.unbindResource(sessionFactory);
}
};
}
/**
* Look up the SessionFactory that this filter should use,
* taking the current HTTP request as argument.
@@ -291,28 +317,4 @@ public class OpenSessionInViewFilter extends OncePerRequestFilter {
SessionFactoryUtils.closeSession(session);
}
/**
* Create a Callable to extend the use of the open Hibernate Session to the
* async thread completing the request.
*/
private AbstractDelegatingCallable getAsyncCallable(final HttpServletRequest request,
final SessionFactory sessionFactory, final SessionHolder sessionHolder) {
return new AbstractDelegatingCallable() {
public Object call() throws Exception {
TransactionSynchronizationManager.bindResource(sessionFactory, sessionHolder);
try {
getNext().call();
}
finally {
SessionHolder sessionHolder =
(SessionHolder) TransactionSynchronizationManager.unbindResource(sessionFactory);
logger.debug("Closing Hibernate Session in OpenSessionInViewFilter");
SessionFactoryUtils.closeSession(sessionHolder.getSession());
}
return null;
}
};
}
}

View File

@@ -25,8 +25,10 @@ import org.springframework.orm.hibernate3.SessionHolder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.ui.ModelMap;
import org.springframework.web.context.request.WebRequest;
import org.springframework.web.context.request.async.AbstractDelegatingCallable;
import org.springframework.web.context.request.async.AsyncWebRequestInterceptor;
import org.springframework.web.context.request.async.AsyncWebUtils;
import org.springframework.web.context.request.async.WebAsyncManager;
import org.springframework.web.context.request.async.WebAsyncManager.AsyncThreadInitializer;
/**
* Spring web request interceptor that binds a Hibernate <code>Session</code> to the
@@ -140,10 +142,19 @@ public class OpenSessionInViewInterceptor extends HibernateAccessor implements A
* @see org.springframework.orm.hibernate3.SessionFactoryUtils#getSession
*/
public void preHandle(WebRequest request) throws DataAccessException {
WebAsyncManager asyncManager = AsyncWebUtils.getAsyncManager(request);
String participateAttributeName = getParticipateAttributeName();
if (asyncManager.hasConcurrentResult()) {
if (asyncManager.applyAsyncThreadInitializer(participateAttributeName)) {
return;
}
}
if ((isSingleSession() && TransactionSynchronizationManager.hasResource(getSessionFactory())) ||
SessionFactoryUtils.isDeferredCloseActive(getSessionFactory())) {
// Do not modify the Session: just mark the request accordingly.
String participateAttributeName = getParticipateAttributeName();
Integer count = (Integer) request.getAttribute(participateAttributeName, WebRequest.SCOPE_REQUEST);
int newCount = (count != null ? count + 1 : 1);
request.setAttribute(getParticipateAttributeName(), newCount, WebRequest.SCOPE_REQUEST);
@@ -157,6 +168,9 @@ public class OpenSessionInViewInterceptor extends HibernateAccessor implements A
applyFlushMode(session, false);
SessionHolder sessionHolder = new SessionHolder(session);
TransactionSynchronizationManager.bindResource(getSessionFactory(), sessionHolder);
AsyncThreadInitializer asyncThreadInitializer = createThreadInitializer(sessionHolder);
asyncManager.registerAsyncThreadInitializer(participateAttributeName, asyncThreadInitializer);
}
else {
// deferred close mode
@@ -165,44 +179,6 @@ public class OpenSessionInViewInterceptor extends HibernateAccessor implements A
}
}
/**
* Create a <code>Callable</code> to bind the <code>Hibernate</code> session
* to the async request thread.
*/
public AbstractDelegatingCallable getAsyncCallable(WebRequest request) {
String attributeName = getParticipateAttributeName();
if ((request.getAttribute(attributeName, WebRequest.SCOPE_REQUEST) != null) || !isSingleSession()) {
return null;
}
final SessionHolder sessionHolder =
(SessionHolder) TransactionSynchronizationManager.getResource(getSessionFactory());
return new AbstractDelegatingCallable() {
public Object call() throws Exception {
TransactionSynchronizationManager.bindResource(getSessionFactory(), sessionHolder);
getNext().call();
return null;
}
};
}
/**
* Unbind the Hibernate <code>Session</code> from the main thread but leave
* the <code>Session</code> open for further use from the async thread.
*/
public void postHandleAsyncStarted(WebRequest request) {
String attributeName = getParticipateAttributeName();
if (request.getAttribute(attributeName, WebRequest.SCOPE_REQUEST) == null) {
if (isSingleSession()) {
TransactionSynchronizationManager.unbindResource(getSessionFactory());
}
else {
throw new IllegalStateException("Deferred close is not supported with async requests.");
}
}
}
/**
* Flush the Hibernate <code>Session</code> before view rendering, if necessary.
* <p>Note that this just applies in {@link #isSingleSession() single session mode}!
@@ -232,18 +208,7 @@ public class OpenSessionInViewInterceptor extends HibernateAccessor implements A
* @see org.springframework.transaction.support.TransactionSynchronizationManager
*/
public void afterCompletion(WebRequest request, Exception ex) throws DataAccessException {
String participateAttributeName = getParticipateAttributeName();
Integer count = (Integer) request.getAttribute(participateAttributeName, WebRequest.SCOPE_REQUEST);
if (count != null) {
// Do not modify the Session: just clear the marker.
if (count > 1) {
request.setAttribute(participateAttributeName, count - 1, WebRequest.SCOPE_REQUEST);
}
else {
request.removeAttribute(participateAttributeName, WebRequest.SCOPE_REQUEST);
}
}
else {
if (!decrementParticipateCount(request)) {
if (isSingleSession()) {
// single session mode
SessionHolder sessionHolder =
@@ -258,6 +223,34 @@ public class OpenSessionInViewInterceptor extends HibernateAccessor implements A
}
}
public void afterConcurrentHandlingStarted(WebRequest request) {
if (!decrementParticipateCount(request)) {
if (isSingleSession()) {
TransactionSynchronizationManager.unbindResource(getSessionFactory());
}
else {
throw new IllegalStateException("Deferred close mode is not supported with async requests.");
}
}
}
private boolean decrementParticipateCount(WebRequest request) {
String participateAttributeName = getParticipateAttributeName();
Integer count = (Integer) request.getAttribute(participateAttributeName, WebRequest.SCOPE_REQUEST);
if (count == null) {
return false;
}
// Do not modify the Session: just clear the marker.
if (count > 1) {
request.setAttribute(participateAttributeName, count - 1, WebRequest.SCOPE_REQUEST);
}
else {
request.removeAttribute(participateAttributeName, WebRequest.SCOPE_REQUEST);
}
return true;
}
/**
* Return the name of the request attribute that identifies that a request is
* already intercepted.
@@ -268,4 +261,15 @@ public class OpenSessionInViewInterceptor extends HibernateAccessor implements A
return getSessionFactory().toString() + PARTICIPATE_SUFFIX;
}
private AsyncThreadInitializer createThreadInitializer(final SessionHolder sessionHolder) {
return new AsyncThreadInitializer() {
public void initialize() {
TransactionSynchronizationManager.bindResource(getSessionFactory(), sessionHolder);
}
public void reset() {
TransactionSynchronizationManager.unbindResource(getSessionFactory());
}
};
}
}

View File

@@ -17,6 +17,7 @@
package org.springframework.orm.hibernate4.support;
import java.io.IOException;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@@ -26,14 +27,14 @@ import org.hibernate.FlushMode;
import org.hibernate.HibernateException;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.orm.hibernate4.SessionFactoryUtils;
import org.springframework.orm.hibernate4.SessionHolder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.request.async.AbstractDelegatingCallable;
import org.springframework.web.context.request.async.AsyncExecutionChain;
import org.springframework.web.context.request.async.AsyncWebUtils;
import org.springframework.web.context.request.async.WebAsyncManager;
import org.springframework.web.context.request.async.WebAsyncManager.AsyncThreadInitializer;
import org.springframework.web.context.support.WebApplicationContextUtils;
import org.springframework.web.filter.OncePerRequestFilter;
@@ -99,27 +100,41 @@ public class OpenSessionInViewFilter extends OncePerRequestFilter {
}
/**
* The default value is "true" so that the filter may re-bind the opened
* {@code Session} to each asynchronously dispatched thread and postpone
* closing it until the very last asynchronous dispatch.
*/
@Override
protected boolean shouldFilterAsyncDispatches() {
return true;
}
@Override
protected void doFilterInternal(
HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException {
AsyncExecutionChain chain = AsyncExecutionChain.getForCurrentRequest(request);
SessionFactory sessionFactory = lookupSessionFactory(request);
boolean participate = false;
WebAsyncManager asyncManager = AsyncWebUtils.getAsyncManager(request);
String key = getAlreadyFilteredAttributeName();
if (TransactionSynchronizationManager.hasResource(sessionFactory)) {
// Do not modify the Session: just set the participate flag.
participate = true;
}
else {
logger.debug("Opening Hibernate Session in OpenSessionInViewFilter");
Session session = openSession(sessionFactory);
SessionHolder sessionHolder = new SessionHolder(session);
TransactionSynchronizationManager.bindResource(sessionFactory, sessionHolder);
if (!isAsyncDispatch(request) || !asyncManager.applyAsyncThreadInitializer(key)) {
logger.debug("Opening Hibernate Session in OpenSessionInViewFilter");
Session session = openSession(sessionFactory);
SessionHolder sessionHolder = new SessionHolder(session);
TransactionSynchronizationManager.bindResource(sessionFactory, sessionHolder);
chain.push(getAsyncCallable(request, sessionFactory, sessionHolder));
AsyncThreadInitializer initializer = createAsyncThreadInitializer(sessionFactory, sessionHolder);
asyncManager.registerAsyncThreadInitializer(key, initializer);
}
}
try {
@@ -130,15 +145,27 @@ public class OpenSessionInViewFilter extends OncePerRequestFilter {
if (!participate) {
SessionHolder sessionHolder =
(SessionHolder) TransactionSynchronizationManager.unbindResource(sessionFactory);
if (!chain.pop()) {
return;
if (isLastRequestThread(request)) {
logger.debug("Closing Hibernate Session in OpenSessionInViewFilter");
SessionFactoryUtils.closeSession(sessionHolder.getSession());
}
logger.debug("Closing Hibernate Session in OpenSessionInViewFilter");
SessionFactoryUtils.closeSession(sessionHolder.getSession());
}
}
}
private AsyncThreadInitializer createAsyncThreadInitializer(final SessionFactory sessionFactory,
final SessionHolder sessionHolder) {
return new AsyncThreadInitializer() {
public void initialize() {
TransactionSynchronizationManager.bindResource(sessionFactory, sessionHolder);
}
public void reset() {
TransactionSynchronizationManager.unbindResource(sessionFactory);
}
};
}
/**
* Look up the SessionFactory that this filter should use,
* taking the current HTTP request as argument.
@@ -187,28 +214,4 @@ public class OpenSessionInViewFilter extends OncePerRequestFilter {
}
}
/**
* Create a Callable to extend the use of the open Hibernate Session to the
* async thread completing the request.
*/
private AbstractDelegatingCallable getAsyncCallable(final HttpServletRequest request,
final SessionFactory sessionFactory, final SessionHolder sessionHolder) {
return new AbstractDelegatingCallable() {
public Object call() throws Exception {
TransactionSynchronizationManager.bindResource(sessionFactory, sessionHolder);
try {
getNext().call();
}
finally {
SessionHolder sessionHolder =
(SessionHolder) TransactionSynchronizationManager.unbindResource(sessionFactory);
logger.debug("Closing Hibernate Session in OpenSessionInViewFilter");
SessionFactoryUtils.closeSession(sessionHolder.getSession());
}
return null;
}
};
}
}

View File

@@ -29,9 +29,10 @@ import org.springframework.orm.hibernate4.SessionHolder;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.ui.ModelMap;
import org.springframework.web.context.request.WebRequest;
import org.springframework.web.context.request.async.AbstractDelegatingCallable;
import org.springframework.web.context.request.async.AsyncExecutionChain;
import org.springframework.web.context.request.async.AsyncWebRequestInterceptor;
import org.springframework.web.context.request.async.AsyncWebUtils;
import org.springframework.web.context.request.async.WebAsyncManager;
import org.springframework.web.context.request.async.WebAsyncManager.AsyncThreadInitializer;
/**
* Spring web request interceptor that binds a Hibernate <code>Session</code> to the
@@ -103,9 +104,18 @@ public class OpenSessionInViewInterceptor implements AsyncWebRequestInterceptor
* {@link org.springframework.transaction.support.TransactionSynchronizationManager}.
*/
public void preHandle(WebRequest request) throws DataAccessException {
WebAsyncManager asyncManager = AsyncWebUtils.getAsyncManager(request);
String participateAttributeName = getParticipateAttributeName();
if (asyncManager.hasConcurrentResult()) {
if (asyncManager.applyAsyncThreadInitializer(participateAttributeName)) {
return;
}
}
if (TransactionSynchronizationManager.hasResource(getSessionFactory())) {
// Do not modify the Session: just mark the request accordingly.
String participateAttributeName = getParticipateAttributeName();
Integer count = (Integer) request.getAttribute(participateAttributeName, WebRequest.SCOPE_REQUEST);
int newCount = (count != null ? count + 1 : 1);
request.setAttribute(getParticipateAttributeName(), newCount, WebRequest.SCOPE_REQUEST);
@@ -115,69 +125,51 @@ public class OpenSessionInViewInterceptor implements AsyncWebRequestInterceptor
Session session = openSession();
SessionHolder sessionHolder = new SessionHolder(session);
TransactionSynchronizationManager.bindResource(getSessionFactory(), sessionHolder);
AsyncThreadInitializer asyncThreadInitializer = createThreadInitializer(sessionHolder);
asyncManager.registerAsyncThreadInitializer(participateAttributeName, asyncThreadInitializer);
}
}
public void postHandle(WebRequest request, ModelMap model) {
}
/**
* Create a <code>Callable</code> to bind the <code>Hibernate</code> session
* to the async request thread.
*/
public AbstractDelegatingCallable getAsyncCallable(WebRequest request) {
String attributeName = getParticipateAttributeName();
if (request.getAttribute(attributeName, WebRequest.SCOPE_REQUEST) != null) {
return null;
}
final SessionHolder sessionHolder =
(SessionHolder) TransactionSynchronizationManager.getResource(getSessionFactory());
return new AbstractDelegatingCallable() {
public Object call() throws Exception {
TransactionSynchronizationManager.bindResource(getSessionFactory(), sessionHolder);
getNext().call();
return null;
}
};
}
/**
* Unbind the Hibernate <code>Session</code> from the main thread leaving
* it open for further use from an async thread.
*/
public void postHandleAsyncStarted(WebRequest request) {
String attributeName = getParticipateAttributeName();
if (request.getAttribute(attributeName, WebRequest.SCOPE_REQUEST) == null) {
TransactionSynchronizationManager.unbindResource(getSessionFactory());
}
}
/**
* Unbind the Hibernate <code>Session</code> from the thread and close it).
* @see org.springframework.transaction.support.TransactionSynchronizationManager
*/
public void afterCompletion(WebRequest request, Exception ex) throws DataAccessException {
String participateAttributeName = getParticipateAttributeName();
Integer count = (Integer) request.getAttribute(participateAttributeName, WebRequest.SCOPE_REQUEST);
if (count != null) {
// Do not modify the Session: just clear the marker.
if (count > 1) {
request.setAttribute(participateAttributeName, count - 1, WebRequest.SCOPE_REQUEST);
}
else {
request.removeAttribute(participateAttributeName, WebRequest.SCOPE_REQUEST);
}
}
else {
if (!decrementParticipateCount(request)) {
SessionHolder sessionHolder =
(SessionHolder) TransactionSynchronizationManager.unbindResource(getSessionFactory());
logger.debug("Closing Hibernate Session in OpenSessionInViewInterceptor");
SessionFactoryUtils.closeSession(sessionHolder.getSession());
}
}
public void afterConcurrentHandlingStarted(WebRequest request) {
if (!decrementParticipateCount(request)) {
TransactionSynchronizationManager.unbindResource(getSessionFactory());
}
}
private boolean decrementParticipateCount(WebRequest request) {
String participateAttributeName = getParticipateAttributeName();
Integer count = (Integer) request.getAttribute(participateAttributeName, WebRequest.SCOPE_REQUEST);
if (count == null) {
return false;
}
// Do not modify the Session: just clear the marker.
if (count > 1) {
request.setAttribute(participateAttributeName, count - 1, WebRequest.SCOPE_REQUEST);
}
else {
request.removeAttribute(participateAttributeName, WebRequest.SCOPE_REQUEST);
}
return true;
}
/**
* Open a Session for the SessionFactory that this interceptor uses.
* <p>The default implementation delegates to the
@@ -208,4 +200,15 @@ public class OpenSessionInViewInterceptor implements AsyncWebRequestInterceptor
return getSessionFactory().toString() + PARTICIPATE_SUFFIX;
}
private AsyncThreadInitializer createThreadInitializer(final SessionHolder sessionHolder) {
return new AsyncThreadInitializer() {
public void initialize() {
TransactionSynchronizationManager.bindResource(getSessionFactory(), sessionHolder);
}
public void reset() {
TransactionSynchronizationManager.unbindResource(getSessionFactory());
}
};
}
}