diff --git a/docs/src/main/asciidoc/spring-cloud-function.adoc b/docs/src/main/asciidoc/spring-cloud-function.adoc index 5bcff77e9..ee6358a03 100644 --- a/docs/src/main/asciidoc/spring-cloud-function.adoc +++ b/docs/src/main/asciidoc/spring-cloud-function.adoc @@ -59,6 +59,43 @@ transmit headers from any adapter that supports key-value metadata |=== +==== Supplier +As you can see from the table above Supplier can be _reactive_ - `Supplier>` +or _imperative_ - `Supplier`. From the invocation standpoint this should make no difference +to the implementor of such Supplier. However, when used within frameworks +(e.g., https://spring.io/projects/spring-cloud-stream[Spring Cloud Stream]), Suppliers, especially reactive, +often used to represent the source of the stream, therefore they are invoked once to get the stream (e.g., Flux) +to which consumers can subscribe to. In other words such suppliers represent an equivalent of an _infinite stream_. +However, the same reactive suppliers can also represent _finite_ stream(s) (e.g., result set on the polled JDBC data). +In those cases such reactive suppliers must be hooked up to some polling mechanism of the underlying framework. + +To assist with that Spring Cloud Function provides a marker annotation +`org.springframework.cloud.function.context.PollableSupplier` to signal that such supplier produces a +finite stream and may need to be polled again. That said, it is important to understand that Spring Cloud Function itself +provides no behavior for this annotation. + +In addition `PollableSupplier` annotation exposes a _splittable_ attribute to signal that produced stream +needs to be split (see https://www.enterpriseintegrationpatterns.com/patterns/messaging/Sequencer.html[Splitter EIP]) + +Here is the example: + +[source, java] +---- +@PollableSupplier(splittable = true) +public Supplier> someSupplier() { + return () -> { + String v1 = String.valueOf(System.nanoTime()); + String v2 = String.valueOf(System.nanoTime()); + String v3 = String.valueOf(System.nanoTime()); + return Flux.just(v1, v2, v3); + }; +} +---- + +==== Function +TBD + +==== Consumer Consumer is a little bit special because it has a `void` return type, which implies blocking, at least potentially. Most likely you will not need to write `Consumer>`, but if you do need to do that, diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/PollableSupplier.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/PollableSupplier.java new file mode 100644 index 000000000..7b76a6c8b --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/PollableSupplier.java @@ -0,0 +1,64 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.function.Supplier; + +import org.springframework.context.annotation.Bean; + +/** + * + * A marker annotation to signal to the consumers of the + * annotated {@link Supplier} method that regardless of its type signature + * (reactive or imperative), such supplier needs to be polled + * periodically. This has special significance to the reactive suppliers (e.g., {@code Supplier}), + * since in most cases they are treated as producers of an infinite stream + * that is managed independently once produced. However if such suppliers produce a stream hat is finite + * they may need to be called again. + * + *
+ * NOTE: Given that polling behavior is specific to the users (consumers) of the annotated supplier, + * spring-cloud-function provides no default post processing behavior which means that annotating a + * factory method with this annotation will not have any effect without some application/framework + * specific post processing. + * + * + * @author Oleg Zhurakousky + * @since 3.0 + * + */ +@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Bean +@Documented +public @interface PollableSupplier { + + /** + * Signals to the post processors of this annotation that the result produced by the + * annotated {@link Supplier} has to be split. Specifics on how to split and what + * to split are left to the underlying framework. + * + * @return true if the resulting stream produced by the + * annotated {@link Supplier} has to be split. + */ + boolean splittable() default false; +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java index f4caae2cb..320e999e2 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java @@ -240,8 +240,21 @@ public final class FunctionTypeUtils { return Publisher.class.isAssignableFrom(rawType); } + public static boolean isSupplier(Type type) { + return isOfType(type, Supplier.class); + } + + public static boolean isFunction(Type type) { + return isOfType(type, Function.class); + } + public static boolean isConsumer(Type type) { - return type.getTypeName().startsWith("java.util.function.Consumer"); + return isOfType(type, Consumer.class); + } + + public static boolean isOfType(Type type, Class cls) { + Class c = type instanceof ParameterizedType ? (Class) ((ParameterizedType) type).getRawType() : (Class) type; + return cls.isAssignableFrom(c); } public static boolean isMono(Type type) { @@ -275,14 +288,6 @@ public final class FunctionTypeUtils { return argument != null && argument.getClass().getName().startsWith("reactor.util.function.Tuple"); } - public static boolean isSupplier(Type type) { - return type.getTypeName().startsWith("java.util.function.Supplier"); - } - - public static boolean isFunction(Type type) { - return type.getTypeName().startsWith("java.util.function.Function"); - } - public static Type compose(Type originType, Type composedType) { ResolvableType resolvableOriginType = ResolvableType.forType(originType); ResolvableType resolvableComposedType = ResolvableType.forType(composedType);