From ae07a13d037f15f718e9327b17e141ff80cbcf84 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 15 Oct 2018 09:34:06 -0400 Subject: [PATCH] GH-56 Added support for Function and Resolves #56 Resolves #218 --- ...ntextFunctionCatalogAutoConfiguration.java | 25 ++++++++- .../BeanFactoryFunctionCatalogTests.java | 43 +++++++++++++++ .../function/core/FluxToMonoFunction.java | 53 ++++++++++++++++++ .../function/core/MonoToFluxFunction.java | 54 +++++++++++++++++++ 4 files changed, 174 insertions(+), 1 deletion(-) create mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxToMonoFunction.java create mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoToFluxFunction.java diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 321bbea0b..8d24b54b4 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -52,9 +52,11 @@ import org.springframework.cloud.function.context.catalog.FunctionUnregistration import org.springframework.cloud.function.core.FluxConsumer; import org.springframework.cloud.function.core.FluxFunction; import org.springframework.cloud.function.core.FluxSupplier; +import org.springframework.cloud.function.core.FluxToMonoFunction; import org.springframework.cloud.function.core.IsolatedConsumer; import org.springframework.cloud.function.core.IsolatedFunction; import org.springframework.cloud.function.core.IsolatedSupplier; +import org.springframework.cloud.function.core.MonoToFluxFunction; import org.springframework.cloud.function.json.GsonMapper; import org.springframework.cloud.function.json.JacksonMapper; import org.springframework.context.ApplicationEventPublisher; @@ -349,7 +351,22 @@ public class ContextFunctionCatalogAutoConfiguration { else if (a instanceof Function && b instanceof Function) { Function function1 = (Function) a; Function function2 = (Function) b; - return function1.andThen(function2); + if (function1 instanceof FluxToMonoFunction) { + if (function2 instanceof MonoToFluxFunction) { + return function1.andThen(function2); + } + else { + throw new IllegalStateException("The provided function is finite (i.e., returns Mono) " + + "therefore it can *only* be composed with compatible function (i.e., Function"); + } + } + else if (function2 instanceof FluxToMonoFunction) { + return new FluxToMonoFunction(((Function, Flux>)a) + .andThen(((FluxToMonoFunction) b).getTarget())); + } + else { + return function1.andThen(function2); + } } else if (a instanceof Function && b instanceof Consumer) { Function function = (Function) a; @@ -490,6 +507,12 @@ public class ContextFunctionCatalogAutoConfiguration { } registration.target(target); } + if (Mono.class.isAssignableFrom(type.getOutputWrapper())) { + registration.target(new FluxToMonoFunction<>((Function) target)); + } + else if (Mono.class.isAssignableFrom(type.getInputWrapper())) { + registration.target(new MonoToFluxFunction<>((Function) target)); + } return registration; } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java index 708360c47..87bb5e378 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java @@ -120,6 +120,33 @@ public class BeanFactoryFunctionCatalogTests { assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("Hello 4"); } + @Test + public void composeWithFiniteFunction() { + Function func1 = x -> x.toUpperCase(); + processor.register(new FunctionRegistration<>(func1, "func1")); + processor.register(new FunctionRegistration<>(new FluxThenMonoFunction(), "func2")); + Function, Mono> foos = processor.lookup(Function.class, "func1,func2"); + assertThat(foos.apply(Flux.fromArray(new String[] {"a", "b", "c"})).block()).isEqualTo(3); + } + + @Test + public void composeWithFiniteFunctionAndContinueWithCompatible() { + Function func1 = x -> x.toUpperCase(); + processor.register(new FunctionRegistration<>(func1, "func1")); + processor.register(new FunctionRegistration<>(new FluxThenMonoFunction(), "func2")); + processor.register(new FunctionRegistration<>(new MonoThenFluxFunction(), "func3")); + Function, Flux> foos = processor.lookup(Function.class, "func1,func2,func3"); + assertThat(foos.apply(Flux.fromArray(new String[] {"a", "b", "c"})).collectList().block().size()).isEqualTo(3); + } + + @Test(expected=IllegalStateException.class) + public void composeIncompatibleFunctions() { + Function func1 = x -> x.toUpperCase(); + processor.register(new FunctionRegistration<>(func1, "func1")); + processor.register(new FunctionRegistration<>(new FluxThenMonoFunction(), "func2")); + processor.lookup(Function.class, "func2,func1"); + } + @Test public void composeSupplier() { processor.register(new FunctionRegistration<>(new Source(), "numbers")); @@ -231,4 +258,20 @@ public class BeanFactoryFunctionCatalogTests { } + protected static class FluxThenMonoFunction implements Function, Mono> { + + @Override + public Mono apply(Flux t) { + return t.count(); + } + } + + protected static class MonoThenFluxFunction implements Function, Flux> { + + @Override + public Flux apply(Mono t) { + return Flux.range(0, Integer.parseInt(t.block().toString())); + } + } + } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxToMonoFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxToMonoFunction.java new file mode 100644 index 000000000..c0e7a422f --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxToMonoFunction.java @@ -0,0 +1,53 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.core; + +import java.util.function.Function; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Marker wrapper for target Function<Flux, Mono> + * + * @author Oleg Zhurakousky + * @since 2.0 + * + * @param type of {@link Flux} input of the target function + * @param type of {@link Mono} output of the target function + */ +public class FluxToMonoFunction implements Function, Mono>, FluxWrapper, Mono>> { + + private final Function, Mono> function; + + /** + * @param function target function + */ + public FluxToMonoFunction(Function, Mono> function) { + this.function = function; + } + + @Override + public Function, Mono> getTarget() { + return function; + } + + @Override + public Mono apply(Flux input) { + return function.apply(input); + } +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoToFluxFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoToFluxFunction.java new file mode 100644 index 000000000..ad62e7dd6 --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoToFluxFunction.java @@ -0,0 +1,54 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.core; + +import java.util.function.Function; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Marker wrapper for target Function<Mono, Flux> + * + * @author Oleg Zhurakousky + * @since 2.0 + * + * @param type of {@link Mono} input of the target function + * @param type of {@link Flux} output of the target function + */ +public class MonoToFluxFunction implements Function, Flux>, FluxWrapper, Flux>> { + + private final Function, Flux> function; + + /** + * @param function target function + * @param names name(s) of the target function (optional) + */ + public MonoToFluxFunction(Function, Flux> function) { + this.function = function; + } + + @Override + public Function, Flux> getTarget() { + return function; + } + + @Override + public Flux apply(Mono input) { + return function.apply(input); + } +}