diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java index 9ec23ed87..c19e9b1de 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java @@ -37,6 +37,7 @@ import org.springframework.cloud.function.core.FluxSupplier; import org.springframework.cloud.function.core.FluxToMonoFunction; import org.springframework.cloud.function.core.FluxedConsumer; import org.springframework.cloud.function.core.FluxedFunction; +import org.springframework.cloud.function.core.MonoSupplier; import org.springframework.cloud.function.core.MonoToFluxFunction; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @@ -155,17 +156,19 @@ public class FunctionRegistration implements BeanNameAware { result = new FunctionRegistration(target); result.type(this.type.getType()); - if (!type.isWrapper()) { + if (!this.type.isWrapper()) { target = target instanceof Supplier ? (S) new FluxSupplier((Supplier) target) : target instanceof Function ? (S) new FluxFunction((Function) target) : (S) new FluxConsumer((Consumer) target); } - else if (Mono.class.isAssignableFrom(type.getOutputWrapper())) { - target = (S) new FluxToMonoFunction((Function) target); + else if (Mono.class.isAssignableFrom(this.type.getOutputWrapper())) { + target = target instanceof Supplier + ? (S) new MonoSupplier((Supplier) target) + : (S) new FluxToMonoFunction((Function) target); } - else if (Mono.class.isAssignableFrom(type.getInputWrapper())) { + else if (Mono.class.isAssignableFrom(this.type.getInputWrapper())) { target = (S) new MonoToFluxFunction((Function) target); } else if (target instanceof Consumer) { diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoSupplier.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoSupplier.java new file mode 100644 index 000000000..76362478b --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoSupplier.java @@ -0,0 +1,50 @@ +/* + * Copyright 2012-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.core; + +import java.util.function.Supplier; + +import reactor.core.publisher.Mono; + +/** + * {@link Supplier} implementation that wraps a target Supplier so that the target's + * simple output type will be wrapped in a {@link Mono} instance. + * + * @param output type of target supplier + * @author Mark Fisher + */ +public class MonoSupplier implements Supplier>, FluxWrapper> { + + private final Supplier supplier; + + public MonoSupplier(Supplier supplier) { + this.supplier = supplier; + } + + @Override + public Supplier getTarget() { + return this.supplier; + } + + @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) + public Mono get() { + Object result = this.supplier.get(); + return Mono.just((T) result); + } + +}