diff --git a/pom.xml b/pom.xml index 5b2732d65..c8bceeefe 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 org.springframework.cloud spring-cloud-stream-parent @@ -9,7 +10,7 @@ org.springframework.cloud spring-cloud-build 1.1.2.BUILD-SNAPSHOT - + https://github.com/spring-cloud/spring-cloud-stream @@ -42,6 +43,7 @@ spring-cloud-stream-test-support-internal spring-cloud-stream-integration-tests spring-cloud-stream-docs + spring-cloud-stream-reactive diff --git a/spring-cloud-stream-dependencies/pom.xml b/spring-cloud-stream-dependencies/pom.xml index 907507a92..7ea393551 100644 --- a/spring-cloud-stream-dependencies/pom.xml +++ b/spring-cloud-stream-dependencies/pom.xml @@ -17,6 +17,7 @@ 1.1.5 1.0.0.RELEASE 1.0.0.RELEASE + 3.0.0.BUILD-SNAPSHOT @@ -65,6 +66,11 @@ kryo-shaded 3.0.3 + + io.projectreactor + reactor-core + ${reactor.version} + io.reactivex rxjava diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerWithAnnotatedArgsTests.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerWithAnnotatedArgsTests.java new file mode 100644 index 000000000..47052f2fb --- /dev/null +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerWithAnnotatedArgsTests.java @@ -0,0 +1,79 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.config; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Marius Bogoevici + */ +public class StreamListenerWithAnnotatedArgsTests { + + @Test + public void testInputOutputArgs() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestInputOutputArgs.class, "--server.port=0"); + sendMessageAndValidate(context); + } + + private void sendMessageAndValidate(ConfigurableApplicationContext context) throws InterruptedException { + @SuppressWarnings("unchecked") + Processor processor = context.getBean(Processor.class); + processor.input().send(MessageBuilder.withPayload("hello").setHeader("contentType", "text/plain").build()); + MessageCollector messageCollector = context.getBean(MessageCollector.class); + Message result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS); + assertThat(result).isNotNull(); + assertThat(result.getPayload()).isEqualTo("HELLO"); + context.close(); + } + + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestInputOutputArgs { + + @StreamListener + public void receive(@Input(Processor.INPUT) SubscribableChannel input, @Output(Processor.OUTPUT) final MessageChannel output) { + input.subscribe(new MessageHandler() { + @Override + public void handleMessage(Message message) throws MessagingException { + output.send(MessageBuilder.withPayload(message.getPayload().toString().toUpperCase()).build()); + } + }); + } + } + +} diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerTests.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerWithHandlerTests.java similarity index 96% rename from spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerTests.java rename to spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerWithHandlerTests.java index 696f3d8ab..690121523 100644 --- a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerTests.java +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerWithHandlerTests.java @@ -51,7 +51,7 @@ import static org.junit.Assert.fail; /** * @author Marius Bogoevici */ -public class StreamListenerTests { +public class StreamListenerWithHandlerTests { @Test public void testContentTypeConversion() throws Exception { @@ -240,8 +240,8 @@ public class StreamListenerTests { @StreamListener(Sink.INPUT) public void receive(FooPojo fooPojo) { - receivedArguments.add(fooPojo); - latch.countDown(); + this.receivedArguments.add(fooPojo); + this.latch.countDown(); } } @@ -254,7 +254,7 @@ public class StreamListenerTests { @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public String receive(FooPojo fooPojo) { - receivedPojos.add(fooPojo); + this.receivedPojos.add(fooPojo); return fooPojo.getBar(); } } @@ -268,7 +268,7 @@ public class StreamListenerTests { @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public BazPojo receive(FooPojo fooPojo) { - receivedPojos.add(fooPojo); + this.receivedPojos.add(fooPojo); BazPojo bazPojo = new BazPojo(); bazPojo.setQux(fooPojo.getBar()); return bazPojo; @@ -285,9 +285,9 @@ public class StreamListenerTests { public void receive(@Payload FooPojo fooPojo, @Headers Map headers, @Header(MessageHeaders.CONTENT_TYPE) String contentType) { - receivedArguments.add(fooPojo); - receivedArguments.add(headers); - receivedArguments.add(contentType); + this.receivedArguments.add(fooPojo); + this.receivedArguments.add(headers); + this.receivedArguments.add(contentType); } } @@ -300,7 +300,7 @@ public class StreamListenerTests { @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public Message receive(FooPojo fooPojo) { - receivedPojos.add(fooPojo); + this.receivedPojos.add(fooPojo); BazPojo bazPojo = new BazPojo(); bazPojo.setQux(fooPojo.getBar()); return MessageBuilder.withPayload(bazPojo).setHeader("foo", "bar").build(); @@ -316,7 +316,7 @@ public class StreamListenerTests { @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public BazPojo receive(Message fooMessage) { - receivedMessages.add(fooMessage); + this.receivedMessages.add(fooMessage); BazPojo bazPojo = new BazPojo(); bazPojo.setQux(fooMessage.getPayload()); return bazPojo; @@ -353,7 +353,7 @@ public class StreamListenerTests { @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public BazPojo receive(FooPojo fooMessage) { - receivedPojos.add(fooMessage); + this.receivedPojos.add(fooMessage); BazPojo bazPojo = new BazPojo(); bazPojo.setQux(fooMessage.getBar()); return bazPojo; @@ -365,7 +365,7 @@ public class StreamListenerTests { private String bar; public String getBar() { - return bar; + return this.bar; } public void setBar(String bar) { @@ -378,7 +378,7 @@ public class StreamListenerTests { private String qux; public String getQux() { - return qux; + return this.qux; } public void setQux(String qux) { diff --git a/spring-cloud-stream-reactive/pom.xml b/spring-cloud-stream-reactive/pom.xml new file mode 100644 index 000000000..43f1d3601 --- /dev/null +++ b/spring-cloud-stream-reactive/pom.xml @@ -0,0 +1,49 @@ + + + + org.springframework.cloud + spring-cloud-stream-parent + 1.1.0.BUILD-SNAPSHOT + + 4.0.0 + + spring-cloud-stream-reactive + + + 1.8 + + + + + org.springframework.cloud + spring-cloud-stream + + + io.projectreactor + reactor-core + + + io.reactivex + rxjava + true + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.cloud + spring-cloud-stream-test-support + test + + + org.springframework.cloud + spring-cloud-stream-test-support-internal + test + + + + diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/FluxSender.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/FluxSender.java new file mode 100644 index 000000000..f02cdfbe3 --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/FluxSender.java @@ -0,0 +1,37 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Used for {@link org.springframework.cloud.stream.annotation.StreamListener} arguments annotated with {@link + * org.springframework.cloud.stream.annotation.Output}. + * @author Marius Bogoevici + */ +public interface FluxSender { + + /** + * Streams the {@link reactor.core.publisher.Flux} through the bound + * element corresponding to the {@link org.springframework.cloud.stream.annotation.Output} annotation of the + * argument. + * @param flux a {@link Flux} that will be streamed through the bound element + * @return a {@link Mono} representing the result of sending the flux (completion or error) + */ + Mono send(Flux flux); +} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/FluxToMessageChannelResultAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/FluxToMessageChannelResultAdapter.java new file mode 100644 index 000000000..689d4a28c --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/FluxToMessageChannelResultAdapter.java @@ -0,0 +1,51 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.Flux; + +import org.springframework.cloud.stream.binding.StreamListenerResultAdapter; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; + +/** + * A {@link org.springframework.cloud.stream.binding.StreamListenerResultAdapter} from a {@link Flux} + * return type to a bound {@link MessageChannel}. + * @author Marius Bogoevici + */ +public class FluxToMessageChannelResultAdapter + implements StreamListenerResultAdapter, MessageChannel> { + + private Log log = LogFactory.getLog(FluxToMessageChannelResultAdapter.class); + + @Override + public boolean supports(Class resultType, Class boundType) { + return Flux.class.isAssignableFrom(resultType) && MessageChannel.class.isAssignableFrom(boundType); + } + + public void adapt(Flux streamListenerResult, MessageChannel boundElement) { + streamListenerResult + .doOnError(e -> this.log.error("Error while processing result", e)) + .retry() + .subscribe( + result -> boundElement.send(result instanceof Message ? (Message) result + : MessageBuilder.withPayload(result).build())); + } +} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToFluxSenderParameterAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToFluxSenderParameterAdapter.java new file mode 100644 index 000000000..caae0ad71 --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToFluxSenderParameterAdapter.java @@ -0,0 +1,64 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.MonoProcessor; + +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter; +import org.springframework.core.MethodParameter; +import org.springframework.core.ResolvableType; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; + +/** + * Adapts an {@link org.springframework.cloud.stream.annotation.Output} annotated + * {@link FluxSender} to an outbound {@link MessageChannel}. + * @author Marius Bogoevici + */ +public class MessageChannelToFluxSenderParameterAdapter + implements StreamListenerParameterAdapter { + + private Log log = LogFactory.getLog(MessageChannelToFluxSenderParameterAdapter.class); + + @Override + public boolean supports(Class boundElementType, MethodParameter methodParameter) { + ResolvableType type = ResolvableType.forMethodParameter(methodParameter); + return MessageChannel.class.isAssignableFrom(boundElementType) + && methodParameter.getParameterAnnotation(Output.class) != null + && FluxSender.class.isAssignableFrom(type.getRawClass()); + } + + @Override + public FluxSender adapt(MessageChannel boundElement, MethodParameter parameter) { + return resultPublisher -> { + MonoProcessor sendResult = MonoProcessor.create(); + // add error handling and reconnect in the event of an error + resultPublisher + .doOnError(e -> this.log.error("Error during processing: ", e)) + .retry() + .subscribe( + result -> boundElement.send(result instanceof Message ? (Message) result : + MessageBuilder.withPayload(result).build()), e -> sendResult.onError(e), + () -> sendResult.onComplete()); + return sendResult; + }; + } +} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapter.java new file mode 100644 index 000000000..f211922f9 --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapter.java @@ -0,0 +1,87 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import reactor.core.publisher.Flux; + +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter; +import org.springframework.core.MethodParameter; +import org.springframework.core.ResolvableType; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.util.Assert; + +/** + * Adapts an {@link org.springframework.cloud.stream.annotation.Input} annotated + * {@link MessageChannel} to a {@link Flux}. + * @author Marius Bogoevici + */ +public class MessageChannelToInputFluxParameterAdapter + implements StreamListenerParameterAdapter, SubscribableChannel> { + + private final CompositeMessageConverter messageConverter; + + public MessageChannelToInputFluxParameterAdapter(CompositeMessageConverter messageConverter) { + Assert.notNull(messageConverter, "cannot not be null"); + this.messageConverter = messageConverter; + } + + @Override + public boolean supports(Class boundElementType, MethodParameter methodParameter) { + return SubscribableChannel.class.isAssignableFrom(boundElementType) + && methodParameter.getParameterAnnotation(Input.class) != null + && Flux.class.isAssignableFrom(methodParameter.getParameterType()); + } + + @Override + public Flux adapt(final SubscribableChannel boundElement, MethodParameter parameter) { + ResolvableType resolvableType = ResolvableType.forMethodParameter(parameter); + Class argumentClass = resolvableType.getGeneric(0).getRawClass(); + final Object monitor = new Object(); + if (Message.class.isAssignableFrom(argumentClass)) { + return Flux.create(emitter -> { + MessageHandler messageHandler = message -> { + synchronized (monitor) { + emitter.next(message); + } + }; + boundElement.subscribe(messageHandler); + emitter.setCancellation(() -> boundElement.unsubscribe(messageHandler)); + }); + } + else { + return Flux.create(emitter -> { + MessageHandler messageHandler = message -> { + synchronized (monitor) { + if (argumentClass.isAssignableFrom(message.getPayload().getClass())) { + emitter.next(message.getPayload()); + } + else { + emitter.next(this.messageConverter.fromMessage(message, argumentClass)); + } + } + }; + boundElement.subscribe(messageHandler); + emitter.setCancellation(() -> boundElement.unsubscribe(messageHandler)); + }); + } + } +} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputObservableParameterAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputObservableParameterAdapter.java new file mode 100644 index 000000000..266f0d624 --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputObservableParameterAdapter.java @@ -0,0 +1,56 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import reactor.adapter.RxJava1Adapter; +import rx.Observable; + +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter; +import org.springframework.core.MethodParameter; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.util.Assert; + +/** + * Adapts an {@link org.springframework.cloud.stream.annotation.Input} annotated + * {@link MessageChannel} to an {@link Observable}. + * @author Marius Bogoevici + */ +public class MessageChannelToInputObservableParameterAdapter + implements StreamListenerParameterAdapter, SubscribableChannel> { + + private final MessageChannelToInputFluxParameterAdapter messageChannelToInputFluxArgumentAdapter; + + public MessageChannelToInputObservableParameterAdapter( + MessageChannelToInputFluxParameterAdapter messageChannelToInputFluxArgumentAdapter) { + Assert.notNull(messageChannelToInputFluxArgumentAdapter, "cannot be null"); + this.messageChannelToInputFluxArgumentAdapter = messageChannelToInputFluxArgumentAdapter; + } + + public boolean supports(Class boundElementType, MethodParameter methodParameter) { + return SubscribableChannel.class.isAssignableFrom(boundElementType) + && methodParameter.getParameterAnnotation(Input.class) != null + && Observable.class.isAssignableFrom(methodParameter.getParameterType()); + } + + @Override + public Observable adapt(final SubscribableChannel boundElement, MethodParameter parameter) { + return RxJava1Adapter.publisherToObservable( + this.messageChannelToInputFluxArgumentAdapter.adapt(boundElement, parameter)); + } +} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToObservableSenderParameterAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToObservableSenderParameterAdapter.java new file mode 100644 index 000000000..365b80c83 --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToObservableSenderParameterAdapter.java @@ -0,0 +1,69 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import reactor.adapter.RxJava1Adapter; +import rx.Observable; +import rx.Single; + +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter; +import org.springframework.core.MethodParameter; +import org.springframework.core.ResolvableType; +import org.springframework.messaging.MessageChannel; +import org.springframework.util.Assert; + +/** + * Adapts an {@link org.springframework.cloud.stream.annotation.Output} annotated + * {@link ObservableSender} to an outbound {@link MessageChannel}. + * @author Marius Bogoevici + */ +public class MessageChannelToObservableSenderParameterAdapter implements + StreamListenerParameterAdapter { + + private final MessageChannelToFluxSenderParameterAdapter messageChannelToFluxSenderArgumentAdapter; + + public MessageChannelToObservableSenderParameterAdapter( + MessageChannelToFluxSenderParameterAdapter messageChannelToFluxSenderArgumentAdapter) { + Assert.notNull(messageChannelToFluxSenderArgumentAdapter, "cannot be null"); + this.messageChannelToFluxSenderArgumentAdapter = messageChannelToFluxSenderArgumentAdapter; + } + + @Override + public boolean supports(Class boundElementType, MethodParameter methodParameter) { + ResolvableType type = ResolvableType.forMethodParameter(methodParameter); + return MessageChannel.class.isAssignableFrom(boundElementType) + && methodParameter.getParameterAnnotation(Output.class) != null + && ObservableSender.class.isAssignableFrom(type.getRawClass()); + } + + @Override + public ObservableSender adapt(MessageChannel boundElement, MethodParameter parameter) { + return new ObservableSender() { + + private FluxSender fluxSender = MessageChannelToObservableSenderParameterAdapter.this + .messageChannelToFluxSenderArgumentAdapter + .adapt(boundElement, parameter); + + @Override + public Single send(Observable observable) { + return RxJava1Adapter.publisherToSingle( + this.fluxSender.send(RxJava1Adapter.observableToFlux(observable))); + } + }; + } +} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableSender.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableSender.java new file mode 100644 index 000000000..dd0051d90 --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableSender.java @@ -0,0 +1,37 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import rx.Observable; +import rx.Single; + +/** + * Used for {@link org.springframework.cloud.stream.annotation.StreamListener} arguments annotated with {@link + * org.springframework.cloud.stream.annotation.Output}. + * @author Marius Bogoevici + */ +public interface ObservableSender { + + /** + * Streams the {@link Observable} through the bound + * element corresponding to the {@link org.springframework.cloud.stream.annotation.Output} annotation of the + * argument. + * @param observable an {@link Observable} that will be streamed through the bound element + * @return a {@link Single} representing the result of an operation + */ + Single send(Observable observable); +} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableToMessageChannelResultAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableToMessageChannelResultAdapter.java new file mode 100644 index 000000000..b67ca19c9 --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableToMessageChannelResultAdapter.java @@ -0,0 +1,52 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import reactor.adapter.RxJava1Adapter; +import rx.Observable; + +import org.springframework.cloud.stream.binding.StreamListenerResultAdapter; +import org.springframework.messaging.MessageChannel; +import org.springframework.util.Assert; + +/** + * A {@link StreamListenerResultAdapter} from an {@link Observable} + * return type to a bound {@link MessageChannel}. + * @author Marius Bogoevici + */ +public class ObservableToMessageChannelResultAdapter + implements StreamListenerResultAdapter, MessageChannel> { + + private FluxToMessageChannelResultAdapter fluxToMessageChannelResultAdapter; + + public ObservableToMessageChannelResultAdapter( + FluxToMessageChannelResultAdapter fluxToMessageChannelResultAdapter) { + Assert.notNull(fluxToMessageChannelResultAdapter, "cannot be null"); + this.fluxToMessageChannelResultAdapter = fluxToMessageChannelResultAdapter; + } + + @Override + public boolean supports(Class resultType, Class boundType) { + return Observable.class.isAssignableFrom(resultType) + && MessageChannel.class.isAssignableFrom(boundType); + } + + public void adapt(Observable streamListenerResult, MessageChannel boundElement) { + this.fluxToMessageChannelResultAdapter.adapt(RxJava1Adapter.observableToFlux(streamListenerResult), + boundElement); + } +} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java new file mode 100644 index 000000000..c1ff3945f --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java @@ -0,0 +1,70 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Marius Bogoevici + */ +@Configuration +public class ReactiveSupportAutoConfiguration { + + @Bean + public MessageChannelToInputFluxParameterAdapter messageChannelToInputFluxArgumentAdapter( + CompositeMessageConverterFactory compositeMessageConverterFactory) { + return new MessageChannelToInputFluxParameterAdapter( + compositeMessageConverterFactory.getMessageConverterForAllRegistered()); + } + + @Bean + public MessageChannelToFluxSenderParameterAdapter messageChannelToFluxSenderArgumentAdapter() { + return new MessageChannelToFluxSenderParameterAdapter(); + } + + @Bean + public FluxToMessageChannelResultAdapter fluxToMessageChannelResultAdapter() { + return new FluxToMessageChannelResultAdapter(); + } + + @Configuration + @ConditionalOnClass(name = "rx.Observable") + public static class RxJava1SupportConfiguration { + + @Bean + public MessageChannelToInputObservableParameterAdapter messageChannelToInputObservableArgumentAdapter( + MessageChannelToInputFluxParameterAdapter messageChannelToFluxArgumentAdapter) { + return new MessageChannelToInputObservableParameterAdapter(messageChannelToFluxArgumentAdapter); + } + + @Bean + public MessageChannelToObservableSenderParameterAdapter messageChannelToObservableSenderArgumentAdapter( + MessageChannelToFluxSenderParameterAdapter messageChannelToFluxSenderArgumentAdapter) { + return new MessageChannelToObservableSenderParameterAdapter(messageChannelToFluxSenderArgumentAdapter); + } + + @Bean + public ObservableToMessageChannelResultAdapter + observableToMessageChannelResultAdapter( + FluxToMessageChannelResultAdapter fluxToMessageChannelResultAdapter) { + return new ObservableToMessageChannelResultAdapter(fluxToMessageChannelResultAdapter); + } + } +} diff --git a/spring-cloud-stream-reactive/src/main/resources/META-INF/spring.factories b/spring-cloud-stream-reactive/src/main/resources/META-INF/spring.factories new file mode 100644 index 000000000..a8ce9d92c --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +org.springframework.cloud.stream.reactive.ReactiveSupportAutoConfiguration diff --git a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamListenerReactorTests.java b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamListenerReactorTests.java new file mode 100644 index 000000000..af268222a --- /dev/null +++ b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamListenerReactorTests.java @@ -0,0 +1,274 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import reactor.core.publisher.Flux; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Marius Bogoevici + */ +public class StreamListenerReactorTests { + + @Test + public void testInputOutputArgs() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestInputOutputArgs.class, "--server.port=0"); + sendMessageAndValidate(context); + context.close(); + } + + private void sendMessageAndValidate(ConfigurableApplicationContext context) throws InterruptedException { + @SuppressWarnings("unchecked") + Processor processor = context.getBean(Processor.class); + String sentPayload = "hello " + UUID.randomUUID().toString(); + processor.input().send(MessageBuilder.withPayload(sentPayload).setHeader("contentType", "text/plain").build()); + MessageCollector messageCollector = context.getBean(MessageCollector.class); + Message result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS); + assertThat(result).isNotNull(); + assertThat(result.getPayload()).isEqualTo(sentPayload.toUpperCase()); + } + + private void sendFailingMessage(ConfigurableApplicationContext context) throws InterruptedException { + @SuppressWarnings("unchecked") + Processor processor = context.getBean(Processor.class); + processor.input().send(MessageBuilder.withPayload("fail").setHeader("contentType", "text/plain").build()); + } + + @Test + public void testInputOutputArgsWithMessage() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestInputOutputArgsWithMessage.class, + "--server.port=0"); + sendMessageAndValidate(context); + context.close(); + } + + @Test + public void testInputOutputArgsWithFluxSender() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestInputOutputArgsWithFluxSender.class, + "--server.port=0"); + // send multiple message + sendMessageAndValidate(context); + sendMessageAndValidate(context); + sendMessageAndValidate(context); + context.close(); + } + + @Test + public void testInputOutputArgsWithFluxSenderAndFailure() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestInputOutputArgsWithFluxSenderAndFailure.class, "--server.port=0"); + sendMessageAndValidate(context); + sendFailingMessage(context); + sendMessageAndValidate(context); + context.close(); + } + + @Test + public void testReturn() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestReturn.class, "--server.port=0"); + sendMessageAndValidate(context); + sendMessageAndValidate(context); + sendMessageAndValidate(context); + context.close(); + } + + @Test + public void testReturnWithFailure() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestReturnWithFailure.class, "--server.port=0"); + sendMessageAndValidate(context); + sendFailingMessage(context); + sendMessageAndValidate(context); + sendFailingMessage(context); + sendMessageAndValidate(context); + context.close(); + } + + @Test + public void testReturnWithMessage() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestReturnWithMessage.class, "--server.port=0"); + sendMessageAndValidate(context); + context.close(); + } + + @Test + public void testReturnWithPojo() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestReturnWithPojo.class, "--server.port=0"); + @SuppressWarnings("unchecked") + Processor processor = context.getBean(Processor.class); + processor.input().send(MessageBuilder.withPayload("{\"message\":\"helloPojo\"}") + .setHeader("contentType", "application/json").build()); + MessageCollector messageCollector = context.getBean(MessageCollector.class); + Message result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS); + assertThat(result).isNotNull(); + assertThat(result.getPayload()).isInstanceOf(BarPojo.class); + assertThat(((BarPojo) result.getPayload()).getBarMessage()).isEqualTo("helloPojo"); + context.close(); + } + + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestInputOutputArgs { + + @StreamListener + public void receive(@Input(Processor.INPUT) Flux input, @Output(Processor.OUTPUT) FluxSender output) { + output.send(input.map(m -> m.toUpperCase())); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestInputOutputArgsWithMessage { + + @StreamListener + public void receive(@Input(Processor.INPUT) Flux> input, + @Output(Processor.OUTPUT) FluxSender output) { + output.send(input.map(m -> MessageBuilder.withPayload(m.getPayload().toUpperCase()).build())); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestInputOutputArgsWithFluxSender { + @StreamListener + public void receive(@Input(Processor.INPUT) Flux> input, @Output(Processor.OUTPUT) FluxSender output) { + output.send(input + .map(m -> m.getPayload().toString().toUpperCase()) + .map(o -> MessageBuilder.withPayload(o).build())); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestInputOutputArgsWithFluxSenderAndFailure { + @StreamListener + public void receive(@Input(Processor.INPUT) Flux> input, @Output(Processor.OUTPUT) FluxSender output) { + output.send(input + .map(m -> m.getPayload().toString()) + .map(m -> { + if (!m.equals("fail")) { + return m.toUpperCase(); + } + else { + throw new RuntimeException(); + } + }) + .map(o -> MessageBuilder.withPayload(o).build())); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestReturn { + + @StreamListener + public + @Output(Processor.OUTPUT) + Flux receive(@Input(Processor.INPUT) Flux input) { + return input.map(m -> m.toUpperCase()); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestReturnWithFailure { + + @StreamListener + public + @Output(Processor.OUTPUT) + Flux receive(@Input(Processor.INPUT) Flux input) { + return input.map(m -> { + if (!m.equals("fail")) { + return m.toUpperCase(); + } + else { + throw new RuntimeException(); + } + }); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestReturnWithMessage { + + @StreamListener + public + @Output(Processor.OUTPUT) + Flux receive(@Input(Processor.INPUT) Flux> input) { + return input.map(m -> m.getPayload().toUpperCase()); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestReturnWithPojo { + + @StreamListener + public + @Output(Processor.OUTPUT) + Flux receive(@Input(Processor.INPUT) Flux input) { + return input.map(m -> new BarPojo(m.getMessage())); + } + } + + public static class FooPojo { + + private String message; + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + } + + public static class BarPojo { + + private String barMessage; + + public BarPojo(String barMessage) { + this.barMessage = barMessage; + } + + public String getBarMessage() { + return barMessage; + } + + public void setBarMessage(String barMessage) { + this.barMessage = barMessage; + } + } +} diff --git a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamListenerRxJava1Tests.java b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamListenerRxJava1Tests.java new file mode 100644 index 000000000..e9f6816c1 --- /dev/null +++ b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamListenerRxJava1Tests.java @@ -0,0 +1,245 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import rx.Observable; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Marius Bogoevici + */ +public class StreamListenerRxJava1Tests { + + @Test + public void testInputOutputArgs() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestInputOutputArgs.class, "--server.port=0"); + sendMessageAndValidate(context); + context.close(); + } + + private void sendMessageAndValidate(ConfigurableApplicationContext context) throws InterruptedException { + @SuppressWarnings("unchecked") + Processor processor = context.getBean(Processor.class); + String sentPayload = "hello " + UUID.randomUUID().toString(); + processor.input().send(MessageBuilder.withPayload(sentPayload).setHeader("contentType", "text/plain").build()); + MessageCollector messageCollector = context.getBean(MessageCollector.class); + Message result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS); + assertThat(result).isNotNull(); + assertThat(result.getPayload()).isEqualTo(sentPayload.toUpperCase()); + } + + private void sendFailingMessage(ConfigurableApplicationContext context) throws InterruptedException { + @SuppressWarnings("unchecked") + Processor processor = context.getBean(Processor.class); + processor.input().send(MessageBuilder.withPayload("fail").setHeader("contentType", "text/plain").build()); + } + + @Test + public void testInputOutputArgsWithMessage() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestInputOutputArgsWithMessage.class, + "--server.port=0"); + sendMessageAndValidate(context); + context.close(); + } + + @Test + public void testInputOutputArgsWithObservableSender() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestInputOutputArgsWithObservableSender.class, + "--server.port=0"); + // send multiple message + sendMessageAndValidate(context); + sendMessageAndValidate(context); + sendMessageAndValidate(context); + context.close(); + } + + @Test + public void testReturn() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestReturn.class, "--server.port=0"); + sendMessageAndValidate(context); + sendMessageAndValidate(context); + sendMessageAndValidate(context); + context.close(); + } + + @Test + public void testReturnWithFailure() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestReturnWithFailure.class, "--server.port=0"); + sendMessageAndValidate(context); + sendFailingMessage(context); + sendMessageAndValidate(context); + context.close(); + } + + @Test + public void testReturnWithMessage() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestReturnWithMessage.class, "--server.port=0"); + sendMessageAndValidate(context); + context.close(); + } + + @Test + public void testReturnWithPojo() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestReturnWithPojo.class, "--server.port=0"); + @SuppressWarnings("unchecked") + Processor processor = context.getBean(Processor.class); + processor.input().send(MessageBuilder.withPayload("{\"message\":\"helloPojo\"}") + .setHeader("contentType", "application/json").build()); + MessageCollector messageCollector = context.getBean(MessageCollector.class); + Message result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS); + assertThat(result).isNotNull(); + assertThat(result.getPayload()).isInstanceOf(BarPojo.class); + assertThat(((BarPojo) result.getPayload()).getBarMessage()).isEqualTo("helloPojo"); + context.close(); + } + + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestInputOutputArgs { + + @StreamListener + public void receive(@Input(Processor.INPUT) Observable input, @Output(Processor.OUTPUT) ObservableSender output) { + output.send(input.map(m -> m.toUpperCase())); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestInputOutputArgsWithMessage { + + @StreamListener + public void receive(@Input(Processor.INPUT) Observable> input, + @Output(Processor.OUTPUT) ObservableSender output) { + output.send(input.map(m -> MessageBuilder.withPayload(m.getPayload().toUpperCase()).build())); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestInputOutputArgsWithObservableSender { + @StreamListener + public void receive(@Input(Processor.INPUT) Observable> input, @Output(Processor.OUTPUT) + ObservableSender output) { + output.send(input + .map(m -> m.getPayload().toString().toUpperCase()) + .map(o -> MessageBuilder.withPayload(o).build())); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestReturn { + + @StreamListener + public + @Output(Processor.OUTPUT) + Observable receive(@Input(Processor.INPUT) Observable input) { + return input.map(m -> m.toUpperCase()); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestReturnWithFailure { + + @StreamListener + public + @Output(Processor.OUTPUT) + Observable receive(@Input(Processor.INPUT) Observable input) { + return input.map(m -> { + if (!m.equals("fail")) { + return m.toUpperCase(); + } + else { + throw new RuntimeException(); + } + }); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestReturnWithMessage { + + @StreamListener + public + @Output(Processor.OUTPUT) + Observable receive(@Input(Processor.INPUT) Observable> input) { + return input.map(m -> m.getPayload().toUpperCase()); + } + } + + @EnableBinding(Processor.class) + @EnableAutoConfiguration + public static class TestReturnWithPojo { + + @StreamListener + public + @Output(Processor.OUTPUT) + Observable receive(@Input(Processor.INPUT) Observable input) { + return input.map(m -> new BarPojo(m.getMessage())); + } + } + + public static class FooPojo { + + private String message; + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + } + + public static class BarPojo { + + private String barMessage; + + public BarPojo(String barMessage) { + this.barMessage = barMessage; + } + + public String getBarMessage() { + return barMessage; + } + + public void setBarMessage(String barMessage) { + this.barMessage = barMessage; + } + } +} diff --git a/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/EnableRxJavaProcessor.java b/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/EnableRxJavaProcessor.java index 186359da8..e53ec0292 100644 --- a/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/EnableRxJavaProcessor.java +++ b/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/EnableRxJavaProcessor.java @@ -32,6 +32,7 @@ import org.springframework.context.annotation.Import; * annotated is expected to provide a bean that implements {@link RxJavaProcessor}. * * @author Ilayaperumal Gopinathan + * @deprecated in favor of {@link org.springframework.cloud.stream.annotation.StreamListener} with reactive types */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @@ -39,5 +40,6 @@ import org.springframework.context.annotation.Import; @Inherited @EnableBinding(Processor.class) @Import(RxJavaProcessorConfiguration.class) +@Deprecated public @interface EnableRxJavaProcessor { } diff --git a/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/RxJavaProcessor.java b/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/RxJavaProcessor.java index a10de2c86..d8cd919d7 100644 --- a/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/RxJavaProcessor.java +++ b/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/RxJavaProcessor.java @@ -23,7 +23,9 @@ import rx.Observable; * * @author Mark Pollack * @author Ilayaperumal Gopinathan + * @deprecated in favor of {@link org.springframework.cloud.stream.annotation.StreamListener} with reactive types */ +@Deprecated public interface RxJavaProcessor { Observable process(Observable input); diff --git a/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/RxJavaProcessorConfiguration.java b/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/RxJavaProcessorConfiguration.java index 177a41057..30ac35329 100644 --- a/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/RxJavaProcessorConfiguration.java +++ b/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/RxJavaProcessorConfiguration.java @@ -30,6 +30,7 @@ import org.springframework.messaging.MessageHandler; * @author Marius Bogoevici */ @Configuration +@Deprecated public class RxJavaProcessorConfiguration { @Autowired @@ -38,7 +39,7 @@ public class RxJavaProcessorConfiguration { @ServiceActivator(inputChannel = Processor.INPUT, phase = "0") @Bean public MessageHandler subjectMessageHandler() { - SubjectMessageHandler messageHandler = new SubjectMessageHandler(processor); + SubjectMessageHandler messageHandler = new SubjectMessageHandler(this.processor); messageHandler.setOutputChannelName(Processor.OUTPUT); return messageHandler; } diff --git a/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/SubjectMessageHandler.java b/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/SubjectMessageHandler.java index 26cc95d4b..ea81df0a7 100644 --- a/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/SubjectMessageHandler.java +++ b/spring-cloud-stream-rxjava/src/main/java/org/springframework/cloud/stream/annotation/rxjava/SubjectMessageHandler.java @@ -62,6 +62,7 @@ import org.springframework.util.ClassUtils; * @author Marius Bogoevici */ @SuppressWarnings({"unchecked", "rawtypes"}) +@Deprecated public class SubjectMessageHandler extends AbstractMessageProducingHandler implements SmartLifecycle { private final Log logger = LogFactory.getLog(getClass()); @@ -83,10 +84,10 @@ public class SubjectMessageHandler extends AbstractMessageProducingHandler imple @Override public synchronized void start() { - if (!running) { - subject = new SerializedSubject(PublishSubject.create()); - Observable outputStream = processor.process(subject); - subscription = outputStream.subscribe(new Action1() { + if (!this.running) { + this.subject = new SerializedSubject(PublishSubject.create()); + Observable outputStream = this.processor.process(this.subject); + this.subscription = outputStream.subscribe(new Action1() { @Override public void call(Object outputObject) { @@ -101,22 +102,23 @@ public class SubjectMessageHandler extends AbstractMessageProducingHandler imple @Override public void call(Throwable throwable) { - logger.error(throwable.getMessage(), throwable); + SubjectMessageHandler.this.logger.error(throwable.getMessage(), throwable); } }, new Action0() { @Override public void call() { - logger.info("Subscription close for [" + subscription + "]"); + SubjectMessageHandler.this.logger + .info("Subscription close for [" + SubjectMessageHandler.this.subscription + "]"); } }); - running = true; + this.running = true; } } @Override public synchronized boolean isRunning() { - return running; + return this.running; } @Override @@ -126,7 +128,7 @@ public class SubjectMessageHandler extends AbstractMessageProducingHandler imple @Override public void stop(Runnable callback) { - if (running) { + if (this.running) { stop(); if (callback != null) { callback.run(); @@ -141,17 +143,17 @@ public class SubjectMessageHandler extends AbstractMessageProducingHandler imple @Override protected void handleMessageInternal(Message message) throws Exception { - subject.onNext(message.getPayload()); + this.subject.onNext(message.getPayload()); } @Override public synchronized void stop() { - if (running) { - subject.onCompleted(); - subscription.unsubscribe(); - subscription = null; - subject = null; - running = false; + if (this.running) { + this.subject.onCompleted(); + this.subscription.unsubscribe(); + this.subscription = null; + this.subject = null; + this.running = false; } } } diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamListener.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamListener.java index cd55a11e0..6acb59bc6 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamListener.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamListener.java @@ -22,13 +22,23 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter; import org.springframework.messaging.handler.annotation.MessageMapping; /** - * Annotation that marks a method to be a listener to an input component declared through {@link EnableBinding} - * (e.g. a channel). + * Annotation that marks a method to be a listener to the inputs declared through {@link EnableBinding} + * (e.g. channels). * - * Annotated methods are allowed to have flexible signatures, as described by {@link MessageMapping}. + * Annotated methods are allowed to have flexible signatures, which determine how + * the method is invoked and how their return results are processed. This annotation + * can be applied for two separate classes of methods. + * + *

Individual message handler mode

+ * + * Methods where the annotation has a value, are treated as message handlers, and are invoked for each + * incoming message received from that target. In this case, the + * method can have a flexible signature, as described by {@link MessageMapping}. + * The value must be the name of an {@link Input} bound target. * * If the method returns a {@link org.springframework.messaging.Message}, the result will be automatically sent * to a channel, as follows: @@ -43,10 +53,17 @@ import org.springframework.messaging.handler.annotation.MessageMapping; *
  • The value set on the {@link org.springframework.messaging.handler.annotation.SendTo} annotation, if present
  • * * + *

    Declarative mode

    + * + * If the annotation has an empty value (the default), the method is a declarative + * pipeline definition and will be invoked once, when the application starts. + * All parameters must be annotated with either {@link Input} or {@link Output} and can + * be either bound elements (e.g. channels) or conversion targets from bound elements + * via a registered {@link StreamListenerParameterAdapter}. + * @author Marius Bogoevici * @see {@link MessageMapping} * @see {@link EnableBinding} * @see {@link org.springframework.messaging.handler.annotation.SendTo} - * @author Marius Bogoevici */ @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @@ -56,7 +73,6 @@ public @interface StreamListener { /** * The name of the bound component (e.g. channel) that the method subscribes to. - * */ String value() default ""; diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageChannelStreamListenerResultAdapter.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageChannelStreamListenerResultAdapter.java new file mode 100644 index 000000000..f085b4cc1 --- /dev/null +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/MessageChannelStreamListenerResultAdapter.java @@ -0,0 +1,44 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binding; + +import org.springframework.integration.handler.BridgeHandler; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.SubscribableChannel; + +/** + * A {@link StreamListenerResultAdapter} used for bridging an {@link org.springframework.cloud.stream.annotation.Output} + * {@link MessageChannel} to a bound {@link MessageChannel}. + * @author Marius Bogoevici + */ +public class MessageChannelStreamListenerResultAdapter + implements StreamListenerResultAdapter { + + @Override + public boolean supports(Class resultType, Class boundElement) { + return MessageChannel.class.isAssignableFrom(resultType) + && MessageChannel.class.isAssignableFrom(boundElement); + } + + @Override + public void adapt(MessageChannel streamListenerResult, MessageChannel boundElement) { + BridgeHandler handler = new BridgeHandler(); + handler.setOutputChannel(boundElement); + handler.afterPropertiesSet(); + ((SubscribableChannel) streamListenerResult).subscribe(handler); + } +} diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java index 327521d33..49cf68af2 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java @@ -16,8 +16,11 @@ package org.springframework.cloud.stream.binding; +import java.lang.annotation.Annotation; import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.springframework.aop.framework.Advised; @@ -26,10 +29,13 @@ import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanInitializationException; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.core.MethodParameter; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; import org.springframework.messaging.Message; @@ -50,7 +56,8 @@ import org.springframework.util.StringUtils; * * @author Marius Bogoevici */ -public class StreamListenerAnnotationBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware, SmartInitializingSingleton { +public class StreamListenerAnnotationBeanPostProcessor + implements BeanPostProcessor, ApplicationContextAware, SmartInitializingSingleton { private final DestinationResolver binderAwareChannelResolver; @@ -60,7 +67,12 @@ public class StreamListenerAnnotationBeanPostProcessor implements BeanPostProces private ConfigurableApplicationContext applicationContext; - public StreamListenerAnnotationBeanPostProcessor(DestinationResolver binderAwareChannelResolver, MessageHandlerMethodFactory messageHandlerMethodFactory) { + private final List> streamListenerParameterAdapters = new ArrayList<>(); + + private final List> streamListenerResultAdapters = new ArrayList<>(); + + public StreamListenerAnnotationBeanPostProcessor(DestinationResolver binderAwareChannelResolver, + MessageHandlerMethodFactory messageHandlerMethodFactory) { Assert.notNull(binderAwareChannelResolver, "Destination resolver cannot be null"); Assert.notNull(messageHandlerMethodFactory, "Message handler method factory cannot be null"); this.binderAwareChannelResolver = binderAwareChannelResolver; @@ -68,8 +80,20 @@ public class StreamListenerAnnotationBeanPostProcessor implements BeanPostProces } @Override + @SuppressWarnings({"rawtypes", "unchecked"}) public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext) applicationContext; + Map parameterAdapterMap = + this.applicationContext.getBeansOfType(StreamListenerParameterAdapter.class); + for (StreamListenerParameterAdapter parameterAdapter : parameterAdapterMap.values()) { + this.streamListenerParameterAdapters.add(parameterAdapter); + } + Map resultAdapterMap = + this.applicationContext.getBeansOfType(StreamListenerResultAdapter.class); + this.streamListenerResultAdapters.add(new MessageChannelStreamListenerResultAdapter()); + for (StreamListenerResultAdapter resultAdapter : resultAdapterMap.values()) { + this.streamListenerResultAdapters.add(resultAdapter); + } } @Override @@ -85,48 +109,140 @@ public class StreamListenerAnnotationBeanPostProcessor implements BeanPostProces public void doWith(final Method method) throws IllegalArgumentException, IllegalAccessException { StreamListener streamListener = AnnotationUtils.findAnnotation(method, StreamListener.class); if (streamListener != null) { - Method targetMethod = checkProxy(method, bean); - Assert.hasText(streamListener.value(), "The binding name cannot be null"); - final InvocableHandlerMethod invocableHandlerMethod = messageHandlerMethodFactory.createInvocableHandlerMethod(bean, targetMethod); - if (!StringUtils.hasText(streamListener.value())) { - throw new BeanInitializationException("A bound component name must be specified"); + if (!method.getReturnType().equals(Void.TYPE)) { + Assert.isTrue(method.getAnnotation(Input.class) == null, + "A @StreamListener may never be annotated with @Input." + + "If it should listen to a specific input, use the value of @StreamListener " + + "instead."); } - if (mappedBindings.containsKey(streamListener.value())) { - throw new BeanInitializationException("Duplicate @" + StreamListener.class.getSimpleName() + - " mapping for '" + streamListener.value() + "' on " + invocableHandlerMethod.getShortLogMessage() + - " already existing for " + mappedBindings.get(streamListener.value()).getShortLogMessage()); - } - mappedBindings.put(streamListener.value(), invocableHandlerMethod); - SubscribableChannel channel = applicationContext.getBean(streamListener.value(), - SubscribableChannel.class); - final String defaultOutputChannel = extractDefaultOutput(method); - if (invocableHandlerMethod.isVoid()) { - Assert.isTrue(StringUtils.isEmpty(defaultOutputChannel), "An output channel cannot be specified for a method that " + - "does not return a value"); + Class[] parameterTypes = method.getParameterTypes(); + if (StringUtils.hasText(streamListener.value())) { + for (int i = 0; i < parameterTypes.length; i++) { + MethodParameter methodParameter = MethodParameter.forMethodOrConstructor(method, i); + Assert.isTrue(methodParameter.getParameterAnnotation(Input.class) == null && + methodParameter.getParameterAnnotation(Output.class) == null, + "A message handling @StreamListener method cannot have parameters annotated " + + "with @Input or @Output"); + } + if (!method.getReturnType().equals(Void.TYPE)) { + Assert.isTrue(method.getAnnotation(Output.class) == null, + "A message handling @StreamListener method cannot be annotated with @Output"); + } + registerHandlerMethodOnListenedChannel(method, streamListener, bean); } else { - Assert.isTrue(!StringUtils.isEmpty(defaultOutputChannel), "An output channel must be specified for a method that " + - "can return a value"); + for (int i = 0; i < parameterTypes.length; i++) { + MethodParameter methodParameter = MethodParameter.forMethodOrConstructor(method, i); + Assert.isTrue(methodParameter.getParameterAnnotation(Input.class) != null ^ + methodParameter.getParameterAnnotation(Output.class) != null, + "A declarative @StreamListener method must have its parameters annotated" + + "with @Input or @Output, but not with both."); + } + if (!method.getReturnType().equals(Void.TYPE)) { + Assert.isTrue(method.getAnnotation(Output.class) != null, + "A declarative @StreamListener method must be annotated with @Output"); + } + invokeSetupMethodOnListenedChannel(method, bean); } - StreamListenerMessageHandler handler = new StreamListenerMessageHandler(invocableHandlerMethod); - handler.setApplicationContext(applicationContext); - handler.setChannelResolver(binderAwareChannelResolver); - if (!StringUtils.isEmpty(defaultOutputChannel)) { - handler.setOutputChannelName(defaultOutputChannel); - } - handler.afterPropertiesSet(); - channel.subscribe(handler); } } }); return bean; } + @SuppressWarnings({"rawtypes", "unchecked"}) + private void invokeSetupMethodOnListenedChannel(Method method, Object bean) { + Object[] arguments = new Object[method.getParameterTypes().length]; + for (int parameterIndex = 0; parameterIndex < arguments.length; parameterIndex++) { + MethodParameter methodParameter = MethodParameter.forMethodOrConstructor(method, parameterIndex); + Class parameterType = methodParameter.getParameterType(); + Annotation targetReferenceAnnotation = methodParameter.hasParameterAnnotation(Input.class) ? + methodParameter.getParameterAnnotation(Input.class) : methodParameter.getParameterAnnotation( + Output.class); + Object targetReferenceAnnotationValue = AnnotationUtils.getValue(targetReferenceAnnotation); + Assert.isInstanceOf(String.class, targetReferenceAnnotationValue, "Annotation value must be a String"); + Assert.hasText((String) targetReferenceAnnotationValue, "Annotation value must not be empty"); + Object targetBean = this.applicationContext.getBean((String) targetReferenceAnnotationValue); + if (parameterType.isAssignableFrom(targetBean.getClass())) { + arguments[parameterIndex] = targetBean; + } + else { + for (StreamListenerParameterAdapter streamListenerParameterAdapter : + this.streamListenerParameterAdapters) { + if (streamListenerParameterAdapter.supports(targetBean.getClass(), methodParameter)) { + arguments[parameterIndex] = streamListenerParameterAdapter.adapt(targetBean, methodParameter); + break; + } + } + } + Assert.notNull(arguments[parameterIndex], + "Cannot convert argument " + parameterIndex + " of " + method + "from " + targetBean.getClass() + .toString() + " to " + parameterType.toString()); + } + try { + if (method.getReturnType().equals(Void.TYPE)) { + method.invoke(bean, arguments); + } + else { + Object result = method.invoke(bean, arguments); + Output output = AnnotationUtils.getAnnotation(method, Output.class); + Object targetBean = this.applicationContext.getBean(output.value()); + for (StreamListenerResultAdapter streamListenerResultAdapter : this + .streamListenerResultAdapters) { + if (streamListenerResultAdapter.supports(result.getClass(), targetBean.getClass())) { + streamListenerResultAdapter.adapt(result, targetBean); + break; + } + } + } + } + catch (Exception e) { + throw new BeanInitializationException("Cannot setup StreamListener for " + method, e); + } + } + + protected void registerHandlerMethodOnListenedChannel(Method method, StreamListener streamListener, Object bean) { + Method targetMethod = checkProxy(method, bean); + Assert.hasText(streamListener.value(), "The binding name cannot be null"); + final InvocableHandlerMethod invocableHandlerMethod = + this.messageHandlerMethodFactory.createInvocableHandlerMethod(bean, targetMethod); + if (!StringUtils.hasText(streamListener.value())) { + throw new BeanInitializationException("A bound component name must be specified"); + } + if (this.mappedBindings.containsKey(streamListener.value())) { + throw new BeanInitializationException("Duplicate @" + StreamListener.class.getSimpleName() + + " mapping for '" + streamListener.value() + "' on " + invocableHandlerMethod.getShortLogMessage() + + " already existing for " + this.mappedBindings.get(streamListener.value()).getShortLogMessage()); + } + this.mappedBindings.put(streamListener.value(), invocableHandlerMethod); + SubscribableChannel channel = this.applicationContext.getBean(streamListener.value(), + SubscribableChannel.class); + final String defaultOutputChannel = extractDefaultOutput(method); + if (invocableHandlerMethod.isVoid()) { + Assert.isTrue(StringUtils.isEmpty(defaultOutputChannel), + "An output channel cannot be specified for a method that " + + "does not return a value"); + } + else { + Assert.isTrue(!StringUtils.isEmpty(defaultOutputChannel), + "An output channel must be specified for a method that " + + "can return a value"); + } + StreamListenerMessageHandler handler = new StreamListenerMessageHandler(invocableHandlerMethod); + handler.setApplicationContext(this.applicationContext); + handler.setChannelResolver(this.binderAwareChannelResolver); + if (!StringUtils.isEmpty(defaultOutputChannel)) { + handler.setOutputChannelName(defaultOutputChannel); + } + handler.afterPropertiesSet(); + channel.subscribe(handler); + } + @Override public void afterSingletonsInstantiated() { // Dump the mappings after the context has been created, ensuring that beans can be processed correctly // again. - mappedBindings.clear(); + this.mappedBindings.clear(); } private String extractDefaultOutput(Method method) { @@ -188,16 +304,19 @@ public class StreamListenerAnnotationBeanPostProcessor implements BeanPostProces @Override protected Object handleRequestMessage(Message requestMessage) { try { - return invocableHandlerMethod.invoke(requestMessage); + return this.invocableHandlerMethod.invoke(requestMessage); } catch (Exception e) { if (e instanceof MessagingException) { throw (MessagingException) e; } else { - throw new MessagingException(requestMessage, "Exception thrown while invoking " + invocableHandlerMethod.getShortLogMessage(), e); + throw new MessagingException(requestMessage, "Exception thrown while invoking " + this + .invocableHandlerMethod + .getShortLogMessage(), e); } } } } + } diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerParameterAdapter.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerParameterAdapter.java new file mode 100644 index 000000000..e147a5eab --- /dev/null +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerParameterAdapter.java @@ -0,0 +1,51 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binding; + +import org.springframework.core.MethodParameter; + +/** + * Strategy for adapting a method argument type annotated with + * {@link org.springframework.cloud.stream.annotation.Input} or + * {@link org.springframework.cloud.stream.annotation.Output} from a bound element + * (e.g. {@link org.springframework.messaging.MessageChannel}) supported by an + * existing binder. + * + * This is a framework extension and is not primarily intended for use by end-users. + * @author Marius Bogoevici + */ +public interface StreamListenerParameterAdapter { + + /** + * Return true if the conversion from the bound element type to the argument type + * is supported. + * @param boundElementType the bound element type + * @param methodParameter the method parameter for which the conversion is performed + * @return true if the conversion is supported + */ + boolean supports(Class boundElementType, MethodParameter methodParameter); + + /** + * Adapts the bound element to the argument type. The result will be passed + * as argument to a method annotated with {@link org.springframework.cloud.stream.annotation.StreamListener} + * when used for setting up a pipeline. + * @param boundElement the bound element + * @param parameter the method parameter for which the conversion is performed + * @return an instance of the parameter type, which will be passed to the method + */ + A adapt(B boundElement, MethodParameter parameter); +} diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerResultAdapter.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerResultAdapter.java new file mode 100644 index 000000000..c8f370c93 --- /dev/null +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerResultAdapter.java @@ -0,0 +1,44 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binding; + +/** + * A strategy for adapting the result of a {@link org.springframework.cloud.stream.annotation.StreamListener} + * annotated method to a bound element annotated with {@link org.springframework.cloud.stream.annotation.Output}. + * + * Used when the {@link org.springframework.cloud.stream.annotation.StreamListener} annotated method is operating in + * declarative mode. + * @author Marius Bogoevici + */ +public interface StreamListenerResultAdapter { + + /** + * Return true if the result type can be converted to the bound element. + * @param resultType the result type. + * @param boundElement the bound element. + * @return true if the conversion can take place. + */ + boolean supports(Class resultType, Class boundElement); + + /** + * Adapts the result to the bound element. + * @param streamListenerResult the result of invoking the method. + * @param boundElement the bound element + */ + void adapt(R streamListenerResult, B boundElement); + +}