Fix deprecations

- Due to switch to boot 2.4.x, fixing various
  deprecations where things mostly come from reactor.
This commit is contained in:
Janne Valkealahti
2020-10-31 09:02:46 +00:00
parent 7526cb0f54
commit 96eab9d061
4 changed files with 33 additions and 35 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,16 +23,18 @@ import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import reactor.util.concurrent.Queues;
public class ReactiveLifecycleManager implements StateMachineReactiveLifecycle {
private static final Log log = LogFactory.getLog(ReactiveLifecycleManager.class);
private final AtomicEnum state = new AtomicEnum(LifecycleState.STOPPED);
private EmitterProcessor<Mono<Void>> startRequestsProcessor;
private EmitterProcessor<Mono<Void>> stopRequestsProcessor;
private Many<Mono<Void>> startRequestsSink;
private Many<Mono<Void>> stopRequestsSink;
private Flux<Mono<Void>> startRequests;
private Flux<Mono<Void>> stopRequests;
private Supplier<Mono<Void>> preStartRequest;
@@ -55,10 +57,10 @@ public class ReactiveLifecycleManager implements StateMachineReactiveLifecycle {
this.preStopRequest = preStopRequest;
this.postStartRequest = postStartRequest;
this.postStopRequest = postStopRequest;
this.startRequestsProcessor = EmitterProcessor.<Mono<Void>>create(false);
this.stopRequestsProcessor = EmitterProcessor.<Mono<Void>>create(false);
this.startRequests = this.startRequestsProcessor.cache(1);
this.stopRequests = this.stopRequestsProcessor.cache(1);
this.startRequestsSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
this.stopRequestsSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
this.startRequests = this.startRequestsSink.asFlux().cache(1);
this.stopRequests = this.stopRequestsSink.asFlux().cache(1);
}
@Override
@@ -69,8 +71,7 @@ public class ReactiveLifecycleManager implements StateMachineReactiveLifecycle {
.filter(owns -> owns)
.flatMap(owns -> this.startRequests.next().flatMap(Function.identity()).doOnSuccess(aVoid -> {
state.set(LifecycleState.STARTED);
}))
;
}));
})
.then(Mono.defer(postStartRequest))
.then(Mono.defer(() -> {
@@ -79,8 +80,7 @@ public class ReactiveLifecycleManager implements StateMachineReactiveLifecycle {
return stopReactively();
}
return Mono.empty();
}))
;
}));
}
@Override
@@ -145,10 +145,10 @@ public class ReactiveLifecycleManager implements StateMachineReactiveLifecycle {
log.debug("Lifecycle from " + expect + " to " + update + " in " + ReactiveLifecycleManager.this);
if (update == LifecycleState.STARTING) {
log.debug("Next start request with doStartReactively in " + ReactiveLifecycleManager.this);
startRequestsProcessor.onNext(preStartRequest.get());
startRequestsSink.emitNext(preStartRequest.get(), Sinks.EmitFailureHandler.FAIL_FAST);
} else if (update == LifecycleState.STOPPING) {
log.debug("Next stop request with doStopReactively in " + ReactiveLifecycleManager.this);
stopRequestsProcessor.onNext(preStopRequest.get());
stopRequestsSink.emitNext(preStopRequest.get(), Sinks.EmitFailureHandler.FAIL_FAST);
}
}
return set;

View File

@@ -28,6 +28,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -53,10 +54,11 @@ import org.springframework.statemachine.trigger.Trigger;
import org.springframework.statemachine.trigger.TriggerListener;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
/**
@@ -86,8 +88,7 @@ public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport i
private volatile Message<E> forwardedInitialEvent;
private volatile Message<E> queuedMessage = null;
private StateMachineExecutorTransit<S, E> stateMachineExecutorTransit;
private EmitterProcessor<TriggerQueueItem> triggerProcessor = EmitterProcessor.create(false);
private FluxSink<TriggerQueueItem> triggerSink;
private Many<TriggerQueueItem> triggerSink;
private Flux<Void> triggerFlux;
private Disposable triggerDisposable;
@@ -111,8 +112,8 @@ public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport i
@Override
protected void onInit() throws Exception {
triggerSink = triggerProcessor.sink();
triggerFlux = Flux.from(triggerProcessor).flatMap(trigger -> handleTrigger(trigger));
triggerSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
triggerFlux = triggerSink.asFlux().flatMap(trigger -> handleTrigger(trigger));
}
@Override
@@ -156,7 +157,7 @@ public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport i
if (log.isDebugEnabled()) {
log.debug("Queue trigger " + trigger);
}
triggerSink.next(new TriggerQueueItem(trigger, message, null, null));
triggerSink.emitNext(new TriggerQueueItem(trigger, message, null, null), Sinks.EmitFailureHandler.FAIL_FAST);
}
@Override
@@ -208,10 +209,8 @@ public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport i
return messages
.flatMap(m -> handleEvent(m, callback, triggerCallback))
.doOnNext(i -> {
try {
triggerSink.next(i);
} catch (Exception e) {
log.error("Unable to handle queued event", e);
while (triggerSink.tryEmitNext(i).isFailure()) {
LockSupport.parkNanos(10);
}
})
.then()
@@ -313,7 +312,7 @@ public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport i
return ret;
})
.onErrorResume(resumeTriggerErrorToContext())
.and(Mono.subscriberContext()
.and(Mono.deferContextual(Mono::just)
.doOnNext(ctx -> {
if (queueItem.callback != null) {
Optional<ExecutorExceptionHolder> holder = ctx.getOrEmpty(StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS);
@@ -336,11 +335,10 @@ public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport i
}
});
}
}))
.subscriberContext(Context.of(StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS,
new ExecutorExceptionHolder(), REACTOR_CONTEXT_TRIGGER_ERRORS, new ExecutorExceptionHolder()));
.contextWrite(Context.of(
StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS, new ExecutorExceptionHolder(),
REACTOR_CONTEXT_TRIGGER_ERRORS, new ExecutorExceptionHolder()));
}
@@ -491,7 +489,7 @@ public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport i
}
private static Function<? super Throwable, Mono<Void>> resumeTriggerErrorToContext() {
return t -> Mono.subscriberContext()
return t -> Mono.deferContextual(Mono::just)
.doOnNext(ctx -> {
Optional<ExecutorExceptionHolder> holder = ctx.getOrEmpty(REACTOR_CONTEXT_TRIGGER_ERRORS);
holder.ifPresent(h -> {

View File

@@ -204,7 +204,7 @@ public abstract class StateMachineUtils {
* @return mono for completion
*/
public static Function<? super Throwable, Mono<Void>> resumeErrorToContext() {
return t -> Mono.subscriberContext()
return t -> Mono.deferContextual(Mono::just)
.doOnNext(ctx -> {
Optional<ExecutorExceptionHolder> holder = ctx.getOrEmpty(StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS);
holder.ifPresent(h -> {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -66,12 +66,12 @@ public class Persist {
//tag::snippetB[]
public void change(int order, String event) {
Order o = jdbcTemplate.queryForObject("select id, state from orders where id = ?", new Object[] { order },
Order o = jdbcTemplate.queryForObject("select id, state from orders where id = ?",
new RowMapper<Order>() {
public Order mapRow(ResultSet rs, int rowNum) throws SQLException {
return new Order(rs.getInt("id"), rs.getString("state"));
}
});
}, new Object[] { order });
handler.handleEventWithStateReactively(MessageBuilder
.withPayload(event).setHeader("order", order).build(), o.state)
.subscribe();