, MessageChannel> {
- private Log log = LogFactory.getLog(FluxToMessageChannelResultAdapter.class);
+ private Log log = LogFactory.getLog(PublisherToMessageChannelResultAdapter.class);
@Override
public boolean supports(Class> resultType, Class> bindingTarget) {
- return Flux.class.isAssignableFrom(resultType) && MessageChannel.class.isAssignableFrom(bindingTarget);
+ return Publisher.class.isAssignableFrom(resultType)
+ && MessageChannel.class.isAssignableFrom(bindingTarget);
}
- public void adapt(Flux> streamListenerResult, MessageChannel bindingTarget) {
- streamListenerResult
+ public Closeable adapt(Publisher> streamListenerResult, MessageChannel bindingTarget) {
+ Disposable disposable = Flux.from(streamListenerResult)
.doOnError(e -> this.log.error("Error while processing result", e))
.retry()
.subscribe(
- result -> bindingTarget.send(result instanceof Message> ? (Message>) result
- : MessageBuilder.withPayload(result).build()));
+ result ->
+ bindingTarget.send(result instanceof Message>
+ ? (Message>) result
+ : MessageBuilder.withPayload(result).build()));
+
+ return disposable::dispose;
}
+
}
diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java
index 89f748dd5..92f1b5835 100644
--- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java
+++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java
@@ -46,9 +46,14 @@ public class ReactiveSupportAutoConfiguration {
}
@Bean
- @ConditionalOnMissingBean(FluxToMessageChannelResultAdapter.class)
- public FluxToMessageChannelResultAdapter fluxToMessageChannelResultAdapter() {
- return new FluxToMessageChannelResultAdapter();
+ @ConditionalOnMissingBean(PublisherToMessageChannelResultAdapter.class)
+ public PublisherToMessageChannelResultAdapter fluxToMessageChannelResultAdapter() {
+ return new PublisherToMessageChannelResultAdapter();
+ }
+
+ @Bean
+ public static StreamEmitterAnnotationBeanPostProcessor streamEmitterAnnotationBeanPostProcessor() {
+ return new StreamEmitterAnnotationBeanPostProcessor();
}
@Configuration
@@ -72,8 +77,8 @@ public class ReactiveSupportAutoConfiguration {
@Bean
@ConditionalOnMissingBean(ObservableToMessageChannelResultAdapter.class)
public ObservableToMessageChannelResultAdapter observableToMessageChannelResultAdapter(
- FluxToMessageChannelResultAdapter fluxToMessageChannelResultAdapter) {
- return new ObservableToMessageChannelResultAdapter(fluxToMessageChannelResultAdapter);
+ PublisherToMessageChannelResultAdapter publisherToMessageChannelResultAdapter) {
+ return new ObservableToMessageChannelResultAdapter(publisherToMessageChannelResultAdapter);
}
}
}
diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitter.java
new file mode 100644
index 000000000..16202d9f4
--- /dev/null
+++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitter.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright 2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.reactive;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.Input;
+import org.springframework.cloud.stream.annotation.Output;
+
+/**
+ * Method level annotation that marks a method to be an emitter to outputs declared via
+ * {@link EnableBinding} (e.g. channels).
+ *
+ * This annotation is intended to be used in a Spring Cloud Stream application that requires
+ * a source to write to one or more {@link Output}s using the reactive paradigm.
+ *
+ * No {@link Input}s are allowed on a method that is annotated with StreamEmitter.
+ *
+ * Depending on how the method is structured, there are some flexibility in how the {@link Output} may be used.
+ *
+ * Here are some supported usage patterns:
+ *
+ * A StreamEmitter method that has a return type cannot take any method parameters.
+ *
+ *
+ * @StreamEmitter
+ * @Output(Source.OUTPUT)
+ * public Flux emit() {
+ * return Flux.intervalMillis(1000)
+ * .map(l -> "Hello World!!");
+ * }
+ *
+ *
+ * The following examples show how a void return type can be used on a method with StreamEmitter and how the
+ * method signatures could be used in a flexible manner.
+ *
+ *
+ * @StreamEmitter
+ * public void emit(@Output(Source.OUTPUT) FluxSender output) {
+ * output.send(Flux.intervalMillis(1000)
+ * .map(l -> "Hello World!!"));
+ * }
+ *
+ *
+ *
+ * @StreamEmitter
+ * @Output(Source.OUTPUT)
+ * public void emit(FluxSender output) {
+ * output.send(Flux.intervalMillis(1000)
+ * .map(l -> "Hello World!!"));
+ * }
+ *
+ *
+ *
+ * @StreamEmitter
+ * public void emit(@Output("OUTPUT1") FluxSender output1,
+ * @Output("OUTPUT2") FluxSender output2,
+ * @Output("OUTPUT3)" FluxSender output3) {
+ * output1.send(Flux.intervalMillis(1000)
+ * .map(l -> "Hello World!!"));
+ * output2.send(Flux.intervalMillis(1000)
+ * .map(l -> "Hello World!!"));
+ * output3.send(Flux.intervalMillis(1000)
+ * .map(l -> "Hello World!!"));
+ * }
+ *
+ *
+ * @author Soby Chacko
+ *
+ * @since 1.3.0
+ */
+@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface StreamEmitter {}
diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitterAnnotationBeanPostProcessor.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitterAnnotationBeanPostProcessor.java
new file mode 100644
index 000000000..762f9f450
--- /dev/null
+++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitterAnnotationBeanPostProcessor.java
@@ -0,0 +1,282 @@
+/*
+ * Copyright 2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.reactive;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.springframework.aop.support.AopUtils;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.BeanFactoryUtils;
+import org.springframework.beans.factory.BeanInitializationException;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.cloud.stream.annotation.Input;
+import org.springframework.cloud.stream.annotation.Output;
+import org.springframework.cloud.stream.binding.MessageChannelStreamListenerResultAdapter;
+import org.springframework.cloud.stream.binding.StreamAnnotationCommonMethodUtils;
+import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
+import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.SmartLifecycle;
+import org.springframework.core.MethodParameter;
+import org.springframework.core.annotation.AnnotatedElementUtils;
+import org.springframework.core.annotation.AnnotationUtils;
+import org.springframework.core.annotation.SynthesizingMethodParameter;
+import org.springframework.util.Assert;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import org.springframework.util.ReflectionUtils;
+import org.springframework.util.StringUtils;
+
+/**
+ * {@link BeanPostProcessor} that handles {@link StreamEmitter} annotations found on bean methods.
+ *
+ * @author Soby Chacko
+ * @author Artem Bilan
+ *
+ * @since 1.3.0
+ */
+public class StreamEmitterAnnotationBeanPostProcessor
+ implements BeanPostProcessor, InitializingBean, ApplicationContextAware, SmartLifecycle {
+
+ private static final Log log = LogFactory.getLog(StreamEmitterAnnotationBeanPostProcessor.class);
+
+ private final List> streamListenerParameterAdapters = new ArrayList<>();
+
+ private final List> streamListenerResultAdapters = new ArrayList<>();
+
+ private final List closeableFluxResources = new ArrayList<>();
+
+ private ConfigurableApplicationContext applicationContext;
+
+ private MultiValueMap mappedStreamEmitterMethods = new LinkedMultiValueMap<>();
+
+ private volatile boolean running;
+
+ private final Lock lock = new ReentrantLock();
+
+ @Override
+ public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ Assert.isTrue(applicationContext instanceof ConfigurableApplicationContext,
+ "ConfigurableApplicationContext is required");
+ this.applicationContext = (ConfigurableApplicationContext) applicationContext;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void afterPropertiesSet() throws Exception {
+ Map parameterAdapterMap = BeanFactoryUtils
+ .beansOfTypeIncludingAncestors(this.applicationContext, StreamListenerParameterAdapter.class);
+ parameterAdapterMap.values().iterator().forEachRemaining(this.streamListenerParameterAdapters::add);
+ Map resultAdapterMap = BeanFactoryUtils
+ .beansOfTypeIncludingAncestors(this.applicationContext, StreamListenerResultAdapter.class);
+ this.streamListenerResultAdapters.add(new MessageChannelStreamListenerResultAdapter());
+ resultAdapterMap.values().iterator().forEachRemaining(this.streamListenerResultAdapters::add);
+ }
+
+ @Override
+ public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
+ return bean;
+ }
+
+ @Override
+ public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
+ Class> targetClass = AopUtils.getTargetClass(bean);
+ ReflectionUtils.doWithMethods(targetClass,
+ method -> {
+ if (AnnotatedElementUtils.isAnnotated(method, StreamEmitter.class)) {
+ mappedStreamEmitterMethods.add(bean, method);
+ }
+ }, ReflectionUtils.USER_DECLARED_METHODS);
+ return bean;
+ }
+
+ @Override
+ public void start() {
+ try {
+ lock.lock();
+ if (!running) {
+ mappedStreamEmitterMethods.forEach((k, v) -> v.forEach(item -> {
+ Assert.isTrue(item.getAnnotation(Input.class) == null,
+ StreamEmitterErrorMessages.INPUT_ANNOTATIONS_ARE_NOT_ALLOWED);
+ String methodAnnotatedOutboundName =
+ StreamAnnotationCommonMethodUtils.getOutboundBindingTargetName(item);
+ int outputAnnotationCount = StreamAnnotationCommonMethodUtils.outputAnnotationCount(item);
+ validateStreamEmitterMethod(item, outputAnnotationCount, methodAnnotatedOutboundName);
+ invokeSetupMethodOnToTargetChannel(item, k, methodAnnotatedOutboundName);
+ }
+ ));
+ this.running = true;
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void invokeSetupMethodOnToTargetChannel(Method method, Object bean, String outboundName) {
+ Object[] arguments = new Object[method.getParameterCount()];
+ Object targetBean = null;
+ for (int parameterIndex = 0; parameterIndex < arguments.length; parameterIndex++) {
+ MethodParameter methodParameter = new SynthesizingMethodParameter(method, parameterIndex);
+ Class> parameterType = methodParameter.getParameterType();
+ Object targetReferenceValue = null;
+ if (methodParameter.hasParameterAnnotation(Output.class)) {
+ targetReferenceValue = AnnotationUtils.getValue(methodParameter.getParameterAnnotation(Output.class));
+ }
+ else if (arguments.length == 1 && StringUtils.hasText(outboundName)) {
+ targetReferenceValue = outboundName;
+ }
+ if (targetReferenceValue != null) {
+ targetBean = this.applicationContext.getBean((String) targetReferenceValue);
+ for (StreamListenerParameterAdapter, Object> streamListenerParameterAdapter : this.streamListenerParameterAdapters) {
+ if (streamListenerParameterAdapter.supports(targetBean.getClass(), methodParameter)) {
+ arguments[parameterIndex] = streamListenerParameterAdapter.adapt(targetBean,
+ methodParameter);
+ if (arguments[parameterIndex] instanceof FluxSender) {
+ closeableFluxResources.add((FluxSender) arguments[parameterIndex]);
+ }
+ break;
+ }
+ }
+ Assert.notNull(arguments[parameterIndex], "Cannot convert argument " + parameterIndex + " of " + method
+ + "from " + targetBean.getClass() + " to " + parameterType);
+ }
+ else {
+ throw new IllegalStateException(StreamEmitterErrorMessages.ATLEAST_ONE_OUTPUT);
+ }
+ }
+ Object result;
+ try {
+ result = method.invoke(bean, arguments);
+ }
+ catch (Exception e) {
+ throw new BeanInitializationException("Cannot setup StreamEmitter for " + method, e);
+ }
+
+ if (!Void.TYPE.equals(method.getReturnType())) {
+ if (targetBean == null) {
+ targetBean = this.applicationContext.getBean(outboundName);
+ }
+ boolean streamListenerResultAdapterFound = false;
+ for (StreamListenerResultAdapter streamListenerResultAdapter : this.streamListenerResultAdapters) {
+ if (streamListenerResultAdapter.supports(result.getClass(), targetBean.getClass())) {
+ Closeable fluxDisposable = streamListenerResultAdapter.adapt(result, targetBean);
+ closeableFluxResources.add(fluxDisposable);
+ streamListenerResultAdapterFound = true;
+ break;
+ }
+ }
+ Assert.state(streamListenerResultAdapterFound,
+ StreamEmitterErrorMessages.CANNOT_CONVERT_RETURN_TYPE_TO_ANY_AVAILABLE_RESULT_ADAPTERS);
+ }
+ }
+
+ private static void validateStreamEmitterMethod(Method method, int outputAnnotationCount,
+ String methodAnnotatedOutboundName) {
+
+ if (StringUtils.hasText(methodAnnotatedOutboundName)) {
+ Assert.isTrue(outputAnnotationCount == 0,
+ StreamEmitterErrorMessages.INVALID_OUTPUT_METHOD_PARAMETERS);
+ }
+ else {
+ Assert.isTrue(outputAnnotationCount > 0, StreamEmitterErrorMessages.NO_OUTPUT_SPECIFIED);
+ }
+
+ if (!method.getReturnType().equals(Void.TYPE)) {
+ Assert.isTrue(StringUtils.hasText(methodAnnotatedOutboundName),
+ StreamEmitterErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED);
+ Assert.isTrue(method.getParameterCount() == 0, StreamEmitterErrorMessages.RETURN_TYPE_METHOD_ARGUMENTS);
+ }
+ else {
+ if (!StringUtils.hasText(methodAnnotatedOutboundName)) {
+ int methodArgumentsLength = method.getParameterTypes().length;
+ for (int parameterIndex = 0; parameterIndex < methodArgumentsLength; parameterIndex++) {
+ MethodParameter methodParameter = new MethodParameter(method, parameterIndex);
+ if (methodParameter.hasParameterAnnotation(Output.class)) {
+ String outboundName = (String) AnnotationUtils
+ .getValue(methodParameter.getParameterAnnotation(Output.class));
+ Assert.isTrue(StringUtils.hasText(outboundName),
+ StreamEmitterErrorMessages.INVALID_OUTBOUND_NAME);
+ }
+ else {
+ throw new IllegalArgumentException(
+ StreamEmitterErrorMessages.OUTPUT_ANNOTATION_MISSING_ON_METHOD_PARAMETERS_VOID_RETURN_TYPE);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean isAutoStartup() {
+ return true;
+ }
+
+ @Override
+ public void stop(Runnable callback) {
+ stop();
+ if (callback != null) {
+ callback.run();
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ this.lock.lock();
+ if (this.running) {
+ for (Closeable closeable : closeableFluxResources) {
+ try {
+ closeable.close();
+ }
+ catch (IOException e) {
+ log.error("Error closing reactive source", e);
+ }
+ }
+ this.running = false;
+ }
+ }
+ finally {
+ this.lock.unlock();
+ }
+ }
+
+ @Override
+ public boolean isRunning() {
+ return this.running;
+ }
+
+ @Override
+ public int getPhase() {
+ return 0;
+ }
+
+}
diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitterErrorMessages.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitterErrorMessages.java
new file mode 100644
index 000000000..35965426f
--- /dev/null
+++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/StreamEmitterErrorMessages.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2016-2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.reactive;
+
+import org.springframework.cloud.stream.binding.StreamAnnotationErrorMessages;
+
+/**
+ * @author Soby Chacko
+ * @since 1.3.0
+ */
+abstract class StreamEmitterErrorMessages extends StreamAnnotationErrorMessages {
+
+ private static final String PREFIX = "A method annotated with @StreamEmitter ";
+
+ static final String RETURN_TYPE_NO_OUTBOUND_SPECIFIED = PREFIX
+ + "having a return type should also have an outbound target specified at the method level.";
+
+ static final String RETURN_TYPE_METHOD_ARGUMENTS = PREFIX
+ + "having a return type should not have any method arguments";
+
+ static final String INVALID_OUTPUT_METHOD_PARAMETERS = "@Output annotations are not permitted on "
+ + "method parameters while using the @StreamEmitter and a method-level output specification";
+
+ static final String NO_OUTPUT_SPECIFIED = "No method level or parameter level @Output annotations are detected. "
+ + "@StreamEmitter requires a method or parameter level @Output annotation.";
+
+ static final String OUTPUT_ANNOTATION_MISSING_ON_METHOD_PARAMETERS_VOID_RETURN_TYPE = PREFIX
+ + "and void return type without method level @Output annotation requires @Output on each of the method parameter";
+
+ static final String INPUT_ANNOTATIONS_ARE_NOT_ALLOWED = PREFIX
+ + "cannot contain @Input annotations";
+
+ static final String CANNOT_CONVERT_RETURN_TYPE_TO_ANY_AVAILABLE_RESULT_ADAPTERS =
+ "No suitable adapters are found that can convert the return type";
+
+}
diff --git a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterBasicTests.java b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterBasicTests.java
new file mode 100644
index 000000000..a6c477c1a
--- /dev/null
+++ b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterBasicTests.java
@@ -0,0 +1,308 @@
+/*
+ * Copyright 2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.reactive;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.Output;
+import org.springframework.cloud.stream.messaging.Processor;
+import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.integration.dsl.IntegrationFlows;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.support.GenericMessage;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * @author Soby Chacko
+ * @author Artem Bilan
+ */
+public class StreamEmitterBasicTests {
+
+ @Test
+ public void testFluxReturnAndOutputMethodLevel() throws Exception {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestFluxReturnAndOutputMethodLevel.class);
+ context.refresh();
+ receiveAndValidate(context);
+ context.close();
+ }
+
+ @Test
+ public void testVoidReturnAndOutputMethodParameter() throws Exception {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestVoidReturnAndOutputMethodParameter.class);
+ context.refresh();
+ receiveAndValidate(context);
+ context.close();
+ }
+
+ @Test
+ public void testVoidReturnAndOutputAtMethodLevel() throws Exception {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestVoidReturnAndOutputAtMethodLevel.class);
+ context.refresh();
+ receiveAndValidate(context);
+ context.close();
+ }
+
+ @Test
+ public void testVoidReturnAndMultipleOutputMethodParameters() throws Exception {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestVoidReturnAndMultipleOutputMethodParameters.class);
+ context.refresh();
+ receiveAndValidateMultipleOutputs(context);
+ context.close();
+ }
+
+ @Test
+ public void testMultipleStreamEmitterMethods() throws Exception {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestMultipleStreamEmitterMethods.class);
+ context.refresh();
+ receiveAndValidateMultipleOutputs(context);
+ context.close();
+ }
+
+ @Test
+ public void testSameAppContextWithMultipleStreamEmitters() throws Exception {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestSameAppContextWithMultipleStreamEmitters.class);
+ context.refresh();
+ receiveAndValidateMultiStreamEmittersInSameContext(context);
+ context.close();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void receiveAndValidate(ConfigurableApplicationContext context) throws InterruptedException {
+ Source source = context.getBean(Source.class);
+ MessageCollector messageCollector = context.getBean(MessageCollector.class);
+ List messages = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ messages.add((String) messageCollector.forChannel(source.output()).poll(5000, TimeUnit.MILLISECONDS).getPayload());
+ }
+ for (int i = 0; i < 1000; i++) {
+ assertThat(messages.get(i)).isEqualTo("HELLO WORLD!!" + i);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void receiveAndValidateMultipleOutputs(ConfigurableApplicationContext context) throws InterruptedException {
+ TestMultiOutboundChannels source = context.getBean(TestMultiOutboundChannels.class);
+ MessageCollector messageCollector = context.getBean(MessageCollector.class);
+ List messages = new ArrayList<>();
+ assertMessages(source.output1(), messageCollector, messages);
+ messages.clear();
+ assertMessages(source.output2(), messageCollector, messages);
+ messages.clear();
+ assertMessages(source.output3(), messageCollector, messages);
+ messages.clear();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void receiveAndValidateMultiStreamEmittersInSameContext(ConfigurableApplicationContext context1) throws InterruptedException {
+ TestMultiOutboundChannels source1 = context1.getBean(TestMultiOutboundChannels.class);
+ MessageCollector messageCollector = context1.getBean(MessageCollector.class);
+
+ List messages = new ArrayList<>();
+ assertMessagesX(source1.output1(), messageCollector, messages);
+ messages.clear();
+ assertMessagesY(source1.output2(), messageCollector, messages);
+ messages.clear();
+ }
+
+ private static void assertMessages(MessageChannel channel, MessageCollector messageCollector, List messages) throws InterruptedException {
+ for (int i = 0; i < 1000; i++) {
+ messages.add((String) messageCollector.forChannel(channel).poll(5000, TimeUnit.MILLISECONDS).getPayload());
+ }
+ for (int i = 0; i < 1000; i++) {
+ assertThat(messages.get(i)).isEqualTo("Hello World!!" + i);
+ }
+ }
+
+ private static void assertMessagesX(MessageChannel channel, MessageCollector messageCollector, List messages) throws InterruptedException {
+ for (int i = 0; i < 1000; i++) {
+ messages.add((String) messageCollector.forChannel(channel).poll(5000, TimeUnit.MILLISECONDS).getPayload());
+ }
+ for (int i = 0; i < 1000; i++) {
+ assertThat(messages.get(i)).isEqualTo("Hello World!!" + i);
+ }
+ }
+
+ private static void assertMessagesY(MessageChannel channel, MessageCollector messageCollector, List messages) throws InterruptedException {
+ for (int i = 0; i < 1000; i++) {
+ messages.add((String) messageCollector.forChannel(channel).poll(5000, TimeUnit.MILLISECONDS).getPayload());
+ }
+ for (int i = 0; i < 1000; i++) {
+ assertThat(messages.get(i)).isEqualTo("Hello FooBar!!" + i);
+ }
+ }
+
+ @EnableBinding(Processor.class)
+ @EnableAutoConfiguration
+ public static class TestFluxReturnAndOutputMethodLevel {
+
+ @StreamEmitter
+ @Output(Source.OUTPUT)
+ @Bean
+ public Publisher> emit() {
+ AtomicInteger atomicInteger = new AtomicInteger();
+ return IntegrationFlows.from(() ->
+ new GenericMessage<>("Hello World!!" + atomicInteger.getAndIncrement()),
+ e -> e.poller(p -> p.fixedDelay(1)))
+ .transform(String::toUpperCase)
+ .toReactivePublisher();
+ }
+
+ }
+
+ @EnableBinding(Processor.class)
+ @EnableAutoConfiguration
+ public static class TestVoidReturnAndOutputMethodParameter {
+
+ @StreamEmitter
+ public void emit(@Output(Source.OUTPUT) FluxSender output) {
+ output.send(Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l)
+ .map(String::toUpperCase));
+ }
+ }
+
+ @EnableBinding(Processor.class)
+ @EnableAutoConfiguration
+ public static class TestVoidReturnAndOutputAtMethodLevel {
+
+ @StreamEmitter
+ @Output(Source.OUTPUT)
+ public void emit(FluxSender output) {
+ output.send(Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l)
+ .map(String::toUpperCase));
+ }
+ }
+
+ @EnableBinding(TestMultiOutboundChannels.class)
+ @EnableAutoConfiguration
+ public static class TestVoidReturnAndMultipleOutputMethodParameters {
+
+ @StreamEmitter
+ public void emit(@Output(TestMultiOutboundChannels.OUTPUT1) FluxSender output1,
+ @Output(TestMultiOutboundChannels.OUTPUT2) FluxSender output2,
+ @Output(TestMultiOutboundChannels.OUTPUT3) FluxSender output3) {
+ output1.send(Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l));
+ output2.send(Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l));
+ output3.send(Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l));
+ }
+ }
+
+ @EnableBinding(TestMultiOutboundChannels.class)
+ @EnableAutoConfiguration
+ public static class TestMultipleStreamEmitterMethods {
+
+ @StreamEmitter
+ @Output(TestMultiOutboundChannels.OUTPUT1)
+ public Flux emit1() {
+ return Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l);
+ }
+
+ @StreamEmitter
+ @Output(TestMultiOutboundChannels.OUTPUT2)
+ public Flux emit2() {
+ return Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l);
+ }
+
+ @StreamEmitter
+ public void emit3(@Output(TestMultiOutboundChannels.OUTPUT3) FluxSender outputX) {
+ outputX.send(Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l));
+ }
+ }
+
+ @EnableBinding(TestMultiOutboundChannels.class)
+ @EnableAutoConfiguration
+ public static class TestSameAppContextWithMultipleStreamEmitters {
+
+ @Bean
+ public Foo foo() {
+ return new Foo();
+ }
+
+ @Bean
+ public Bar bar() {
+ return new Bar();
+ }
+
+ static class Foo {
+
+ @StreamEmitter
+ @Output(TestMultiOutboundChannels.OUTPUT1)
+ public Flux emit1() {
+ return Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l);
+ }
+ }
+
+ static class Bar {
+
+ @StreamEmitter
+ @Output(TestMultiOutboundChannels.OUTPUT2)
+ public Flux emit2() {
+ return Flux.intervalMillis(1)
+ .map(l -> "Hello FooBar!!" + l);
+ }
+ }
+ }
+
+ interface TestMultiOutboundChannels {
+
+ String OUTPUT1 = "output1";
+
+ String OUTPUT2 = "output2";
+
+ String OUTPUT3 = "output3";
+
+ @Output(TestMultiOutboundChannels.OUTPUT1)
+ MessageChannel output1();
+
+ @Output(TestMultiOutboundChannels.OUTPUT2)
+ MessageChannel output2();
+
+ @Output(TestMultiOutboundChannels.OUTPUT3)
+ MessageChannel output3();
+
+ }
+}
diff --git a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterValidationTests.java b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterValidationTests.java
new file mode 100644
index 000000000..c5d1be2cf
--- /dev/null
+++ b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterValidationTests.java
@@ -0,0 +1,307 @@
+/*
+ * Copyright 2016-2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.reactive;
+
+import org.junit.Test;
+import reactor.core.publisher.Flux;
+
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.Input;
+import org.springframework.cloud.stream.annotation.Output;
+import org.springframework.cloud.stream.messaging.Processor;
+import org.springframework.cloud.stream.messaging.Source;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import static org.springframework.cloud.stream.binding.StreamAnnotationErrorMessages.ATLEAST_ONE_OUTPUT;
+import static org.springframework.cloud.stream.binding.StreamAnnotationErrorMessages.INVALID_OUTBOUND_NAME;
+import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.CANNOT_CONVERT_RETURN_TYPE_TO_ANY_AVAILABLE_RESULT_ADAPTERS;
+import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.INPUT_ANNOTATIONS_ARE_NOT_ALLOWED;
+import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.INVALID_OUTPUT_METHOD_PARAMETERS;
+import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.NO_OUTPUT_SPECIFIED;
+import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.OUTPUT_ANNOTATION_MISSING_ON_METHOD_PARAMETERS_VOID_RETURN_TYPE;
+import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.RETURN_TYPE_METHOD_ARGUMENTS;
+import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED;
+
+/**
+ * @author Soby Chacko
+ */
+public class StreamEmitterValidationTests {
+
+ @Test
+ public void testOutputAsMethodandMethodParameter() {
+ try {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestOutputAsMethodandMethodParameter.class);
+ context.refresh();
+ context.close();
+ fail("Expected exception: " + INVALID_OUTPUT_METHOD_PARAMETERS);
+ }
+ catch (Exception e) {
+ assertThat(e.getMessage()).contains(INVALID_OUTPUT_METHOD_PARAMETERS);
+ }
+ }
+
+ @Test
+ public void testFluxReturnTypeNoOutputGiven() {
+ try {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestFluxReturnTypeNoOutputGiven.class);
+ context.refresh();
+ context.close();
+ fail("Expected exception: " + NO_OUTPUT_SPECIFIED);
+ }
+ catch (Exception e) {
+ assertThat(e.getMessage()).contains(NO_OUTPUT_SPECIFIED);
+ }
+ }
+
+ @Test
+ public void testVoidReturnTypeNoOutputGiven() {
+ try {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestVoidReturnTypeNoOutputGiven.class);
+ context.refresh();
+ context.close();
+ fail("Expected exception: " + NO_OUTPUT_SPECIFIED);
+ }
+ catch (Exception e) {
+ assertThat(e.getMessage()).contains(NO_OUTPUT_SPECIFIED);
+ }
+ }
+
+ @Test
+ public void testNonVoidReturnButOutputAsMethodParameter() {
+ try {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestNonVoidReturnButOutputAsMethodParameter.class);
+ context.refresh();
+ context.close();
+ fail("Expected exception: " + RETURN_TYPE_NO_OUTBOUND_SPECIFIED);
+ }
+ catch (Exception e) {
+ assertThat(e.getMessage()).contains(RETURN_TYPE_NO_OUTBOUND_SPECIFIED);
+ }
+ }
+
+ @Test
+ public void testNonVoidReturnButMethodArguments() {
+ try {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestNonVoidReturnButMethodArguments.class);
+ context.refresh();
+ context.close();
+ fail("Expected exception: " + RETURN_TYPE_METHOD_ARGUMENTS);
+ }
+ catch (Exception e) {
+ assertThat(e.getMessage()).contains(RETURN_TYPE_METHOD_ARGUMENTS);
+ }
+ }
+
+ @Test
+ public void testVoidReturnTypeMultipleMethodParametersWithOneMissingOutput() {
+ try {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestVoidReturnTypeMultipleMethodParametersWithOneMissingOutput.class);
+ context.refresh();
+ context.close();
+ fail("Expected exception: " + OUTPUT_ANNOTATION_MISSING_ON_METHOD_PARAMETERS_VOID_RETURN_TYPE);
+ }
+ catch (Exception e) {
+ assertThat(e.getMessage()).contains(OUTPUT_ANNOTATION_MISSING_ON_METHOD_PARAMETERS_VOID_RETURN_TYPE);
+ }
+ }
+
+ @Test
+ public void testOutputAtCorrectLocationButNameMissing1() {
+ try {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestOutputAtCorrectLocationButNameMissing1.class);
+ context.refresh();
+ context.close();
+ fail("Expected exception: " + ATLEAST_ONE_OUTPUT);
+ }
+ catch (Exception e) {
+ assertThat(e.getMessage()).contains(ATLEAST_ONE_OUTPUT);
+ }
+ }
+
+ @Test
+ public void testOutputAtCorrectLocationButNameMissing2() {
+ try {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestOutputAtCorrectLocationButNameMissing2.class);
+ context.refresh();
+ context.close();
+ fail("Expected exception: " + INVALID_OUTBOUND_NAME);
+ }
+ catch (Exception e) {
+ assertThat(e.getMessage()).contains(INVALID_OUTBOUND_NAME);
+ }
+ }
+
+ @Test
+ public void testInputAnnotationsAreNotPermitted() {
+ try {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestInputAnnotationsAreNotPermitted.class);
+ context.refresh();
+ context.close();
+ fail("Expected exception: " + INPUT_ANNOTATIONS_ARE_NOT_ALLOWED);
+ }
+ catch (Exception e) {
+ assertThat(e.getMessage()).contains(INPUT_ANNOTATIONS_ARE_NOT_ALLOWED);
+ }
+ }
+
+ @Test
+ public void testReturnTypeNotSupported() {
+ try {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(TestReturnTypeNotSupported.class);
+ context.refresh();
+ context.close();
+ fail("Expected exception: " + CANNOT_CONVERT_RETURN_TYPE_TO_ANY_AVAILABLE_RESULT_ADAPTERS);
+ }
+ catch (Exception e) {
+ assertThat(e.getMessage()).contains(CANNOT_CONVERT_RETURN_TYPE_TO_ANY_AVAILABLE_RESULT_ADAPTERS);
+ }
+ }
+
+ @EnableBinding(Processor.class)
+ @EnableAutoConfiguration
+ public static class TestOutputAsMethodandMethodParameter {
+
+ @StreamEmitter
+ @Output(Source.OUTPUT)
+ public void receive(@Output(Source.OUTPUT) FluxSender output) {
+ output.send(Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l));
+ }
+ }
+
+ @EnableBinding(Processor.class)
+ @EnableAutoConfiguration
+ public static class TestFluxReturnTypeNoOutputGiven {
+
+ @StreamEmitter
+ public Flux emit() {
+ return Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l);
+ }
+ }
+
+ @EnableBinding(Processor.class)
+ @EnableAutoConfiguration
+ public static class TestVoidReturnTypeNoOutputGiven {
+
+ @StreamEmitter
+ public void emit(FluxSender output) {
+ output.send(Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l));
+ }
+ }
+
+ @EnableBinding(Processor.class)
+ @EnableAutoConfiguration
+ public static class TestNonVoidReturnButOutputAsMethodParameter {
+
+ @StreamEmitter
+ public Flux emit(@Output(Source.OUTPUT) FluxSender output) {
+ return Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l);
+ }
+ }
+
+ @EnableBinding(Processor.class)
+ @EnableAutoConfiguration
+ public static class TestNonVoidReturnButMethodArguments {
+
+ @StreamEmitter
+ @Output(Source.OUTPUT)
+ public Flux receive(FluxSender output) {
+ return Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l);
+ }
+ }
+
+ @EnableBinding(StreamEmitterBasicTests.TestMultiOutboundChannels.class)
+ @EnableAutoConfiguration
+ public static class TestVoidReturnTypeMultipleMethodParametersWithOneMissingOutput {
+
+ @StreamEmitter
+ public void emit(@Output(StreamEmitterBasicTests.TestMultiOutboundChannels.OUTPUT1) FluxSender output1,
+ @Output(StreamEmitterBasicTests.TestMultiOutboundChannels.OUTPUT2) FluxSender output2,
+ FluxSender output3) {
+ output1.send(Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l));
+ output2.send(Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l));
+ output3.send(Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l));
+ }
+ }
+
+ @EnableBinding(Processor.class)
+ @EnableAutoConfiguration
+ public static class TestOutputAtCorrectLocationButNameMissing1 {
+
+ @StreamEmitter
+ @Output("")
+ public void receive(FluxSender output) {
+ output.send(Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l));
+ }
+ }
+
+ @EnableBinding(StreamEmitterBasicTests.TestMultiOutboundChannels.class)
+ @EnableAutoConfiguration
+ public static class TestOutputAtCorrectLocationButNameMissing2 {
+
+ @StreamEmitter
+ public void emit(@Output("") FluxSender output1) {
+ output1.send(Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l));
+ }
+ }
+
+ @EnableBinding(Processor.class)
+ @EnableAutoConfiguration
+ public static class TestInputAnnotationsAreNotPermitted {
+
+ @StreamEmitter
+ @Output(Source.OUTPUT)
+ @Input(Processor.INPUT)
+ public Flux emit() {
+ return Flux.intervalMillis(1)
+ .map(l -> "Hello World!!" + l);
+ }
+ }
+
+ @EnableBinding(Processor.class)
+ @EnableAutoConfiguration
+ public static class TestReturnTypeNotSupported {
+
+ @StreamEmitter
+ @Output(Source.OUTPUT)
+ public String emit() {
+ return "hello";
+ }
+ }
+}
diff --git a/spring-cloud-stream-reactive/src/test/resources/logback.xml b/spring-cloud-stream-reactive/src/test/resources/logback.xml
new file mode 100644
index 000000000..412f0d7d9
--- /dev/null
+++ b/spring-cloud-stream-reactive/src/test/resources/logback.xml
@@ -0,0 +1,10 @@
+
+
+
+ %d{ISO8601} %5p %t %c{2}:%L - %m%n
+
+
+
+
+
+
diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageChannelStreamListenerResultAdapter.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageChannelStreamListenerResultAdapter.java
index e6d6bb629..15052613b 100644
--- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageChannelStreamListenerResultAdapter.java
+++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageChannelStreamListenerResultAdapter.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016 the original author or authors.
+ * Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,9 @@
package org.springframework.cloud.stream.binding;
+import java.io.Closeable;
+import java.io.IOException;
+
import org.springframework.integration.handler.BridgeHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
@@ -24,7 +27,9 @@ import org.springframework.messaging.SubscribableChannel;
* A {@link StreamListenerResultAdapter} used for bridging an
* {@link org.springframework.cloud.stream.annotation.Output} {@link MessageChannel} to a
* bound {@link MessageChannel}.
+ *
* @author Marius Bogoevici
+ * @author Soby Chacko
*/
public class MessageChannelStreamListenerResultAdapter
implements StreamListenerResultAdapter {
@@ -36,10 +41,22 @@ public class MessageChannelStreamListenerResultAdapter
}
@Override
- public void adapt(MessageChannel streamListenerResult, MessageChannel bindingTarget) {
+ public Closeable adapt(MessageChannel streamListenerResult, MessageChannel bindingTarget) {
BridgeHandler handler = new BridgeHandler();
handler.setOutputChannel(bindingTarget);
handler.afterPropertiesSet();
((SubscribableChannel) streamListenerResult).subscribe(handler);
+
+ return new NoOpCloseeable();
}
+
+ private static final class NoOpCloseeable implements Closeable {
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ }
+
}
diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamAnnotationCommonMethodUtils.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamAnnotationCommonMethodUtils.java
new file mode 100644
index 000000000..1efa65583
--- /dev/null
+++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamAnnotationCommonMethodUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2016-2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.binding;
+
+import java.lang.reflect.Method;
+
+import org.springframework.cloud.stream.annotation.Output;
+import org.springframework.core.MethodParameter;
+import org.springframework.core.annotation.AnnotationUtils;
+import org.springframework.messaging.handler.annotation.SendTo;
+import org.springframework.util.Assert;
+import org.springframework.util.ObjectUtils;
+import org.springframework.util.StringUtils;
+
+/**
+ * Common methods that can be used across various Stream annotations.
+ *
+ * @author Soby Chacko
+ * @since 1.3.0
+ */
+public abstract class StreamAnnotationCommonMethodUtils {
+
+ public static String getOutboundBindingTargetName(Method method) {
+ SendTo sendTo = AnnotationUtils.findAnnotation(method, SendTo.class);
+ if (sendTo != null) {
+ Assert.isTrue(!ObjectUtils.isEmpty(sendTo.value()), StreamAnnotationErrorMessages.ATLEAST_ONE_OUTPUT);
+ Assert.isTrue(sendTo.value().length == 1, StreamAnnotationErrorMessages.SEND_TO_MULTIPLE_DESTINATIONS);
+ Assert.hasText(sendTo.value()[0], StreamAnnotationErrorMessages.SEND_TO_EMPTY_DESTINATION);
+ return sendTo.value()[0];
+ }
+ Output output = AnnotationUtils.findAnnotation(method, Output.class);
+ if (output != null) {
+ Assert.isTrue(StringUtils.hasText(output.value()), StreamAnnotationErrorMessages.ATLEAST_ONE_OUTPUT);
+ return output.value();
+ }
+ return null;
+ }
+
+ public static int outputAnnotationCount(Method method) {
+ int outputAnnotationCount = 0;
+ for (int parameterIndex = 0; parameterIndex < method.getParameterTypes().length; parameterIndex++) {
+ MethodParameter methodParameter = MethodParameter.forMethodOrConstructor(method, parameterIndex);
+ if (methodParameter.hasParameterAnnotation(Output.class)) {
+ outputAnnotationCount++;
+ }
+ }
+ return outputAnnotationCount;
+ }
+}
diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamAnnotationErrorMessages.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamAnnotationErrorMessages.java
new file mode 100644
index 000000000..01e38d635
--- /dev/null
+++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamAnnotationErrorMessages.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2016-2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.binding;
+
+/**
+ * @author Soby Chacko
+ */
+public abstract class StreamAnnotationErrorMessages {
+
+ public static final String ATLEAST_ONE_OUTPUT = "At least one output must be specified";
+
+ public static final String SEND_TO_MULTIPLE_DESTINATIONS = "Multiple destinations cannot be specified";
+
+ public static final String SEND_TO_EMPTY_DESTINATION = "An empty destination cannot be specified";
+
+ public static final String INVALID_OUTBOUND_NAME = "The @Output annotation must have the name of an input as value";
+
+}
diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerResultAdapter.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerResultAdapter.java
index 3c1355ae9..5283ad428 100644
--- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerResultAdapter.java
+++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerResultAdapter.java
@@ -16,6 +16,8 @@
package org.springframework.cloud.stream.binding;
+import java.io.Closeable;
+
/**
* A strategy for adapting the result of a
* {@link org.springframework.cloud.stream.annotation.StreamListener} annotated method to
@@ -41,6 +43,6 @@ public interface StreamListenerResultAdapter {
* @param streamListenerResult the result of invoking the method.
* @param bindingTarget the binding target.
*/
- void adapt(R streamListenerResult, B bindingTarget);
+ Closeable adapt(R streamListenerResult, B bindingTarget);
}