Commit ce2c26e9 authored by Brian Clozel's avatar Brian Clozel

Use Reactor's new Schedulers.boundedElastic()

Prior to this commit, Spring Boot would use `Schedulers.elastic()` when
required to process blocking tasks in a reactive environment.
reactor/reactor-core#1804 introduced a new scheduler,
`Schedulers.boundedElastic()` that behaves quite similarly but:

* will limit the number of workers thread
* will queue tasks if no worker thread is available and reject them is
the queue is exceeds a limit

This allows Spring Boot to schedule blocking tasks as before and allows
greater flexibility.

Fixes gh-18269
See gh-18276
parent 9377b9a9
...@@ -223,7 +223,8 @@ public abstract class AbstractWebFluxEndpointHandlerMapping extends RequestMappi ...@@ -223,7 +223,8 @@ public abstract class AbstractWebFluxEndpointHandlerMapping extends RequestMappi
/** /**
* An {@link OperationInvoker} that performs the invocation of a blocking operation on * An {@link OperationInvoker} that performs the invocation of a blocking operation on
* a separate thread using Reactor's {@link Schedulers#elastic() elastic scheduler}. * a separate thread using Reactor's {@link Schedulers#boundedElastic() bounded
* elastic scheduler}.
*/ */
protected static final class ElasticSchedulerInvoker implements OperationInvoker { protected static final class ElasticSchedulerInvoker implements OperationInvoker {
...@@ -235,7 +236,7 @@ public abstract class AbstractWebFluxEndpointHandlerMapping extends RequestMappi ...@@ -235,7 +236,7 @@ public abstract class AbstractWebFluxEndpointHandlerMapping extends RequestMappi
@Override @Override
public Object invoke(InvocationContext context) { public Object invoke(InvocationContext context) {
return Mono.fromCallable(() -> this.invoker.invoke(context)).subscribeOn(Schedulers.elastic()); return Mono.fromCallable(() -> this.invoker.invoke(context)).subscribeOn(Schedulers.boundedElastic());
} }
} }
......
...@@ -43,7 +43,7 @@ public class HealthIndicatorReactiveAdapter implements ReactiveHealthIndicator { ...@@ -43,7 +43,7 @@ public class HealthIndicatorReactiveAdapter implements ReactiveHealthIndicator {
@Override @Override
public Mono<Health> health() { public Mono<Health> health() {
return Mono.fromCallable(this.delegate::health).subscribeOn(Schedulers.elastic()); return Mono.fromCallable(this.delegate::health).subscribeOn(Schedulers.boundedElastic());
} }
} }
...@@ -56,7 +56,8 @@ public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicato ...@@ -56,7 +56,8 @@ public class RedisReactiveHealthIndicator extends AbstractReactiveHealthIndicato
} }
private Mono<ReactiveRedisConnection> getConnection() { private Mono<ReactiveRedisConnection> getConnection() {
return Mono.fromSupplier(this.connectionFactory::getReactiveConnection).subscribeOn(Schedulers.parallel()); return Mono.fromSupplier(this.connectionFactory::getReactiveConnection)
.subscribeOn(Schedulers.boundedElastic());
} }
private Health up(Health.Builder builder, Properties info) { private Health up(Health.Builder builder, Properties info) {
......
...@@ -171,7 +171,7 @@ ...@@ -171,7 +171,7 @@
<quartz.version>2.3.1</quartz.version> <quartz.version>2.3.1</quartz.version>
<querydsl.version>4.2.1</querydsl.version> <querydsl.version>4.2.1</querydsl.version>
<rabbit-amqp-client.version>5.7.3</rabbit-amqp-client.version> <rabbit-amqp-client.version>5.7.3</rabbit-amqp-client.version>
<reactor-bom.version>Dysprosium-RC1</reactor-bom.version> <reactor-bom.version>Dysprosium-BUILD-SNAPSHOT</reactor-bom.version>
<rest-assured.version>3.3.0</rest-assured.version> <rest-assured.version>3.3.0</rest-assured.version>
<reactive-streams.version>1.0.3</reactive-streams.version> <reactive-streams.version>1.0.3</reactive-streams.version>
<rsocket.version>1.0.0-RC3</rsocket.version> <rsocket.version>1.0.0-RC3</rsocket.version>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment