Add code formatting guidelines

Add 'eclipse' folder containing Eclipse code
formatter configuration and instructions how to use
 it.

Update rule for join_wrapped_lines

 - Set to `false`

Resolves #930

Update README

Address review comments
This commit is contained in:
Ilayaperumal Gopinathan
2017-05-12 21:34:39 +05:30
committed by Marius Bogoevici
parent 9632546f03
commit bd002e4aaf
159 changed files with 1770 additions and 1283 deletions

View File

@@ -26,8 +26,8 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
/**
* A {@link org.springframework.cloud.stream.binding.StreamListenerResultAdapter} from a {@link Flux}
* return type to a bound {@link MessageChannel}.
* A {@link org.springframework.cloud.stream.binding.StreamListenerResultAdapter} from a
* {@link Flux} return type to a bound {@link MessageChannel}.
* @author Marius Bogoevici
*/
public class FluxToMessageChannelResultAdapter

View File

@@ -53,8 +53,9 @@ public class MessageChannelToFluxSenderParameterAdapter
.doOnError(e -> this.log.error("Error during processing: ", e))
.retry()
.subscribe(
result -> bindingTarget.send(result instanceof Message<?> ? (Message<?>) result :
MessageBuilder.withPayload(result).build()), e -> sendResult.onError(e),
result -> bindingTarget.send(result instanceof Message<?> ? (Message<?>) result
: MessageBuilder.withPayload(result).build()),
e -> sendResult.onError(e),
() -> sendResult.onComplete());
return sendResult;
};

View File

@@ -55,8 +55,7 @@ public class MessageChannelToObservableSenderParameterAdapter implements
public ObservableSender adapt(MessageChannel bindingTarget, MethodParameter parameter) {
return new ObservableSender() {
private FluxSender fluxSender = MessageChannelToObservableSenderParameterAdapter.this
.messageChannelToFluxSenderArgumentAdapter
private FluxSender fluxSender = MessageChannelToObservableSenderParameterAdapter.this.messageChannelToFluxSenderArgumentAdapter
.adapt(bindingTarget, parameter);
@Override

View File

@@ -26,8 +26,8 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert;
/**
* A {@link StreamListenerResultAdapter} from an {@link Observable}
* return type to a bound {@link MessageChannel}.
* A {@link StreamListenerResultAdapter} from an {@link Observable} return type to a bound
* {@link MessageChannel}.
*
* @author Marius Bogoevici
*/

View File

@@ -71,8 +71,7 @@ public class ReactiveSupportAutoConfiguration {
@Bean
@ConditionalOnMissingBean(ObservableToMessageChannelResultAdapter.class)
public ObservableToMessageChannelResultAdapter
observableToMessageChannelResultAdapter(
public ObservableToMessageChannelResultAdapter observableToMessageChannelResultAdapter(
FluxToMessageChannelResultAdapter fluxToMessageChannelResultAdapter) {
return new ObservableToMessageChannelResultAdapter(fluxToMessageChannelResultAdapter);
}

View File

@@ -44,9 +44,21 @@ import static org.springframework.cloud.stream.binding.StreamListenerErrorMessag
@SuppressWarnings("unchecked")
public class StreamListenerGenericFluxInputOutputArgsWithMessageTests {
@SuppressWarnings("unchecked")
private static void sendMessageAndValidate(ConfigurableApplicationContext context) throws InterruptedException {
Processor processor = context.getBean(Processor.class);
String sentPayload = "hello " + UUID.randomUUID().toString();
processor.input().send(MessageBuilder.withPayload(sentPayload).setHeader("contentType", "text/plain").build());
MessageCollector messageCollector = context.getBean(MessageCollector.class);
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS);
assertThat(result).isNotNull();
assertThat(result.getPayload()).isEqualTo(sentPayload.toUpperCase());
}
@Test
public void testGenericFluxInputOutputArgsWithMessage() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(TestGenericStringFluxInputOutputArgsWithMessageImpl1.class, "--server.port=0");
ConfigurableApplicationContext context = SpringApplication
.run(TestGenericStringFluxInputOutputArgsWithMessageImpl1.class, "--server.port=0");
sendMessageAndValidate(context);
context.close();
}
@@ -62,21 +74,12 @@ public class StreamListenerGenericFluxInputOutputArgsWithMessageTests {
}
}
@SuppressWarnings("unchecked")
private static void sendMessageAndValidate(ConfigurableApplicationContext context) throws InterruptedException {
Processor processor = context.getBean(Processor.class);
String sentPayload = "hello " + UUID.randomUUID().toString();
processor.input().send(MessageBuilder.withPayload(sentPayload).setHeader("contentType", "text/plain").build());
MessageCollector messageCollector = context.getBean(MessageCollector.class);
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS);
assertThat(result).isNotNull();
assertThat(result.getPayload()).isEqualTo(sentPayload.toUpperCase());
public static class TestGenericStringFluxInputOutputArgsWithMessageImpl1
extends TestGenericFluxInputOutputArgsWithMessage1<String> {
}
public static class TestGenericStringFluxInputOutputArgsWithMessageImpl1 extends TestGenericFluxInputOutputArgsWithMessage1<String> {
}
public static class TestGenericStringFluxInputOutputArgsWithMessageImpl2 extends TestGenericFluxInputOutputArgsWithMessage2<String> {
public static class TestGenericStringFluxInputOutputArgsWithMessageImpl2
extends TestGenericFluxInputOutputArgsWithMessage2<String> {
}
@EnableBinding(Processor.class)

View File

@@ -35,8 +35,8 @@ import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Test validating that a fix for <a href="https://github.com/reactor/reactor-core/issues/159"/>
* is present.
* Test validating that a fix for
* <a href="https://github.com/reactor/reactor-core/issues/159"/> is present.
*
* @author Marius Bogoevici
*/

View File

@@ -56,14 +56,7 @@ public class StreamListenerReactiveInputOutputArgsTests {
@Parameterized.Parameters
public static Collection InputConfigs() {
return Arrays.asList(new Class[]{ReactorTestInputOutputArgs.class, RxJava1TestInputOutputArgs.class});
}
@Test
public void testInputOutputArgs() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(this.configClass, "--server.port=0");
sendMessageAndValidate(context);
context.close();
return Arrays.asList(new Class[] { ReactorTestInputOutputArgs.class, RxJava1TestInputOutputArgs.class });
}
private static void sendMessageAndValidate(ConfigurableApplicationContext context) throws InterruptedException {
@@ -77,6 +70,13 @@ public class StreamListenerReactiveInputOutputArgsTests {
assertThat(result.getPayload()).isEqualTo(sentPayload.toUpperCase());
}
@Test
public void testInputOutputArgs() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(this.configClass, "--server.port=0");
sendMessageAndValidate(context);
context.close();
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class ReactorTestInputOutputArgs {
@@ -92,7 +92,8 @@ public class StreamListenerReactiveInputOutputArgsTests {
public static class RxJava1TestInputOutputArgs {
@StreamListener
public void receive(@Input(Processor.INPUT) Observable<String> input, @Output(Processor.OUTPUT) ObservableSender output) {
public void receive(@Input(Processor.INPUT) Observable<String> input,
@Output(Processor.OUTPUT) ObservableSender output) {
output.send(input.map(m -> m.toUpperCase()));
}
}

View File

@@ -56,14 +56,8 @@ public class StreamListenerReactiveInputOutputArgsWithMessageTests {
@Parameterized.Parameters
public static Collection InputConfigs() {
return Arrays.asList(new Class[]{ReactorTestInputOutputArgsWithMessage.class, RxJava1TestInputOutputArgsWithMessage.class});
}
@Test
public void testInputOutputArgs() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(this.configClass, "--server.port=0");
sendMessageAndValidate(context);
context.close();
return Arrays.asList(new Class[] { ReactorTestInputOutputArgsWithMessage.class,
RxJava1TestInputOutputArgsWithMessage.class });
}
private static void sendMessageAndValidate(ConfigurableApplicationContext context) throws InterruptedException {
@@ -77,6 +71,13 @@ public class StreamListenerReactiveInputOutputArgsWithMessageTests {
assertThat(result.getPayload()).isEqualTo(sentPayload.toUpperCase());
}
@Test
public void testInputOutputArgs() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(this.configClass, "--server.port=0");
sendMessageAndValidate(context);
context.close();
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class ReactorTestInputOutputArgsWithMessage {

View File

@@ -56,19 +56,10 @@ public class StreamListenerReactiveInputOutputArgsWithSenderAndFailureTests {
@Parameterized.Parameters
public static Collection InputConfigs() {
return Arrays.asList(new Class[]{TestInputOutputArgsWithFluxSenderAndFailure.class, TestInputOutputArgsWithObservableSenderAndFailure.class});
return Arrays.asList(new Class[] { TestInputOutputArgsWithFluxSenderAndFailure.class,
TestInputOutputArgsWithObservableSenderAndFailure.class });
}
@Test
public void testInputOutputArgsWithFluxSenderAndFailure() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(this.configClass, "--server.port=0");
sendMessageAndValidate(context);
sendFailingMessage(context);
sendMessageAndValidate(context);
context.close();
}
private static void sendMessageAndValidate(ConfigurableApplicationContext context) throws InterruptedException {
@SuppressWarnings("unchecked")
Processor processor = context.getBean(Processor.class);
@@ -86,11 +77,21 @@ public class StreamListenerReactiveInputOutputArgsWithSenderAndFailureTests {
processor.input().send(MessageBuilder.withPayload("fail").setHeader("contentType", "text/plain").build());
}
@Test
public void testInputOutputArgsWithFluxSenderAndFailure() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(this.configClass, "--server.port=0");
sendMessageAndValidate(context);
sendFailingMessage(context);
sendMessageAndValidate(context);
context.close();
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class TestInputOutputArgsWithFluxSenderAndFailure {
@StreamListener
public void receive(@Input(Processor.INPUT) Flux<Message<String>> input, @Output(Processor.OUTPUT) FluxSender output) {
public void receive(@Input(Processor.INPUT) Flux<Message<String>> input,
@Output(Processor.OUTPUT) FluxSender output) {
output.send(input
.map(m -> m.getPayload().toString())
.map(m -> {
@@ -109,7 +110,8 @@ public class StreamListenerReactiveInputOutputArgsWithSenderAndFailureTests {
@EnableAutoConfiguration
public static class TestInputOutputArgsWithObservableSenderAndFailure {
@StreamListener
public void receive(@Input(Processor.INPUT) Observable<Message<String>> input, @Output(Processor.OUTPUT) ObservableSender output) {
public void receive(@Input(Processor.INPUT) Observable<Message<String>> input,
@Output(Processor.OUTPUT) ObservableSender output) {
output.send(input
.map(m -> m.getPayload().toString())
.map(m -> {

View File

@@ -56,7 +56,19 @@ public class StreamListenerReactiveInputOutputArgsWithSenderTests {
@Parameterized.Parameters
public static Collection InputConfigs() {
return Arrays.asList(new Class[]{ReactorTestInputOutputArgsWithFluxSender.class, RxJava1TestInputOutputArgsWithObservableSender.class});
return Arrays.asList(new Class[] { ReactorTestInputOutputArgsWithFluxSender.class,
RxJava1TestInputOutputArgsWithObservableSender.class });
}
private static void sendMessageAndValidate(ConfigurableApplicationContext context) throws InterruptedException {
@SuppressWarnings("unchecked")
Processor processor = context.getBean(Processor.class);
String sentPayload = "hello " + UUID.randomUUID().toString();
processor.input().send(MessageBuilder.withPayload(sentPayload).setHeader("contentType", "text/plain").build());
MessageCollector messageCollector = context.getBean(MessageCollector.class);
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS);
assertThat(result).isNotNull();
assertThat(result.getPayload()).isEqualTo(sentPayload.toUpperCase());
}
@Test
@@ -70,23 +82,12 @@ public class StreamListenerReactiveInputOutputArgsWithSenderTests {
context.close();
}
private static void sendMessageAndValidate(ConfigurableApplicationContext context) throws InterruptedException {
@SuppressWarnings("unchecked")
Processor processor = context.getBean(Processor.class);
String sentPayload = "hello " + UUID.randomUUID().toString();
processor.input().send(MessageBuilder.withPayload(sentPayload).setHeader("contentType", "text/plain").build());
MessageCollector messageCollector = context.getBean(MessageCollector.class);
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS);
assertThat(result).isNotNull();
assertThat(result.getPayload()).isEqualTo(sentPayload.toUpperCase());
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class ReactorTestInputOutputArgsWithFluxSender {
@StreamListener
public void receive(@Input(Processor.INPUT) Flux<Message<String>> input, @Output(Processor.OUTPUT) FluxSender output) {
public void receive(@Input(Processor.INPUT) Flux<Message<String>> input,
@Output(Processor.OUTPUT) FluxSender output) {
output.send(input
.map(m -> m.getPayload().toString().toUpperCase())
.map(o -> MessageBuilder.withPayload(o).build()));
@@ -97,8 +98,8 @@ public class StreamListenerReactiveInputOutputArgsWithSenderTests {
@EnableAutoConfiguration
public static class RxJava1TestInputOutputArgsWithObservableSender {
@StreamListener
public void receive(@Input(Processor.INPUT) Observable<Message<?>> input, @Output(Processor.OUTPUT)
ObservableSender output) {
public void receive(@Input(Processor.INPUT) Observable<Message<?>> input,
@Output(Processor.OUTPUT) ObservableSender output) {
output.send(input
.map(m -> m.getPayload().toString().toUpperCase())
.map(o -> MessageBuilder.withPayload(o).build()));

View File

@@ -49,16 +49,6 @@ public class StreamListenerReactiveMethodTests {
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class ReactorTestInputOutputArgs {
@StreamListener(Processor.INPUT)
public void receive(Flux<String> input, @Output(Processor.OUTPUT) FluxSender output) {
output.send(input.map(m -> m.toUpperCase()));
}
}
@Test
public void testRxJava1InvalidInputValueWithOutputMethodParameters() {
try {
@@ -70,16 +60,6 @@ public class StreamListenerReactiveMethodTests {
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class RxJava1TestInputOutputArgs {
@StreamListener(Processor.INPUT)
public void receive(Observable<String> input, @Output(Processor.OUTPUT) ObservableSender output) {
output.send(input.map(m -> m.toUpperCase()));
}
}
@Test
public void testMethodReturnTypeWithNoOutboundSpecified() {
try {
@@ -91,6 +71,26 @@ public class StreamListenerReactiveMethodTests {
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class ReactorTestInputOutputArgs {
@StreamListener(Processor.INPUT)
public void receive(Flux<String> input, @Output(Processor.OUTPUT) FluxSender output) {
output.send(input.map(m -> m.toUpperCase()));
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class RxJava1TestInputOutputArgs {
@StreamListener(Processor.INPUT)
public void receive(Observable<String> input, @Output(Processor.OUTPUT) ObservableSender output) {
output.send(input.map(m -> m.toUpperCase()));
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class ReactorTestReturn5 {

View File

@@ -57,17 +57,10 @@ public class StreamListenerReactiveMethodWithReturnTypeTests {
@Parameterized.Parameters
public static Collection InputConfigs() {
return Arrays.asList(new Class[]{ReactorTestReturn1.class, ReactorTestReturn2.class, ReactorTestReturn3.class, ReactorTestReturn4.class,
RxJava1TestReturn1.class, RxJava1TestReturn2.class, RxJava1TestReturn3.class, RxJava1TestReturn4.class});
}
@Test
public void testReturn() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(this.configClass, "--server.port=0");
sendMessageAndValidate(context);
sendMessageAndValidate(context);
sendMessageAndValidate(context);
context.close();
return Arrays.asList(new Class[] { ReactorTestReturn1.class, ReactorTestReturn2.class, ReactorTestReturn3.class,
ReactorTestReturn4.class,
RxJava1TestReturn1.class, RxJava1TestReturn2.class, RxJava1TestReturn3.class,
RxJava1TestReturn4.class });
}
private static void sendMessageAndValidate(ConfigurableApplicationContext context) throws InterruptedException {
@@ -81,14 +74,21 @@ public class StreamListenerReactiveMethodWithReturnTypeTests {
assertThat(result.getPayload()).isEqualTo(sentPayload.toUpperCase());
}
@Test
public void testReturn() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(this.configClass, "--server.port=0");
sendMessageAndValidate(context);
sendMessageAndValidate(context);
sendMessageAndValidate(context);
context.close();
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class ReactorTestReturn1 {
@StreamListener
public
@Output(Processor.OUTPUT)
Flux<String> receive(@Input(Processor.INPUT) Flux<String> input) {
public @Output(Processor.OUTPUT) Flux<String> receive(@Input(Processor.INPUT) Flux<String> input) {
return input.map(m -> m.toUpperCase());
}
}
@@ -131,9 +131,7 @@ public class StreamListenerReactiveMethodWithReturnTypeTests {
public static class RxJava1TestReturn1 {
@StreamListener
public
@Output(Processor.OUTPUT)
Observable<String> receive(@Input(Processor.INPUT) Observable<String> input) {
public @Output(Processor.OUTPUT) Observable<String> receive(@Input(Processor.INPUT) Observable<String> input) {
return input.map(m -> m.toUpperCase());
}
}
@@ -143,9 +141,7 @@ public class StreamListenerReactiveMethodWithReturnTypeTests {
public static class RxJava1TestReturn2 {
@StreamListener(Processor.INPUT)
public
@Output(Processor.OUTPUT)
Observable<String> receive(Observable<String> input) {
public @Output(Processor.OUTPUT) Observable<String> receive(Observable<String> input) {
return input.map(m -> m.toUpperCase());
}
}
@@ -155,9 +151,7 @@ public class StreamListenerReactiveMethodWithReturnTypeTests {
public static class RxJava1TestReturn3 {
@StreamListener(Processor.INPUT)
public
@SendTo(Processor.OUTPUT)
Observable<String> receive(Observable<String> input) {
public @SendTo(Processor.OUTPUT) Observable<String> receive(Observable<String> input) {
return input.map(m -> m.toUpperCase());
}
}
@@ -167,9 +161,7 @@ public class StreamListenerReactiveMethodWithReturnTypeTests {
public static class RxJava1TestReturn4 {
@StreamListener
public
@SendTo(Processor.OUTPUT)
Observable<String> receive(@Input(Processor.INPUT) Observable<String> input) {
public @SendTo(Processor.OUTPUT) Observable<String> receive(@Input(Processor.INPUT) Observable<String> input) {
return input.map(m -> m.toUpperCase());
}
}

View File

@@ -57,20 +57,11 @@ public class StreamListenerReactiveReturnWithFailureTests {
@Parameterized.Parameters
public static Collection InputConfigs() {
return Arrays.asList(new Class[] {ReactorTestReturnWithFailure1.class, ReactorTestReturnWithFailure2.class,
ReactorTestReturnWithFailure3.class, ReactorTestReturnWithFailure4.class, RxJava1TestReturnWithFailure1.class,
RxJava1TestReturnWithFailure2.class, RxJava1TestReturnWithFailure3.class, RxJava1TestReturnWithFailure4.class});
}
@Test
public void testReturnWithFailure() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(this.configClass, "--server.port=0");
sendMessageAndValidate(context);
sendFailingMessage(context);
sendMessageAndValidate(context);
sendFailingMessage(context);
sendMessageAndValidate(context);
context.close();
return Arrays.asList(new Class[] { ReactorTestReturnWithFailure1.class, ReactorTestReturnWithFailure2.class,
ReactorTestReturnWithFailure3.class, ReactorTestReturnWithFailure4.class,
RxJava1TestReturnWithFailure1.class,
RxJava1TestReturnWithFailure2.class, RxJava1TestReturnWithFailure3.class,
RxJava1TestReturnWithFailure4.class });
}
private static void sendMessageAndValidate(ConfigurableApplicationContext context) throws InterruptedException {
@@ -90,14 +81,23 @@ public class StreamListenerReactiveReturnWithFailureTests {
processor.input().send(MessageBuilder.withPayload("fail").setHeader("contentType", "text/plain").build());
}
@Test
public void testReturnWithFailure() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(this.configClass, "--server.port=0");
sendMessageAndValidate(context);
sendFailingMessage(context);
sendMessageAndValidate(context);
sendFailingMessage(context);
sendMessageAndValidate(context);
context.close();
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class ReactorTestReturnWithFailure1 {
@StreamListener
public
@Output(Processor.OUTPUT)
Flux<String> receive(@Input(Processor.INPUT) Flux<String> input) {
public @Output(Processor.OUTPUT) Flux<String> receive(@Input(Processor.INPUT) Flux<String> input) {
return input.map(m -> {
if (!m.equals("fail")) {
return m.toUpperCase();
@@ -114,9 +114,7 @@ public class StreamListenerReactiveReturnWithFailureTests {
public static class ReactorTestReturnWithFailure2 {
@StreamListener(Processor.INPUT)
public
@Output(Processor.OUTPUT)
Flux<String> receive(Flux<String> input) {
public @Output(Processor.OUTPUT) Flux<String> receive(Flux<String> input) {
return input.map(m -> {
if (!m.equals("fail")) {
return m.toUpperCase();
@@ -133,9 +131,7 @@ public class StreamListenerReactiveReturnWithFailureTests {
public static class ReactorTestReturnWithFailure3 {
@StreamListener(Processor.INPUT)
public
@SendTo(Processor.OUTPUT)
Flux<String> receive(Flux<String> input) {
public @SendTo(Processor.OUTPUT) Flux<String> receive(Flux<String> input) {
return input.map(m -> {
if (!m.equals("fail")) {
return m.toUpperCase();
@@ -152,9 +148,7 @@ public class StreamListenerReactiveReturnWithFailureTests {
public static class ReactorTestReturnWithFailure4 {
@StreamListener
public
@SendTo(Processor.OUTPUT)
Flux<String> receive(@Input(Processor.INPUT) Flux<String> input) {
public @SendTo(Processor.OUTPUT) Flux<String> receive(@Input(Processor.INPUT) Flux<String> input) {
return input.map(m -> {
if (!m.equals("fail")) {
return m.toUpperCase();
@@ -171,9 +165,7 @@ public class StreamListenerReactiveReturnWithFailureTests {
public static class RxJava1TestReturnWithFailure1 {
@StreamListener
public
@Output(Processor.OUTPUT)
Observable<String> receive(@Input(Processor.INPUT) Observable<String> input) {
public @Output(Processor.OUTPUT) Observable<String> receive(@Input(Processor.INPUT) Observable<String> input) {
return input.map(m -> {
if (!m.equals("fail")) {
return m.toUpperCase();
@@ -190,9 +182,7 @@ public class StreamListenerReactiveReturnWithFailureTests {
public static class RxJava1TestReturnWithFailure2 {
@StreamListener
public
@SendTo(Processor.OUTPUT)
Observable<String> receive(@Input(Processor.INPUT) Observable<String> input) {
public @SendTo(Processor.OUTPUT) Observable<String> receive(@Input(Processor.INPUT) Observable<String> input) {
return input.map(m -> {
if (!m.equals("fail")) {
return m.toUpperCase();
@@ -209,9 +199,7 @@ public class StreamListenerReactiveReturnWithFailureTests {
public static class RxJava1TestReturnWithFailure3 {
@StreamListener(Processor.INPUT)
public
@SendTo(Processor.OUTPUT)
Observable<String> receive(Observable<String> input) {
public @SendTo(Processor.OUTPUT) Observable<String> receive(Observable<String> input) {
return input.map(m -> {
if (!m.equals("fail")) {
return m.toUpperCase();
@@ -228,9 +216,7 @@ public class StreamListenerReactiveReturnWithFailureTests {
public static class RxJava1TestReturnWithFailure4 {
@StreamListener(Processor.INPUT)
public
@Output(Processor.OUTPUT)
Observable<String> receive(Observable<String> input) {
public @Output(Processor.OUTPUT) Observable<String> receive(Observable<String> input) {
return input.map(m -> {
if (!m.equals("fail")) {
return m.toUpperCase();

View File

@@ -57,16 +57,11 @@ public class StreamListenerReactiveReturnWithMessageTests {
@Parameterized.Parameters
public static Collection InputConfigs() {
return Arrays.asList(new Class[] {ReactorTestReturnWithMessage1.class, ReactorTestReturnWithMessage2.class,
ReactorTestReturnWithMessage3.class, ReactorTestReturnWithMessage4.class, RxJava1TestReturnWithMessage1.class,
RxJava1TestReturnWithMessage2.class, RxJava1TestReturnWithMessage3.class, RxJava1TestReturnWithMessage4.class});
}
@Test
public void testReturnWithMessage() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(this.configClass, "--server.port=0");
sendMessageAndValidate(context);
context.close();
return Arrays.asList(new Class[] { ReactorTestReturnWithMessage1.class, ReactorTestReturnWithMessage2.class,
ReactorTestReturnWithMessage3.class, ReactorTestReturnWithMessage4.class,
RxJava1TestReturnWithMessage1.class,
RxJava1TestReturnWithMessage2.class, RxJava1TestReturnWithMessage3.class,
RxJava1TestReturnWithMessage4.class });
}
private static void sendMessageAndValidate(ConfigurableApplicationContext context) throws InterruptedException {
@@ -80,14 +75,19 @@ public class StreamListenerReactiveReturnWithMessageTests {
assertThat(result.getPayload()).isEqualTo(sentPayload.toUpperCase());
}
@Test
public void testReturnWithMessage() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(this.configClass, "--server.port=0");
sendMessageAndValidate(context);
context.close();
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class ReactorTestReturnWithMessage1 {
@StreamListener
public
@Output(Processor.OUTPUT)
Flux<String> receive(@Input(Processor.INPUT) Flux<Message<String>> input) {
public @Output(Processor.OUTPUT) Flux<String> receive(@Input(Processor.INPUT) Flux<Message<String>> input) {
return input.map(m -> m.getPayload().toUpperCase());
}
}
@@ -97,9 +97,7 @@ public class StreamListenerReactiveReturnWithMessageTests {
public static class ReactorTestReturnWithMessage2 {
@StreamListener(Processor.INPUT)
public
@Output(Processor.OUTPUT)
Flux<String> receive(Flux<Message<String>> input) {
public @Output(Processor.OUTPUT) Flux<String> receive(Flux<Message<String>> input) {
return input.map(m -> m.getPayload().toUpperCase());
}
}
@@ -109,22 +107,17 @@ public class StreamListenerReactiveReturnWithMessageTests {
public static class ReactorTestReturnWithMessage3 {
@StreamListener(Processor.INPUT)
public
@SendTo(Processor.OUTPUT)
Flux<String> receive(Flux<Message<String>> input) {
public @SendTo(Processor.OUTPUT) Flux<String> receive(Flux<Message<String>> input) {
return input.map(m -> m.getPayload().toUpperCase());
}
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class ReactorTestReturnWithMessage4 {
@StreamListener
public
@SendTo(Processor.OUTPUT)
Flux<String> receive(@Input(Processor.INPUT) Flux<Message<String>> input) {
public @SendTo(Processor.OUTPUT) Flux<String> receive(@Input(Processor.INPUT) Flux<Message<String>> input) {
return input.map(m -> m.getPayload().toUpperCase());
}
}
@@ -134,9 +127,8 @@ public class StreamListenerReactiveReturnWithMessageTests {
public static class RxJava1TestReturnWithMessage1 {
@StreamListener
public
@Output(Processor.OUTPUT)
Observable<String> receive(@Input(Processor.INPUT) Observable<Message<String>> input) {
public @Output(Processor.OUTPUT) Observable<String> receive(
@Input(Processor.INPUT) Observable<Message<String>> input) {
return input.map(m -> m.getPayload().toUpperCase());
}
}
@@ -146,9 +138,8 @@ public class StreamListenerReactiveReturnWithMessageTests {
public static class RxJava1TestReturnWithMessage2 {
@StreamListener
public
@SendTo(Processor.OUTPUT)
Observable<String> receive(@Input(Processor.INPUT) Observable<Message<String>> input) {
public @SendTo(Processor.OUTPUT) Observable<String> receive(
@Input(Processor.INPUT) Observable<Message<String>> input) {
return input.map(m -> m.getPayload().toUpperCase());
}
}
@@ -158,9 +149,7 @@ public class StreamListenerReactiveReturnWithMessageTests {
public static class RxJava1TestReturnWithMessage3 {
@StreamListener(Processor.INPUT)
public
@Output(Processor.OUTPUT)
Observable<String> receive(Observable<Message<String>> input) {
public @Output(Processor.OUTPUT) Observable<String> receive(Observable<Message<String>> input) {
return input.map(m -> m.getPayload().toUpperCase());
}
}
@@ -170,9 +159,7 @@ public class StreamListenerReactiveReturnWithMessageTests {
public static class RxJava1TestReturnWithMessage4 {
@StreamListener(Processor.INPUT)
public
@SendTo(Processor.OUTPUT)
Observable<String> receive(Observable<Message<String>> input) {
public @SendTo(Processor.OUTPUT) Observable<String> receive(Observable<Message<String>> input) {
return input.map(m -> m.getPayload().toUpperCase());
}
}

View File

@@ -56,9 +56,9 @@ public class StreamListenerReactiveReturnWithPojoTests {
@Parameterized.Parameters
public static Collection InputConfigs() {
return Arrays.asList(new Class[] {ReactorTestReturnWithPojo1.class, ReactorTestReturnWithPojo2.class,
return Arrays.asList(new Class[] { ReactorTestReturnWithPojo1.class, ReactorTestReturnWithPojo2.class,
ReactorTestReturnWithPojo3.class, ReactorTestReturnWithPojo4.class, RxJava1TestReturnWithPojo1.class,
RxJava1TestReturnWithPojo2.class, RxJava1TestReturnWithPojo3.class, RxJava1TestReturnWithPojo4.class});
RxJava1TestReturnWithPojo2.class, RxJava1TestReturnWithPojo3.class, RxJava1TestReturnWithPojo4.class });
}
@Test
@@ -81,9 +81,7 @@ public class StreamListenerReactiveReturnWithPojoTests {
public static class ReactorTestReturnWithPojo1 {
@StreamListener
public
@Output(Processor.OUTPUT)
Flux<BarPojo> receive(@Input(Processor.INPUT) Flux<FooPojo> input) {
public @Output(Processor.OUTPUT) Flux<BarPojo> receive(@Input(Processor.INPUT) Flux<FooPojo> input) {
return input.map(m -> new BarPojo(m.getMessage()));
}
}
@@ -93,9 +91,7 @@ public class StreamListenerReactiveReturnWithPojoTests {
public static class ReactorTestReturnWithPojo2 {
@StreamListener(Processor.INPUT)
public
@Output(Processor.OUTPUT)
Flux<BarPojo> receive(Flux<FooPojo> input) {
public @Output(Processor.OUTPUT) Flux<BarPojo> receive(Flux<FooPojo> input) {
return input.map(m -> new BarPojo(m.getMessage()));
}
}
@@ -105,9 +101,7 @@ public class StreamListenerReactiveReturnWithPojoTests {
public static class ReactorTestReturnWithPojo3 {
@StreamListener(Processor.INPUT)
public
@SendTo(Processor.OUTPUT)
Flux<BarPojo> receive(Flux<FooPojo> input) {
public @SendTo(Processor.OUTPUT) Flux<BarPojo> receive(Flux<FooPojo> input) {
return input.map(m -> new BarPojo(m.getMessage()));
}
}
@@ -117,9 +111,7 @@ public class StreamListenerReactiveReturnWithPojoTests {
public static class ReactorTestReturnWithPojo4 {
@StreamListener
public
@SendTo(Processor.OUTPUT)
Flux<BarPojo> receive(@Input(Processor.INPUT) Flux<FooPojo> input) {
public @SendTo(Processor.OUTPUT) Flux<BarPojo> receive(@Input(Processor.INPUT) Flux<FooPojo> input) {
return input.map(m -> new BarPojo(m.getMessage()));
}
}
@@ -129,9 +121,8 @@ public class StreamListenerReactiveReturnWithPojoTests {
public static class RxJava1TestReturnWithPojo1 {
@StreamListener
public
@Output(Processor.OUTPUT)
Observable<BarPojo> receive(@Input(Processor.INPUT) Observable<FooPojo> input) {
public @Output(Processor.OUTPUT) Observable<BarPojo> receive(
@Input(Processor.INPUT) Observable<FooPojo> input) {
return input.map(m -> new BarPojo(m.getMessage()));
}
}
@@ -141,9 +132,8 @@ public class StreamListenerReactiveReturnWithPojoTests {
public static class RxJava1TestReturnWithPojo2 {
@StreamListener
public
@SendTo(Processor.OUTPUT)
Observable<BarPojo> receive(@Input(Processor.INPUT) Observable<FooPojo> input) {
public @SendTo(Processor.OUTPUT) Observable<BarPojo> receive(
@Input(Processor.INPUT) Observable<FooPojo> input) {
return input.map(m -> new BarPojo(m.getMessage()));
}
}
@@ -153,9 +143,7 @@ public class StreamListenerReactiveReturnWithPojoTests {
public static class RxJava1TestReturnWithPojo3 {
@StreamListener(Processor.INPUT)
public
@Output(Processor.OUTPUT)
Observable<BarPojo> receive(Observable<FooPojo> input) {
public @Output(Processor.OUTPUT) Observable<BarPojo> receive(Observable<FooPojo> input) {
return input.map(m -> new BarPojo(m.getMessage()));
}
}
@@ -165,9 +153,7 @@ public class StreamListenerReactiveReturnWithPojoTests {
public static class RxJava1TestReturnWithPojo4 {
@StreamListener(Processor.INPUT)
public
@SendTo(Processor.OUTPUT)
Observable<BarPojo> receive(Observable<FooPojo> input) {
public @SendTo(Processor.OUTPUT) Observable<BarPojo> receive(Observable<FooPojo> input) {
return input.map(m -> new BarPojo(m.getMessage()));
}
}

View File

@@ -45,9 +45,21 @@ import static org.springframework.cloud.stream.binding.StreamListenerErrorMessag
*/
public class StreamListenerWildCardFluxInputOutputArgsWithMessageTests {
private static void sendMessageAndValidate(ConfigurableApplicationContext context) throws InterruptedException {
@SuppressWarnings("unchecked")
Processor processor = context.getBean(Processor.class);
String sentPayload = "hello " + UUID.randomUUID().toString();
processor.input().send(MessageBuilder.withPayload(sentPayload).setHeader("contentType", "text/plain").build());
MessageCollector messageCollector = context.getBean(MessageCollector.class);
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS);
assertThat(result).isNotNull();
assertThat(result.getPayload()).isEqualTo(sentPayload.toUpperCase());
}
@Test
public void testWildCardFluxInputOutputArgsWithMessage() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(TestWildCardFluxInputOutputArgsWithMessage1.class, "--server.port=0");
ConfigurableApplicationContext context = SpringApplication
.run(TestWildCardFluxInputOutputArgsWithMessage1.class, "--server.port=0");
sendMessageAndValidate(context);
context.close();
}
@@ -74,17 +86,6 @@ public class StreamListenerWildCardFluxInputOutputArgsWithMessageTests {
}
}
private static void sendMessageAndValidate(ConfigurableApplicationContext context) throws InterruptedException {
@SuppressWarnings("unchecked")
Processor processor = context.getBean(Processor.class);
String sentPayload = "hello " + UUID.randomUUID().toString();
processor.input().send(MessageBuilder.withPayload(sentPayload).setHeader("contentType", "text/plain").build());
MessageCollector messageCollector = context.getBean(MessageCollector.class);
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS);
assertThat(result).isNotNull();
assertThat(result.getPayload()).isEqualTo(sentPayload.toUpperCase());
}
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class TestWildCardFluxInputOutputArgsWithMessage1 {