Initial support for Reactive API

Fixes #520
Fixes #458 (for non-reactive binders)

Adds spring-cloud-stream-reactive module.
Introduces support for declarative @StreamListener and @Input and @Output annotated parameters.
Add StreamListenerArgumentAdapter and StreamListenerResultAdapter for wrapping bindable inputs
and outputs when passing arguments to declarative @StreamListener.
Adds support for using reactive types (Flux/Observable) with traditional binders.
Introduce FluxSender and ObservableSender for handling multiple streaming outputs per method.
Ensure that errors are caught and logged.

Fixing constructor assertions and Javadoc

Addressing PR comments

- rework @Input/@Output parameter validation
- renamed StreamListenerArgumentAdapter to StreamListenerParameterAdapter
- ensure that parameter direction is accounted for in the current adapters
This commit is contained in:
Marius Bogoevici
2016-07-21 17:25:04 -04:00
committed by Mark Fisher
parent 55b9aa75dd
commit 08d65a92bb
26 changed files with 1530 additions and 69 deletions

View File

@@ -32,6 +32,7 @@ import org.springframework.context.annotation.Import;
* annotated is expected to provide a bean that implements {@link RxJavaProcessor}.
*
* @author Ilayaperumal Gopinathan
* @deprecated in favor of {@link org.springframework.cloud.stream.annotation.StreamListener} with reactive types
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@@ -39,5 +40,6 @@ import org.springframework.context.annotation.Import;
@Inherited
@EnableBinding(Processor.class)
@Import(RxJavaProcessorConfiguration.class)
@Deprecated
public @interface EnableRxJavaProcessor {
}

View File

@@ -23,7 +23,9 @@ import rx.Observable;
*
* @author Mark Pollack
* @author Ilayaperumal Gopinathan
* @deprecated in favor of {@link org.springframework.cloud.stream.annotation.StreamListener} with reactive types
*/
@Deprecated
public interface RxJavaProcessor<I, O> {
Observable<O> process(Observable<I> input);

View File

@@ -30,6 +30,7 @@ import org.springframework.messaging.MessageHandler;
* @author Marius Bogoevici
*/
@Configuration
@Deprecated
public class RxJavaProcessorConfiguration {
@Autowired
@@ -38,7 +39,7 @@ public class RxJavaProcessorConfiguration {
@ServiceActivator(inputChannel = Processor.INPUT, phase = "0")
@Bean
public MessageHandler subjectMessageHandler() {
SubjectMessageHandler messageHandler = new SubjectMessageHandler(processor);
SubjectMessageHandler messageHandler = new SubjectMessageHandler(this.processor);
messageHandler.setOutputChannelName(Processor.OUTPUT);
return messageHandler;
}

View File

@@ -62,6 +62,7 @@ import org.springframework.util.ClassUtils;
* @author Marius Bogoevici
*/
@SuppressWarnings({"unchecked", "rawtypes"})
@Deprecated
public class SubjectMessageHandler extends AbstractMessageProducingHandler implements SmartLifecycle {
private final Log logger = LogFactory.getLog(getClass());
@@ -83,10 +84,10 @@ public class SubjectMessageHandler extends AbstractMessageProducingHandler imple
@Override
public synchronized void start() {
if (!running) {
subject = new SerializedSubject(PublishSubject.create());
Observable<?> outputStream = processor.process(subject);
subscription = outputStream.subscribe(new Action1<Object>() {
if (!this.running) {
this.subject = new SerializedSubject(PublishSubject.create());
Observable<?> outputStream = this.processor.process(this.subject);
this.subscription = outputStream.subscribe(new Action1<Object>() {
@Override
public void call(Object outputObject) {
@@ -101,22 +102,23 @@ public class SubjectMessageHandler extends AbstractMessageProducingHandler imple
@Override
public void call(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
SubjectMessageHandler.this.logger.error(throwable.getMessage(), throwable);
}
}, new Action0() {
@Override
public void call() {
logger.info("Subscription close for [" + subscription + "]");
SubjectMessageHandler.this.logger
.info("Subscription close for [" + SubjectMessageHandler.this.subscription + "]");
}
});
running = true;
this.running = true;
}
}
@Override
public synchronized boolean isRunning() {
return running;
return this.running;
}
@Override
@@ -126,7 +128,7 @@ public class SubjectMessageHandler extends AbstractMessageProducingHandler imple
@Override
public void stop(Runnable callback) {
if (running) {
if (this.running) {
stop();
if (callback != null) {
callback.run();
@@ -141,17 +143,17 @@ public class SubjectMessageHandler extends AbstractMessageProducingHandler imple
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
subject.onNext(message.getPayload());
this.subject.onNext(message.getPayload());
}
@Override
public synchronized void stop() {
if (running) {
subject.onCompleted();
subscription.unsubscribe();
subscription = null;
subject = null;
running = false;
if (this.running) {
this.subject.onCompleted();
this.subscription.unsubscribe();
this.subscription = null;
this.subject = null;
this.running = false;
}
}
}