add interval for non-Flux stream Suppliers

This commit is contained in:
markfisher
2017-02-24 12:44:14 -05:00
parent 1dc0489c23
commit 2ae7789cd1
5 changed files with 38 additions and 5 deletions

View File

@@ -69,8 +69,9 @@ public class StreamConfiguration {
@ConditionalOnProperty("spring.cloud.stream.bindings.output.destination")
public SupplierInvokingMessageProducer<Object> invoker(FunctionCatalog registry) {
String name = properties.getEndpoint();
long interval = properties.getInterval();
Supplier<Flux<Object>> supplier = registry.lookupSupplier(name);
return new SupplierInvokingMessageProducer<Object>(supplier);
return new SupplierInvokingMessageProducer<Object>(supplier, interval);
}
}

View File

@@ -26,6 +26,12 @@ public class StreamConfigurationProperties {
private String endpoint;
/**
* Interval to be used for the Duration (in milliseconds) of a non-Flux producing Supplier.
* Default is 0, which means the Supplier will only be invoked once.
*/
private long interval = 0L;
public String getEndpoint() {
return endpoint;
}
@@ -33,4 +39,12 @@ public class StreamConfigurationProperties {
public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}
public long getInterval() {
return interval;
}
public void setInterval(long interval) {
this.interval = interval;
}
}

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.function.stream;
import java.time.Duration;
import java.util.function.Supplier;
import org.springframework.cloud.function.support.FluxSupplier;
@@ -34,10 +35,12 @@ public class SupplierInvokingMessageProducer<T> extends MessageProducerSupport {
private final Supplier<Flux<T>> supplier;
public SupplierInvokingMessageProducer(Supplier<?> supplier) {
public SupplierInvokingMessageProducer(Supplier<?> supplier, long interval) {
Assert.notNull(supplier, "Supplier must not be null");
if (!FunctionUtils.isFluxSupplier(supplier)) {
supplier = new FluxSupplier<>(supplier);
supplier = (interval > 0)
? new FluxSupplier<>(supplier, Duration.ofMillis(interval))
: new FluxSupplier<>(supplier);
}
this.supplier = (Supplier<Flux<T>>) supplier;
this.setOutputChannelName(Source.OUTPUT);