From 8205c579f21155876c8b27abaf93c8659a77a5b5 Mon Sep 17 00:00:00 2001 From: markfisher Date: Wed, 11 Jan 2017 14:48:21 -0500 Subject: [PATCH] added Supplier and Consumer support --- .../function/stream/StreamConfiguration.java | 135 ++++++++++++++++-- .../StreamListeningConsumerInvoker.java | 42 ++++++ .../SupplierInvokingMessageProducer.java | 43 ++++++ 3 files changed, 210 insertions(+), 10 deletions(-) create mode 100644 spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java create mode 100644 spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java index fa2152730..db382376d 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java @@ -16,40 +16,155 @@ package org.springframework.cloud.function.stream; +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionOutcome; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.condition.SpringBootCondition; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.function.invoker.AbstractFunctionInvoker; import org.springframework.cloud.function.registry.FunctionCatalog; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.binder.Binder; import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; -import org.springframework.util.StringUtils; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.context.annotation.Conditional; +import org.springframework.context.annotation.ConfigurationCondition; +import org.springframework.core.type.AnnotatedTypeMetadata; import reactor.core.publisher.Flux; /** * @author Mark Fisher */ -@EnableBinding(Processor.class) @EnableConfigurationProperties(FunctionConfigurationProperties.class) @ConditionalOnClass({ Binder.class, AbstractFunctionInvoker.class }) @ConditionalOnProperty(name = "spring.cloud.stream.enabled", havingValue = "true", matchIfMissing = true) public class StreamConfiguration { - @Autowired - private FunctionConfigurationProperties properties; + @ConditionalOnSupplier + @EnableBinding(Source.class) + protected static class SupplierConfiguration { - @Bean - @ConditionalOnProperty("spring.cloud.stream.bindings.input.destination") - public AbstractFunctionInvoker invoker(FunctionCatalog registry) { - String name = properties.getName(); - Function, Flux> function = registry.lookupFunction(name); - return new StreamListeningFunctionInvoker(function); + @Autowired + private FunctionConfigurationProperties properties; + + @Bean + @ConditionalOnProperty("spring.cloud.stream.bindings.output.destination") + public SupplierInvokingMessageProducer invoker(FunctionCatalog registry) { + String name = properties.getName(); + Supplier> supplier = registry.lookupSupplier(name); + return new SupplierInvokingMessageProducer(supplier); + } } + @ConditionalOnFunction + @EnableBinding(Processor.class) + protected static class FunctionConfiguration { + + @Autowired + private FunctionConfigurationProperties properties; + + @Bean + @ConditionalOnProperty("spring.cloud.stream.bindings.input.destination") + public AbstractFunctionInvoker invoker(FunctionCatalog registry) { + String name = properties.getName(); + Function, Flux> function = registry.lookupFunction(name); + return new StreamListeningFunctionInvoker(function); + } + } + + @ConditionalOnConsumer + @EnableBinding(Sink.class) + protected static class ConsumerConfiguration { + + @Autowired + private FunctionConfigurationProperties properties; + + @Bean + @ConditionalOnProperty("spring.cloud.stream.bindings.input.destination") + public StreamListeningConsumerInvoker invoker(FunctionCatalog registry) { + String name = properties.getName(); + Consumer consumer = registry.lookupConsumer(name); + return new StreamListeningConsumerInvoker(consumer); + } + } + + @Conditional(SupplierCondition.class) + @Target(ElementType.TYPE) + @Retention(RetentionPolicy.RUNTIME) + @Documented + private @interface ConditionalOnSupplier { + } + + @Conditional(FunctionCondition.class) + @Target(ElementType.TYPE) + @Retention(RetentionPolicy.RUNTIME) + @Documented + private @interface ConditionalOnFunction { + } + + @Conditional(ConsumerCondition.class) + @Target(ElementType.TYPE) + @Retention(RetentionPolicy.RUNTIME) + @Documented + private @interface ConditionalOnConsumer { + } + + private static abstract class AbstractFunctionCondition extends SpringBootCondition implements ConfigurationCondition { + + private final Class type; + + private AbstractFunctionCondition(Class type) { + this.type = type; + } + + @Override + public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) { + String functionName = context.getEnvironment().getProperty("function.name"); + Class beanType = context.getBeanFactory().getType(functionName); + if (type.isAssignableFrom(beanType)) { + return ConditionOutcome.match(String.format("bean '%s' is a %s", functionName, type)); + } + return ConditionOutcome.noMatch(String.format("bean '%s' is not a %s", functionName, type)); + } + + @Override + public ConfigurationPhase getConfigurationPhase() { + return ConfigurationPhase.REGISTER_BEAN; + } + } + + private static class SupplierCondition extends AbstractFunctionCondition { + + public SupplierCondition() { + super(Supplier.class); + } + } + + private static class FunctionCondition extends AbstractFunctionCondition { + + public FunctionCondition() { + super(Function.class); + } + } + + private static class ConsumerCondition extends AbstractFunctionCondition { + + public ConsumerCondition() { + super(Consumer.class); + } + } } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java new file mode 100644 index 000000000..2ebaaa73c --- /dev/null +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java @@ -0,0 +1,42 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream; + +import java.util.function.Consumer; + +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Processor; + +import reactor.core.publisher.Flux; + +/** + * @author Mark Fisher + */ +public class StreamListeningConsumerInvoker { + + private final Consumer consumer; + + public StreamListeningConsumerInvoker(Consumer consumer) { + this.consumer = consumer; + } + + @StreamListener + public void handle(@Input(Processor.INPUT) Flux input) { + input.subscribe(this.consumer); + } +} diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java new file mode 100644 index 000000000..15d985cfb --- /dev/null +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java @@ -0,0 +1,43 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.stream; + +import java.util.function.Supplier; + +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.messaging.support.MessageBuilder; + +import reactor.core.publisher.Flux; + +/** + * @author Mark Fisher + */ +public class SupplierInvokingMessageProducer extends MessageProducerSupport { + + private final Supplier> supplier; + + public SupplierInvokingMessageProducer(Supplier> supplier) { + this.supplier = supplier; + this.setOutputChannelName(Source.OUTPUT); + } + + @Override + protected void doStart() { + this.supplier.get().subscribe(m -> this.sendMessage(MessageBuilder.withPayload(m).build())); + } +}