From c3246f32571b4e64065d7fd489ea70909d2be272 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Mon, 22 May 2017 15:59:40 -0400 Subject: [PATCH] GH-722: Add support for Reactive Sources Fixes spring-cloud/spring-cloud-stream#722 Addressing PR review comments Further addressing PR review comments Degenericizing `StreamListenerResultAdapter` with the 3rd generic argument added for returning `Disposable` `StreamListenerResultAdapter` adapt method now returns `java.io.Closeable` Addressing PR review comments Addressing PR review comments Converting `FluxToMessageChannelResultAdapter` to `PublisherToMessageChannelResultAdapter` Add `logback.xml` for reactive tests * Revert `AopUtils.getTargetClass(bean)` to really deal with target classes only. Although it does not make any effect on the `@Configuration` classes * Use `AnnotatedElementUtils.isAnnotated()` instead of `getMergedAnnotation()` to avoid synthetic methods in the `@Configuration` classes * Add artificial `.transform()` to the `IntegrationFlow` test to overcome SI Java DSL bug * Apply similar `.map()` for `Flux` tests configs for proper assertions * Simple code style polishing --- spring-cloud-stream-reactive/pom.xml | 5 + .../stream/reactive/DefaultFluxSender.java | 73 +++++ .../cloud/stream/reactive/FluxSender.java | 13 +- ...geChannelToFluxSenderParameterAdapter.java | 28 +- ...servableToMessageChannelResultAdapter.java | 16 +- ...blisherToMessageChannelResultAdapter.java} | 34 +- .../ReactiveSupportAutoConfiguration.java | 15 +- .../cloud/stream/reactive/StreamEmitter.java | 94 ++++++ ...eamEmitterAnnotationBeanPostProcessor.java | 282 ++++++++++++++++ .../reactive/StreamEmitterErrorMessages.java | 50 +++ .../reactive/StreamEmitterBasicTests.java | 308 ++++++++++++++++++ .../StreamEmitterValidationTests.java | 307 +++++++++++++++++ .../src/test/resources/logback.xml | 10 + ...ageChannelStreamListenerResultAdapter.java | 21 +- .../StreamAnnotationCommonMethodUtils.java | 63 ++++ .../StreamAnnotationErrorMessages.java | 32 ++ .../binding/StreamListenerResultAdapter.java | 4 +- 17 files changed, 1307 insertions(+), 48 deletions(-) create mode 100644 spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/DefaultFluxSender.java rename spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/{FluxToMessageChannelResultAdapter.java => PublisherToMessageChannelResultAdapter.java} (58%) create mode 100644 spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitter.java create mode 100644 spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitterAnnotationBeanPostProcessor.java create mode 100644 spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitterErrorMessages.java create mode 100644 spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterBasicTests.java create mode 100644 spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterValidationTests.java create mode 100644 spring-cloud-stream-reactive/src/test/resources/logback.xml create mode 100644 spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamAnnotationCommonMethodUtils.java create mode 100644 spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamAnnotationErrorMessages.java diff --git a/spring-cloud-stream-reactive/pom.xml b/spring-cloud-stream-reactive/pom.xml index 1385ecb2e..cdadda4e6 100644 --- a/spring-cloud-stream-reactive/pom.xml +++ b/spring-cloud-stream-reactive/pom.xml @@ -47,6 +47,11 @@ spring-cloud-stream-test-support-internal test + + org.springframework.integration + spring-integration-java-dsl + test + diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/DefaultFluxSender.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/DefaultFluxSender.java new file mode 100644 index 000000000..2eb453c07 --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/DefaultFluxSender.java @@ -0,0 +1,73 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import java.util.function.Consumer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; + +import org.springframework.util.Assert; + +/** + * Default {@link org.springframework.cloud.stream.reactive.FluxSender} implementation. + * This implementation may be used for cancelling a subscription on the underlying + * {@link reactor.core.publisher.Flux}. + * + * @author Soby Chacko + * + * @since 1.3.0 + */ +class DefaultFluxSender implements FluxSender { + + private Log log = LogFactory.getLog(DefaultFluxSender.class); + + private volatile Disposable disposable; + + private final Consumer consumer; + + DefaultFluxSender(Consumer consumer) { + Assert.notNull(consumer, "Consumer must not be null"); + this.consumer = consumer; + } + + @Override + public Mono send(Flux flux) { + MonoProcessor sendResult = MonoProcessor.create(); + // add error handling and reconnect in the event of an error + this.disposable = flux + .doOnError(e -> this.log.error("Error during processing: ", e)) + .retry() + .subscribe( + this.consumer, + sendResult::onError, + sendResult::onComplete); + return sendResult; + } + + @Override + public void close() { + if (this.disposable != null) { + this.disposable.dispose(); + } + } + +} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/FluxSender.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/FluxSender.java index d171b15b7..d70052ab0 100644 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/FluxSender.java +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/FluxSender.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,15 +16,21 @@ package org.springframework.cloud.stream.reactive; +import java.io.Closeable; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Used for {@link org.springframework.cloud.stream.annotation.StreamListener} arguments + * Used for {@link org.springframework.cloud.stream.annotation.StreamListener} + * and {@link org.springframework.cloud.stream.reactive.StreamEmitter} arguments * annotated with {@link org.springframework.cloud.stream.annotation.Output}. + * * @author Marius Bogoevici + * + * @see reactor.core.Disposable */ -public interface FluxSender { +public interface FluxSender extends Closeable { /** * Streams the {@link reactor.core.publisher.Flux} through the binding target @@ -35,4 +41,5 @@ public interface FluxSender { * error) */ Mono send(Flux flux); + } diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToFluxSenderParameterAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToFluxSenderParameterAdapter.java index 9467cddf7..c71303127 100644 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToFluxSenderParameterAdapter.java +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToFluxSenderParameterAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,10 +16,6 @@ package org.springframework.cloud.stream.reactive; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import reactor.core.publisher.MonoProcessor; - import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter; import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; @@ -30,13 +26,13 @@ import org.springframework.messaging.MessageChannel; /** * Adapts an {@link org.springframework.cloud.stream.annotation.Output} annotated * {@link FluxSender} to an outbound {@link MessageChannel}. + * * @author Marius Bogoevici + * @author Soby Chacko */ public class MessageChannelToFluxSenderParameterAdapter implements StreamListenerParameterAdapter { - private Log log = LogFactory.getLog(MessageChannelToFluxSenderParameterAdapter.class); - @Override public boolean supports(Class bindingTargetType, MethodParameter methodParameter) { ResolvableType type = ResolvableType.forMethodParameter(methodParameter); @@ -46,18 +42,10 @@ public class MessageChannelToFluxSenderParameterAdapter @Override public FluxSender adapt(MessageChannel bindingTarget, MethodParameter parameter) { - return resultPublisher -> { - MonoProcessor sendResult = MonoProcessor.create(); - // add error handling and reconnect in the event of an error - resultPublisher - .doOnError(e -> this.log.error("Error during processing: ", e)) - .retry() - .subscribe( - result -> bindingTarget.send(result instanceof Message ? (Message) result - : MessageBuilder.withPayload(result).build()), - e -> sendResult.onError(e), - () -> sendResult.onComplete()); - return sendResult; - }; + return new DefaultFluxSender(result -> + bindingTarget.send(result instanceof Message + ? (Message) result + : MessageBuilder.withPayload(result).build())); } + } diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableToMessageChannelResultAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableToMessageChannelResultAdapter.java index 85dde03af..c9507e2fd 100644 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableToMessageChannelResultAdapter.java +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableToMessageChannelResultAdapter.java @@ -16,8 +16,9 @@ package org.springframework.cloud.stream.reactive; +import java.io.Closeable; + import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; import rx.Observable; import rx.RxReactiveStreams; @@ -34,12 +35,12 @@ import org.springframework.util.Assert; public class ObservableToMessageChannelResultAdapter implements StreamListenerResultAdapter, MessageChannel> { - private FluxToMessageChannelResultAdapter fluxToMessageChannelResultAdapter; + private PublisherToMessageChannelResultAdapter publisherToMessageChannelResultAdapter; public ObservableToMessageChannelResultAdapter( - FluxToMessageChannelResultAdapter fluxToMessageChannelResultAdapter) { - Assert.notNull(fluxToMessageChannelResultAdapter, "cannot be null"); - this.fluxToMessageChannelResultAdapter = fluxToMessageChannelResultAdapter; + PublisherToMessageChannelResultAdapter publisherToMessageChannelResultAdapter) { + Assert.notNull(publisherToMessageChannelResultAdapter, "cannot be null"); + this.publisherToMessageChannelResultAdapter = publisherToMessageChannelResultAdapter; } @Override @@ -48,8 +49,9 @@ public class ObservableToMessageChannelResultAdapter && MessageChannel.class.isAssignableFrom(bindingTarget); } - public void adapt(Observable streamListenerResult, MessageChannel bindingTarget) { + public Closeable adapt(Observable streamListenerResult, MessageChannel bindingTarget) { Publisher adaptedPublisher = RxReactiveStreams.toPublisher(streamListenerResult); - this.fluxToMessageChannelResultAdapter.adapt(Flux.from(adaptedPublisher), bindingTarget); + return this.publisherToMessageChannelResultAdapter.adapt(adaptedPublisher, bindingTarget); } + } diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/FluxToMessageChannelResultAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/PublisherToMessageChannelResultAdapter.java similarity index 58% rename from spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/FluxToMessageChannelResultAdapter.java rename to spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/PublisherToMessageChannelResultAdapter.java index 61d1652d9..9f143346d 100644 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/FluxToMessageChannelResultAdapter.java +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/PublisherToMessageChannelResultAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,13 @@ package org.springframework.cloud.stream.reactive; +import java.io.Closeable; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; + +import reactor.core.Disposable; import reactor.core.publisher.Flux; import org.springframework.cloud.stream.binding.StreamListenerResultAdapter; @@ -27,25 +32,34 @@ import org.springframework.messaging.MessageChannel; /** * A {@link org.springframework.cloud.stream.binding.StreamListenerResultAdapter} from a - * {@link Flux} return type to a bound {@link MessageChannel}. + * {@link Publisher} return type to a bound {@link MessageChannel}. + * * @author Marius Bogoevici + * @author Soby Chacko + * */ -public class FluxToMessageChannelResultAdapter - implements StreamListenerResultAdapter, MessageChannel> { +public class PublisherToMessageChannelResultAdapter + implements StreamListenerResultAdapter, MessageChannel> { - private Log log = LogFactory.getLog(FluxToMessageChannelResultAdapter.class); + private Log log = LogFactory.getLog(PublisherToMessageChannelResultAdapter.class); @Override public boolean supports(Class resultType, Class bindingTarget) { - return Flux.class.isAssignableFrom(resultType) && MessageChannel.class.isAssignableFrom(bindingTarget); + return Publisher.class.isAssignableFrom(resultType) + && MessageChannel.class.isAssignableFrom(bindingTarget); } - public void adapt(Flux streamListenerResult, MessageChannel bindingTarget) { - streamListenerResult + public Closeable adapt(Publisher streamListenerResult, MessageChannel bindingTarget) { + Disposable disposable = Flux.from(streamListenerResult) .doOnError(e -> this.log.error("Error while processing result", e)) .retry() .subscribe( - result -> bindingTarget.send(result instanceof Message ? (Message) result - : MessageBuilder.withPayload(result).build())); + result -> + bindingTarget.send(result instanceof Message + ? (Message) result + : MessageBuilder.withPayload(result).build())); + + return disposable::dispose; } + } diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java index 89f748dd5..92f1b5835 100644 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java @@ -46,9 +46,14 @@ public class ReactiveSupportAutoConfiguration { } @Bean - @ConditionalOnMissingBean(FluxToMessageChannelResultAdapter.class) - public FluxToMessageChannelResultAdapter fluxToMessageChannelResultAdapter() { - return new FluxToMessageChannelResultAdapter(); + @ConditionalOnMissingBean(PublisherToMessageChannelResultAdapter.class) + public PublisherToMessageChannelResultAdapter fluxToMessageChannelResultAdapter() { + return new PublisherToMessageChannelResultAdapter(); + } + + @Bean + public static StreamEmitterAnnotationBeanPostProcessor streamEmitterAnnotationBeanPostProcessor() { + return new StreamEmitterAnnotationBeanPostProcessor(); } @Configuration @@ -72,8 +77,8 @@ public class ReactiveSupportAutoConfiguration { @Bean @ConditionalOnMissingBean(ObservableToMessageChannelResultAdapter.class) public ObservableToMessageChannelResultAdapter observableToMessageChannelResultAdapter( - FluxToMessageChannelResultAdapter fluxToMessageChannelResultAdapter) { - return new ObservableToMessageChannelResultAdapter(fluxToMessageChannelResultAdapter); + PublisherToMessageChannelResultAdapter publisherToMessageChannelResultAdapter) { + return new ObservableToMessageChannelResultAdapter(publisherToMessageChannelResultAdapter); } } } diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitter.java new file mode 100644 index 000000000..16202d9f4 --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitter.java @@ -0,0 +1,94 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; + +/** + * Method level annotation that marks a method to be an emitter to outputs declared via + * {@link EnableBinding} (e.g. channels). + * + * This annotation is intended to be used in a Spring Cloud Stream application that requires + * a source to write to one or more {@link Output}s using the reactive paradigm. + * + * No {@link Input}s are allowed on a method that is annotated with StreamEmitter. + * + * Depending on how the method is structured, there are some flexibility in how the {@link Output} may be used. + * + * Here are some supported usage patterns: + * + * A StreamEmitter method that has a return type cannot take any method parameters. + * + *
+ * @StreamEmitter
+ * @Output(Source.OUTPUT)
+ * public Flux emit() {
+ * 	return Flux.intervalMillis(1000)
+ * 		.map(l -> "Hello World!!");
+ * }
+ * 
+ * + * The following examples show how a void return type can be used on a method with StreamEmitter and how the + * method signatures could be used in a flexible manner. + * + *
+ * @StreamEmitter
+ * public void emit(@Output(Source.OUTPUT) FluxSender output) {
+ * 	output.send(Flux.intervalMillis(1000)
+ *		.map(l -> "Hello World!!"));
+ * }
+ * 
+ * + *
+ * @StreamEmitter
+ * @Output(Source.OUTPUT)
+ * public void emit(FluxSender output) {
+ * 	output.send(Flux.intervalMillis(1000)
+ *		.map(l -> "Hello World!!"));
+ * }
+ * 
+ * + *
+ * @StreamEmitter
+ * public void emit(@Output("OUTPUT1") FluxSender output1,
+ *					@Output("OUTPUT2") FluxSender output2,
+ *					@Output("OUTPUT3)" FluxSender output3) {
+ *	output1.send(Flux.intervalMillis(1000)
+ *		.map(l -> "Hello World!!"));
+ *	output2.send(Flux.intervalMillis(1000)
+ *		.map(l -> "Hello World!!"));
+ *	output3.send(Flux.intervalMillis(1000)
+ *		.map(l -> "Hello World!!"));
+ *	}
+ *
+ * + * @author Soby Chacko + * + * @since 1.3.0 + */ +@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE }) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface StreamEmitter {} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitterAnnotationBeanPostProcessor.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitterAnnotationBeanPostProcessor.java new file mode 100644 index 000000000..762f9f450 --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitterAnnotationBeanPostProcessor.java @@ -0,0 +1,282 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.aop.support.AopUtils; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactoryUtils; +import org.springframework.beans.factory.BeanInitializationException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.cloud.stream.binding.MessageChannelStreamListenerResultAdapter; +import org.springframework.cloud.stream.binding.StreamAnnotationCommonMethodUtils; +import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter; +import org.springframework.cloud.stream.binding.StreamListenerResultAdapter; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.SmartLifecycle; +import org.springframework.core.MethodParameter; +import org.springframework.core.annotation.AnnotatedElementUtils; +import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.core.annotation.SynthesizingMethodParameter; +import org.springframework.util.Assert; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.util.ReflectionUtils; +import org.springframework.util.StringUtils; + +/** + * {@link BeanPostProcessor} that handles {@link StreamEmitter} annotations found on bean methods. + * + * @author Soby Chacko + * @author Artem Bilan + * + * @since 1.3.0 + */ +public class StreamEmitterAnnotationBeanPostProcessor + implements BeanPostProcessor, InitializingBean, ApplicationContextAware, SmartLifecycle { + + private static final Log log = LogFactory.getLog(StreamEmitterAnnotationBeanPostProcessor.class); + + private final List> streamListenerParameterAdapters = new ArrayList<>(); + + private final List> streamListenerResultAdapters = new ArrayList<>(); + + private final List closeableFluxResources = new ArrayList<>(); + + private ConfigurableApplicationContext applicationContext; + + private MultiValueMap mappedStreamEmitterMethods = new LinkedMultiValueMap<>(); + + private volatile boolean running; + + private final Lock lock = new ReentrantLock(); + + @Override + public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + Assert.isTrue(applicationContext instanceof ConfigurableApplicationContext, + "ConfigurableApplicationContext is required"); + this.applicationContext = (ConfigurableApplicationContext) applicationContext; + } + + @Override + @SuppressWarnings("unchecked") + public void afterPropertiesSet() throws Exception { + Map parameterAdapterMap = BeanFactoryUtils + .beansOfTypeIncludingAncestors(this.applicationContext, StreamListenerParameterAdapter.class); + parameterAdapterMap.values().iterator().forEachRemaining(this.streamListenerParameterAdapters::add); + Map resultAdapterMap = BeanFactoryUtils + .beansOfTypeIncludingAncestors(this.applicationContext, StreamListenerResultAdapter.class); + this.streamListenerResultAdapters.add(new MessageChannelStreamListenerResultAdapter()); + resultAdapterMap.values().iterator().forEachRemaining(this.streamListenerResultAdapters::add); + } + + @Override + public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + return bean; + } + + @Override + public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { + Class targetClass = AopUtils.getTargetClass(bean); + ReflectionUtils.doWithMethods(targetClass, + method -> { + if (AnnotatedElementUtils.isAnnotated(method, StreamEmitter.class)) { + mappedStreamEmitterMethods.add(bean, method); + } + }, ReflectionUtils.USER_DECLARED_METHODS); + return bean; + } + + @Override + public void start() { + try { + lock.lock(); + if (!running) { + mappedStreamEmitterMethods.forEach((k, v) -> v.forEach(item -> { + Assert.isTrue(item.getAnnotation(Input.class) == null, + StreamEmitterErrorMessages.INPUT_ANNOTATIONS_ARE_NOT_ALLOWED); + String methodAnnotatedOutboundName = + StreamAnnotationCommonMethodUtils.getOutboundBindingTargetName(item); + int outputAnnotationCount = StreamAnnotationCommonMethodUtils.outputAnnotationCount(item); + validateStreamEmitterMethod(item, outputAnnotationCount, methodAnnotatedOutboundName); + invokeSetupMethodOnToTargetChannel(item, k, methodAnnotatedOutboundName); + } + )); + this.running = true; + } + } + finally { + lock.unlock(); + } + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private void invokeSetupMethodOnToTargetChannel(Method method, Object bean, String outboundName) { + Object[] arguments = new Object[method.getParameterCount()]; + Object targetBean = null; + for (int parameterIndex = 0; parameterIndex < arguments.length; parameterIndex++) { + MethodParameter methodParameter = new SynthesizingMethodParameter(method, parameterIndex); + Class parameterType = methodParameter.getParameterType(); + Object targetReferenceValue = null; + if (methodParameter.hasParameterAnnotation(Output.class)) { + targetReferenceValue = AnnotationUtils.getValue(methodParameter.getParameterAnnotation(Output.class)); + } + else if (arguments.length == 1 && StringUtils.hasText(outboundName)) { + targetReferenceValue = outboundName; + } + if (targetReferenceValue != null) { + targetBean = this.applicationContext.getBean((String) targetReferenceValue); + for (StreamListenerParameterAdapter streamListenerParameterAdapter : this.streamListenerParameterAdapters) { + if (streamListenerParameterAdapter.supports(targetBean.getClass(), methodParameter)) { + arguments[parameterIndex] = streamListenerParameterAdapter.adapt(targetBean, + methodParameter); + if (arguments[parameterIndex] instanceof FluxSender) { + closeableFluxResources.add((FluxSender) arguments[parameterIndex]); + } + break; + } + } + Assert.notNull(arguments[parameterIndex], "Cannot convert argument " + parameterIndex + " of " + method + + "from " + targetBean.getClass() + " to " + parameterType); + } + else { + throw new IllegalStateException(StreamEmitterErrorMessages.ATLEAST_ONE_OUTPUT); + } + } + Object result; + try { + result = method.invoke(bean, arguments); + } + catch (Exception e) { + throw new BeanInitializationException("Cannot setup StreamEmitter for " + method, e); + } + + if (!Void.TYPE.equals(method.getReturnType())) { + if (targetBean == null) { + targetBean = this.applicationContext.getBean(outboundName); + } + boolean streamListenerResultAdapterFound = false; + for (StreamListenerResultAdapter streamListenerResultAdapter : this.streamListenerResultAdapters) { + if (streamListenerResultAdapter.supports(result.getClass(), targetBean.getClass())) { + Closeable fluxDisposable = streamListenerResultAdapter.adapt(result, targetBean); + closeableFluxResources.add(fluxDisposable); + streamListenerResultAdapterFound = true; + break; + } + } + Assert.state(streamListenerResultAdapterFound, + StreamEmitterErrorMessages.CANNOT_CONVERT_RETURN_TYPE_TO_ANY_AVAILABLE_RESULT_ADAPTERS); + } + } + + private static void validateStreamEmitterMethod(Method method, int outputAnnotationCount, + String methodAnnotatedOutboundName) { + + if (StringUtils.hasText(methodAnnotatedOutboundName)) { + Assert.isTrue(outputAnnotationCount == 0, + StreamEmitterErrorMessages.INVALID_OUTPUT_METHOD_PARAMETERS); + } + else { + Assert.isTrue(outputAnnotationCount > 0, StreamEmitterErrorMessages.NO_OUTPUT_SPECIFIED); + } + + if (!method.getReturnType().equals(Void.TYPE)) { + Assert.isTrue(StringUtils.hasText(methodAnnotatedOutboundName), + StreamEmitterErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED); + Assert.isTrue(method.getParameterCount() == 0, StreamEmitterErrorMessages.RETURN_TYPE_METHOD_ARGUMENTS); + } + else { + if (!StringUtils.hasText(methodAnnotatedOutboundName)) { + int methodArgumentsLength = method.getParameterTypes().length; + for (int parameterIndex = 0; parameterIndex < methodArgumentsLength; parameterIndex++) { + MethodParameter methodParameter = new MethodParameter(method, parameterIndex); + if (methodParameter.hasParameterAnnotation(Output.class)) { + String outboundName = (String) AnnotationUtils + .getValue(methodParameter.getParameterAnnotation(Output.class)); + Assert.isTrue(StringUtils.hasText(outboundName), + StreamEmitterErrorMessages.INVALID_OUTBOUND_NAME); + } + else { + throw new IllegalArgumentException( + StreamEmitterErrorMessages.OUTPUT_ANNOTATION_MISSING_ON_METHOD_PARAMETERS_VOID_RETURN_TYPE); + } + } + } + } + } + + @Override + public boolean isAutoStartup() { + return true; + } + + @Override + public void stop(Runnable callback) { + stop(); + if (callback != null) { + callback.run(); + } + } + + @Override + public void stop() { + try { + this.lock.lock(); + if (this.running) { + for (Closeable closeable : closeableFluxResources) { + try { + closeable.close(); + } + catch (IOException e) { + log.error("Error closing reactive source", e); + } + } + this.running = false; + } + } + finally { + this.lock.unlock(); + } + } + + @Override + public boolean isRunning() { + return this.running; + } + + @Override + public int getPhase() { + return 0; + } + +} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitterErrorMessages.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitterErrorMessages.java new file mode 100644 index 000000000..35965426f --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitterErrorMessages.java @@ -0,0 +1,50 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import org.springframework.cloud.stream.binding.StreamAnnotationErrorMessages; + +/** + * @author Soby Chacko + * @since 1.3.0 + */ +abstract class StreamEmitterErrorMessages extends StreamAnnotationErrorMessages { + + private static final String PREFIX = "A method annotated with @StreamEmitter "; + + static final String RETURN_TYPE_NO_OUTBOUND_SPECIFIED = PREFIX + + "having a return type should also have an outbound target specified at the method level."; + + static final String RETURN_TYPE_METHOD_ARGUMENTS = PREFIX + + "having a return type should not have any method arguments"; + + static final String INVALID_OUTPUT_METHOD_PARAMETERS = "@Output annotations are not permitted on " + + "method parameters while using the @StreamEmitter and a method-level output specification"; + + static final String NO_OUTPUT_SPECIFIED = "No method level or parameter level @Output annotations are detected. " + + "@StreamEmitter requires a method or parameter level @Output annotation."; + + static final String OUTPUT_ANNOTATION_MISSING_ON_METHOD_PARAMETERS_VOID_RETURN_TYPE = PREFIX + + "and void return type without method level @Output annotation requires @Output on each of the method parameter"; + + static final String INPUT_ANNOTATIONS_ARE_NOT_ALLOWED = PREFIX + + "cannot contain @Input annotations"; + + static final String CANNOT_CONVERT_RETURN_TYPE_TO_ANY_AVAILABLE_RESULT_ADAPTERS = + "No suitable adapters are found that can convert the return type"; + +} diff --git a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterBasicTests.java b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterBasicTests.java new file mode 100644 index 000000000..a6c477c1a --- /dev/null +++ b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterBasicTests.java @@ -0,0 +1,308 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Soby Chacko + * @author Artem Bilan + */ +public class StreamEmitterBasicTests { + + @Test + public void testFluxReturnAndOutputMethodLevel() throws Exception { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestFluxReturnAndOutputMethodLevel.class); + context.refresh(); + receiveAndValidate(context); + context.close(); + } + + @Test + public void testVoidReturnAndOutputMethodParameter() throws Exception { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestVoidReturnAndOutputMethodParameter.class); + context.refresh(); + receiveAndValidate(context); + context.close(); + } + + @Test + public void testVoidReturnAndOutputAtMethodLevel() throws Exception { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestVoidReturnAndOutputAtMethodLevel.class); + context.refresh(); + receiveAndValidate(context); + context.close(); + } + + @Test + public void testVoidReturnAndMultipleOutputMethodParameters() throws Exception { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestVoidReturnAndMultipleOutputMethodParameters.class); + context.refresh(); + receiveAndValidateMultipleOutputs(context); + context.close(); + } + + @Test + public void testMultipleStreamEmitterMethods() throws Exception { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestMultipleStreamEmitterMethods.class); + context.refresh(); + receiveAndValidateMultipleOutputs(context); + context.close(); + } + + @Test + public void testSameAppContextWithMultipleStreamEmitters() throws Exception { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestSameAppContextWithMultipleStreamEmitters.class); + context.refresh(); + receiveAndValidateMultiStreamEmittersInSameContext(context); + context.close(); + } + + @SuppressWarnings("unchecked") + private static void receiveAndValidate(ConfigurableApplicationContext context) throws InterruptedException { + Source source = context.getBean(Source.class); + MessageCollector messageCollector = context.getBean(MessageCollector.class); + List messages = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + messages.add((String) messageCollector.forChannel(source.output()).poll(5000, TimeUnit.MILLISECONDS).getPayload()); + } + for (int i = 0; i < 1000; i++) { + assertThat(messages.get(i)).isEqualTo("HELLO WORLD!!" + i); + } + } + + @SuppressWarnings("unchecked") + private static void receiveAndValidateMultipleOutputs(ConfigurableApplicationContext context) throws InterruptedException { + TestMultiOutboundChannels source = context.getBean(TestMultiOutboundChannels.class); + MessageCollector messageCollector = context.getBean(MessageCollector.class); + List messages = new ArrayList<>(); + assertMessages(source.output1(), messageCollector, messages); + messages.clear(); + assertMessages(source.output2(), messageCollector, messages); + messages.clear(); + assertMessages(source.output3(), messageCollector, messages); + messages.clear(); + } + + @SuppressWarnings("unchecked") + private static void receiveAndValidateMultiStreamEmittersInSameContext(ConfigurableApplicationContext context1) throws InterruptedException { + TestMultiOutboundChannels source1 = context1.getBean(TestMultiOutboundChannels.class); + MessageCollector messageCollector = context1.getBean(MessageCollector.class); + + List messages = new ArrayList<>(); + assertMessagesX(source1.output1(), messageCollector, messages); + messages.clear(); + assertMessagesY(source1.output2(), messageCollector, messages); + messages.clear(); + } + + private static void assertMessages(MessageChannel channel, MessageCollector messageCollector, List messages) throws InterruptedException { + for (int i = 0; i < 1000; i++) { + messages.add((String) messageCollector.forChannel(channel).poll(5000, TimeUnit.MILLISECONDS).getPayload()); + } + for (int i = 0; i < 1000; i++) { + assertThat(messages.get(i)).isEqualTo("Hello World!!" + i); + } + } + + private static void assertMessagesX(MessageChannel channel, MessageCollector messageCollector, List messages) throws InterruptedException { + for (int i = 0; i < 1000; i++) { + messages.add((String) messageCollector.forChannel(channel).poll(5000, TimeUnit.MILLISECONDS).getPayload()); + } + for (int i = 0; i < 1000; i++) { + assertThat(messages.get(i)).isEqualTo("Hello World!!" + i); + } + } + + private static void assertMessagesY(MessageChannel channel, MessageCollector messageCollector, List messages) throws InterruptedException { + for (int i = 0; i < 1000; i++) { + messages.add((String) messageCollector.forChannel(channel).poll(5000, TimeUnit.MILLISECONDS).getPayload()); + } + for (int i = 0; i < 1000; i++) { + assertThat(messages.get(i)).isEqualTo("Hello FooBar!!" + i); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestFluxReturnAndOutputMethodLevel { + + @StreamEmitter + @Output(Source.OUTPUT) + @Bean + public Publisher> emit() { + AtomicInteger atomicInteger = new AtomicInteger(); + return IntegrationFlows.from(() -> + new GenericMessage<>("Hello World!!" + atomicInteger.getAndIncrement()), + e -> e.poller(p -> p.fixedDelay(1))) + .transform(String::toUpperCase) + .toReactivePublisher(); + } + + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestVoidReturnAndOutputMethodParameter { + + @StreamEmitter + public void emit(@Output(Source.OUTPUT) FluxSender output) { + output.send(Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l) + .map(String::toUpperCase)); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestVoidReturnAndOutputAtMethodLevel { + + @StreamEmitter + @Output(Source.OUTPUT) + public void emit(FluxSender output) { + output.send(Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l) + .map(String::toUpperCase)); + } + } + + @EnableBinding(TestMultiOutboundChannels.class) + @EnableAutoConfiguration + public static class TestVoidReturnAndMultipleOutputMethodParameters { + + @StreamEmitter + public void emit(@Output(TestMultiOutboundChannels.OUTPUT1) FluxSender output1, + @Output(TestMultiOutboundChannels.OUTPUT2) FluxSender output2, + @Output(TestMultiOutboundChannels.OUTPUT3) FluxSender output3) { + output1.send(Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l)); + output2.send(Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l)); + output3.send(Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l)); + } + } + + @EnableBinding(TestMultiOutboundChannels.class) + @EnableAutoConfiguration + public static class TestMultipleStreamEmitterMethods { + + @StreamEmitter + @Output(TestMultiOutboundChannels.OUTPUT1) + public Flux emit1() { + return Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l); + } + + @StreamEmitter + @Output(TestMultiOutboundChannels.OUTPUT2) + public Flux emit2() { + return Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l); + } + + @StreamEmitter + public void emit3(@Output(TestMultiOutboundChannels.OUTPUT3) FluxSender outputX) { + outputX.send(Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l)); + } + } + + @EnableBinding(TestMultiOutboundChannels.class) + @EnableAutoConfiguration + public static class TestSameAppContextWithMultipleStreamEmitters { + + @Bean + public Foo foo() { + return new Foo(); + } + + @Bean + public Bar bar() { + return new Bar(); + } + + static class Foo { + + @StreamEmitter + @Output(TestMultiOutboundChannels.OUTPUT1) + public Flux emit1() { + return Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l); + } + } + + static class Bar { + + @StreamEmitter + @Output(TestMultiOutboundChannels.OUTPUT2) + public Flux emit2() { + return Flux.intervalMillis(1) + .map(l -> "Hello FooBar!!" + l); + } + } + } + + interface TestMultiOutboundChannels { + + String OUTPUT1 = "output1"; + + String OUTPUT2 = "output2"; + + String OUTPUT3 = "output3"; + + @Output(TestMultiOutboundChannels.OUTPUT1) + MessageChannel output1(); + + @Output(TestMultiOutboundChannels.OUTPUT2) + MessageChannel output2(); + + @Output(TestMultiOutboundChannels.OUTPUT3) + MessageChannel output3(); + + } +} diff --git a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterValidationTests.java b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterValidationTests.java new file mode 100644 index 000000000..c5d1be2cf --- /dev/null +++ b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterValidationTests.java @@ -0,0 +1,307 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import org.junit.Test; +import reactor.core.publisher.Flux; + +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.springframework.cloud.stream.binding.StreamAnnotationErrorMessages.ATLEAST_ONE_OUTPUT; +import static org.springframework.cloud.stream.binding.StreamAnnotationErrorMessages.INVALID_OUTBOUND_NAME; +import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.CANNOT_CONVERT_RETURN_TYPE_TO_ANY_AVAILABLE_RESULT_ADAPTERS; +import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.INPUT_ANNOTATIONS_ARE_NOT_ALLOWED; +import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.INVALID_OUTPUT_METHOD_PARAMETERS; +import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.NO_OUTPUT_SPECIFIED; +import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.OUTPUT_ANNOTATION_MISSING_ON_METHOD_PARAMETERS_VOID_RETURN_TYPE; +import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.RETURN_TYPE_METHOD_ARGUMENTS; +import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED; + +/** + * @author Soby Chacko + */ +public class StreamEmitterValidationTests { + + @Test + public void testOutputAsMethodandMethodParameter() { + try { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestOutputAsMethodandMethodParameter.class); + context.refresh(); + context.close(); + fail("Expected exception: " + INVALID_OUTPUT_METHOD_PARAMETERS); + } + catch (Exception e) { + assertThat(e.getMessage()).contains(INVALID_OUTPUT_METHOD_PARAMETERS); + } + } + + @Test + public void testFluxReturnTypeNoOutputGiven() { + try { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestFluxReturnTypeNoOutputGiven.class); + context.refresh(); + context.close(); + fail("Expected exception: " + NO_OUTPUT_SPECIFIED); + } + catch (Exception e) { + assertThat(e.getMessage()).contains(NO_OUTPUT_SPECIFIED); + } + } + + @Test + public void testVoidReturnTypeNoOutputGiven() { + try { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestVoidReturnTypeNoOutputGiven.class); + context.refresh(); + context.close(); + fail("Expected exception: " + NO_OUTPUT_SPECIFIED); + } + catch (Exception e) { + assertThat(e.getMessage()).contains(NO_OUTPUT_SPECIFIED); + } + } + + @Test + public void testNonVoidReturnButOutputAsMethodParameter() { + try { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestNonVoidReturnButOutputAsMethodParameter.class); + context.refresh(); + context.close(); + fail("Expected exception: " + RETURN_TYPE_NO_OUTBOUND_SPECIFIED); + } + catch (Exception e) { + assertThat(e.getMessage()).contains(RETURN_TYPE_NO_OUTBOUND_SPECIFIED); + } + } + + @Test + public void testNonVoidReturnButMethodArguments() { + try { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestNonVoidReturnButMethodArguments.class); + context.refresh(); + context.close(); + fail("Expected exception: " + RETURN_TYPE_METHOD_ARGUMENTS); + } + catch (Exception e) { + assertThat(e.getMessage()).contains(RETURN_TYPE_METHOD_ARGUMENTS); + } + } + + @Test + public void testVoidReturnTypeMultipleMethodParametersWithOneMissingOutput() { + try { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestVoidReturnTypeMultipleMethodParametersWithOneMissingOutput.class); + context.refresh(); + context.close(); + fail("Expected exception: " + OUTPUT_ANNOTATION_MISSING_ON_METHOD_PARAMETERS_VOID_RETURN_TYPE); + } + catch (Exception e) { + assertThat(e.getMessage()).contains(OUTPUT_ANNOTATION_MISSING_ON_METHOD_PARAMETERS_VOID_RETURN_TYPE); + } + } + + @Test + public void testOutputAtCorrectLocationButNameMissing1() { + try { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestOutputAtCorrectLocationButNameMissing1.class); + context.refresh(); + context.close(); + fail("Expected exception: " + ATLEAST_ONE_OUTPUT); + } + catch (Exception e) { + assertThat(e.getMessage()).contains(ATLEAST_ONE_OUTPUT); + } + } + + @Test + public void testOutputAtCorrectLocationButNameMissing2() { + try { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestOutputAtCorrectLocationButNameMissing2.class); + context.refresh(); + context.close(); + fail("Expected exception: " + INVALID_OUTBOUND_NAME); + } + catch (Exception e) { + assertThat(e.getMessage()).contains(INVALID_OUTBOUND_NAME); + } + } + + @Test + public void testInputAnnotationsAreNotPermitted() { + try { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestInputAnnotationsAreNotPermitted.class); + context.refresh(); + context.close(); + fail("Expected exception: " + INPUT_ANNOTATIONS_ARE_NOT_ALLOWED); + } + catch (Exception e) { + assertThat(e.getMessage()).contains(INPUT_ANNOTATIONS_ARE_NOT_ALLOWED); + } + } + + @Test + public void testReturnTypeNotSupported() { + try { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.register(TestReturnTypeNotSupported.class); + context.refresh(); + context.close(); + fail("Expected exception: " + CANNOT_CONVERT_RETURN_TYPE_TO_ANY_AVAILABLE_RESULT_ADAPTERS); + } + catch (Exception e) { + assertThat(e.getMessage()).contains(CANNOT_CONVERT_RETURN_TYPE_TO_ANY_AVAILABLE_RESULT_ADAPTERS); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestOutputAsMethodandMethodParameter { + + @StreamEmitter + @Output(Source.OUTPUT) + public void receive(@Output(Source.OUTPUT) FluxSender output) { + output.send(Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l)); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestFluxReturnTypeNoOutputGiven { + + @StreamEmitter + public Flux emit() { + return Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestVoidReturnTypeNoOutputGiven { + + @StreamEmitter + public void emit(FluxSender output) { + output.send(Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l)); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestNonVoidReturnButOutputAsMethodParameter { + + @StreamEmitter + public Flux emit(@Output(Source.OUTPUT) FluxSender output) { + return Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestNonVoidReturnButMethodArguments { + + @StreamEmitter + @Output(Source.OUTPUT) + public Flux receive(FluxSender output) { + return Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l); + } + } + + @EnableBinding(StreamEmitterBasicTests.TestMultiOutboundChannels.class) + @EnableAutoConfiguration + public static class TestVoidReturnTypeMultipleMethodParametersWithOneMissingOutput { + + @StreamEmitter + public void emit(@Output(StreamEmitterBasicTests.TestMultiOutboundChannels.OUTPUT1) FluxSender output1, + @Output(StreamEmitterBasicTests.TestMultiOutboundChannels.OUTPUT2) FluxSender output2, + FluxSender output3) { + output1.send(Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l)); + output2.send(Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l)); + output3.send(Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l)); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestOutputAtCorrectLocationButNameMissing1 { + + @StreamEmitter + @Output("") + public void receive(FluxSender output) { + output.send(Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l)); + } + } + + @EnableBinding(StreamEmitterBasicTests.TestMultiOutboundChannels.class) + @EnableAutoConfiguration + public static class TestOutputAtCorrectLocationButNameMissing2 { + + @StreamEmitter + public void emit(@Output("") FluxSender output1) { + output1.send(Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l)); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestInputAnnotationsAreNotPermitted { + + @StreamEmitter + @Output(Source.OUTPUT) + @Input(Processor.INPUT) + public Flux emit() { + return Flux.intervalMillis(1) + .map(l -> "Hello World!!" + l); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestReturnTypeNotSupported { + + @StreamEmitter + @Output(Source.OUTPUT) + public String emit() { + return "hello"; + } + } +} diff --git a/spring-cloud-stream-reactive/src/test/resources/logback.xml b/spring-cloud-stream-reactive/src/test/resources/logback.xml new file mode 100644 index 000000000..412f0d7d9 --- /dev/null +++ b/spring-cloud-stream-reactive/src/test/resources/logback.xml @@ -0,0 +1,10 @@ + + + + %d{ISO8601} %5p %t %c{2}:%L - %m%n + + + + + + diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageChannelStreamListenerResultAdapter.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageChannelStreamListenerResultAdapter.java index e6d6bb629..15052613b 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageChannelStreamListenerResultAdapter.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageChannelStreamListenerResultAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,9 @@ package org.springframework.cloud.stream.binding; +import java.io.Closeable; +import java.io.IOException; + import org.springframework.integration.handler.BridgeHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; @@ -24,7 +27,9 @@ import org.springframework.messaging.SubscribableChannel; * A {@link StreamListenerResultAdapter} used for bridging an * {@link org.springframework.cloud.stream.annotation.Output} {@link MessageChannel} to a * bound {@link MessageChannel}. + * * @author Marius Bogoevici + * @author Soby Chacko */ public class MessageChannelStreamListenerResultAdapter implements StreamListenerResultAdapter { @@ -36,10 +41,22 @@ public class MessageChannelStreamListenerResultAdapter } @Override - public void adapt(MessageChannel streamListenerResult, MessageChannel bindingTarget) { + public Closeable adapt(MessageChannel streamListenerResult, MessageChannel bindingTarget) { BridgeHandler handler = new BridgeHandler(); handler.setOutputChannel(bindingTarget); handler.afterPropertiesSet(); ((SubscribableChannel) streamListenerResult).subscribe(handler); + + return new NoOpCloseeable(); } + + private static final class NoOpCloseeable implements Closeable { + + @Override + public void close() throws IOException { + + } + + } + } diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamAnnotationCommonMethodUtils.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamAnnotationCommonMethodUtils.java new file mode 100644 index 000000000..1efa65583 --- /dev/null +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamAnnotationCommonMethodUtils.java @@ -0,0 +1,63 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binding; + +import java.lang.reflect.Method; + +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.core.MethodParameter; +import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.messaging.handler.annotation.SendTo; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; +import org.springframework.util.StringUtils; + +/** + * Common methods that can be used across various Stream annotations. + * + * @author Soby Chacko + * @since 1.3.0 + */ +public abstract class StreamAnnotationCommonMethodUtils { + + public static String getOutboundBindingTargetName(Method method) { + SendTo sendTo = AnnotationUtils.findAnnotation(method, SendTo.class); + if (sendTo != null) { + Assert.isTrue(!ObjectUtils.isEmpty(sendTo.value()), StreamAnnotationErrorMessages.ATLEAST_ONE_OUTPUT); + Assert.isTrue(sendTo.value().length == 1, StreamAnnotationErrorMessages.SEND_TO_MULTIPLE_DESTINATIONS); + Assert.hasText(sendTo.value()[0], StreamAnnotationErrorMessages.SEND_TO_EMPTY_DESTINATION); + return sendTo.value()[0]; + } + Output output = AnnotationUtils.findAnnotation(method, Output.class); + if (output != null) { + Assert.isTrue(StringUtils.hasText(output.value()), StreamAnnotationErrorMessages.ATLEAST_ONE_OUTPUT); + return output.value(); + } + return null; + } + + public static int outputAnnotationCount(Method method) { + int outputAnnotationCount = 0; + for (int parameterIndex = 0; parameterIndex < method.getParameterTypes().length; parameterIndex++) { + MethodParameter methodParameter = MethodParameter.forMethodOrConstructor(method, parameterIndex); + if (methodParameter.hasParameterAnnotation(Output.class)) { + outputAnnotationCount++; + } + } + return outputAnnotationCount; + } +} diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamAnnotationErrorMessages.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamAnnotationErrorMessages.java new file mode 100644 index 000000000..01e38d635 --- /dev/null +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamAnnotationErrorMessages.java @@ -0,0 +1,32 @@ +/* + * Copyright 2016-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binding; + +/** + * @author Soby Chacko + */ +public abstract class StreamAnnotationErrorMessages { + + public static final String ATLEAST_ONE_OUTPUT = "At least one output must be specified"; + + public static final String SEND_TO_MULTIPLE_DESTINATIONS = "Multiple destinations cannot be specified"; + + public static final String SEND_TO_EMPTY_DESTINATION = "An empty destination cannot be specified"; + + public static final String INVALID_OUTBOUND_NAME = "The @Output annotation must have the name of an input as value"; + +} diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerResultAdapter.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerResultAdapter.java index 3c1355ae9..5283ad428 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerResultAdapter.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerResultAdapter.java @@ -16,6 +16,8 @@ package org.springframework.cloud.stream.binding; +import java.io.Closeable; + /** * A strategy for adapting the result of a * {@link org.springframework.cloud.stream.annotation.StreamListener} annotated method to @@ -41,6 +43,6 @@ public interface StreamListenerResultAdapter { * @param streamListenerResult the result of invoking the method. * @param bindingTarget the binding target. */ - void adapt(R streamListenerResult, B bindingTarget); + Closeable adapt(R streamListenerResult, B bindingTarget); }