From 2ae7789cd1023b7c3564b7d6206a6ae506bbc40b Mon Sep 17 00:00:00 2001 From: markfisher Date: Fri, 24 Feb 2017 12:44:14 -0500 Subject: [PATCH] add interval for non-Flux stream Suppliers --- scripts/stream.sh | 8 +++++++- .../cloud/function/support/FluxSupplier.java | 11 ++++++++++- .../cloud/function/stream/StreamConfiguration.java | 3 ++- .../stream/StreamConfigurationProperties.java | 14 ++++++++++++++ .../stream/SupplierInvokingMessageProducer.java | 7 +++++-- 5 files changed, 38 insertions(+), 5 deletions(-) diff --git a/scripts/stream.sh b/scripts/stream.sh index 81a5a1f40..fa4dc2f97 100755 --- a/scripts/stream.sh +++ b/scripts/stream.sh @@ -9,7 +9,9 @@ tokenize() { echo ${TOKENS[@]} } -while getopts ":i:s:f:c:o:p:" opt; do +DURATION=0 + +while getopts ":i:s:f:c:o:p:d:" opt; do case $opt in i) IN=--spring.cloud.stream.bindings.input.destination=$OPTARG @@ -37,6 +39,9 @@ while getopts ":i:s:f:c:o:p:" opt; do p) PORT=$OPTARG ;; + d) + DURATION=$OPTARG + ;; esac done @@ -44,6 +49,7 @@ java -jar ../spring-cloud-function-samples/spring-cloud-function-sample-compiler --management.security.enabled=false\ --server.port=$PORT\ --spring.cloud.function.stream.endpoint=$FUNC\ + --spring.cloud.function.stream.interval=$DURATION\ $IN\ $OUT\ $RESOURCE\ diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxSupplier.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxSupplier.java index 86e6cbe76..183568331 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxSupplier.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxSupplier.java @@ -18,6 +18,7 @@ package org.springframework.cloud.function.support; import java.time.Duration; import java.util.function.Supplier; +import java.util.stream.Stream; import reactor.core.publisher.Flux; @@ -48,10 +49,18 @@ public class FluxSupplier implements Supplier> { } @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) public Flux get() { if (this.period != null) { return Flux.interval(this.period).map(i->this.supplier.get()); } - return Flux.just(this.supplier.get()); + Object result = this.supplier.get(); + if (result instanceof Stream) { + return Flux.fromStream((Stream) result); + } + if (result instanceof Iterable) { + return Flux.fromIterable((Iterable) result); + } + return Flux.just((T) result); } } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java index f34f6849b..2b2c8b629 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java @@ -69,8 +69,9 @@ public class StreamConfiguration { @ConditionalOnProperty("spring.cloud.stream.bindings.output.destination") public SupplierInvokingMessageProducer invoker(FunctionCatalog registry) { String name = properties.getEndpoint(); + long interval = properties.getInterval(); Supplier> supplier = registry.lookupSupplier(name); - return new SupplierInvokingMessageProducer(supplier); + return new SupplierInvokingMessageProducer(supplier, interval); } } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfigurationProperties.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfigurationProperties.java index bdabb4705..b922128d3 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfigurationProperties.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfigurationProperties.java @@ -26,6 +26,12 @@ public class StreamConfigurationProperties { private String endpoint; + /** + * Interval to be used for the Duration (in milliseconds) of a non-Flux producing Supplier. + * Default is 0, which means the Supplier will only be invoked once. + */ + private long interval = 0L; + public String getEndpoint() { return endpoint; } @@ -33,4 +39,12 @@ public class StreamConfigurationProperties { public void setEndpoint(String endpoint) { this.endpoint = endpoint; } + + public long getInterval() { + return interval; + } + + public void setInterval(long interval) { + this.interval = interval; + } } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java index cd2eb6f4b..ef3c41409 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java @@ -16,6 +16,7 @@ package org.springframework.cloud.function.stream; +import java.time.Duration; import java.util.function.Supplier; import org.springframework.cloud.function.support.FluxSupplier; @@ -34,10 +35,12 @@ public class SupplierInvokingMessageProducer extends MessageProducerSupport { private final Supplier> supplier; - public SupplierInvokingMessageProducer(Supplier supplier) { + public SupplierInvokingMessageProducer(Supplier supplier, long interval) { Assert.notNull(supplier, "Supplier must not be null"); if (!FunctionUtils.isFluxSupplier(supplier)) { - supplier = new FluxSupplier<>(supplier); + supplier = (interval > 0) + ? new FluxSupplier<>(supplier, Duration.ofMillis(interval)) + : new FluxSupplier<>(supplier); } this.supplier = (Supplier>) supplier; this.setOutputChannelName(Source.OUTPUT);