diff --git a/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java b/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java index d19517a008..fc368cd3c8 100644 --- a/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java +++ b/spring-web/src/main/java/org/springframework/web/context/request/async/WebAsyncManager.java @@ -30,10 +30,12 @@ import org.apache.commons.logging.LogFactory; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.core.task.SyncTaskExecutor; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.web.context.request.RequestAttributes; import org.springframework.web.context.request.async.DeferredResult.DeferredResultHandler; +import org.springframework.web.server.adapter.WebHttpHandlerBuilder; /** * The central class for managing asynchronous request processing, mainly intended @@ -61,6 +63,9 @@ public final class WebAsyncManager { private static final Object RESULT_NONE = new Object(); + private static final AsyncTaskExecutor DEFAULT_TASK_EXECUTOR = + new SimpleAsyncTaskExecutor(WebHttpHandlerBuilder.class.getSimpleName()); + private static final Log logger = LogFactory.getLog(WebAsyncManager.class); private static final CallableProcessingInterceptor timeoutCallableInterceptor = @@ -69,10 +74,12 @@ public final class WebAsyncManager { private static final DeferredResultProcessingInterceptor timeoutDeferredResultInterceptor = new TimeoutDeferredResultProcessingInterceptor(); + private static Boolean taskExecutorWarning = true; + private AsyncWebRequest asyncWebRequest; - private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(this.getClass().getSimpleName()); + private AsyncTaskExecutor taskExecutor = DEFAULT_TASK_EXECUTOR; private volatile Object concurrentResult = RESULT_NONE; @@ -277,6 +284,9 @@ public final class WebAsyncManager { if (executor != null) { this.taskExecutor = executor; } + else { + logExecutorWarning(); + } List interceptors = new ArrayList<>(); interceptors.add(webAsyncTask.getInterceptor()); @@ -330,6 +340,27 @@ public final class WebAsyncManager { } } + @SuppressWarnings("ConstantConditions") + private void logExecutorWarning() { + if (taskExecutorWarning && logger.isWarnEnabled()) { + synchronized (DEFAULT_TASK_EXECUTOR) { + AsyncTaskExecutor executor = this.taskExecutor; + if (taskExecutorWarning && + (executor instanceof SimpleAsyncTaskExecutor || executor instanceof SyncTaskExecutor)) { + String executorTypeName = executor.getClass().getSimpleName(); + logger.warn("\n!!!\n" + + "An Executor is required to handle java.util.concurrent.Callable return values.\n" + + "Please, configure a TaskExecutor in the MVC config under \"async support\".\n" + + "The " + executorTypeName + " currently in use is not suitable under load.\n" + + "-------------------------------\n" + + "Request URI: '" + formatRequestUri() + "'\n" + + "!!!"); + taskExecutorWarning = false; + } + } + } + } + private String formatRequestUri() { HttpServletRequest request = this.asyncWebRequest.getNativeRequest(HttpServletRequest.class); return request != null ? request.getRequestURI() : "servlet container"; diff --git a/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTests.java b/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTests.java index 91f9741938..4a1b65eb42 100644 --- a/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTests.java +++ b/spring-web/src/test/java/org/springframework/web/context/request/async/WebAsyncManagerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -115,7 +115,7 @@ public class WebAsyncManagerTests { verifyDefaultAsyncScenario(); verify(interceptor).beforeConcurrentHandling(this.asyncWebRequest, task); verify(interceptor).preProcess(this.asyncWebRequest, task); - verify(interceptor).postProcess(this.asyncWebRequest, task, new Integer(concurrentResult)); + verify(interceptor).postProcess(this.asyncWebRequest, task, concurrentResult); } @Test @@ -161,9 +161,9 @@ public class WebAsyncManagerTests { assertFalse(this.asyncManager.hasConcurrentResult()); - verify(this.asyncWebRequest).addTimeoutHandler((Runnable) notNull()); - verify(this.asyncWebRequest).addErrorHandler((Consumer) notNull()); - verify(this.asyncWebRequest).addCompletionHandler((Runnable) notNull()); + verify(this.asyncWebRequest).addTimeoutHandler(notNull()); + verify(this.asyncWebRequest).addErrorHandler(notNull()); + verify(this.asyncWebRequest).addCompletionHandler(notNull()); } @Test @@ -303,9 +303,9 @@ public class WebAsyncManagerTests { assertFalse(this.asyncManager.hasConcurrentResult()); - verify(this.asyncWebRequest).addTimeoutHandler((Runnable) notNull()); - verify(this.asyncWebRequest).addErrorHandler((Consumer) notNull()); - verify(this.asyncWebRequest).addCompletionHandler((Runnable) notNull()); + verify(this.asyncWebRequest).addTimeoutHandler(notNull()); + verify(this.asyncWebRequest).addErrorHandler(notNull()); + verify(this.asyncWebRequest).addCompletionHandler(notNull()); } @Test @@ -353,7 +353,7 @@ public class WebAsyncManagerTests { @Test public void startDeferredResultProcessingNullInput() throws Exception { try { - this.asyncManager.startDeferredResultProcessing((DeferredResult) null); + this.asyncManager.startDeferredResultProcessing(null); fail("Expected exception"); } catch (IllegalArgumentException ex) { @@ -368,9 +368,9 @@ public class WebAsyncManagerTests { @SuppressWarnings("unchecked") private void verifyDefaultAsyncScenario() { - verify(this.asyncWebRequest).addTimeoutHandler((Runnable) notNull()); - verify(this.asyncWebRequest).addErrorHandler((Consumer) notNull()); - verify(this.asyncWebRequest).addCompletionHandler((Runnable) notNull()); + verify(this.asyncWebRequest).addTimeoutHandler(notNull()); + verify(this.asyncWebRequest).addErrorHandler(notNull()); + verify(this.asyncWebRequest).addCompletionHandler(notNull()); verify(this.asyncWebRequest).startAsync(); verify(this.asyncWebRequest).dispatch(); } diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/config/annotation/AsyncSupportConfigurer.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/config/annotation/AsyncSupportConfigurer.java index 5a25caff7b..2d5e4747d8 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/config/annotation/AsyncSupportConfigurer.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/config/annotation/AsyncSupportConfigurer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,10 +24,10 @@ import java.util.concurrent.Callable; import org.springframework.core.task.AsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.lang.Nullable; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.web.context.request.async.CallableProcessingInterceptor; import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.DeferredResultProcessingInterceptor; -import org.springframework.web.context.request.async.WebAsyncTask; /** * Helps with configuring options for asynchronous request processing. @@ -49,16 +49,15 @@ public class AsyncSupportConfigurer { /** - * Set the default {@link AsyncTaskExecutor} to use when a controller method - * returns a {@link Callable}. Controller methods can override this default on - * a per-request basis by returning a {@link WebAsyncTask}. - *

By default a {@link SimpleAsyncTaskExecutor} instance is used, and it's - * highly recommended to change that default in production since the simple - * executor does not re-use threads. - *

As of 5.0 this executor is also used when a controller returns a reactive - * type that does streaming (e.g. "text/event-stream" or - * "application/stream+json") for the blocking writes to the - * {@link javax.servlet.ServletOutputStream}. + * The provided task executor is used to: + *

    + *
  1. Handle {@link Callable} controller method return values. + *
  2. Perform blocking writes when streaming to the response + * through a reactive (e.g. Reactor, RxJava) controller method return value. + *
+ *

By default only a {@link SimpleAsyncTaskExecutor} is used. However when + * using the above two use cases, it's recommended to configure an executor + * backed by a thread pool such as {@link ThreadPoolTaskExecutor}. * @param taskExecutor the task executor instance to use by default */ public AsyncSupportConfigurer setTaskExecutor(AsyncTaskExecutor taskExecutor) { diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java index e645e86fb4..3606c16ef8 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandler.java @@ -35,6 +35,7 @@ import org.springframework.core.MethodParameter; import org.springframework.core.ReactiveAdapter; import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.core.ResolvableType; +import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.SyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import org.springframework.http.MediaType; @@ -79,6 +80,8 @@ class ReactiveTypeHandler { private final TaskExecutor taskExecutor; + private Boolean taskExecutorWarning; + private final ContentNegotiationManager contentNegotiationManager; @@ -92,6 +95,7 @@ class ReactiveTypeHandler { Assert.notNull(manager, "ContentNegotiationManager is required"); this.reactiveRegistry = registry; this.taskExecutor = executor; + this.taskExecutorWarning = executor instanceof SimpleAsyncTaskExecutor || executor instanceof SyncTaskExecutor; this.contentNegotiationManager = manager; } @@ -127,16 +131,19 @@ class ReactiveTypeHandler { if (adapter.isMultiValue()) { if (mediaTypes.stream().anyMatch(MediaType.TEXT_EVENT_STREAM::includes) || ServerSentEvent.class.isAssignableFrom(elementClass)) { + logExecutorWarning(returnType); SseEmitter emitter = new SseEmitter(STREAMING_TIMEOUT_VALUE); new SseEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue); return emitter; } if (CharSequence.class.isAssignableFrom(elementClass)) { + logExecutorWarning(returnType); ResponseBodyEmitter emitter = getEmitter(mediaType.orElse(MediaType.TEXT_PLAIN)); new TextEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue); return emitter; } if (mediaTypes.stream().anyMatch(MediaType.APPLICATION_STREAM_JSON::includes)) { + logExecutorWarning(returnType); ResponseBodyEmitter emitter = getEmitter(MediaType.APPLICATION_STREAM_JSON); new JsonEmitterSubscriber(emitter, this.taskExecutor).connect(adapter, returnValue); return emitter; @@ -171,6 +178,27 @@ class ReactiveTypeHandler { }; } + @SuppressWarnings("ConstantConditions") + private void logExecutorWarning(MethodParameter returnType) { + if (this.taskExecutorWarning && logger.isWarnEnabled()) { + synchronized (this) { + if (this.taskExecutorWarning) { + String executorTypeName = this.taskExecutor.getClass().getSimpleName(); + logger.warn("\n!!!\n" + + "Streaming through a reactive type requires an Executor to write to the response.\n" + + "Please, configure a TaskExecutor in the MVC config under \"async support\".\n" + + "The " + executorTypeName + " currently in use is not suitable under load.\n" + + "-------------------------------\n" + + "Controller:\t" + returnType.getContainingClass().getName() + "\n" + + "Method:\t\t" + returnType.getMethod().getName() + "\n" + + "Returning:\t" + ResolvableType.forMethodParameter(returnType).toString() + "\n" + + "!!!"); + this.taskExecutorWarning = false; + } + } + } + } + private abstract static class AbstractEmitterSubscriber implements Subscriber, Runnable { diff --git a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java index f14c87a4c9..45d3b95c5e 100644 --- a/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java +++ b/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java @@ -81,7 +81,8 @@ public class ReactiveTypeHandlerTests { ContentNegotiationManagerFactoryBean factoryBean = new ContentNegotiationManagerFactoryBean(); factoryBean.afterPropertiesSet(); ContentNegotiationManager manager = factoryBean.getObject(); - this.handler = new ReactiveTypeHandler(ReactiveAdapterRegistry.getSharedInstance(), new SyncTaskExecutor(), manager); + ReactiveAdapterRegistry adapterRegistry = ReactiveAdapterRegistry.getSharedInstance(); + this.handler = new ReactiveTypeHandler(adapterRegistry, new SyncTaskExecutor(), manager); resetRequest(); } @@ -90,8 +91,8 @@ public class ReactiveTypeHandlerTests { this.servletResponse = new MockHttpServletResponse(); this.webRequest = new ServletWebRequest(this.servletRequest, this.servletResponse); - AsyncWebRequest asyncWebRequest = new StandardServletAsyncWebRequest(this.servletRequest, this.servletResponse); - WebAsyncUtils.getAsyncManager(this.webRequest).setAsyncWebRequest(asyncWebRequest); + AsyncWebRequest webRequest = new StandardServletAsyncWebRequest(this.servletRequest, this.servletResponse); + WebAsyncUtils.getAsyncManager(this.webRequest).setAsyncWebRequest(webRequest); this.servletRequest.setAsyncSupported(true); } @@ -122,7 +123,8 @@ public class ReactiveTypeHandlerTests { // RxJava 1 Single AtomicReference> ref = new AtomicReference<>(); Single single = Single.fromEmitter(ref::set); - testDeferredResultSubscriber(single, Single.class, forClass(String.class), () -> ref.get().onSuccess("foo"), "foo"); + testDeferredResultSubscriber(single, Single.class, forClass(String.class), + () -> ref.get().onSuccess("foo"), "foo"); // RxJava 2 Single AtomicReference> ref2 = new AtomicReference<>(); diff --git a/src/docs/asciidoc/web/webmvc.adoc b/src/docs/asciidoc/web/webmvc.adoc index a830d34a22..d3c0e5765a 100644 --- a/src/docs/asciidoc/web/webmvc.adoc +++ b/src/docs/asciidoc/web/webmvc.adoc @@ -3629,14 +3629,13 @@ Spring MVC supports Reactor and RxJava through the `spring-core` which allows it to adapt from multiple reactive libraries. ==== -When streaming to the response via reactive types, Spring MVC supports reactive back -pressure, but still needs to use blocking I/O to perform actual writes. This is done -through the <> MVC `TaskExecutor` on -a separate thread in order to avoid blocking the upstream source (e.g. a `Flux` returned -from the `WebClient`). By default a `SyncTaskExecutor` is used which is not suitable for -production. https://jira.spring.io/browse/SPR-16203[SPR-16203] will provide better -defaults in Spring Framework 5.1. In the mean time please configure the executor through -the <>. +For streaming to the response, reactive back pressure is supported, but writes to the +response are still blocking, and are executed on a separate thread through the +<> `TaskExecutor` in order to avoid +blocking the upstream source (e.g. a `Flux` returned from the `WebClient`). +By default `SimpleAsyncTaskExecutor` is used for the blocking writes but that is not +suitable under load. If you plan to stream with a reactive type, please use the +<> to configure a task executor.