From 5622a9e2cb337b43cac1f5e5af0818fa4e13ce4c Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Tue, 8 Aug 2017 09:27:11 +0100 Subject: [PATCH] Remove custom conditions from spring-cloud-function-stream It didn't really make any sense to have custom conditions that depend on the presence or absence of beans of type Function, Supplier, Consumer because the actual endpoints are derived from the FunctionCatalog (which might not be based on bean definitions). This approach is far simpler, and reduces the amount of custom code in the stream binder. The spring.cloud.function.stream.supplier.enabled flag is awkward, so we should try and find a way to avoid that. There's also no reason it should need to be set in the deployer tests. --- spring-cloud-function-deployer/pom.xml | 2 +- .../deployer/FunctionAppDeployerTests.java | 5 +- .../function/stream/StreamConfiguration.java | 171 +++--------------- .../StreamListeningConsumerInvoker.java | 128 ------------- ...xPojoStreamingFunctionConversionTests.java | 2 +- 5 files changed, 26 insertions(+), 282 deletions(-) delete mode 100644 spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java diff --git a/spring-cloud-function-deployer/pom.xml b/spring-cloud-function-deployer/pom.xml index a3e12d90e..4bffead12 100644 --- a/spring-cloud-function-deployer/pom.xml +++ b/spring-cloud-function-deployer/pom.xml @@ -15,7 +15,7 @@ - 1.0.5.RELEASE + 1.0.6.RELEASE diff --git a/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionAppDeployerTests.java b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionAppDeployerTests.java index 3442616d3..e4a3d476b 100644 --- a/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionAppDeployerTests.java +++ b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionAppDeployerTests.java @@ -76,7 +76,7 @@ public class FunctionAppDeployerTests { @Test public void web() throws Exception { String first = deploy("maven://com.example:function-sample:1.0.0.BUILD-SNAPSHOT", - ""); + "", "--spring.cloud.function.stream.supplier.enabled=false"); // Deployment is blocking so it either failed or succeeded. assertThat(deployer.status(first).getState()).isEqualTo(DeploymentState.deployed); deployer.undeploy(first); @@ -85,7 +85,8 @@ public class FunctionAppDeployerTests { @Test public void stream() throws Exception { String first = deploy("maven://com.example:function-sample:1.0.0.BUILD-SNAPSHOT", - "spring.cloud.deployer.thin.profile=stream"); + "spring.cloud.deployer.thin.profile=stream", + "--spring.cloud.function.stream.supplier.enabled=false", "--debug=true"); // Deployment is blocking so it either failed or succeeded. assertThat(deployer.status(first).getState()).isEqualTo(DeploymentState.deployed); deployer.undeploy(first); diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java index 1b592f599..09c47e9ad 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java @@ -16,21 +16,10 @@ package org.springframework.cloud.function.stream; -import java.lang.annotation.Documented; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; - import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionOutcome; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.autoconfigure.condition.SpringBootCondition; -import org.springframework.boot.bind.RelaxedPropertyResolver; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.function.context.FunctionInspector; import org.springframework.cloud.function.registry.FunctionCatalog; @@ -38,158 +27,40 @@ import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.binder.Binder; import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; import org.springframework.cloud.stream.messaging.Processor; -import org.springframework.cloud.stream.messaging.Sink; -import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ConditionContext; -import org.springframework.context.annotation.Conditional; -import org.springframework.context.annotation.ConfigurationCondition; import org.springframework.context.annotation.Lazy; -import org.springframework.core.type.AnnotatedTypeMetadata; /** * @author Mark Fisher * @author Marius Bogoevici */ @EnableConfigurationProperties(StreamConfigurationProperties.class) -@ConditionalOnClass({ Binder.class }) +@ConditionalOnClass(Binder.class) +@ConditionalOnBean(FunctionCatalog.class) @ConditionalOnProperty(name = "spring.cloud.stream.enabled", havingValue = "true", matchIfMissing = true) +@EnableBinding(Processor.class) public class StreamConfiguration { - @ConditionalOnSupplier - @EnableBinding(Source.class) - protected static class SupplierConfiguration { + @Autowired + private StreamConfigurationProperties properties; - @Bean - public SupplierInvokingMessageProducer supplierInvoker( - FunctionCatalog registry) { - return new SupplierInvokingMessageProducer(registry); - } + @Bean + // Because of the underlying behaviour of Spring AMQP etc., sinks do not start + // up and fail gracefully if the broker is down. So we need a flag to be able to + // switch this off and stop the app failing on startup. + // TODO: find a slicker way to do it (e.g. backoff if the broker is down) + @ConditionalOnProperty(name = "spring.cloud.function.stream.supplier.enabled", havingValue = "true", matchIfMissing = true) + public SupplierInvokingMessageProducer supplierInvoker( + FunctionCatalog registry) { + return new SupplierInvokingMessageProducer(registry); } - @ConditionalOnFunction - @EnableBinding(Processor.class) - protected static class FunctionConfiguration { - - @Autowired - private StreamConfigurationProperties properties; - - @Bean - public StreamListeningFunctionInvoker functionInvoker(FunctionCatalog registry, - FunctionInspector functionInspector, - @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { - return new StreamListeningFunctionInvoker(registry, functionInspector, - compositeMessageConverterFactory, properties.getEndpoint()); - } + @Bean + public StreamListeningFunctionInvoker functionInvoker(FunctionCatalog registry, + FunctionInspector functionInspector, + @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { + return new StreamListeningFunctionInvoker(registry, functionInspector, + compositeMessageConverterFactory, properties.getEndpoint()); } - @ConditionalOnConsumer - @EnableBinding(Sink.class) - protected static class ConsumerConfiguration { - - @Autowired - private StreamConfigurationProperties properties; - - @Bean - public StreamListeningConsumerInvoker consumerInvoker(FunctionCatalog registry, - FunctionInspector functionInspector, - @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { - return new StreamListeningConsumerInvoker(registry, functionInspector, - compositeMessageConverterFactory, properties.getEndpoint()); - } - } - - @Conditional(SupplierCondition.class) - @Target(ElementType.TYPE) - @Retention(RetentionPolicy.RUNTIME) - @Documented - private @interface ConditionalOnSupplier { - } - - @Conditional(FunctionCondition.class) - @Target(ElementType.TYPE) - @Retention(RetentionPolicy.RUNTIME) - @Documented - private @interface ConditionalOnFunction { - } - - @Conditional(ConsumerCondition.class) - @Target(ElementType.TYPE) - @Retention(RetentionPolicy.RUNTIME) - @Documented - private @interface ConditionalOnConsumer { - } - - private static abstract class AbstractFunctionCondition extends SpringBootCondition - implements ConfigurationCondition { - - private final Class type; - - private AbstractFunctionCondition(Class type) { - this.type = type; - } - - @Override - public ConditionOutcome getMatchOutcome(ConditionContext context, - AnnotatedTypeMetadata metadata) { - return getMatchOutcomeForType(this.type, context, metadata); - - } - - protected ConditionOutcome getMatchOutcomeForType(Class type, - ConditionContext context, AnnotatedTypeMetadata metadata) { - if (context.getBeanFactory().getBeanNamesForType(type, false, - false).length > 0) { - String endpoint = new RelaxedPropertyResolver(context.getEnvironment(), - "spring.cloud.function.stream.").getProperty("endpoint"); - if (endpoint != null && !type - .isAssignableFrom(context.getBeanFactory().getType(endpoint))) { - return ConditionOutcome.noMatch(String.format( - "explicit endpoint of type other than %s detected", type)); - } - return ConditionOutcome - .match(String.format("bean of type %s detected", type)); - - } - return ConditionOutcome - .noMatch(String.format("no bean of type %s detected", type)); - - } - - @Override - public ConfigurationPhase getConfigurationPhase() { - return ConfigurationPhase.REGISTER_BEAN; - } - } - - private static class SupplierCondition extends AbstractFunctionCondition { - - public SupplierCondition() { - super(Supplier.class); - } - } - - private static class FunctionCondition extends AbstractFunctionCondition { - - public FunctionCondition() { - super(Function.class); - } - } - - private static class ConsumerCondition extends AbstractFunctionCondition { - - public ConsumerCondition() { - super(Consumer.class); - } - - @Override - public ConditionOutcome getMatchOutcome(ConditionContext context, - AnnotatedTypeMetadata metadata) { - if (getMatchOutcomeForType(Function.class, context, metadata).isMatch()) { - return ConditionOutcome - .noMatch(String.format("bean of type Function detected")); - } - return super.getMatchOutcome(context, metadata); - } - } } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java deleted file mode 100644 index d28a5e586..000000000 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright 2017 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.stream; - -import java.util.Set; -import java.util.function.Function; - -import org.springframework.beans.factory.SmartInitializingSingleton; -import org.springframework.cloud.function.context.FunctionInspector; -import org.springframework.cloud.function.registry.FunctionCatalog; -import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; -import org.springframework.cloud.stream.messaging.Sink; -import org.springframework.messaging.Message; -import org.springframework.messaging.converter.MessageConverter; - -import reactor.core.publisher.Flux; - -/** - * @author Mark Fisher - * @author Marius Bogoevici - */ -public class StreamListeningConsumerInvoker implements SmartInitializingSingleton { - - private final FunctionInspector functionInspector; - - private final CompositeMessageConverterFactory converterFactory; - - private MessageConverter converter; - - private final FunctionCatalog functionCatalog; - - private final String defaultEndpoint; - - private static final String NOENDPOINT = "__NOENDPOINT__"; - - public StreamListeningConsumerInvoker(FunctionCatalog functionCatalog, - FunctionInspector functionInspector, - CompositeMessageConverterFactory converterFactory, String defaultEndpoint) { - this.functionCatalog = functionCatalog; - this.functionInspector = functionInspector; - this.converterFactory = converterFactory; - this.defaultEndpoint = defaultEndpoint; - } - - @Override - public void afterSingletonsInstantiated() { - this.converter = this.converterFactory.getMessageConverterForAllRegistered(); - } - - @StreamListener - public void handle(@Input(Sink.INPUT) Flux> input) { - input.groupBy(this::select) - .filter(group -> functionCatalog.lookupConsumer(group.key()) != null) - .subscribe(group -> process(group.key(), group)); - } - - private void process(String name, Flux> flux) { - functionCatalog.lookupConsumer(name) - .accept(flux.map(message -> convertInput(name).apply(message))); - } - - private String select(Message input) { - String name = defaultEndpoint; - if (name == null) { - Set names = functionCatalog.getConsumerNames(); - if (names.size() == 1) { - name = names.iterator().next(); - } - else { - if (input.getHeaders() - .containsKey(StreamConfigurationProperties.ROUTE_KEY)) { - String key = (String) input.getHeaders() - .get(StreamConfigurationProperties.ROUTE_KEY); - if (functionCatalog.lookupConsumer(key) != null) { - return key; - } - } - else { - for (String candidate : names) { - Class inputType = functionInspector - .getInputType(functionCatalog.lookupConsumer(candidate)); - Object value = this.converter.fromMessage(input, inputType); - if (value != null && inputType.isInstance(value)) { - name = candidate; - break; - } - } - } - } - } - if (name == null) { - return NOENDPOINT; - } - return name; - } - - private Function, Object> convertInput(String name) { - Class inputType = functionInspector - .getInputType(functionCatalog.lookupConsumer(name)); - return m -> { - if (Message.class.isAssignableFrom(inputType)) { - return m; - } - else if (inputType.isAssignableFrom(m.getPayload().getClass())) { - return m.getPayload(); - } - else { - return this.converter.fromMessage(m, inputType); - } - }; - } -} diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionConversionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionConversionTests.java index 36a8e458a..63fe3ca8e 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionConversionTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionConversionTests.java @@ -43,7 +43,7 @@ import reactor.core.publisher.Flux; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = FluxPojoStreamingFunctionConversionTests.StreamingFunctionApplication.class) +@SpringBootTest(classes = FluxPojoStreamingFunctionConversionTests.StreamingFunctionApplication.class, properties = "logging.level.org.springframework.integration=DEBUG") public class FluxPojoStreamingFunctionConversionTests { @Autowired