Limit trigger handling concurrency
- Change flatMap to use concurrency 1 in a trigger handling which should fix issues when events are sent fast. - Backport #942 - Fixes #943
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2020 the original author or authors.
|
||||
* Copyright 2019-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -115,7 +115,8 @@ public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport i
|
||||
@Override
|
||||
protected void onInit() throws Exception {
|
||||
triggerSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
|
||||
triggerFlux = triggerSink.asFlux().flatMap(trigger -> handleTrigger(trigger));
|
||||
// limit concurrency so that we get one by one handling
|
||||
triggerFlux = triggerSink.asFlux().flatMap(trigger -> handleTrigger(trigger), 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -328,10 +329,11 @@ public class ReactiveStateMachineExecutor<S, E> extends LifecycleObjectSupport i
|
||||
}
|
||||
});
|
||||
}
|
||||
}))
|
||||
.contextWrite(Context.of(
|
||||
StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS, new ExecutorExceptionHolder(),
|
||||
REACTOR_CONTEXT_TRIGGER_ERRORS, new ExecutorExceptionHolder()));
|
||||
})
|
||||
)
|
||||
.contextWrite(Context.of(
|
||||
StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS, new ExecutorExceptionHolder(),
|
||||
REACTOR_CONTEXT_TRIGGER_ERRORS, new ExecutorExceptionHolder()));
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2015-2020 the original author or authors.
|
||||
* Copyright 2015-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -23,6 +23,8 @@ import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.time.Duration;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
@@ -44,6 +46,8 @@ import reactor.test.StepVerifier;
|
||||
*/
|
||||
public class TestUtils {
|
||||
|
||||
private static Log log = LogFactory.getLog(TestUtils.class);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <S, E> StateMachine<S, E> resolveMachine(BeanFactory beanFactory) {
|
||||
assertThat(beanFactory.containsBean(DEFAULT_ID_STATEMACHINE)).isTrue();
|
||||
@@ -99,7 +103,10 @@ public class TestUtils {
|
||||
|
||||
public static <S, E> void doSendEventAndConsumeAll(StateMachine<S, E> stateMachine, E event) {
|
||||
StepVerifier.create(stateMachine.sendEvent(eventAsMono(event)))
|
||||
.thenConsumeWhile(eventResult -> true)
|
||||
.thenConsumeWhile(eventResult -> {
|
||||
log.debug("Consume eventResult " + eventResult);
|
||||
return true;
|
||||
})
|
||||
.expectComplete()
|
||||
.verify(Duration.ofSeconds(5));
|
||||
}
|
||||
@@ -126,6 +133,30 @@ public class TestUtils {
|
||||
.verifyComplete();
|
||||
}
|
||||
|
||||
@SafeVarargs
|
||||
public static <S, E> void doSendEventsAndConsumeAll(StateMachine<S, E> stateMachine, E... events) {
|
||||
StepVerifier.create(stateMachine.sendEvents(eventsAsFlux(events)))
|
||||
.thenConsumeWhile(eventResult -> {
|
||||
log.debug("Consume eventResult " + eventResult);
|
||||
return true;
|
||||
})
|
||||
.expectComplete()
|
||||
.verify(Duration.ofSeconds(5));
|
||||
}
|
||||
|
||||
@SafeVarargs
|
||||
public static <S, E> void doSendEventsAndConsumeAllWithComplete(StateMachine<S, E> stateMachine, E... events) {
|
||||
Flux<Void> completions = stateMachine.sendEvents(eventsAsFlux(events))
|
||||
.doOnNext(result -> {
|
||||
log.debug("Consume eventResult " + result);
|
||||
})
|
||||
.flatMap(result -> result.complete());
|
||||
StepVerifier.create(completions)
|
||||
.thenConsumeWhile(complete -> true)
|
||||
.expectComplete()
|
||||
.verify(Duration.ofSeconds(10));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> T readField(String name, Object target) throws Exception {
|
||||
Field field = null;
|
||||
|
||||
@@ -0,0 +1,131 @@
|
||||
/*
|
||||
* Copyright 2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.statemachine.action;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.springframework.statemachine.TestUtils.doSendEventsAndConsumeAllWithComplete;
|
||||
import static org.springframework.statemachine.TestUtils.doStartAndAssert;
|
||||
import static org.springframework.statemachine.TestUtils.resolveMachine;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.statemachine.AbstractStateMachineTests;
|
||||
import org.springframework.statemachine.StateContext;
|
||||
import org.springframework.statemachine.StateMachine;
|
||||
import org.springframework.statemachine.config.EnableStateMachine;
|
||||
import org.springframework.statemachine.config.EnumStateMachineConfigurerAdapter;
|
||||
import org.springframework.statemachine.config.builders.StateMachineStateConfigurer;
|
||||
import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer;
|
||||
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* Tests for state machine reactive actions.
|
||||
*
|
||||
* @author Janne Valkealahti
|
||||
*
|
||||
*/
|
||||
public class ReactiveAction2Tests extends AbstractStateMachineTests {
|
||||
|
||||
@Test
|
||||
public void testSimpleReactiveActions() throws Exception {
|
||||
context.register(Config1.class);
|
||||
context.refresh();
|
||||
StateMachine<TestStates, TestEvents> machine = resolveMachine(context);
|
||||
doStartAndAssert(machine);
|
||||
|
||||
TestCountAction testAction3 = context.getBean("testAction3", TestCountAction.class);
|
||||
TestCountAction testAction4 = context.getBean("testAction4", TestCountAction.class);
|
||||
doSendEventsAndConsumeAllWithComplete(machine, TestEvents.E1, TestEvents.E2);
|
||||
assertThat(testAction3.latch.await(6, TimeUnit.SECONDS)).isTrue();
|
||||
assertThat(testAction4.latch.await(6, TimeUnit.SECONDS)).isTrue();
|
||||
assertThat(testAction4.time.get() - testAction3.time.get()).isGreaterThan(1000);
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@EnableStateMachine
|
||||
static class Config1 extends EnumStateMachineConfigurerAdapter<TestStates, TestEvents> {
|
||||
|
||||
@Override
|
||||
public void configure(StateMachineStateConfigurer<TestStates, TestEvents> states) throws Exception {
|
||||
states
|
||||
.withStates()
|
||||
.initial(TestStates.S1)
|
||||
.stateEntryFunction(TestStates.S2, testAction3())
|
||||
.stateEntryFunction(TestStates.S3, testAction4());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(StateMachineTransitionConfigurer<TestStates, TestEvents> transitions) throws Exception {
|
||||
transitions
|
||||
.withExternal()
|
||||
.source(TestStates.S1)
|
||||
.target(TestStates.S2)
|
||||
.event(TestEvents.E1)
|
||||
.and()
|
||||
.withExternal()
|
||||
.source(TestStates.S2)
|
||||
.target(TestStates.S3)
|
||||
.event(TestEvents.E2);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public TestCountAction testAction3() {
|
||||
return new TestCountAction("ACTION3");
|
||||
}
|
||||
|
||||
@Bean
|
||||
public TestCountAction testAction4() {
|
||||
return new TestCountAction("ACTION4");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AnnotationConfigApplicationContext buildContext() {
|
||||
return new AnnotationConfigApplicationContext();
|
||||
}
|
||||
|
||||
private static class TestCountAction implements ReactiveAction<TestStates, TestEvents> {
|
||||
|
||||
private final String id;
|
||||
int count = 0;
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicLong time = new AtomicLong();
|
||||
|
||||
TestCountAction(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> apply(StateContext<TestStates, TestEvents> context) {
|
||||
return Mono.delay(Duration.ofMillis(2000))
|
||||
.doFinally(x -> {
|
||||
count++;
|
||||
time.set(System.currentTimeMillis());
|
||||
latch.countDown();
|
||||
})
|
||||
.then()
|
||||
.log(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user