diff --git a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveStateMachineExecutor.java b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveStateMachineExecutor.java index 4bb44094..26f063bf 100644 --- a/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveStateMachineExecutor.java +++ b/spring-statemachine-core/src/main/java/org/springframework/statemachine/support/ReactiveStateMachineExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -115,7 +115,8 @@ public class ReactiveStateMachineExecutor extends LifecycleObjectSupport i @Override protected void onInit() throws Exception { triggerSink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false); - triggerFlux = triggerSink.asFlux().flatMap(trigger -> handleTrigger(trigger)); + // limit concurrency so that we get one by one handling + triggerFlux = triggerSink.asFlux().flatMap(trigger -> handleTrigger(trigger), 1); } @Override @@ -328,10 +329,11 @@ public class ReactiveStateMachineExecutor extends LifecycleObjectSupport i } }); } - })) - .contextWrite(Context.of( - StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS, new ExecutorExceptionHolder(), - REACTOR_CONTEXT_TRIGGER_ERRORS, new ExecutorExceptionHolder())); + }) + ) + .contextWrite(Context.of( + StateMachineSystemConstants.REACTOR_CONTEXT_ERRORS, new ExecutorExceptionHolder(), + REACTOR_CONTEXT_TRIGGER_ERRORS, new ExecutorExceptionHolder())); } diff --git a/spring-statemachine-core/src/test/java/org/springframework/statemachine/TestUtils.java b/spring-statemachine-core/src/test/java/org/springframework/statemachine/TestUtils.java index d7d2ea36..ef7b1c20 100644 --- a/spring-statemachine-core/src/test/java/org/springframework/statemachine/TestUtils.java +++ b/spring-statemachine-core/src/test/java/org/springframework/statemachine/TestUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2020 the original author or authors. + * Copyright 2015-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,6 +23,8 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.BeanFactory; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; @@ -44,6 +46,8 @@ import reactor.test.StepVerifier; */ public class TestUtils { + private static Log log = LogFactory.getLog(TestUtils.class); + @SuppressWarnings("unchecked") public static StateMachine resolveMachine(BeanFactory beanFactory) { assertThat(beanFactory.containsBean(DEFAULT_ID_STATEMACHINE)).isTrue(); @@ -99,7 +103,10 @@ public class TestUtils { public static void doSendEventAndConsumeAll(StateMachine stateMachine, E event) { StepVerifier.create(stateMachine.sendEvent(eventAsMono(event))) - .thenConsumeWhile(eventResult -> true) + .thenConsumeWhile(eventResult -> { + log.debug("Consume eventResult " + eventResult); + return true; + }) .expectComplete() .verify(Duration.ofSeconds(5)); } @@ -126,6 +133,30 @@ public class TestUtils { .verifyComplete(); } + @SafeVarargs + public static void doSendEventsAndConsumeAll(StateMachine stateMachine, E... events) { + StepVerifier.create(stateMachine.sendEvents(eventsAsFlux(events))) + .thenConsumeWhile(eventResult -> { + log.debug("Consume eventResult " + eventResult); + return true; + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @SafeVarargs + public static void doSendEventsAndConsumeAllWithComplete(StateMachine stateMachine, E... events) { + Flux completions = stateMachine.sendEvents(eventsAsFlux(events)) + .doOnNext(result -> { + log.debug("Consume eventResult " + result); + }) + .flatMap(result -> result.complete()); + StepVerifier.create(completions) + .thenConsumeWhile(complete -> true) + .expectComplete() + .verify(Duration.ofSeconds(10)); + } + @SuppressWarnings("unchecked") public static T readField(String name, Object target) throws Exception { Field field = null; diff --git a/spring-statemachine-core/src/test/java/org/springframework/statemachine/action/ReactiveAction2Tests.java b/spring-statemachine-core/src/test/java/org/springframework/statemachine/action/ReactiveAction2Tests.java new file mode 100644 index 00000000..306495ac --- /dev/null +++ b/spring-statemachine-core/src/test/java/org/springframework/statemachine/action/ReactiveAction2Tests.java @@ -0,0 +1,131 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.statemachine.action; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.springframework.statemachine.TestUtils.doSendEventsAndConsumeAllWithComplete; +import static org.springframework.statemachine.TestUtils.doStartAndAssert; +import static org.springframework.statemachine.TestUtils.resolveMachine; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.jupiter.api.Test; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.statemachine.AbstractStateMachineTests; +import org.springframework.statemachine.StateContext; +import org.springframework.statemachine.StateMachine; +import org.springframework.statemachine.config.EnableStateMachine; +import org.springframework.statemachine.config.EnumStateMachineConfigurerAdapter; +import org.springframework.statemachine.config.builders.StateMachineStateConfigurer; +import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer; + +import reactor.core.publisher.Mono; + +/** + * Tests for state machine reactive actions. + * + * @author Janne Valkealahti + * + */ +public class ReactiveAction2Tests extends AbstractStateMachineTests { + + @Test + public void testSimpleReactiveActions() throws Exception { + context.register(Config1.class); + context.refresh(); + StateMachine machine = resolveMachine(context); + doStartAndAssert(machine); + + TestCountAction testAction3 = context.getBean("testAction3", TestCountAction.class); + TestCountAction testAction4 = context.getBean("testAction4", TestCountAction.class); + doSendEventsAndConsumeAllWithComplete(machine, TestEvents.E1, TestEvents.E2); + assertThat(testAction3.latch.await(6, TimeUnit.SECONDS)).isTrue(); + assertThat(testAction4.latch.await(6, TimeUnit.SECONDS)).isTrue(); + assertThat(testAction4.time.get() - testAction3.time.get()).isGreaterThan(1000); + } + + @Configuration + @EnableStateMachine + static class Config1 extends EnumStateMachineConfigurerAdapter { + + @Override + public void configure(StateMachineStateConfigurer states) throws Exception { + states + .withStates() + .initial(TestStates.S1) + .stateEntryFunction(TestStates.S2, testAction3()) + .stateEntryFunction(TestStates.S3, testAction4()); + } + + @Override + public void configure(StateMachineTransitionConfigurer transitions) throws Exception { + transitions + .withExternal() + .source(TestStates.S1) + .target(TestStates.S2) + .event(TestEvents.E1) + .and() + .withExternal() + .source(TestStates.S2) + .target(TestStates.S3) + .event(TestEvents.E2); + } + + @Bean + public TestCountAction testAction3() { + return new TestCountAction("ACTION3"); + } + + @Bean + public TestCountAction testAction4() { + return new TestCountAction("ACTION4"); + } + } + + @Override + protected AnnotationConfigApplicationContext buildContext() { + return new AnnotationConfigApplicationContext(); + } + + private static class TestCountAction implements ReactiveAction { + + private final String id; + int count = 0; + CountDownLatch latch = new CountDownLatch(1); + AtomicLong time = new AtomicLong(); + + TestCountAction(String id) { + this.id = id; + } + + @Override + public Mono apply(StateContext context) { + return Mono.delay(Duration.ofMillis(2000)) + .doFinally(x -> { + count++; + time.set(System.currentTimeMillis()); + latch.countDown(); + }) + .then() + .log(id); + } + } +}