GH-722: Add support for Reactive Sources

Fixes spring-cloud/spring-cloud-stream#722

Addressing PR review comments

Further addressing PR review comments
Degenericizing `StreamListenerResultAdapter` with the 3rd generic argument
added for returning `Disposable` `StreamListenerResultAdapter` adapt method
now returns `java.io.Closeable`

Addressing PR review comments

Addressing PR review comments

Converting `FluxToMessageChannelResultAdapter` to `PublisherToMessageChannelResultAdapter`

Add `logback.xml` for reactive tests

* Revert `AopUtils.getTargetClass(bean)` to really deal with target classes only.
Although it does not make any effect on the `@Configuration` classes
* Use `AnnotatedElementUtils.isAnnotated()` instead of `getMergedAnnotation()`
to avoid synthetic methods in the `@Configuration` classes
* Add artificial `.transform()` to the `IntegrationFlow` test to overcome
SI Java DSL bug
* Apply similar `.map()` for `Flux` tests configs for proper assertions

* Simple code style polishing
This commit is contained in:
Soby Chacko
2017-05-22 15:59:40 -04:00
committed by Artem Bilan
parent 51deb128f0
commit c3246f3257
17 changed files with 1307 additions and 48 deletions

View File

@@ -47,6 +47,11 @@
<artifactId>spring-cloud-stream-test-support-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@@ -0,0 +1,73 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.reactive;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import org.springframework.util.Assert;
/**
* Default {@link org.springframework.cloud.stream.reactive.FluxSender} implementation.
* This implementation may be used for cancelling a subscription on the underlying
* {@link reactor.core.publisher.Flux}.
*
* @author Soby Chacko
*
* @since 1.3.0
*/
class DefaultFluxSender implements FluxSender {
private Log log = LogFactory.getLog(DefaultFluxSender.class);
private volatile Disposable disposable;
private final Consumer<Object> consumer;
DefaultFluxSender(Consumer<Object> consumer) {
Assert.notNull(consumer, "Consumer must not be null");
this.consumer = consumer;
}
@Override
public Mono<Void> send(Flux<?> flux) {
MonoProcessor<Void> sendResult = MonoProcessor.create();
// add error handling and reconnect in the event of an error
this.disposable = flux
.doOnError(e -> this.log.error("Error during processing: ", e))
.retry()
.subscribe(
this.consumer,
sendResult::onError,
sendResult::onComplete);
return sendResult;
}
@Override
public void close() {
if (this.disposable != null) {
this.disposable.dispose();
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,15 +16,21 @@
package org.springframework.cloud.stream.reactive;
import java.io.Closeable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Used for {@link org.springframework.cloud.stream.annotation.StreamListener} arguments
* Used for {@link org.springframework.cloud.stream.annotation.StreamListener}
* and {@link org.springframework.cloud.stream.reactive.StreamEmitter} arguments
* annotated with {@link org.springframework.cloud.stream.annotation.Output}.
*
* @author Marius Bogoevici
*
* @see reactor.core.Disposable
*/
public interface FluxSender {
public interface FluxSender extends Closeable {
/**
* Streams the {@link reactor.core.publisher.Flux} through the binding target
@@ -35,4 +41,5 @@ public interface FluxSender {
* error)
*/
Mono<Void> send(Flux<?> flux);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,10 +16,6 @@
package org.springframework.cloud.stream.reactive;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.MonoProcessor;
import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
@@ -30,13 +26,13 @@ import org.springframework.messaging.MessageChannel;
/**
* Adapts an {@link org.springframework.cloud.stream.annotation.Output} annotated
* {@link FluxSender} to an outbound {@link MessageChannel}.
*
* @author Marius Bogoevici
* @author Soby Chacko
*/
public class MessageChannelToFluxSenderParameterAdapter
implements StreamListenerParameterAdapter<FluxSender, MessageChannel> {
private Log log = LogFactory.getLog(MessageChannelToFluxSenderParameterAdapter.class);
@Override
public boolean supports(Class<?> bindingTargetType, MethodParameter methodParameter) {
ResolvableType type = ResolvableType.forMethodParameter(methodParameter);
@@ -46,18 +42,10 @@ public class MessageChannelToFluxSenderParameterAdapter
@Override
public FluxSender adapt(MessageChannel bindingTarget, MethodParameter parameter) {
return resultPublisher -> {
MonoProcessor<Void> sendResult = MonoProcessor.create();
// add error handling and reconnect in the event of an error
resultPublisher
.doOnError(e -> this.log.error("Error during processing: ", e))
.retry()
.subscribe(
result -> bindingTarget.send(result instanceof Message<?> ? (Message<?>) result
: MessageBuilder.withPayload(result).build()),
e -> sendResult.onError(e),
() -> sendResult.onComplete());
return sendResult;
};
return new DefaultFluxSender(result ->
bindingTarget.send(result instanceof Message<?>
? (Message<?>) result
: MessageBuilder.withPayload(result).build()));
}
}

View File

@@ -16,8 +16,9 @@
package org.springframework.cloud.stream.reactive;
import java.io.Closeable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import rx.Observable;
import rx.RxReactiveStreams;
@@ -34,12 +35,12 @@ import org.springframework.util.Assert;
public class ObservableToMessageChannelResultAdapter
implements StreamListenerResultAdapter<Observable<?>, MessageChannel> {
private FluxToMessageChannelResultAdapter fluxToMessageChannelResultAdapter;
private PublisherToMessageChannelResultAdapter publisherToMessageChannelResultAdapter;
public ObservableToMessageChannelResultAdapter(
FluxToMessageChannelResultAdapter fluxToMessageChannelResultAdapter) {
Assert.notNull(fluxToMessageChannelResultAdapter, "cannot be null");
this.fluxToMessageChannelResultAdapter = fluxToMessageChannelResultAdapter;
PublisherToMessageChannelResultAdapter publisherToMessageChannelResultAdapter) {
Assert.notNull(publisherToMessageChannelResultAdapter, "cannot be null");
this.publisherToMessageChannelResultAdapter = publisherToMessageChannelResultAdapter;
}
@Override
@@ -48,8 +49,9 @@ public class ObservableToMessageChannelResultAdapter
&& MessageChannel.class.isAssignableFrom(bindingTarget);
}
public void adapt(Observable<?> streamListenerResult, MessageChannel bindingTarget) {
public Closeable adapt(Observable<?> streamListenerResult, MessageChannel bindingTarget) {
Publisher<?> adaptedPublisher = RxReactiveStreams.toPublisher(streamListenerResult);
this.fluxToMessageChannelResultAdapter.adapt(Flux.from(adaptedPublisher), bindingTarget);
return this.publisherToMessageChannelResultAdapter.adapt(adaptedPublisher, bindingTarget);
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,8 +16,13 @@
package org.springframework.cloud.stream.reactive;
import java.io.Closeable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
@@ -27,25 +32,34 @@ import org.springframework.messaging.MessageChannel;
/**
* A {@link org.springframework.cloud.stream.binding.StreamListenerResultAdapter} from a
* {@link Flux} return type to a bound {@link MessageChannel}.
* {@link Publisher} return type to a bound {@link MessageChannel}.
*
* @author Marius Bogoevici
* @author Soby Chacko
*
*/
public class FluxToMessageChannelResultAdapter
implements StreamListenerResultAdapter<Flux<?>, MessageChannel> {
public class PublisherToMessageChannelResultAdapter
implements StreamListenerResultAdapter<Publisher<?>, MessageChannel> {
private Log log = LogFactory.getLog(FluxToMessageChannelResultAdapter.class);
private Log log = LogFactory.getLog(PublisherToMessageChannelResultAdapter.class);
@Override
public boolean supports(Class<?> resultType, Class<?> bindingTarget) {
return Flux.class.isAssignableFrom(resultType) && MessageChannel.class.isAssignableFrom(bindingTarget);
return Publisher.class.isAssignableFrom(resultType)
&& MessageChannel.class.isAssignableFrom(bindingTarget);
}
public void adapt(Flux<?> streamListenerResult, MessageChannel bindingTarget) {
streamListenerResult
public Closeable adapt(Publisher<?> streamListenerResult, MessageChannel bindingTarget) {
Disposable disposable = Flux.from(streamListenerResult)
.doOnError(e -> this.log.error("Error while processing result", e))
.retry()
.subscribe(
result -> bindingTarget.send(result instanceof Message<?> ? (Message<?>) result
: MessageBuilder.withPayload(result).build()));
result ->
bindingTarget.send(result instanceof Message<?>
? (Message<?>) result
: MessageBuilder.withPayload(result).build()));
return disposable::dispose;
}
}

View File

@@ -46,9 +46,14 @@ public class ReactiveSupportAutoConfiguration {
}
@Bean
@ConditionalOnMissingBean(FluxToMessageChannelResultAdapter.class)
public FluxToMessageChannelResultAdapter fluxToMessageChannelResultAdapter() {
return new FluxToMessageChannelResultAdapter();
@ConditionalOnMissingBean(PublisherToMessageChannelResultAdapter.class)
public PublisherToMessageChannelResultAdapter fluxToMessageChannelResultAdapter() {
return new PublisherToMessageChannelResultAdapter();
}
@Bean
public static StreamEmitterAnnotationBeanPostProcessor streamEmitterAnnotationBeanPostProcessor() {
return new StreamEmitterAnnotationBeanPostProcessor();
}
@Configuration
@@ -72,8 +77,8 @@ public class ReactiveSupportAutoConfiguration {
@Bean
@ConditionalOnMissingBean(ObservableToMessageChannelResultAdapter.class)
public ObservableToMessageChannelResultAdapter observableToMessageChannelResultAdapter(
FluxToMessageChannelResultAdapter fluxToMessageChannelResultAdapter) {
return new ObservableToMessageChannelResultAdapter(fluxToMessageChannelResultAdapter);
PublisherToMessageChannelResultAdapter publisherToMessageChannelResultAdapter) {
return new ObservableToMessageChannelResultAdapter(publisherToMessageChannelResultAdapter);
}
}
}

View File

@@ -0,0 +1,94 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.reactive;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
/**
* Method level annotation that marks a method to be an emitter to outputs declared via
* {@link EnableBinding} (e.g. channels).
*
* This annotation is intended to be used in a Spring Cloud Stream application that requires
* a source to write to one or more {@link Output}s using the reactive paradigm.
*
* No {@link Input}s are allowed on a method that is annotated with StreamEmitter.
*
* Depending on how the method is structured, there are some flexibility in how the {@link Output} may be used.
*
* Here are some supported usage patterns:
*
* A StreamEmitter method that has a return type cannot take any method parameters.
*
* <pre class="code">
* &#064;StreamEmitter
* &#064;Output(Source.OUTPUT)
* public Flux<String> emit() {
* return Flux.intervalMillis(1000)
* .map(l -> "Hello World!!");
* }
* </pre>
*
* The following examples show how a void return type can be used on a method with StreamEmitter and how the
* method signatures could be used in a flexible manner.
*
* <pre class="code">
* &#064;StreamEmitter
* public void emit(&#064;Output(Source.OUTPUT) FluxSender output) {
* output.send(Flux.intervalMillis(1000)
* .map(l -> "Hello World!!"));
* }
* </pre>
*
* <pre class="code">
* &#064;StreamEmitter
* &#064;Output(Source.OUTPUT)
* public void emit(FluxSender output) {
* output.send(Flux.intervalMillis(1000)
* .map(l -> "Hello World!!"));
* }
* </pre>
*
* <pre class="code">
* &#064;StreamEmitter
* public void emit(&#064;Output("OUTPUT1") FluxSender output1,
* &#064;Output("OUTPUT2") FluxSender output2,
* &#064;Output("OUTPUT3)" FluxSender output3) {
* output1.send(Flux.intervalMillis(1000)
* .map(l -> "Hello World!!"));
* output2.send(Flux.intervalMillis(1000)
* .map(l -> "Hello World!!"));
* output3.send(Flux.intervalMillis(1000)
* .map(l -> "Hello World!!"));
* }
*</pre>
*
* @author Soby Chacko
*
* @since 1.3.0
*/
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface StreamEmitter {}

View File

@@ -0,0 +1,282 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.reactive;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactoryUtils;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.binding.MessageChannelStreamListenerResultAdapter;
import org.springframework.cloud.stream.binding.StreamAnnotationCommonMethodUtils;
import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.annotation.SynthesizingMethodParameter;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
/**
* {@link BeanPostProcessor} that handles {@link StreamEmitter} annotations found on bean methods.
*
* @author Soby Chacko
* @author Artem Bilan
*
* @since 1.3.0
*/
public class StreamEmitterAnnotationBeanPostProcessor
implements BeanPostProcessor, InitializingBean, ApplicationContextAware, SmartLifecycle {
private static final Log log = LogFactory.getLog(StreamEmitterAnnotationBeanPostProcessor.class);
private final List<StreamListenerParameterAdapter<?, Object>> streamListenerParameterAdapters = new ArrayList<>();
private final List<StreamListenerResultAdapter<?, ?>> streamListenerResultAdapters = new ArrayList<>();
private final List<Closeable> closeableFluxResources = new ArrayList<>();
private ConfigurableApplicationContext applicationContext;
private MultiValueMap<Object, Method> mappedStreamEmitterMethods = new LinkedMultiValueMap<>();
private volatile boolean running;
private final Lock lock = new ReentrantLock();
@Override
public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Assert.isTrue(applicationContext instanceof ConfigurableApplicationContext,
"ConfigurableApplicationContext is required");
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
@Override
@SuppressWarnings("unchecked")
public void afterPropertiesSet() throws Exception {
Map<String, StreamListenerParameterAdapter> parameterAdapterMap = BeanFactoryUtils
.beansOfTypeIncludingAncestors(this.applicationContext, StreamListenerParameterAdapter.class);
parameterAdapterMap.values().iterator().forEachRemaining(this.streamListenerParameterAdapters::add);
Map<String, StreamListenerResultAdapter> resultAdapterMap = BeanFactoryUtils
.beansOfTypeIncludingAncestors(this.applicationContext, StreamListenerResultAdapter.class);
this.streamListenerResultAdapters.add(new MessageChannelStreamListenerResultAdapter());
resultAdapterMap.values().iterator().forEachRemaining(this.streamListenerResultAdapters::add);
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
Class<?> targetClass = AopUtils.getTargetClass(bean);
ReflectionUtils.doWithMethods(targetClass,
method -> {
if (AnnotatedElementUtils.isAnnotated(method, StreamEmitter.class)) {
mappedStreamEmitterMethods.add(bean, method);
}
}, ReflectionUtils.USER_DECLARED_METHODS);
return bean;
}
@Override
public void start() {
try {
lock.lock();
if (!running) {
mappedStreamEmitterMethods.forEach((k, v) -> v.forEach(item -> {
Assert.isTrue(item.getAnnotation(Input.class) == null,
StreamEmitterErrorMessages.INPUT_ANNOTATIONS_ARE_NOT_ALLOWED);
String methodAnnotatedOutboundName =
StreamAnnotationCommonMethodUtils.getOutboundBindingTargetName(item);
int outputAnnotationCount = StreamAnnotationCommonMethodUtils.outputAnnotationCount(item);
validateStreamEmitterMethod(item, outputAnnotationCount, methodAnnotatedOutboundName);
invokeSetupMethodOnToTargetChannel(item, k, methodAnnotatedOutboundName);
}
));
this.running = true;
}
}
finally {
lock.unlock();
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private void invokeSetupMethodOnToTargetChannel(Method method, Object bean, String outboundName) {
Object[] arguments = new Object[method.getParameterCount()];
Object targetBean = null;
for (int parameterIndex = 0; parameterIndex < arguments.length; parameterIndex++) {
MethodParameter methodParameter = new SynthesizingMethodParameter(method, parameterIndex);
Class<?> parameterType = methodParameter.getParameterType();
Object targetReferenceValue = null;
if (methodParameter.hasParameterAnnotation(Output.class)) {
targetReferenceValue = AnnotationUtils.getValue(methodParameter.getParameterAnnotation(Output.class));
}
else if (arguments.length == 1 && StringUtils.hasText(outboundName)) {
targetReferenceValue = outboundName;
}
if (targetReferenceValue != null) {
targetBean = this.applicationContext.getBean((String) targetReferenceValue);
for (StreamListenerParameterAdapter<?, Object> streamListenerParameterAdapter : this.streamListenerParameterAdapters) {
if (streamListenerParameterAdapter.supports(targetBean.getClass(), methodParameter)) {
arguments[parameterIndex] = streamListenerParameterAdapter.adapt(targetBean,
methodParameter);
if (arguments[parameterIndex] instanceof FluxSender) {
closeableFluxResources.add((FluxSender) arguments[parameterIndex]);
}
break;
}
}
Assert.notNull(arguments[parameterIndex], "Cannot convert argument " + parameterIndex + " of " + method
+ "from " + targetBean.getClass() + " to " + parameterType);
}
else {
throw new IllegalStateException(StreamEmitterErrorMessages.ATLEAST_ONE_OUTPUT);
}
}
Object result;
try {
result = method.invoke(bean, arguments);
}
catch (Exception e) {
throw new BeanInitializationException("Cannot setup StreamEmitter for " + method, e);
}
if (!Void.TYPE.equals(method.getReturnType())) {
if (targetBean == null) {
targetBean = this.applicationContext.getBean(outboundName);
}
boolean streamListenerResultAdapterFound = false;
for (StreamListenerResultAdapter streamListenerResultAdapter : this.streamListenerResultAdapters) {
if (streamListenerResultAdapter.supports(result.getClass(), targetBean.getClass())) {
Closeable fluxDisposable = streamListenerResultAdapter.adapt(result, targetBean);
closeableFluxResources.add(fluxDisposable);
streamListenerResultAdapterFound = true;
break;
}
}
Assert.state(streamListenerResultAdapterFound,
StreamEmitterErrorMessages.CANNOT_CONVERT_RETURN_TYPE_TO_ANY_AVAILABLE_RESULT_ADAPTERS);
}
}
private static void validateStreamEmitterMethod(Method method, int outputAnnotationCount,
String methodAnnotatedOutboundName) {
if (StringUtils.hasText(methodAnnotatedOutboundName)) {
Assert.isTrue(outputAnnotationCount == 0,
StreamEmitterErrorMessages.INVALID_OUTPUT_METHOD_PARAMETERS);
}
else {
Assert.isTrue(outputAnnotationCount > 0, StreamEmitterErrorMessages.NO_OUTPUT_SPECIFIED);
}
if (!method.getReturnType().equals(Void.TYPE)) {
Assert.isTrue(StringUtils.hasText(methodAnnotatedOutboundName),
StreamEmitterErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED);
Assert.isTrue(method.getParameterCount() == 0, StreamEmitterErrorMessages.RETURN_TYPE_METHOD_ARGUMENTS);
}
else {
if (!StringUtils.hasText(methodAnnotatedOutboundName)) {
int methodArgumentsLength = method.getParameterTypes().length;
for (int parameterIndex = 0; parameterIndex < methodArgumentsLength; parameterIndex++) {
MethodParameter methodParameter = new MethodParameter(method, parameterIndex);
if (methodParameter.hasParameterAnnotation(Output.class)) {
String outboundName = (String) AnnotationUtils
.getValue(methodParameter.getParameterAnnotation(Output.class));
Assert.isTrue(StringUtils.hasText(outboundName),
StreamEmitterErrorMessages.INVALID_OUTBOUND_NAME);
}
else {
throw new IllegalArgumentException(
StreamEmitterErrorMessages.OUTPUT_ANNOTATION_MISSING_ON_METHOD_PARAMETERS_VOID_RETURN_TYPE);
}
}
}
}
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
stop();
if (callback != null) {
callback.run();
}
}
@Override
public void stop() {
try {
this.lock.lock();
if (this.running) {
for (Closeable closeable : closeableFluxResources) {
try {
closeable.close();
}
catch (IOException e) {
log.error("Error closing reactive source", e);
}
}
this.running = false;
}
}
finally {
this.lock.unlock();
}
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
public int getPhase() {
return 0;
}
}

View File

@@ -0,0 +1,50 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.reactive;
import org.springframework.cloud.stream.binding.StreamAnnotationErrorMessages;
/**
* @author Soby Chacko
* @since 1.3.0
*/
abstract class StreamEmitterErrorMessages extends StreamAnnotationErrorMessages {
private static final String PREFIX = "A method annotated with @StreamEmitter ";
static final String RETURN_TYPE_NO_OUTBOUND_SPECIFIED = PREFIX
+ "having a return type should also have an outbound target specified at the method level.";
static final String RETURN_TYPE_METHOD_ARGUMENTS = PREFIX
+ "having a return type should not have any method arguments";
static final String INVALID_OUTPUT_METHOD_PARAMETERS = "@Output annotations are not permitted on "
+ "method parameters while using the @StreamEmitter and a method-level output specification";
static final String NO_OUTPUT_SPECIFIED = "No method level or parameter level @Output annotations are detected. "
+ "@StreamEmitter requires a method or parameter level @Output annotation.";
static final String OUTPUT_ANNOTATION_MISSING_ON_METHOD_PARAMETERS_VOID_RETURN_TYPE = PREFIX
+ "and void return type without method level @Output annotation requires @Output on each of the method parameter";
static final String INPUT_ANNOTATIONS_ARE_NOT_ALLOWED = PREFIX
+ "cannot contain @Input annotations";
static final String CANNOT_CONVERT_RETURN_TYPE_TO_ANY_AVAILABLE_RESULT_ADAPTERS =
"No suitable adapters are found that can convert the return type";
}

View File

@@ -0,0 +1,308 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.reactive;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
* @author Artem Bilan
*/
public class StreamEmitterBasicTests {
@Test
public void testFluxReturnAndOutputMethodLevel() throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestFluxReturnAndOutputMethodLevel.class);
context.refresh();
receiveAndValidate(context);
context.close();
}
@Test
public void testVoidReturnAndOutputMethodParameter() throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestVoidReturnAndOutputMethodParameter.class);
context.refresh();
receiveAndValidate(context);
context.close();
}
@Test
public void testVoidReturnAndOutputAtMethodLevel() throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestVoidReturnAndOutputAtMethodLevel.class);
context.refresh();
receiveAndValidate(context);
context.close();
}
@Test
public void testVoidReturnAndMultipleOutputMethodParameters() throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestVoidReturnAndMultipleOutputMethodParameters.class);
context.refresh();
receiveAndValidateMultipleOutputs(context);
context.close();
}
@Test
public void testMultipleStreamEmitterMethods() throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestMultipleStreamEmitterMethods.class);
context.refresh();
receiveAndValidateMultipleOutputs(context);
context.close();
}
@Test
public void testSameAppContextWithMultipleStreamEmitters() throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestSameAppContextWithMultipleStreamEmitters.class);
context.refresh();
receiveAndValidateMultiStreamEmittersInSameContext(context);
context.close();
}
@SuppressWarnings("unchecked")
private static void receiveAndValidate(ConfigurableApplicationContext context) throws InterruptedException {
Source source = context.getBean(Source.class);
MessageCollector messageCollector = context.getBean(MessageCollector.class);
List<String> messages = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
messages.add((String) messageCollector.forChannel(source.output()).poll(5000, TimeUnit.MILLISECONDS).getPayload());
}
for (int i = 0; i < 1000; i++) {
assertThat(messages.get(i)).isEqualTo("HELLO WORLD!!" + i);
}
}
@SuppressWarnings("unchecked")
private static void receiveAndValidateMultipleOutputs(ConfigurableApplicationContext context) throws InterruptedException {
TestMultiOutboundChannels source = context.getBean(TestMultiOutboundChannels.class);
MessageCollector messageCollector = context.getBean(MessageCollector.class);
List<String> messages = new ArrayList<>();
assertMessages(source.output1(), messageCollector, messages);
messages.clear();
assertMessages(source.output2(), messageCollector, messages);
messages.clear();
assertMessages(source.output3(), messageCollector, messages);
messages.clear();
}
@SuppressWarnings("unchecked")
private static void receiveAndValidateMultiStreamEmittersInSameContext(ConfigurableApplicationContext context1) throws InterruptedException {
TestMultiOutboundChannels source1 = context1.getBean(TestMultiOutboundChannels.class);
MessageCollector messageCollector = context1.getBean(MessageCollector.class);
List<String> messages = new ArrayList<>();
assertMessagesX(source1.output1(), messageCollector, messages);
messages.clear();
assertMessagesY(source1.output2(), messageCollector, messages);
messages.clear();
}
private static void assertMessages(MessageChannel channel, MessageCollector messageCollector, List<String> messages) throws InterruptedException {
for (int i = 0; i < 1000; i++) {
messages.add((String) messageCollector.forChannel(channel).poll(5000, TimeUnit.MILLISECONDS).getPayload());
}
for (int i = 0; i < 1000; i++) {
assertThat(messages.get(i)).isEqualTo("Hello World!!" + i);
}
}
private static void assertMessagesX(MessageChannel channel, MessageCollector messageCollector, List<String> messages) throws InterruptedException {
for (int i = 0; i < 1000; i++) {
messages.add((String) messageCollector.forChannel(channel).poll(5000, TimeUnit.MILLISECONDS).getPayload());
}
for (int i = 0; i < 1000; i++) {
assertThat(messages.get(i)).isEqualTo("Hello World!!" + i);
}
}
private static void assertMessagesY(MessageChannel channel, MessageCollector messageCollector, List<String> messages) throws InterruptedException {
for (int i = 0; i < 1000; i++) {
messages.add((String) messageCollector.forChannel(channel).poll(5000, TimeUnit.MILLISECONDS).getPayload());
}
for (int i = 0; i < 1000; i++) {
assertThat(messages.get(i)).isEqualTo("Hello FooBar!!" + i);
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class TestFluxReturnAndOutputMethodLevel {
@StreamEmitter
@Output(Source.OUTPUT)
@Bean
public Publisher<Message<String>> emit() {
AtomicInteger atomicInteger = new AtomicInteger();
return IntegrationFlows.from(() ->
new GenericMessage<>("Hello World!!" + atomicInteger.getAndIncrement()),
e -> e.poller(p -> p.fixedDelay(1)))
.<String, String>transform(String::toUpperCase)
.toReactivePublisher();
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class TestVoidReturnAndOutputMethodParameter {
@StreamEmitter
public void emit(@Output(Source.OUTPUT) FluxSender output) {
output.send(Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l)
.map(String::toUpperCase));
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class TestVoidReturnAndOutputAtMethodLevel {
@StreamEmitter
@Output(Source.OUTPUT)
public void emit(FluxSender output) {
output.send(Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l)
.map(String::toUpperCase));
}
}
@EnableBinding(TestMultiOutboundChannels.class)
@EnableAutoConfiguration
public static class TestVoidReturnAndMultipleOutputMethodParameters {
@StreamEmitter
public void emit(@Output(TestMultiOutboundChannels.OUTPUT1) FluxSender output1,
@Output(TestMultiOutboundChannels.OUTPUT2) FluxSender output2,
@Output(TestMultiOutboundChannels.OUTPUT3) FluxSender output3) {
output1.send(Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l));
output2.send(Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l));
output3.send(Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l));
}
}
@EnableBinding(TestMultiOutboundChannels.class)
@EnableAutoConfiguration
public static class TestMultipleStreamEmitterMethods {
@StreamEmitter
@Output(TestMultiOutboundChannels.OUTPUT1)
public Flux<String> emit1() {
return Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l);
}
@StreamEmitter
@Output(TestMultiOutboundChannels.OUTPUT2)
public Flux<String> emit2() {
return Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l);
}
@StreamEmitter
public void emit3(@Output(TestMultiOutboundChannels.OUTPUT3) FluxSender outputX) {
outputX.send(Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l));
}
}
@EnableBinding(TestMultiOutboundChannels.class)
@EnableAutoConfiguration
public static class TestSameAppContextWithMultipleStreamEmitters {
@Bean
public Foo foo() {
return new Foo();
}
@Bean
public Bar bar() {
return new Bar();
}
static class Foo {
@StreamEmitter
@Output(TestMultiOutboundChannels.OUTPUT1)
public Flux<String> emit1() {
return Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l);
}
}
static class Bar {
@StreamEmitter
@Output(TestMultiOutboundChannels.OUTPUT2)
public Flux<String> emit2() {
return Flux.intervalMillis(1)
.map(l -> "Hello FooBar!!" + l);
}
}
}
interface TestMultiOutboundChannels {
String OUTPUT1 = "output1";
String OUTPUT2 = "output2";
String OUTPUT3 = "output3";
@Output(TestMultiOutboundChannels.OUTPUT1)
MessageChannel output1();
@Output(TestMultiOutboundChannels.OUTPUT2)
MessageChannel output2();
@Output(TestMultiOutboundChannels.OUTPUT3)
MessageChannel output3();
}
}

View File

@@ -0,0 +1,307 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.reactive;
import org.junit.Test;
import reactor.core.publisher.Flux;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.springframework.cloud.stream.binding.StreamAnnotationErrorMessages.ATLEAST_ONE_OUTPUT;
import static org.springframework.cloud.stream.binding.StreamAnnotationErrorMessages.INVALID_OUTBOUND_NAME;
import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.CANNOT_CONVERT_RETURN_TYPE_TO_ANY_AVAILABLE_RESULT_ADAPTERS;
import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.INPUT_ANNOTATIONS_ARE_NOT_ALLOWED;
import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.INVALID_OUTPUT_METHOD_PARAMETERS;
import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.NO_OUTPUT_SPECIFIED;
import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.OUTPUT_ANNOTATION_MISSING_ON_METHOD_PARAMETERS_VOID_RETURN_TYPE;
import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.RETURN_TYPE_METHOD_ARGUMENTS;
import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED;
/**
* @author Soby Chacko
*/
public class StreamEmitterValidationTests {
@Test
public void testOutputAsMethodandMethodParameter() {
try {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestOutputAsMethodandMethodParameter.class);
context.refresh();
context.close();
fail("Expected exception: " + INVALID_OUTPUT_METHOD_PARAMETERS);
}
catch (Exception e) {
assertThat(e.getMessage()).contains(INVALID_OUTPUT_METHOD_PARAMETERS);
}
}
@Test
public void testFluxReturnTypeNoOutputGiven() {
try {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestFluxReturnTypeNoOutputGiven.class);
context.refresh();
context.close();
fail("Expected exception: " + NO_OUTPUT_SPECIFIED);
}
catch (Exception e) {
assertThat(e.getMessage()).contains(NO_OUTPUT_SPECIFIED);
}
}
@Test
public void testVoidReturnTypeNoOutputGiven() {
try {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestVoidReturnTypeNoOutputGiven.class);
context.refresh();
context.close();
fail("Expected exception: " + NO_OUTPUT_SPECIFIED);
}
catch (Exception e) {
assertThat(e.getMessage()).contains(NO_OUTPUT_SPECIFIED);
}
}
@Test
public void testNonVoidReturnButOutputAsMethodParameter() {
try {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestNonVoidReturnButOutputAsMethodParameter.class);
context.refresh();
context.close();
fail("Expected exception: " + RETURN_TYPE_NO_OUTBOUND_SPECIFIED);
}
catch (Exception e) {
assertThat(e.getMessage()).contains(RETURN_TYPE_NO_OUTBOUND_SPECIFIED);
}
}
@Test
public void testNonVoidReturnButMethodArguments() {
try {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestNonVoidReturnButMethodArguments.class);
context.refresh();
context.close();
fail("Expected exception: " + RETURN_TYPE_METHOD_ARGUMENTS);
}
catch (Exception e) {
assertThat(e.getMessage()).contains(RETURN_TYPE_METHOD_ARGUMENTS);
}
}
@Test
public void testVoidReturnTypeMultipleMethodParametersWithOneMissingOutput() {
try {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestVoidReturnTypeMultipleMethodParametersWithOneMissingOutput.class);
context.refresh();
context.close();
fail("Expected exception: " + OUTPUT_ANNOTATION_MISSING_ON_METHOD_PARAMETERS_VOID_RETURN_TYPE);
}
catch (Exception e) {
assertThat(e.getMessage()).contains(OUTPUT_ANNOTATION_MISSING_ON_METHOD_PARAMETERS_VOID_RETURN_TYPE);
}
}
@Test
public void testOutputAtCorrectLocationButNameMissing1() {
try {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestOutputAtCorrectLocationButNameMissing1.class);
context.refresh();
context.close();
fail("Expected exception: " + ATLEAST_ONE_OUTPUT);
}
catch (Exception e) {
assertThat(e.getMessage()).contains(ATLEAST_ONE_OUTPUT);
}
}
@Test
public void testOutputAtCorrectLocationButNameMissing2() {
try {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestOutputAtCorrectLocationButNameMissing2.class);
context.refresh();
context.close();
fail("Expected exception: " + INVALID_OUTBOUND_NAME);
}
catch (Exception e) {
assertThat(e.getMessage()).contains(INVALID_OUTBOUND_NAME);
}
}
@Test
public void testInputAnnotationsAreNotPermitted() {
try {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestInputAnnotationsAreNotPermitted.class);
context.refresh();
context.close();
fail("Expected exception: " + INPUT_ANNOTATIONS_ARE_NOT_ALLOWED);
}
catch (Exception e) {
assertThat(e.getMessage()).contains(INPUT_ANNOTATIONS_ARE_NOT_ALLOWED);
}
}
@Test
public void testReturnTypeNotSupported() {
try {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.register(TestReturnTypeNotSupported.class);
context.refresh();
context.close();
fail("Expected exception: " + CANNOT_CONVERT_RETURN_TYPE_TO_ANY_AVAILABLE_RESULT_ADAPTERS);
}
catch (Exception e) {
assertThat(e.getMessage()).contains(CANNOT_CONVERT_RETURN_TYPE_TO_ANY_AVAILABLE_RESULT_ADAPTERS);
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class TestOutputAsMethodandMethodParameter {
@StreamEmitter
@Output(Source.OUTPUT)
public void receive(@Output(Source.OUTPUT) FluxSender output) {
output.send(Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l));
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class TestFluxReturnTypeNoOutputGiven {
@StreamEmitter
public Flux<String> emit() {
return Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l);
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class TestVoidReturnTypeNoOutputGiven {
@StreamEmitter
public void emit(FluxSender output) {
output.send(Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l));
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class TestNonVoidReturnButOutputAsMethodParameter {
@StreamEmitter
public Flux<String> emit(@Output(Source.OUTPUT) FluxSender output) {
return Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l);
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class TestNonVoidReturnButMethodArguments {
@StreamEmitter
@Output(Source.OUTPUT)
public Flux<String> receive(FluxSender output) {
return Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l);
}
}
@EnableBinding(StreamEmitterBasicTests.TestMultiOutboundChannels.class)
@EnableAutoConfiguration
public static class TestVoidReturnTypeMultipleMethodParametersWithOneMissingOutput {
@StreamEmitter
public void emit(@Output(StreamEmitterBasicTests.TestMultiOutboundChannels.OUTPUT1) FluxSender output1,
@Output(StreamEmitterBasicTests.TestMultiOutboundChannels.OUTPUT2) FluxSender output2,
FluxSender output3) {
output1.send(Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l));
output2.send(Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l));
output3.send(Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l));
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class TestOutputAtCorrectLocationButNameMissing1 {
@StreamEmitter
@Output("")
public void receive(FluxSender output) {
output.send(Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l));
}
}
@EnableBinding(StreamEmitterBasicTests.TestMultiOutboundChannels.class)
@EnableAutoConfiguration
public static class TestOutputAtCorrectLocationButNameMissing2 {
@StreamEmitter
public void emit(@Output("") FluxSender output1) {
output1.send(Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l));
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class TestInputAnnotationsAreNotPermitted {
@StreamEmitter
@Output(Source.OUTPUT)
@Input(Processor.INPUT)
public Flux<String> emit() {
return Flux.intervalMillis(1)
.map(l -> "Hello World!!" + l);
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class TestReturnTypeNotSupported {
@StreamEmitter
@Output(Source.OUTPUT)
public String emit() {
return "hello";
}
}
}

View File

@@ -0,0 +1,10 @@
<configuration>
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} %5p %t %c{2}:%L - %m%n</pattern>
</encoder>
</appender>
<root level="WARN">
<appender-ref ref="stdout"/>
</root>
</configuration>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,9 @@
package org.springframework.cloud.stream.binding;
import java.io.Closeable;
import java.io.IOException;
import org.springframework.integration.handler.BridgeHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
@@ -24,7 +27,9 @@ import org.springframework.messaging.SubscribableChannel;
* A {@link StreamListenerResultAdapter} used for bridging an
* {@link org.springframework.cloud.stream.annotation.Output} {@link MessageChannel} to a
* bound {@link MessageChannel}.
*
* @author Marius Bogoevici
* @author Soby Chacko
*/
public class MessageChannelStreamListenerResultAdapter
implements StreamListenerResultAdapter<MessageChannel, MessageChannel> {
@@ -36,10 +41,22 @@ public class MessageChannelStreamListenerResultAdapter
}
@Override
public void adapt(MessageChannel streamListenerResult, MessageChannel bindingTarget) {
public Closeable adapt(MessageChannel streamListenerResult, MessageChannel bindingTarget) {
BridgeHandler handler = new BridgeHandler();
handler.setOutputChannel(bindingTarget);
handler.afterPropertiesSet();
((SubscribableChannel) streamListenerResult).subscribe(handler);
return new NoOpCloseeable();
}
private static final class NoOpCloseeable implements Closeable {
@Override
public void close() throws IOException {
}
}
}

View File

@@ -0,0 +1,63 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binding;
import java.lang.reflect.Method;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
* Common methods that can be used across various Stream annotations.
*
* @author Soby Chacko
* @since 1.3.0
*/
public abstract class StreamAnnotationCommonMethodUtils {
public static String getOutboundBindingTargetName(Method method) {
SendTo sendTo = AnnotationUtils.findAnnotation(method, SendTo.class);
if (sendTo != null) {
Assert.isTrue(!ObjectUtils.isEmpty(sendTo.value()), StreamAnnotationErrorMessages.ATLEAST_ONE_OUTPUT);
Assert.isTrue(sendTo.value().length == 1, StreamAnnotationErrorMessages.SEND_TO_MULTIPLE_DESTINATIONS);
Assert.hasText(sendTo.value()[0], StreamAnnotationErrorMessages.SEND_TO_EMPTY_DESTINATION);
return sendTo.value()[0];
}
Output output = AnnotationUtils.findAnnotation(method, Output.class);
if (output != null) {
Assert.isTrue(StringUtils.hasText(output.value()), StreamAnnotationErrorMessages.ATLEAST_ONE_OUTPUT);
return output.value();
}
return null;
}
public static int outputAnnotationCount(Method method) {
int outputAnnotationCount = 0;
for (int parameterIndex = 0; parameterIndex < method.getParameterTypes().length; parameterIndex++) {
MethodParameter methodParameter = MethodParameter.forMethodOrConstructor(method, parameterIndex);
if (methodParameter.hasParameterAnnotation(Output.class)) {
outputAnnotationCount++;
}
}
return outputAnnotationCount;
}
}

View File

@@ -0,0 +1,32 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binding;
/**
* @author Soby Chacko
*/
public abstract class StreamAnnotationErrorMessages {
public static final String ATLEAST_ONE_OUTPUT = "At least one output must be specified";
public static final String SEND_TO_MULTIPLE_DESTINATIONS = "Multiple destinations cannot be specified";
public static final String SEND_TO_EMPTY_DESTINATION = "An empty destination cannot be specified";
public static final String INVALID_OUTBOUND_NAME = "The @Output annotation must have the name of an input as value";
}

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.stream.binding;
import java.io.Closeable;
/**
* A strategy for adapting the result of a
* {@link org.springframework.cloud.stream.annotation.StreamListener} annotated method to
@@ -41,6 +43,6 @@ public interface StreamListenerResultAdapter<R, B> {
* @param streamListenerResult the result of invoking the method.
* @param bindingTarget the binding target.
*/
void adapt(R streamListenerResult, B bindingTarget);
Closeable adapt(R streamListenerResult, B bindingTarget);
}