From 805b85b102ea4eff0dfb6575ce9e19d59f33d509 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 11 Feb 2019 15:30:04 +0100 Subject: [PATCH] GH-243, GH-257 Added reactive consumer wrapper - Added wrapper for an already reactive consumer to ensure that consumers can be consistently represented as Function - Fixed the big that deal with inconsistent result in web environments due to inconsistent representation of the Consumers - Polished tests Resolves #243 Resolves #257 --- .../context/FunctionRegistration.java | 6 +++ ...ntextFunctionCatalogAutoConfiguration.java | 33 ++++++------ ...FunctionCatalogAutoConfigurationTests.java | 6 +-- .../cloud/function/core/FluxConsumer.java | 28 ++++------ .../cloud/function/core/FluxFunction.java | 20 +++---- .../cloud/function/core/FluxedConsumer.java | 47 +++++++++++++++++ .../cloud/function/core/WrappedFunction.java | 52 +++++++++++++++++++ .../cloud/function/web/RequestProcessor.java | 17 +++--- .../web/flux/HttpPostIntegrationTests.java | 32 ++++++++---- .../web/mvc/HttpPostIntegrationTests.java | 3 +- 10 files changed, 175 insertions(+), 69 deletions(-) create mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedConsumer.java create mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/WrappedFunction.java diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java index c75e40df6..8907c6ee9 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java @@ -35,6 +35,7 @@ import org.springframework.cloud.function.core.FluxConsumer; import org.springframework.cloud.function.core.FluxFunction; import org.springframework.cloud.function.core.FluxSupplier; import org.springframework.cloud.function.core.FluxToMonoFunction; +import org.springframework.cloud.function.core.FluxedConsumer; import org.springframework.cloud.function.core.MonoToFluxFunction; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @@ -164,6 +165,11 @@ public class FunctionRegistration implements BeanNameAware { target = (S) new FluxConsumer((Consumer) target); } } + else { + if (target instanceof Consumer) { + target = (S) new FluxedConsumer((Consumer) target); + } + } if (Mono.class.isAssignableFrom(type.getOutputWrapper())) { target = (S) new FluxToMonoFunction((Function) target); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 8325ed30e..e4b7fbf3b 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -58,7 +58,6 @@ import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.FunctionType; -import org.springframework.cloud.function.context.catalog.FunctionInspector; import org.springframework.cloud.function.context.catalog.FunctionRegistrationEvent; import org.springframework.cloud.function.context.catalog.FunctionUnregistrationEvent; import org.springframework.cloud.function.core.FluxConsumer; @@ -562,22 +561,22 @@ public class ContextFunctionCatalogAutoConfiguration { } - protected class BeanFactoryFunctionInspector implements FunctionInspector { - - private ContextFunctionRegistry processor; - - public BeanFactoryFunctionInspector(ContextFunctionRegistry processor) { - this.processor = processor; - } - - @Override - public FunctionRegistration getRegistration(Object function) { - FunctionRegistration registration = this.processor - .getRegistration(function); - return registration; - } - - } + // protected class BeanFactoryFunctionInspector implements FunctionInspector { + // + // private ContextFunctionRegistry processor; + // + // public BeanFactoryFunctionInspector(ContextFunctionRegistry processor) { + // this.processor = processor; + // } + // + // @Override + // public FunctionRegistration getRegistration(Object function) { + // FunctionRegistration registration = this.processor + // .getRegistration(function); + // return registration; + // } + // + // } @Configuration @ConditionalOnClass(Gson.class) diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationTests.java index b19b3d5e1..7ca61311a 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfigurationTests.java @@ -560,10 +560,10 @@ public class ContextFunctionCatalogAutoConfigurationTests { "spring.cloud.function.compile.foos.lambda=f -> f.subscribe(" + getClass().getName() + "::set)", "spring.cloud.function.compile.foos.type=consumer"); - assertThat((Consumer) this.catalog.lookup(Consumer.class, "foos")) - .isInstanceOf(Consumer.class); + assertThat((Function) this.catalog.lookup(Function.class, "foos")) + .isInstanceOf(Function.class); assertThat(this.inspector - .getInputWrapper(this.catalog.lookup(Consumer.class, "foos"))) + .getInputWrapper(this.catalog.lookup(Function.class, "foos"))) .isEqualTo(Flux.class); @SuppressWarnings("unchecked") Consumer> consumer = (Consumer>) this.context diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java index a21505c00..c0c4b8e3d 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxConsumer.java @@ -17,35 +17,29 @@ package org.springframework.cloud.function.core; import java.util.function.Consumer; -import java.util.function.Function; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Wrapper for a {@link Consumer} implementation that converts a non-reactive consumer - * into a reactive function. + * Wrapper for a {@link Consumer} implementation that converts a non-reactive + * consumer into a reactive function ({@code Function, Mono>}). * - * @param input type of target consumer + * @param input type of target consumer * @author Dave Syer + * @author Oleg Zhurakousky + * @see FluxedConsumer */ -public class FluxConsumer - implements Function, Mono>, FluxWrapper> { +public class FluxConsumer + extends WrappedFunction, Mono, Consumer> { - private final Consumer consumer; - - public FluxConsumer(Consumer consumer) { - this.consumer = consumer; + public FluxConsumer(Consumer target) { + super(target); } @Override - public Consumer getTarget() { - return this.consumer; - } - - @Override - public Mono apply(Flux input) { - return input.doOnNext(this.consumer).then(); + public Mono apply(Flux input) { + return input.doOnNext(this.getTarget()).then(); } } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java index a45e8138d..2459159b7 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java @@ -27,24 +27,18 @@ import reactor.core.publisher.Flux; * @param input type of target function * @param output type of target function * @author Mark Fisher + * @author Oleg Zhurakousky */ -public class FluxFunction - implements Function, Flux>, FluxWrapper> { +public class FluxFunction + extends WrappedFunction, Flux, Function> { - private final Function function; - - public FluxFunction(Function function) { - this.function = function; + public FluxFunction(Function target) { + super(target); } @Override - public Function getTarget() { - return this.function; - } - - @Override - public Flux apply(Flux input) { - return input.map(i -> this.function.apply(i)); + public Flux apply(Flux input) { + return input.map(value -> this.getTarget().apply(value)); } } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedConsumer.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedConsumer.java new file mode 100644 index 000000000..ab4006dbe --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedConsumer.java @@ -0,0 +1,47 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.core; + +import java.util.function.Consumer; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Wrapper for a {@link Consumer} implementation that converts a reactive consumer into a + * reactive function ({@code Function, Mono>}). This is primarily done for + * consistent representation of reactive and non-reactive consumers. + * + * @param input type of target consumer + * @author Oleg Zhurakousky + * @since 2.0.1 + * @see FluxConsumer + * + */ +public class FluxedConsumer + extends WrappedFunction, Mono, Consumer>> { + + public FluxedConsumer(Consumer> target) { + super(target); + } + + @Override + public Mono apply(Flux input) { + return Mono.fromRunnable(() -> this.getTarget().accept(input)); + } + +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/WrappedFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/WrappedFunction.java new file mode 100644 index 000000000..db3d7ce36 --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/WrappedFunction.java @@ -0,0 +1,52 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.core; + +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.reactivestreams.Publisher; + +/** + * Base class for all wrappers that represent underlying functions (user defined + * suppliers, functions and/or consumers) as reactive functions. + * + * @param input type of target consumer + * @param output type of target consumer + * @param reactive input type of target function (instance of {@link Publisher} + * @param reactive output type of target function (instance of {@link Publisher} + * @param actual target function (instance of {@link Supplier}, {@link Function} or + * {@link Consumer}) + * @author Oleg Zhurakousky + * @since 2.0.1 + */ +abstract class WrappedFunction, OP extends Publisher, T> + implements Function, FluxWrapper { + + private final T target; + + WrappedFunction(T target) { + this.target = target; + } + + @Override + public T getTarget() { + return this.target; + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java index 82e9ca647..373ec963f 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java @@ -40,7 +40,9 @@ import reactor.core.publisher.Mono; import org.springframework.beans.factory.ObjectProvider; import org.springframework.cloud.function.context.catalog.FunctionInspector; import org.springframework.cloud.function.context.message.MessageUtils; +import org.springframework.cloud.function.core.FluxConsumer; import org.springframework.cloud.function.core.FluxWrapper; +import org.springframework.cloud.function.core.FluxedConsumer; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.cloud.function.web.util.HeaderUtils; import org.springframework.core.MethodParameter; @@ -200,7 +202,14 @@ public class RequestProcessor { flux = messages(wrapper, function == null ? consumer : function, flux); } Mono> responseEntityMono = null; - if (function != null) { + + if (function instanceof FluxedConsumer || function instanceof FluxConsumer) { + ((Mono) function.apply(flux)).subscribe(); + logger.debug("Handled POST with consumer"); + responseEntityMono = Mono + .just(ResponseEntity.status(HttpStatus.ACCEPTED).build()); + } + else { Flux result = Flux.from(function.apply(flux)); logger.debug("Handled POST with function"); if (stream) { @@ -211,12 +220,6 @@ public class RequestProcessor { body == null ? null : !(body instanceof Collection), false); } } - else if (consumer != null) { - consumer.accept(flux); - logger.debug("Handled POST with consumer"); - responseEntityMono = Mono - .just(ResponseEntity.status(HttpStatus.ACCEPTED).build()); - } return responseEntityMono; } diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpPostIntegrationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpPostIntegrationTests.java index 00f31d8c1..7719d8403 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpPostIntegrationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpPostIntegrationTests.java @@ -124,14 +124,23 @@ public class HttpPostIntegrationTests { assertThat(result.getBody()).isEqualTo(null); } + @Test + public void addFoosFlux() throws Exception { + ResponseEntity result = this.rest.exchange(RequestEntity + .post(new URI("/addFoosFlux")).contentType(MediaType.APPLICATION_JSON) + .body("[{\"value\":\"foo\"},{\"value\":\"bar\"}]"), String.class); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + assertThat(this.test.list).hasSize(2); + assertThat(result.getBody()).isEqualTo(null); + } + @Test public void bareUpdates() throws Exception { ResponseEntity result = this.rest.exchange(RequestEntity .post(new URI("/bareUpdates")).contentType(MediaType.APPLICATION_JSON) .body("[\"one\",\"two\"]"), String.class); - assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); assertThat(this.test.list).hasSize(2); - assertThat(result.getBody()).isEqualTo("[]"); } @Test @@ -399,12 +408,6 @@ public class HttpPostIntegrationTests { }; } - // @Bean - // public Function byteArrayInputFunction() { - //// return value -> new Foo(value.getValue().trim().toUpperCase()); - // throw new UnsupportedOperationException("boom?"); - // } - @Bean public Function, Flux> wrap() { return flux -> flux.log().map(value -> ".." + value + ".."); @@ -437,13 +440,22 @@ public class HttpPostIntegrationTests { } @Bean - public Consumer> addFoos() { + public Consumer> addFoosFlux() { return flux -> flux.subscribe(value -> this.list.add(value.getValue())); } + @Bean + public Consumer addFoos() { + return value -> { + this.list.add(value.getValue()); + }; + } + @Bean public Consumer bareUpdates() { - return value -> this.list.add(value); + return value -> { + this.list.add(value); + }; } @Bean("not/a") diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HttpPostIntegrationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HttpPostIntegrationTests.java index 396029c4e..327343d63 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HttpPostIntegrationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HttpPostIntegrationTests.java @@ -128,9 +128,8 @@ public class HttpPostIntegrationTests { ResponseEntity result = this.rest.exchange(RequestEntity .post(new URI("/bareUpdates")).contentType(MediaType.APPLICATION_JSON) .body("[\"one\",\"two\"]"), String.class); - assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK); + assertThat(result.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); assertThat(this.test.list).hasSize(2); - assertThat(result.getBody()).isEqualTo("[]"); } @Test