diff --git a/spring-cloud-function-deployer/pom.xml b/spring-cloud-function-deployer/pom.xml index a3e12d90e..4bffead12 100644 --- a/spring-cloud-function-deployer/pom.xml +++ b/spring-cloud-function-deployer/pom.xml @@ -15,7 +15,7 @@ - 1.0.5.RELEASE + 1.0.6.RELEASE diff --git a/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionAppDeployerTests.java b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionAppDeployerTests.java index 3442616d3..e4a3d476b 100644 --- a/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionAppDeployerTests.java +++ b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionAppDeployerTests.java @@ -76,7 +76,7 @@ public class FunctionAppDeployerTests { @Test public void web() throws Exception { String first = deploy("maven://com.example:function-sample:1.0.0.BUILD-SNAPSHOT", - ""); + "", "--spring.cloud.function.stream.supplier.enabled=false"); // Deployment is blocking so it either failed or succeeded. assertThat(deployer.status(first).getState()).isEqualTo(DeploymentState.deployed); deployer.undeploy(first); @@ -85,7 +85,8 @@ public class FunctionAppDeployerTests { @Test public void stream() throws Exception { String first = deploy("maven://com.example:function-sample:1.0.0.BUILD-SNAPSHOT", - "spring.cloud.deployer.thin.profile=stream"); + "spring.cloud.deployer.thin.profile=stream", + "--spring.cloud.function.stream.supplier.enabled=false", "--debug=true"); // Deployment is blocking so it either failed or succeeded. assertThat(deployer.status(first).getState()).isEqualTo(DeploymentState.deployed); deployer.undeploy(first); diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java index 1b592f599..09c47e9ad 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java @@ -16,21 +16,10 @@ package org.springframework.cloud.function.stream; -import java.lang.annotation.Documented; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; - import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionOutcome; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.autoconfigure.condition.SpringBootCondition; -import org.springframework.boot.bind.RelaxedPropertyResolver; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.function.context.FunctionInspector; import org.springframework.cloud.function.registry.FunctionCatalog; @@ -38,158 +27,40 @@ import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.binder.Binder; import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; import org.springframework.cloud.stream.messaging.Processor; -import org.springframework.cloud.stream.messaging.Sink; -import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.ConditionContext; -import org.springframework.context.annotation.Conditional; -import org.springframework.context.annotation.ConfigurationCondition; import org.springframework.context.annotation.Lazy; -import org.springframework.core.type.AnnotatedTypeMetadata; /** * @author Mark Fisher * @author Marius Bogoevici */ @EnableConfigurationProperties(StreamConfigurationProperties.class) -@ConditionalOnClass({ Binder.class }) +@ConditionalOnClass(Binder.class) +@ConditionalOnBean(FunctionCatalog.class) @ConditionalOnProperty(name = "spring.cloud.stream.enabled", havingValue = "true", matchIfMissing = true) +@EnableBinding(Processor.class) public class StreamConfiguration { - @ConditionalOnSupplier - @EnableBinding(Source.class) - protected static class SupplierConfiguration { + @Autowired + private StreamConfigurationProperties properties; - @Bean - public SupplierInvokingMessageProducer supplierInvoker( - FunctionCatalog registry) { - return new SupplierInvokingMessageProducer(registry); - } + @Bean + // Because of the underlying behaviour of Spring AMQP etc., sinks do not start + // up and fail gracefully if the broker is down. So we need a flag to be able to + // switch this off and stop the app failing on startup. + // TODO: find a slicker way to do it (e.g. backoff if the broker is down) + @ConditionalOnProperty(name = "spring.cloud.function.stream.supplier.enabled", havingValue = "true", matchIfMissing = true) + public SupplierInvokingMessageProducer supplierInvoker( + FunctionCatalog registry) { + return new SupplierInvokingMessageProducer(registry); } - @ConditionalOnFunction - @EnableBinding(Processor.class) - protected static class FunctionConfiguration { - - @Autowired - private StreamConfigurationProperties properties; - - @Bean - public StreamListeningFunctionInvoker functionInvoker(FunctionCatalog registry, - FunctionInspector functionInspector, - @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { - return new StreamListeningFunctionInvoker(registry, functionInspector, - compositeMessageConverterFactory, properties.getEndpoint()); - } + @Bean + public StreamListeningFunctionInvoker functionInvoker(FunctionCatalog registry, + FunctionInspector functionInspector, + @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { + return new StreamListeningFunctionInvoker(registry, functionInspector, + compositeMessageConverterFactory, properties.getEndpoint()); } - @ConditionalOnConsumer - @EnableBinding(Sink.class) - protected static class ConsumerConfiguration { - - @Autowired - private StreamConfigurationProperties properties; - - @Bean - public StreamListeningConsumerInvoker consumerInvoker(FunctionCatalog registry, - FunctionInspector functionInspector, - @Lazy CompositeMessageConverterFactory compositeMessageConverterFactory) { - return new StreamListeningConsumerInvoker(registry, functionInspector, - compositeMessageConverterFactory, properties.getEndpoint()); - } - } - - @Conditional(SupplierCondition.class) - @Target(ElementType.TYPE) - @Retention(RetentionPolicy.RUNTIME) - @Documented - private @interface ConditionalOnSupplier { - } - - @Conditional(FunctionCondition.class) - @Target(ElementType.TYPE) - @Retention(RetentionPolicy.RUNTIME) - @Documented - private @interface ConditionalOnFunction { - } - - @Conditional(ConsumerCondition.class) - @Target(ElementType.TYPE) - @Retention(RetentionPolicy.RUNTIME) - @Documented - private @interface ConditionalOnConsumer { - } - - private static abstract class AbstractFunctionCondition extends SpringBootCondition - implements ConfigurationCondition { - - private final Class type; - - private AbstractFunctionCondition(Class type) { - this.type = type; - } - - @Override - public ConditionOutcome getMatchOutcome(ConditionContext context, - AnnotatedTypeMetadata metadata) { - return getMatchOutcomeForType(this.type, context, metadata); - - } - - protected ConditionOutcome getMatchOutcomeForType(Class type, - ConditionContext context, AnnotatedTypeMetadata metadata) { - if (context.getBeanFactory().getBeanNamesForType(type, false, - false).length > 0) { - String endpoint = new RelaxedPropertyResolver(context.getEnvironment(), - "spring.cloud.function.stream.").getProperty("endpoint"); - if (endpoint != null && !type - .isAssignableFrom(context.getBeanFactory().getType(endpoint))) { - return ConditionOutcome.noMatch(String.format( - "explicit endpoint of type other than %s detected", type)); - } - return ConditionOutcome - .match(String.format("bean of type %s detected", type)); - - } - return ConditionOutcome - .noMatch(String.format("no bean of type %s detected", type)); - - } - - @Override - public ConfigurationPhase getConfigurationPhase() { - return ConfigurationPhase.REGISTER_BEAN; - } - } - - private static class SupplierCondition extends AbstractFunctionCondition { - - public SupplierCondition() { - super(Supplier.class); - } - } - - private static class FunctionCondition extends AbstractFunctionCondition { - - public FunctionCondition() { - super(Function.class); - } - } - - private static class ConsumerCondition extends AbstractFunctionCondition { - - public ConsumerCondition() { - super(Consumer.class); - } - - @Override - public ConditionOutcome getMatchOutcome(ConditionContext context, - AnnotatedTypeMetadata metadata) { - if (getMatchOutcomeForType(Function.class, context, metadata).isMatch()) { - return ConditionOutcome - .noMatch(String.format("bean of type Function detected")); - } - return super.getMatchOutcome(context, metadata); - } - } } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java deleted file mode 100644 index d28a5e586..000000000 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningConsumerInvoker.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright 2017 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.stream; - -import java.util.Set; -import java.util.function.Function; - -import org.springframework.beans.factory.SmartInitializingSingleton; -import org.springframework.cloud.function.context.FunctionInspector; -import org.springframework.cloud.function.registry.FunctionCatalog; -import org.springframework.cloud.stream.annotation.Input; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; -import org.springframework.cloud.stream.messaging.Sink; -import org.springframework.messaging.Message; -import org.springframework.messaging.converter.MessageConverter; - -import reactor.core.publisher.Flux; - -/** - * @author Mark Fisher - * @author Marius Bogoevici - */ -public class StreamListeningConsumerInvoker implements SmartInitializingSingleton { - - private final FunctionInspector functionInspector; - - private final CompositeMessageConverterFactory converterFactory; - - private MessageConverter converter; - - private final FunctionCatalog functionCatalog; - - private final String defaultEndpoint; - - private static final String NOENDPOINT = "__NOENDPOINT__"; - - public StreamListeningConsumerInvoker(FunctionCatalog functionCatalog, - FunctionInspector functionInspector, - CompositeMessageConverterFactory converterFactory, String defaultEndpoint) { - this.functionCatalog = functionCatalog; - this.functionInspector = functionInspector; - this.converterFactory = converterFactory; - this.defaultEndpoint = defaultEndpoint; - } - - @Override - public void afterSingletonsInstantiated() { - this.converter = this.converterFactory.getMessageConverterForAllRegistered(); - } - - @StreamListener - public void handle(@Input(Sink.INPUT) Flux> input) { - input.groupBy(this::select) - .filter(group -> functionCatalog.lookupConsumer(group.key()) != null) - .subscribe(group -> process(group.key(), group)); - } - - private void process(String name, Flux> flux) { - functionCatalog.lookupConsumer(name) - .accept(flux.map(message -> convertInput(name).apply(message))); - } - - private String select(Message input) { - String name = defaultEndpoint; - if (name == null) { - Set names = functionCatalog.getConsumerNames(); - if (names.size() == 1) { - name = names.iterator().next(); - } - else { - if (input.getHeaders() - .containsKey(StreamConfigurationProperties.ROUTE_KEY)) { - String key = (String) input.getHeaders() - .get(StreamConfigurationProperties.ROUTE_KEY); - if (functionCatalog.lookupConsumer(key) != null) { - return key; - } - } - else { - for (String candidate : names) { - Class inputType = functionInspector - .getInputType(functionCatalog.lookupConsumer(candidate)); - Object value = this.converter.fromMessage(input, inputType); - if (value != null && inputType.isInstance(value)) { - name = candidate; - break; - } - } - } - } - } - if (name == null) { - return NOENDPOINT; - } - return name; - } - - private Function, Object> convertInput(String name) { - Class inputType = functionInspector - .getInputType(functionCatalog.lookupConsumer(name)); - return m -> { - if (Message.class.isAssignableFrom(inputType)) { - return m; - } - else if (inputType.isAssignableFrom(m.getPayload().getClass())) { - return m.getPayload(); - } - else { - return this.converter.fromMessage(m, inputType); - } - }; - } -} diff --git a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionConversionTests.java b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionConversionTests.java index 36a8e458a..63fe3ca8e 100644 --- a/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionConversionTests.java +++ b/spring-cloud-function-stream/src/test/java/org/springframework/cloud/function/stream/function/FluxPojoStreamingFunctionConversionTests.java @@ -43,7 +43,7 @@ import reactor.core.publisher.Flux; * @author Marius Bogoevici */ @RunWith(SpringRunner.class) -@SpringBootTest(classes = FluxPojoStreamingFunctionConversionTests.StreamingFunctionApplication.class) +@SpringBootTest(classes = FluxPojoStreamingFunctionConversionTests.StreamingFunctionApplication.class, properties = "logging.level.org.springframework.integration=DEBUG") public class FluxPojoStreamingFunctionConversionTests { @Autowired