Remove custom conditions from spring-cloud-function-stream

It didn't really make any sense to have custom conditions that
depend on the presence or absence of beans of type Function,
Supplier, Consumer because the actual endpoints are derived
from the FunctionCatalog (which might not be based on
bean definitions). This approach is far simpler, and
reduces the amount of custom code in the stream binder.

The spring.cloud.function.stream.supplier.enabled flag
is awkward, so we should try and find a way to avoid that.
There's also no reason it should need to be set in the
deployer tests.
This commit is contained in:
Dave Syer
2017-08-08 09:27:11 +01:00
parent 3c412e04f5
commit 5622a9e2cb
5 changed files with 26 additions and 282 deletions

View File

@@ -15,7 +15,7 @@
</parent>
<properties>
<spring-cloud-deployer-thin.version>1.0.5.RELEASE</spring-cloud-deployer-thin.version>
<spring-cloud-deployer-thin.version>1.0.6.RELEASE</spring-cloud-deployer-thin.version>
</properties>
<dependencies>

View File

@@ -76,7 +76,7 @@ public class FunctionAppDeployerTests {
@Test
public void web() throws Exception {
String first = deploy("maven://com.example:function-sample:1.0.0.BUILD-SNAPSHOT",
"");
"", "--spring.cloud.function.stream.supplier.enabled=false");
// Deployment is blocking so it either failed or succeeded.
assertThat(deployer.status(first).getState()).isEqualTo(DeploymentState.deployed);
deployer.undeploy(first);
@@ -85,7 +85,8 @@ public class FunctionAppDeployerTests {
@Test
public void stream() throws Exception {
String first = deploy("maven://com.example:function-sample:1.0.0.BUILD-SNAPSHOT",
"spring.cloud.deployer.thin.profile=stream");
"spring.cloud.deployer.thin.profile=stream",
"--spring.cloud.function.stream.supplier.enabled=false", "--debug=true");
// Deployment is blocking so it either failed or succeeded.
assertThat(deployer.status(first).getState()).isEqualTo(DeploymentState.deployed);
deployer.undeploy(first);

View File

@@ -16,21 +16,10 @@
package org.springframework.cloud.function.stream;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
import org.springframework.boot.bind.RelaxedPropertyResolver;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.registry.FunctionCatalog;
@@ -38,158 +27,40 @@ import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.ConfigurationCondition;
import org.springframework.context.annotation.Lazy;
import org.springframework.core.type.AnnotatedTypeMetadata;
/**
* @author Mark Fisher
* @author Marius Bogoevici
*/
@EnableConfigurationProperties(StreamConfigurationProperties.class)
@ConditionalOnClass({ Binder.class })
@ConditionalOnClass(Binder.class)
@ConditionalOnBean(FunctionCatalog.class)
@ConditionalOnProperty(name = "spring.cloud.stream.enabled", havingValue = "true", matchIfMissing = true)
@EnableBinding(Processor.class)
public class StreamConfiguration {
@ConditionalOnSupplier
@EnableBinding(Source.class)
protected static class SupplierConfiguration {
@Autowired
private StreamConfigurationProperties properties;
@Bean
public SupplierInvokingMessageProducer<Object> supplierInvoker(
FunctionCatalog registry) {
return new SupplierInvokingMessageProducer<Object>(registry);
}
@Bean
// Because of the underlying behaviour of Spring AMQP etc., sinks do not start
// up and fail gracefully if the broker is down. So we need a flag to be able to
// switch this off and stop the app failing on startup.
// TODO: find a slicker way to do it (e.g. backoff if the broker is down)
@ConditionalOnProperty(name = "spring.cloud.function.stream.supplier.enabled", havingValue = "true", matchIfMissing = true)
public SupplierInvokingMessageProducer<Object> supplierInvoker(
FunctionCatalog registry) {
return new SupplierInvokingMessageProducer<Object>(registry);
}
@ConditionalOnFunction
@EnableBinding(Processor.class)
protected static class FunctionConfiguration {
@Autowired
private StreamConfigurationProperties properties;
@Bean
public StreamListeningFunctionInvoker functionInvoker(FunctionCatalog registry,
FunctionInspector functionInspector,
@Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) {
return new StreamListeningFunctionInvoker(registry, functionInspector,
compositeMessageConverterFactory, properties.getEndpoint());
}
@Bean
public StreamListeningFunctionInvoker functionInvoker(FunctionCatalog registry,
FunctionInspector functionInspector,
@Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) {
return new StreamListeningFunctionInvoker(registry, functionInspector,
compositeMessageConverterFactory, properties.getEndpoint());
}
@ConditionalOnConsumer
@EnableBinding(Sink.class)
protected static class ConsumerConfiguration {
@Autowired
private StreamConfigurationProperties properties;
@Bean
public StreamListeningConsumerInvoker consumerInvoker(FunctionCatalog registry,
FunctionInspector functionInspector,
@Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) {
return new StreamListeningConsumerInvoker(registry, functionInspector,
compositeMessageConverterFactory, properties.getEndpoint());
}
}
@Conditional(SupplierCondition.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
private @interface ConditionalOnSupplier {
}
@Conditional(FunctionCondition.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
private @interface ConditionalOnFunction {
}
@Conditional(ConsumerCondition.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
private @interface ConditionalOnConsumer {
}
private static abstract class AbstractFunctionCondition extends SpringBootCondition
implements ConfigurationCondition {
private final Class<?> type;
private AbstractFunctionCondition(Class<?> type) {
this.type = type;
}
@Override
public ConditionOutcome getMatchOutcome(ConditionContext context,
AnnotatedTypeMetadata metadata) {
return getMatchOutcomeForType(this.type, context, metadata);
}
protected ConditionOutcome getMatchOutcomeForType(Class<?> type,
ConditionContext context, AnnotatedTypeMetadata metadata) {
if (context.getBeanFactory().getBeanNamesForType(type, false,
false).length > 0) {
String endpoint = new RelaxedPropertyResolver(context.getEnvironment(),
"spring.cloud.function.stream.").getProperty("endpoint");
if (endpoint != null && !type
.isAssignableFrom(context.getBeanFactory().getType(endpoint))) {
return ConditionOutcome.noMatch(String.format(
"explicit endpoint of type other than %s detected", type));
}
return ConditionOutcome
.match(String.format("bean of type %s detected", type));
}
return ConditionOutcome
.noMatch(String.format("no bean of type %s detected", type));
}
@Override
public ConfigurationPhase getConfigurationPhase() {
return ConfigurationPhase.REGISTER_BEAN;
}
}
private static class SupplierCondition extends AbstractFunctionCondition {
public SupplierCondition() {
super(Supplier.class);
}
}
private static class FunctionCondition extends AbstractFunctionCondition {
public FunctionCondition() {
super(Function.class);
}
}
private static class ConsumerCondition extends AbstractFunctionCondition {
public ConsumerCondition() {
super(Consumer.class);
}
@Override
public ConditionOutcome getMatchOutcome(ConditionContext context,
AnnotatedTypeMetadata metadata) {
if (getMatchOutcomeForType(Function.class, context, metadata).isMatch()) {
return ConditionOutcome
.noMatch(String.format("bean of type Function detected"));
}
return super.getMatchOutcome(context, metadata);
}
}
}

View File

@@ -1,128 +0,0 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.stream;
import java.util.Set;
import java.util.function.Function;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.FunctionInspector;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import reactor.core.publisher.Flux;
/**
* @author Mark Fisher
* @author Marius Bogoevici
*/
public class StreamListeningConsumerInvoker implements SmartInitializingSingleton {
private final FunctionInspector functionInspector;
private final CompositeMessageConverterFactory converterFactory;
private MessageConverter converter;
private final FunctionCatalog functionCatalog;
private final String defaultEndpoint;
private static final String NOENDPOINT = "__NOENDPOINT__";
public StreamListeningConsumerInvoker(FunctionCatalog functionCatalog,
FunctionInspector functionInspector,
CompositeMessageConverterFactory converterFactory, String defaultEndpoint) {
this.functionCatalog = functionCatalog;
this.functionInspector = functionInspector;
this.converterFactory = converterFactory;
this.defaultEndpoint = defaultEndpoint;
}
@Override
public void afterSingletonsInstantiated() {
this.converter = this.converterFactory.getMessageConverterForAllRegistered();
}
@StreamListener
public void handle(@Input(Sink.INPUT) Flux<Message<?>> input) {
input.groupBy(this::select)
.filter(group -> functionCatalog.lookupConsumer(group.key()) != null)
.subscribe(group -> process(group.key(), group));
}
private void process(String name, Flux<Message<?>> flux) {
functionCatalog.lookupConsumer(name)
.accept(flux.map(message -> convertInput(name).apply(message)));
}
private String select(Message<?> input) {
String name = defaultEndpoint;
if (name == null) {
Set<String> names = functionCatalog.getConsumerNames();
if (names.size() == 1) {
name = names.iterator().next();
}
else {
if (input.getHeaders()
.containsKey(StreamConfigurationProperties.ROUTE_KEY)) {
String key = (String) input.getHeaders()
.get(StreamConfigurationProperties.ROUTE_KEY);
if (functionCatalog.lookupConsumer(key) != null) {
return key;
}
}
else {
for (String candidate : names) {
Class<?> inputType = functionInspector
.getInputType(functionCatalog.lookupConsumer(candidate));
Object value = this.converter.fromMessage(input, inputType);
if (value != null && inputType.isInstance(value)) {
name = candidate;
break;
}
}
}
}
}
if (name == null) {
return NOENDPOINT;
}
return name;
}
private Function<Message<?>, Object> convertInput(String name) {
Class<?> inputType = functionInspector
.getInputType(functionCatalog.lookupConsumer(name));
return m -> {
if (Message.class.isAssignableFrom(inputType)) {
return m;
}
else if (inputType.isAssignableFrom(m.getPayload().getClass())) {
return m.getPayload();
}
else {
return this.converter.fromMessage(m, inputType);
}
};
}
}

View File

@@ -43,7 +43,7 @@ import reactor.core.publisher.Flux;
* @author Marius Bogoevici
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = FluxPojoStreamingFunctionConversionTests.StreamingFunctionApplication.class)
@SpringBootTest(classes = FluxPojoStreamingFunctionConversionTests.StreamingFunctionApplication.class, properties = "logging.level.org.springframework.integration=DEBUG")
public class FluxPojoStreamingFunctionConversionTests {
@Autowired