added Supplier and Consumer support

This commit is contained in:
markfisher
2017-01-11 14:48:21 -05:00
parent 18fe932aa6
commit 8205c579f2
3 changed files with 210 additions and 10 deletions

View File

@@ -16,40 +16,155 @@
package org.springframework.cloud.function.stream;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.invoker.AbstractFunctionInvoker;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.util.StringUtils;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.ConfigurationCondition;
import org.springframework.core.type.AnnotatedTypeMetadata;
import reactor.core.publisher.Flux;
/**
* @author Mark Fisher
*/
@EnableBinding(Processor.class)
@EnableConfigurationProperties(FunctionConfigurationProperties.class)
@ConditionalOnClass({ Binder.class, AbstractFunctionInvoker.class })
@ConditionalOnProperty(name = "spring.cloud.stream.enabled", havingValue = "true", matchIfMissing = true)
public class StreamConfiguration {
@Autowired
private FunctionConfigurationProperties properties;
@ConditionalOnSupplier
@EnableBinding(Source.class)
protected static class SupplierConfiguration {
@Bean
@ConditionalOnProperty("spring.cloud.stream.bindings.input.destination")
public AbstractFunctionInvoker<?, ?> invoker(FunctionCatalog registry) {
String name = properties.getName();
Function<Flux<Object>, Flux<Object>> function = registry.lookupFunction(name);
return new StreamListeningFunctionInvoker(function);
@Autowired
private FunctionConfigurationProperties properties;
@Bean
@ConditionalOnProperty("spring.cloud.stream.bindings.output.destination")
public SupplierInvokingMessageProducer<Object> invoker(FunctionCatalog registry) {
String name = properties.getName();
Supplier<Flux<Object>> supplier = registry.lookupSupplier(name);
return new SupplierInvokingMessageProducer<Object>(supplier);
}
}
@ConditionalOnFunction
@EnableBinding(Processor.class)
protected static class FunctionConfiguration {
@Autowired
private FunctionConfigurationProperties properties;
@Bean
@ConditionalOnProperty("spring.cloud.stream.bindings.input.destination")
public AbstractFunctionInvoker<?, ?> invoker(FunctionCatalog registry) {
String name = properties.getName();
Function<Flux<Object>, Flux<Object>> function = registry.lookupFunction(name);
return new StreamListeningFunctionInvoker(function);
}
}
@ConditionalOnConsumer
@EnableBinding(Sink.class)
protected static class ConsumerConfiguration {
@Autowired
private FunctionConfigurationProperties properties;
@Bean
@ConditionalOnProperty("spring.cloud.stream.bindings.input.destination")
public StreamListeningConsumerInvoker<Object> invoker(FunctionCatalog registry) {
String name = properties.getName();
Consumer<Object> consumer = registry.lookupConsumer(name);
return new StreamListeningConsumerInvoker<Object>(consumer);
}
}
@Conditional(SupplierCondition.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
private @interface ConditionalOnSupplier {
}
@Conditional(FunctionCondition.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
private @interface ConditionalOnFunction {
}
@Conditional(ConsumerCondition.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
private @interface ConditionalOnConsumer {
}
private static abstract class AbstractFunctionCondition extends SpringBootCondition implements ConfigurationCondition {
private final Class<?> type;
private AbstractFunctionCondition(Class<?> type) {
this.type = type;
}
@Override
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) {
String functionName = context.getEnvironment().getProperty("function.name");
Class<?> beanType = context.getBeanFactory().getType(functionName);
if (type.isAssignableFrom(beanType)) {
return ConditionOutcome.match(String.format("bean '%s' is a %s", functionName, type));
}
return ConditionOutcome.noMatch(String.format("bean '%s' is not a %s", functionName, type));
}
@Override
public ConfigurationPhase getConfigurationPhase() {
return ConfigurationPhase.REGISTER_BEAN;
}
}
private static class SupplierCondition extends AbstractFunctionCondition {
public SupplierCondition() {
super(Supplier.class);
}
}
private static class FunctionCondition extends AbstractFunctionCondition {
public FunctionCondition() {
super(Function.class);
}
}
private static class ConsumerCondition extends AbstractFunctionCondition {
public ConsumerCondition() {
super(Consumer.class);
}
}
}

View File

@@ -0,0 +1,42 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream;
import java.util.function.Consumer;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import reactor.core.publisher.Flux;
/**
* @author Mark Fisher
*/
public class StreamListeningConsumerInvoker<T> {
private final Consumer<T> consumer;
public StreamListeningConsumerInvoker(Consumer<T> consumer) {
this.consumer = consumer;
}
@StreamListener
public void handle(@Input(Processor.INPUT) Flux<T> input) {
input.subscribe(this.consumer);
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream;
import java.util.function.Supplier;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.support.MessageBuilder;
import reactor.core.publisher.Flux;
/**
* @author Mark Fisher
*/
public class SupplierInvokingMessageProducer<T> extends MessageProducerSupport {
private final Supplier<Flux<T>> supplier;
public SupplierInvokingMessageProducer(Supplier<Flux<T>> supplier) {
this.supplier = supplier;
this.setOutputChannelName(Source.OUTPUT);
}
@Override
protected void doStart() {
this.supplier.get().subscribe(m -> this.sendMessage(MessageBuilder.withPayload(m).build()));
}
}