diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerAnnotatedMethodArgumentsTests.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerAnnotatedMethodArgumentsTests.java index d2412e423..59470d7c7 100644 --- a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerAnnotatedMethodArgumentsTests.java +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerAnnotatedMethodArgumentsTests.java @@ -39,7 +39,7 @@ import org.springframework.messaging.handler.annotation.Payload; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.fail; -import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_MESSAGE_HANDLER_METHOD_PARAMS; +import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS; /** * @author Marius Bogoevici @@ -50,7 +50,7 @@ public class StreamListenerAnnotatedMethodArgumentsTests { @Test @SuppressWarnings("unchecked") public void testAnnotatedArguments() throws Exception { - ConfigurableApplicationContext context = SpringApplication.run(TestPojoWithAnnotatedArguments1.class, + ConfigurableApplicationContext context = SpringApplication.run(TestPojoWithAnnotatedArguments.class, "--server.port=0"); TestPojoWithAnnotatedArguments testPojoWithAnnotatedArguments = context @@ -60,7 +60,7 @@ public class StreamListenerAnnotatedMethodArgumentsTests { sink.input().send(MessageBuilder.withPayload("{\"foo\":\"barbar" + id + "\"}") .setHeader("contentType", "application/json").setHeader("testHeader", "testValue").build()); assertThat(testPojoWithAnnotatedArguments.receivedArguments).hasSize(3); - assertThat(testPojoWithAnnotatedArguments.receivedArguments.get(0)).isInstanceOf(StreamListenerTestInterfaces.FooPojo.class); + assertThat(testPojoWithAnnotatedArguments.receivedArguments.get(0)).isInstanceOf(StreamListenerTestUtils.FooPojo.class); assertThat(testPojoWithAnnotatedArguments.receivedArguments.get(0)).hasFieldOrPropertyWithValue("foo", "barbar" + id); assertThat(testPojoWithAnnotatedArguments.receivedArguments.get(1)).isInstanceOf(Map.class); @@ -75,20 +75,22 @@ public class StreamListenerAnnotatedMethodArgumentsTests { @Test public void testInputAnnotationAtMethodParameter() throws Exception { try { - SpringApplication.run(TestPojoWithAnnotatedArguments2.class, "--server.port=0"); - fail("Exception expected: "+ INVALID_MESSAGE_HANDLER_METHOD_PARAMS); + SpringApplication.run(TestPojoWithInvalidInputAnnotatedArgument.class, "--server.port=0"); + fail("Exception expected: "+ INVALID_DECLARATIVE_METHOD_PARAMETERS); } catch (BeanCreationException e) { - assertThat(e.getCause().getMessage()).contains(INVALID_MESSAGE_HANDLER_METHOD_PARAMS); + assertThat(e.getCause().getMessage()).contains(INVALID_DECLARATIVE_METHOD_PARAMETERS); } } @EnableBinding(Processor.class) @EnableAutoConfiguration - public static class TestPojoWithAnnotatedArguments1 extends TestPojoWithAnnotatedArguments { + public static class TestPojoWithAnnotatedArguments { + + List receivedArguments = new ArrayList<>(); @StreamListener(Processor.INPUT) - public void receive(@Payload StreamListenerTestInterfaces.FooPojo fooPojo, + public void receive(@Payload StreamListenerTestUtils.FooPojo fooPojo, @Headers Map headers, @Header(MessageHeaders.CONTENT_TYPE) String contentType) { this.receivedArguments.add(fooPojo); @@ -99,10 +101,13 @@ public class StreamListenerAnnotatedMethodArgumentsTests { @EnableBinding(Processor.class) @EnableAutoConfiguration - public static class TestPojoWithAnnotatedArguments2 extends TestPojoWithAnnotatedArguments { + public static class TestPojoWithInvalidInputAnnotatedArgument { + + List receivedArguments = new ArrayList<>(); @StreamListener - public void receive(@Input(Processor.INPUT) @Payload StreamListenerTestInterfaces.FooPojo fooPojo, + public void receive( + @Input(Processor.INPUT) @Payload StreamListenerTestUtils.FooPojo fooPojo, @Headers Map headers, @Header(MessageHeaders.CONTENT_TYPE) String contentType) { this.receivedArguments.add(fooPojo); @@ -110,8 +115,4 @@ public class StreamListenerAnnotatedMethodArgumentsTests { this.receivedArguments.add(contentType); } } - - public static class TestPojoWithAnnotatedArguments { - List receivedArguments = new ArrayList<>(); - } } diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerContentTypeConversionTests.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerContentTypeConversionTests.java index 9d24040c8..52e6cec80 100644 --- a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerContentTypeConversionTests.java +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerContentTypeConversionTests.java @@ -41,10 +41,10 @@ public class StreamListenerContentTypeConversionTests { @Test public void testContentTypeConversion() throws Exception { - ConfigurableApplicationContext context = SpringApplication.run(TestSink1.class, + ConfigurableApplicationContext context = SpringApplication.run(TestSinkWithContentTypeConversion.class, "--server.port=0"); @SuppressWarnings("unchecked") - TestSink1 testSink = context.getBean(TestSink1.class); + TestSinkWithContentTypeConversion testSink = context.getBean(TestSinkWithContentTypeConversion.class); Sink sink = context.getBean(Sink.class); String id = UUID.randomUUID().toString(); sink.input().send( @@ -59,13 +59,13 @@ public class StreamListenerContentTypeConversionTests { @EnableBinding(Sink.class) @EnableAutoConfiguration - public static class TestSink1 { + public static class TestSinkWithContentTypeConversion { - List receivedArguments = new ArrayList<>(); + List receivedArguments = new ArrayList<>(); CountDownLatch latch = new CountDownLatch(1); @StreamListener(Sink.INPUT) - public void receive(StreamListenerTestInterfaces.FooPojo fooPojo) { + public void receive(StreamListenerTestUtils.FooPojo fooPojo) { this.receivedArguments.add(fooPojo); this.latch.countDown(); } diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerDuplicateMappingTests.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerDuplicateMappingTests.java index dba60176a..252892cf5 100644 --- a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerDuplicateMappingTests.java +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerDuplicateMappingTests.java @@ -39,7 +39,7 @@ public class StreamListenerDuplicateMappingTests { @SuppressWarnings("unchecked") public void testDuplicateMapping() throws Exception { try { - ConfigurableApplicationContext context = SpringApplication.run(TestDuplicateMapping1.class, + ConfigurableApplicationContext context = SpringApplication.run(TestDuplicateMapping.class, "--server.port=0"); fail("Exception expected on duplicate mapping"); } @@ -50,14 +50,14 @@ public class StreamListenerDuplicateMappingTests { @EnableBinding(Processor.class) @EnableAutoConfiguration - public static class TestDuplicateMapping1 { + public static class TestDuplicateMapping { @StreamListener(Processor.INPUT) public void receive(Message fooMessage) { } @StreamListener(Processor.INPUT) - public void receive2(Message fooMessage) { + public void receiveDuplicateMapping(Message fooMessage) { } } } diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerHandlerBeanTests.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerHandlerBeanTests.java index ca5138363..b9af0bb08 100644 --- a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerHandlerBeanTests.java +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerHandlerBeanTests.java @@ -59,7 +59,7 @@ public class StreamListenerHandlerBeanTests { @Parameterized.Parameters public static Collection InputConfigs() { - return Arrays.asList(new Class[] { TestHandlerBean1.class, TestHandlerBean2.class }); + return Arrays.asList(TestHandlerBeanWithSendTo.class, TestHandlerBean2.class); } @Test @@ -89,11 +89,11 @@ public class StreamListenerHandlerBeanTests { @EnableBinding(Processor.class) @EnableAutoConfiguration - public static class TestHandlerBean1 { + public static class TestHandlerBeanWithSendTo { @Bean - public HandlerBean1 handlerBean() { - return new HandlerBean1(); + public HandlerBeanWithSendTo handlerBean() { + return new HandlerBeanWithSendTo(); } } @@ -102,30 +102,30 @@ public class StreamListenerHandlerBeanTests { public static class TestHandlerBean2 { @Bean - public HandlerBean2 handlerBean() { - return new HandlerBean2(); + public HandlerBeanWithOutput handlerBean() { + return new HandlerBeanWithOutput(); } } - public static class HandlerBean1 extends HandlerBean { + public static class HandlerBeanWithSendTo extends HandlerBean { @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) - public StreamListenerTestInterfaces.BarPojo receive(StreamListenerTestInterfaces.FooPojo fooMessage) { + public StreamListenerTestUtils.BarPojo receive(StreamListenerTestUtils.FooPojo fooMessage) { this.receivedPojos.add(fooMessage); - StreamListenerTestInterfaces.BarPojo barPojo = new StreamListenerTestInterfaces.BarPojo(); + StreamListenerTestUtils.BarPojo barPojo = new StreamListenerTestUtils.BarPojo(); barPojo.setBar(fooMessage.getFoo()); return barPojo; } } - public static class HandlerBean2 extends HandlerBean { + public static class HandlerBeanWithOutput extends HandlerBean { @StreamListener(Processor.INPUT) @Output(Processor.OUTPUT) - public StreamListenerTestInterfaces.BarPojo receive(StreamListenerTestInterfaces.FooPojo fooMessage) { + public StreamListenerTestUtils.BarPojo receive(StreamListenerTestUtils.FooPojo fooMessage) { this.receivedPojos.add(fooMessage); - StreamListenerTestInterfaces.BarPojo barPojo = new StreamListenerTestInterfaces.BarPojo(); + StreamListenerTestUtils.BarPojo barPojo = new StreamListenerTestUtils.BarPojo(); barPojo.setBar(fooMessage.getFoo()); return barPojo; } @@ -133,7 +133,7 @@ public class StreamListenerHandlerBeanTests { public static class HandlerBean { - List receivedPojos = new ArrayList<>(); + List receivedPojos = new ArrayList<>(); } diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerHandlerMethodTests.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerHandlerMethodTests.java index fcd9aeebe..f83a45c6b 100644 --- a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerHandlerMethodTests.java +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerHandlerMethodTests.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import org.junit.Test; import org.springframework.beans.factory.BeanCreationException; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.cloud.stream.annotation.EnableBinding; @@ -45,14 +46,13 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.fail; import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.AMBIGUOUS_MESSAGE_HANDLER_METHOD_ARGUMENTS; import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER; +import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS; import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_INBOUND_NAME; -import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_MESSAGE_HANDLER_METHOD_PARAMS; import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_OUTBOUND_NAME; import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_OUTPUT_VALUES; import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.NO_INPUT_DESTINATION; import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED; import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED; -import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.TARGET_BEAN_NOT_EXISTS; /** * @author Marius Bogoevici @@ -61,9 +61,9 @@ import static org.springframework.cloud.stream.binding.StreamListenerErrorMessag public class StreamListenerHandlerMethodTests { @Test - public void testMethodWIthInputOnStreamListener() throws Exception { + public void testInvalidInputOnMethod() throws Exception { try { - SpringApplication.run(TestMethodWIthInputOnStreamListener.class, "--server.port=0"); + SpringApplication.run(TestInvalidInputOnMethod.class, "--server.port=0"); fail("Exception expected: "+ INPUT_AT_STREAM_LISTENER); } catch (BeanCreationException e) { @@ -72,7 +72,7 @@ public class StreamListenerHandlerMethodTests { } @Test - public void testReturnTypeWithMultipleOutput() throws Exception { + public void testInvalidReturnTypeWithSendToAndOutput() throws Exception { try { SpringApplication.run(TestReturnTypeWithMultipleOutput.class, "--server.port=0"); fail("Exception expected: "+ RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED); @@ -83,9 +83,9 @@ public class StreamListenerHandlerMethodTests { } @Test - public void testReturnTypeWithNoOutput() throws Exception { + public void testInvalidReturnTypeWithNoOutput() throws Exception { try { - SpringApplication.run(TestReturnTypeWithNoOutput.class, "--server.port=0"); + SpringApplication.run(TestInvalidReturnTypeWithNoOutput.class, "--server.port=0"); fail("Exception expected: " + RETURN_TYPE_NO_OUTBOUND_SPECIFIED); } catch (BeanCreationException e) { @@ -94,9 +94,9 @@ public class StreamListenerHandlerMethodTests { } @Test - public void testMethodInputAnnotationWithNoValue() throws Exception { + public void testInvalidInputAnnotationWithNoValue() throws Exception { try { - SpringApplication.run(TestMethodInputAnnotationWithNoValue.class, "--server.port=0"); + SpringApplication.run(TestInvalidInputAnnotationWithNoValue.class, "--server.port=0"); fail("Exception expected: "+ INVALID_INBOUND_NAME); } catch (BeanCreationException e) { @@ -105,9 +105,9 @@ public class StreamListenerHandlerMethodTests { } @Test - public void testMethodOutputAnnotationWithNoValue() throws Exception { + public void testInvalidOutputAnnotationWithNoValue() throws Exception { try { - SpringApplication.run(TestMethodOutputAnnotationWithNoValue.class, "--server.port=0"); + SpringApplication.run(TestInvalidOutputAnnotationWithNoValue.class, "--server.port=0"); fail("Exception expected: "+ INVALID_OUTBOUND_NAME); } catch (BeanCreationException e) { @@ -122,7 +122,8 @@ public class StreamListenerHandlerMethodTests { fail("Exception expected on using invalid inbound name"); } catch (BeanCreationException e) { - assertThat(e.getCause().getMessage()).contains(TARGET_BEAN_NOT_EXISTS + ": invalid"); + assertThat(e.getCause()).isInstanceOf(NoSuchBeanDefinitionException.class); + assertThat(e.getCause()).hasMessageContaining("'invalid'"); } } @@ -133,7 +134,8 @@ public class StreamListenerHandlerMethodTests { fail("Exception expected on using invalid outbound name"); } catch (BeanCreationException e) { - assertThat(e.getCause().getMessage()).contains(TARGET_BEAN_NOT_EXISTS + ": invalid"); + assertThat(e.getCause()).isInstanceOf(NoSuchBeanDefinitionException.class); + assertThat(e.getCause()).hasMessageContaining("'invalid'"); } } @@ -163,10 +165,10 @@ public class StreamListenerHandlerMethodTests { public void testMethodWithInputAsMethodAndParameter() throws Exception { try { SpringApplication.run(TestMethodWithInputAsMethodAndParameter.class, "--server.port=0"); - fail("Exception expected: " + INVALID_MESSAGE_HANDLER_METHOD_PARAMS); + fail("Exception expected: " + INVALID_DECLARATIVE_METHOD_PARAMETERS); } catch (BeanCreationException e) { - assertThat(e.getCause().getMessage()).contains(INVALID_MESSAGE_HANDLER_METHOD_PARAMS); + assertThat(e.getCause().getMessage()).contains(INVALID_DECLARATIVE_METHOD_PARAMETERS); } } @@ -196,7 +198,7 @@ public class StreamListenerHandlerMethodTests { public void testMethodWithMultipleInputParameters() throws Exception { ConfigurableApplicationContext context = SpringApplication.run(TestMethodWithMultipleInputParameters.class, "--server.port=0"); Processor processor = context.getBean(Processor.class); - StreamListenerTestInterfaces.FooInboundChannel1 inboundChannel2 = context.getBean(StreamListenerTestInterfaces.FooInboundChannel1.class); + StreamListenerTestUtils.FooInboundChannel1 inboundChannel2 = context.getBean(StreamListenerTestUtils.FooInboundChannel1.class); String id = UUID.randomUUID().toString(); final CountDownLatch latch = new CountDownLatch(2); ((SubscribableChannel) processor.output()).subscribe(new MessageHandler() { @@ -219,7 +221,7 @@ public class StreamListenerHandlerMethodTests { ConfigurableApplicationContext context = SpringApplication.run(TestMethodWithMultipleOutputParameters.class, "--server.port=0"); Processor processor = context.getBean(Processor.class); String id = UUID.randomUUID().toString(); - StreamListenerTestInterfaces.FooOutboundChannel1 source2 = context.getBean(StreamListenerTestInterfaces.FooOutboundChannel1.class); + StreamListenerTestUtils.FooOutboundChannel1 source2 = context.getBean(StreamListenerTestUtils.FooOutboundChannel1.class); final CountDownLatch latch = new CountDownLatch(2); ((SubscribableChannel) processor.output()).subscribe(new MessageHandler() { @Override @@ -243,13 +245,13 @@ public class StreamListenerHandlerMethodTests { context.close(); } - @EnableBinding({Processor.class, StreamListenerTestInterfaces.FooOutboundChannel1.class}) + @EnableBinding({Processor.class, StreamListenerTestUtils.FooOutboundChannel1.class}) @EnableAutoConfiguration public static class TestMethodWithMultipleOutputParameters { @StreamListener public void receive(@Input(Processor.INPUT) SubscribableChannel input, @Output(Processor.OUTPUT) final MessageChannel output1, - @Output(StreamListenerTestInterfaces.FooOutboundChannel1.OUTPUT) final MessageChannel output2) { + @Output(StreamListenerTestUtils.FooOutboundChannel1.OUTPUT) final MessageChannel output2) { input.subscribe(new MessageHandler() { @Override public void handleMessage(Message message) throws MessagingException { @@ -269,17 +271,17 @@ public class StreamListenerHandlerMethodTests { public static class TestMethodWithoutInput { @StreamListener - public void receive(StreamListenerTestInterfaces.FooPojo fooPojo) { + public void receive(StreamListenerTestUtils.FooPojo fooPojo) { } } @EnableBinding({Sink.class}) @EnableAutoConfiguration - public static class TestMethodWIthInputOnStreamListener { + public static class TestInvalidInputOnMethod { @StreamListener @Input(Sink.INPUT) - public void receive(StreamListenerTestInterfaces.FooPojo fooPojo) { + public void receive(StreamListenerTestUtils.FooPojo fooPojo) { } } @@ -288,7 +290,7 @@ public class StreamListenerHandlerMethodTests { public static class TestAmbiguousMethodArguments1 { @StreamListener(Processor.INPUT) - public void receive(@Payload StreamListenerTestInterfaces.FooPojo fooPojo, String value) { + public void receive(@Payload StreamListenerTestUtils.FooPojo fooPojo, String value) { } } @@ -297,24 +299,24 @@ public class StreamListenerHandlerMethodTests { public static class TestAmbiguousMethodArguments2 { @StreamListener(Processor.INPUT) - public void receive(@Payload StreamListenerTestInterfaces.FooPojo fooPojo, @Payload StreamListenerTestInterfaces.BarPojo barPojo) { + public void receive(@Payload StreamListenerTestUtils.FooPojo fooPojo, @Payload StreamListenerTestUtils.BarPojo barPojo) { } } - @EnableBinding({Processor.class, StreamListenerTestInterfaces.FooOutboundChannel1.class}) + @EnableBinding({Processor.class, StreamListenerTestUtils.FooOutboundChannel1.class}) @EnableAutoConfiguration public static class TestReturnTypeWithMultipleOutput { @StreamListener public String receive(@Input(Processor.INPUT) SubscribableChannel input1, @Output(Processor.OUTPUT) MessageChannel output1, - @Output(StreamListenerTestInterfaces.FooOutboundChannel1.OUTPUT) MessageChannel output2) { + @Output(StreamListenerTestUtils.FooOutboundChannel1.OUTPUT) MessageChannel output2) { return "foo"; } } - @EnableBinding({Processor.class, StreamListenerTestInterfaces.FooOutboundChannel1.class}) + @EnableBinding({Processor.class, StreamListenerTestUtils.FooOutboundChannel1.class}) @EnableAutoConfiguration - public static class TestReturnTypeWithNoOutput { + public static class TestInvalidReturnTypeWithNoOutput { @StreamListener public String receive(@Input(Processor.INPUT) SubscribableChannel input1) { @@ -324,7 +326,7 @@ public class StreamListenerHandlerMethodTests { @EnableBinding({Processor.class}) @EnableAutoConfiguration - public static class TestMethodInputAnnotationWithNoValue { + public static class TestInvalidInputAnnotationWithNoValue { @StreamListener public void receive(@Input SubscribableChannel input) { @@ -333,7 +335,7 @@ public class StreamListenerHandlerMethodTests { @EnableBinding({Processor.class}) @EnableAutoConfiguration - public static class TestMethodOutputAnnotationWithNoValue { + public static class TestInvalidOutputAnnotationWithNoValue { @StreamListener public void receive(@Input(Processor.OUTPUT) SubscribableChannel input, @Output MessageChannel output) { @@ -363,16 +365,16 @@ public class StreamListenerHandlerMethodTests { public static class TestMethodWithInputAsMethodAndParameter { @StreamListener - public void receive(@Input(Sink.INPUT) StreamListenerTestInterfaces.FooPojo fooPojo) { + public void receive(@Input(Sink.INPUT) StreamListenerTestUtils.FooPojo fooPojo) { } } - @EnableBinding({Processor.class, StreamListenerTestInterfaces.FooOutboundChannel1.class}) + @EnableBinding({Processor.class, StreamListenerTestUtils.FooOutboundChannel1.class}) @EnableAutoConfiguration public static class TestMethodWithOutputAsMethodAndParameter { @StreamListener - @Output(StreamListenerTestInterfaces.FooOutboundChannel1.OUTPUT) + @Output(StreamListenerTestUtils.FooOutboundChannel1.OUTPUT) public void receive(@Input(Processor.INPUT) SubscribableChannel input, @Output(Processor.OUTPUT) final MessageChannel output1) { input.subscribe(new MessageHandler() { @Override @@ -383,12 +385,12 @@ public class StreamListenerHandlerMethodTests { } } - @EnableBinding({Processor.class, StreamListenerTestInterfaces.FooInboundChannel1.class}) + @EnableBinding({Processor.class, StreamListenerTestUtils.FooInboundChannel1.class}) @EnableAutoConfiguration public static class TestMethodWithMultipleInputParameters { @StreamListener - public void receive(@Input(Processor.INPUT) SubscribableChannel input1, @Input(StreamListenerTestInterfaces.FooInboundChannel1.INPUT) SubscribableChannel input2, + public void receive(@Input(Processor.INPUT) SubscribableChannel input1, @Input(StreamListenerTestUtils.FooInboundChannel1.INPUT) SubscribableChannel input2, final @Output(Processor.OUTPUT) MessageChannel output) { input1.subscribe(new MessageHandler() { @Override diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMessageArgumentTests.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMessageArgumentTests.java index 33f4aa3de..a4e6c13eb 100644 --- a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMessageArgumentTests.java +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMessageArgumentTests.java @@ -72,7 +72,7 @@ public class StreamListenerMessageArgumentTests { .getBean(TestPojoWithMessageArgument.class); assertThat(testPojoWithMessageArgument.receivedMessages).hasSize(1); assertThat(testPojoWithMessageArgument.receivedMessages.get(0).getPayload()).isEqualTo("barbar" + id); - Message message = (Message) collector + Message message = (Message) collector .forChannel(processor.output()).poll(1, TimeUnit.SECONDS); assertThat(message).isNotNull(); assertThat(message.getPayload().getBar()).isEqualTo("barbar" + id); @@ -85,9 +85,9 @@ public class StreamListenerMessageArgumentTests { @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) - public StreamListenerTestInterfaces.BarPojo receive(Message fooMessage) { + public StreamListenerTestUtils.BarPojo receive(Message fooMessage) { this.receivedMessages.add(fooMessage); - StreamListenerTestInterfaces.BarPojo barPojo = new StreamListenerTestInterfaces.BarPojo(); + StreamListenerTestUtils.BarPojo barPojo = new StreamListenerTestUtils.BarPojo(); barPojo.setBar(fooMessage.getPayload()); return barPojo; } @@ -99,9 +99,9 @@ public class StreamListenerMessageArgumentTests { @StreamListener(Processor.INPUT) @Output(Processor.OUTPUT) - public StreamListenerTestInterfaces.BarPojo receive(Message fooMessage) { + public StreamListenerTestUtils.BarPojo receive(Message fooMessage) { this.receivedMessages.add(fooMessage); - StreamListenerTestInterfaces.BarPojo barPojo = new StreamListenerTestInterfaces.BarPojo(); + StreamListenerTestUtils.BarPojo barPojo = new StreamListenerTestUtils.BarPojo(); barPojo.setBar(fooMessage.getPayload()); return barPojo; } diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMethodReturnWithConversionTests.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMethodReturnWithConversionTests.java index b5787555e..80bc47ed9 100644 --- a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMethodReturnWithConversionTests.java +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMethodReturnWithConversionTests.java @@ -120,7 +120,7 @@ public class StreamListenerMethodReturnWithConversionTests extends Suite { TestPojoWithMimeType testPojoWithMimeType = context.getBean(TestPojoWithMimeType.class); assertThat(testPojoWithMimeType.receivedPojos).hasSize(1); assertThat(testPojoWithMimeType.receivedPojos.get(0)).hasFieldOrPropertyWithValue("foo", "barbar" + id); - Message message = (Message) collector.forChannel(processor.output()).poll(1, + Message message = (Message) collector.forChannel(processor.output()).poll(1, TimeUnit.SECONDS); assertThat(message).isNotNull(); assertThat(message.getPayload().getBar()).isEqualTo("barbar" + id); @@ -135,9 +135,9 @@ public class StreamListenerMethodReturnWithConversionTests extends Suite { @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) - public StreamListenerTestInterfaces.BarPojo receive(StreamListenerTestInterfaces.FooPojo fooPojo) { + public StreamListenerTestUtils.BarPojo receive(StreamListenerTestUtils.FooPojo fooPojo) { this.receivedPojos.add(fooPojo); - StreamListenerTestInterfaces.BarPojo barPojo = new StreamListenerTestInterfaces.BarPojo(); + StreamListenerTestUtils.BarPojo barPojo = new StreamListenerTestUtils.BarPojo(); barPojo.setBar(fooPojo.getFoo()); return barPojo; } @@ -149,16 +149,16 @@ public class StreamListenerMethodReturnWithConversionTests extends Suite { @StreamListener(Processor.INPUT) @Output(Processor.OUTPUT) - public StreamListenerTestInterfaces.BarPojo receive(StreamListenerTestInterfaces.FooPojo fooPojo) { + public StreamListenerTestUtils.BarPojo receive(StreamListenerTestUtils.FooPojo fooPojo) { this.receivedPojos.add(fooPojo); - StreamListenerTestInterfaces.BarPojo barPojo = new StreamListenerTestInterfaces.BarPojo(); + StreamListenerTestUtils.BarPojo barPojo = new StreamListenerTestUtils.BarPojo(); barPojo.setBar(fooPojo.getFoo()); return barPojo; } } public static class TestPojoWithMimeType { - List receivedPojos = new ArrayList<>(); + List receivedPojos = new ArrayList<>(); } } diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMethodWithReturnMessageTests.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMethodWithReturnMessageTests.java index 9501dd440..4631eb1a5 100644 --- a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMethodWithReturnMessageTests.java +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMethodWithReturnMessageTests.java @@ -73,7 +73,7 @@ public class StreamListenerMethodWithReturnMessageTests { .getBean(TestPojoWithMessageReturn.class); assertThat(testPojoWithMessageReturn.receivedPojos).hasSize(1); assertThat(testPojoWithMessageReturn.receivedPojos.get(0)).hasFieldOrPropertyWithValue("foo", "barbar" + id); - Message message = (Message) collector + Message message = (Message) collector .forChannel(processor.output()).poll(1, TimeUnit.SECONDS); assertThat(message).isNotNull(); assertThat(message.getPayload().getBar()).isEqualTo("barbar" + id); @@ -86,9 +86,9 @@ public class StreamListenerMethodWithReturnMessageTests { @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) - public Message receive(StreamListenerTestInterfaces.FooPojo fooPojo) { + public Message receive(StreamListenerTestUtils.FooPojo fooPojo) { this.receivedPojos.add(fooPojo); - StreamListenerTestInterfaces.BarPojo barPojo = new StreamListenerTestInterfaces.BarPojo(); + StreamListenerTestUtils.BarPojo barPojo = new StreamListenerTestUtils.BarPojo(); barPojo.setBar(fooPojo.getFoo()); return MessageBuilder.withPayload(barPojo).setHeader("foo", "bar").build(); } @@ -100,16 +100,16 @@ public class StreamListenerMethodWithReturnMessageTests { @StreamListener(Processor.INPUT) @Output(Processor.OUTPUT) - public Message receive(StreamListenerTestInterfaces.FooPojo fooPojo) { + public Message receive(StreamListenerTestUtils.FooPojo fooPojo) { this.receivedPojos.add(fooPojo); - StreamListenerTestInterfaces.BarPojo bazPojo = new StreamListenerTestInterfaces.BarPojo(); + StreamListenerTestUtils.BarPojo bazPojo = new StreamListenerTestUtils.BarPojo(); bazPojo.setBar(fooPojo.getFoo()); return MessageBuilder.withPayload(bazPojo).setHeader("foo", "bar").build(); } } public static class TestPojoWithMessageReturn { - List receivedPojos = new ArrayList<>(); + List receivedPojos = new ArrayList<>(); } } diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMethodWithReturnValueTests.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMethodWithReturnValueTests.java index 89658e114..59f66838d 100644 --- a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMethodWithReturnValueTests.java +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerMethodWithReturnValueTests.java @@ -86,7 +86,7 @@ public class StreamListenerMethodWithReturnValueTests { @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) - public String receive(StreamListenerTestInterfaces.FooPojo fooPojo) { + public String receive(StreamListenerTestUtils.FooPojo fooPojo) { this.receivedPojos.add(fooPojo); return fooPojo.getFoo(); } @@ -98,14 +98,14 @@ public class StreamListenerMethodWithReturnValueTests { @StreamListener(Processor.INPUT) @Output(Processor.OUTPUT) - public String receive(StreamListenerTestInterfaces.FooPojo fooPojo) { + public String receive(StreamListenerTestUtils.FooPojo fooPojo) { this.receivedPojos.add(fooPojo); return fooPojo.getFoo(); } } public static class TestStringProcessor { - List receivedPojos = new ArrayList<>(); + List receivedPojos = new ArrayList<>(); } } diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerTestInterfaces.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerTestUtils.java similarity index 97% rename from spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerTestInterfaces.java rename to spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerTestUtils.java index 98d377e9f..8ef914dfd 100644 --- a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerTestInterfaces.java +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerTestUtils.java @@ -23,7 +23,7 @@ import org.springframework.messaging.SubscribableChannel; /** * @author Ilayaperumal Gopinathan */ -public class StreamListenerTestInterfaces { +public class StreamListenerTestUtils { public static class FooPojo { diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerWithAnnotatedInputOutputArgsTests.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerWithAnnotatedInputOutputArgsTests.java index 32a81fdc6..974678d26 100644 --- a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerWithAnnotatedInputOutputArgsTests.java +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerWithAnnotatedInputOutputArgsTests.java @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit; import org.junit.Test; +import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.cloud.stream.annotation.EnableBinding; @@ -39,7 +40,6 @@ import org.springframework.messaging.support.MessageBuilder; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS; -import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.TARGET_BEAN_NOT_EXISTS; /** * @author Marius Bogoevici @@ -71,7 +71,8 @@ public class StreamListenerWithAnnotatedInputOutputArgsTests { fail("Exception expected on using invalid bindable target as method parameter"); } catch (Exception e) { - assertThat(e.getMessage()).contains(TARGET_BEAN_NOT_EXISTS + ": invalid"); + assertThat(e.getCause()).isInstanceOf(NoSuchBeanDefinitionException.class); + assertThat(e.getCause()).hasMessageContaining("'invalid'"); } } diff --git a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapterTests.java b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapterTests.java index 65c012058..504a2bce3 100644 --- a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapterTests.java +++ b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapterTests.java @@ -40,18 +40,22 @@ import static org.assertj.core.api.Assertions.assertThat; /** * @author Marius Bogoevici */ -@SuppressWarnings("unchecked") public class MessageChannelToInputFluxParameterAdapterTests { @Test public void testWrapperFluxSupportsMultipleSubscriptions() throws Exception { List results = Collections.synchronizedList(new ArrayList<>()); CountDownLatch latch = new CountDownLatch(4); - final MessageChannelToInputFluxParameterAdapter messageChannelToInputFluxParameterAdapter - = new MessageChannelToInputFluxParameterAdapter(new CompositeMessageConverter(Collections.singleton(new MappingJackson2MessageConverter()))); - final Method processMethod = ReflectionUtils.findMethod(MessageChannelToInputFluxParameterAdapterTests.class, "process", Flux.class); + final MessageChannelToInputFluxParameterAdapter messageChannelToInputFluxParameterAdapter = new MessageChannelToInputFluxParameterAdapter( + new CompositeMessageConverter( + Collections.singleton(new MappingJackson2MessageConverter()))); + final Method processMethod = ReflectionUtils.findMethod( + MessageChannelToInputFluxParameterAdapterTests.class, "process", + Flux.class); final DirectChannel adaptedChannel = new DirectChannel(); - final Flux> adapterFlux = (Flux>) messageChannelToInputFluxParameterAdapter.adapt(adaptedChannel, new MethodParameter(processMethod, 0)); + @SuppressWarnings("unchecked") + final Flux> adapterFlux = (Flux>) messageChannelToInputFluxParameterAdapter + .adapt(adaptedChannel, new MethodParameter(processMethod, 0)); String uuid1 = UUID.randomUUID().toString(); String uuid2 = UUID.randomUUID().toString(); adapterFlux.map(m -> m.getPayload() + uuid1).subscribe(s -> { @@ -67,11 +71,12 @@ public class MessageChannelToInputFluxParameterAdapterTests { adaptedChannel.send(MessageBuilder.withPayload("B").build()); assertThat(latch.await(5000, TimeUnit.MILLISECONDS)).isTrue(); - assertThat(results).containsExactlyInAnyOrder("A" + uuid1, "B" + uuid1, "A" + uuid2, "B" + uuid2); + assertThat(results).containsExactlyInAnyOrder("A" + uuid1, "B" + uuid1, + "A" + uuid2, "B" + uuid2); } public void process(Flux> message) { - // do nothing + // do nothing - we just reference this method from the test } } diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamListener.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamListener.java index a3649109f..ee5e8714c 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamListener.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamListener.java @@ -26,39 +26,98 @@ import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter; import org.springframework.messaging.handler.annotation.MessageMapping; /** - * Annotation that marks a method to be a listener to the inputs declared through {@link EnableBinding} - * (e.g. channels). + * Annotation that marks a method to be a listener to inputs declared via + * {@link EnableBinding} (e.g. channels). * - * Annotated methods are allowed to have flexible signatures, which determine how - * the method is invoked and how their return results are processed. This annotation - * can be applied for two separate classes of methods. + * Annotated methods are allowed to have flexible signatures, which determine how the + * method is invoked and how their return results are processed. This annotation can be + * applied for two separate classes of methods. * *

Declarative mode

* - * A method is considered as declarative if its method parameters are annotated with {@link Input} and/or {@link Output} - * which have either bound elements (e.g. channels) or conversion targets from bound elements via a registered - * {@link StreamListenerParameterAdapter}. In this case, the method is invoked once when the application starts. + * A method is considered declarative if all its method parameter types and return type + * (if not void) are bound elements or conversion targets from bound elements via a + * registered {@link StreamListenerParameterAdapter}. + * + * Only declarative methods can have bound elements or conversion targets as arguments and + * return type. + * + * Declarative methods must specify what inputs and outputs correspond to their arguments + * and return type, and can do this in one of the following ways. + * + *
    + *
  • By using either the {@link Input} or {@link Output} annotation for each of the + * parameters and the {@link Output} annotation on the method for the return type (if applicable). The use + * of annotations in this case is mandatory. In this case the {@link StreamListener} + * annotation must not specify a value.
  • + *
  • By setting an {@link Input} bound target as the annotation value of {@link StreamListener} + * and using {@link org.springframework.messaging.handler.annotation.SendTo}
  • on the method + * for the return type (if applicable). In this case the method must have exactly one parameter, corresponding + * to an input. + *
+ * + * An example of declarative method signature using the former idiom is as follows: + * + *
+ * {@code
+ * @StreamListener
+ * public @Output("joined") Flux join(
+ *       @Input("input1") Flux input1,
+ *       @Input("input2") Flux input2) {
+ *   // ... join the two input streams via functional operators
+ * }
+ * }
+ * 
+ * + * An example of declarative method signature using the latter idiom is as follows: + * + *
+ * {@code
+ * @StreamListener(Processor.INPUT)
+ * @SendTo(Processor.OUTPUT)
+ * public Flux convert(Flux input) {
+ *     return input.map(String::toUppercase);
+ * }
+ * }
+ * 
+ * + * Declarative methods are invoked only once, when the context is refreshed. * *

Individual message handler mode

* - * Non declarative method is treated as message handler based, and is invoked for each - * incoming message received from that target. In this case, the - * method can have a flexible signature, as described by {@link MessageMapping}. + * Non declarative methods are treated as message handler based, and are invoked for each + * incoming message received from that target. In this case, the method can have a + * flexible signature, as described by {@link MessageMapping}. * - * If the method returns a {@link org.springframework.messaging.Message}, the result will be automatically sent - * to a channel, as follows: + * If the method returns a {@link org.springframework.messaging.Message}, the result will + * be automatically sent to a channel, as follows: *
    - *
  • A result of the type {@link org.springframework.messaging.Message} will be sent as-is
  • - *
  • All other results will become the payload of a {@link org.springframework.messaging.Message}
  • + *
  • A result of the type {@link org.springframework.messaging.Message} will be sent + * as-is
  • + *
  • All other results will become the payload of a + * {@link org.springframework.messaging.Message}
  • *
* - * The target channel of the return message is determined by consulting in the following order: + * The target channel of the return message is determined by consulting in the following + * order: *
    - *
  • The {@link org.springframework.messaging.MessageHeaders} of the resulting message.
  • - *
  • The value set on the {@link org.springframework.messaging.handler.annotation.SendTo} annotation, if present
  • + *
  • The {@link org.springframework.messaging.MessageHeaders} of the resulting + * message.
  • + *
  • The value set on the + * {@link org.springframework.messaging.handler.annotation.SendTo} annotation, if + * present
  • *
* - * In both the modes, the StreamListener annotation value must be the name of an {@link Input} bound target. + * An example of individual message handler signature is as follows: + * + *
+ * {@code
+ * @StreamListener(Processor.INPUT)
+ * @SendTo(Processor.OUTPUT)
+ * public String convert(String input) {
+ *     return input.toUppercase();
+ * }
+ * }
  *
  * @author Marius Bogoevici
  * @author Ilayaperumal Gopinathan
@@ -66,7 +125,7 @@ import org.springframework.messaging.handler.annotation.MessageMapping;
  * @see {@link EnableBinding}
  * @see {@link org.springframework.messaging.handler.annotation.SendTo}
  */
-@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
+@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
 @Retention(RetentionPolicy.RUNTIME)
 @MessageMapping
 @Documented
diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java
index e0b9a0da3..94db41417 100644
--- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java
+++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java
@@ -26,7 +26,6 @@ import org.springframework.aop.framework.Advised;
 import org.springframework.aop.support.AopUtils;
 import org.springframework.beans.BeansException;
 import org.springframework.beans.factory.BeanInitializationException;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
 import org.springframework.beans.factory.SmartInitializingSingleton;
 import org.springframework.beans.factory.config.BeanPostProcessor;
 import org.springframework.cloud.stream.annotation.Input;
@@ -49,18 +48,9 @@ import org.springframework.util.Assert;
 import org.springframework.util.ReflectionUtils;
 import org.springframework.util.StringUtils;
 
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMS;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_INBOUND_NAME;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_OUTBOUND_NAME;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.TARGET_BEAN_NOT_EXISTS;
-import static org.springframework.cloud.stream.binding.StreamListenerMethodUtils.getInboundElementNameFromMethod;
-import static org.springframework.cloud.stream.binding.StreamListenerMethodUtils.getOutboundElementNameFromMethod;
-
 /**
- * {@link BeanPostProcessor} that handles {@link StreamListener} annotations found on bean methods.
+ * {@link BeanPostProcessor} that handles {@link StreamListener} annotations found on bean
+ * methods.
  *
  * @author Marius Bogoevici
  * @author Ilayaperumal Gopinathan
@@ -74,8 +64,6 @@ public class StreamListenerAnnotationBeanPostProcessor
 
 	private final Map mappedBindings = new HashMap<>();
 
-	private final Map boundElements = new HashMap<>();
-
 	private ConfigurableApplicationContext applicationContext;
 
 	private final List> streamListenerParameterAdapters = new ArrayList<>();
@@ -120,23 +108,23 @@ public class StreamListenerAnnotationBeanPostProcessor
 			public void doWith(final Method method) throws IllegalArgumentException, IllegalAccessException {
 				StreamListener streamListener = AnnotationUtils.findAnnotation(method, StreamListener.class);
 				if (streamListener != null) {
-					Assert.isTrue(method.getAnnotation(Input.class) == null, INPUT_AT_STREAM_LISTENER);
-					String methodAnnotatedInboundName = getInboundElementNameFromMethod(streamListener);
-					String methodAnnotatedOutboundName = getOutboundElementNameFromMethod(method);
+					Assert.isTrue(method.getAnnotation(Input.class) == null, StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER);
+					String methodAnnotatedInboundName = streamListener.value();
+					String methodAnnotatedOutboundName = StreamListenerMethodUtils.getOutboundElementNameFromMethod(method);
 					int inputAnnotationCount = StreamListenerMethodUtils.inputAnnotationCount(method);
 					int outputAnnotationCount = StreamListenerMethodUtils.outputAnnotationCount(method);
-					boolean isDeclarative = isDeclarativeStreamListenerMethod(method, methodAnnotatedInboundName, methodAnnotatedOutboundName);
-					StreamListenerMethodUtils.assertStreamListenerMethod(method, inputAnnotationCount, outputAnnotationCount, methodAnnotatedInboundName, methodAnnotatedOutboundName, isDeclarative);
-					Class[] parameterTypes = method.getParameterTypes();
+					boolean isDeclarative = checkDeclarativeMethod(method, methodAnnotatedInboundName, methodAnnotatedOutboundName);
+					StreamListenerMethodUtils.validateStreamListenerMethod(method, inputAnnotationCount, outputAnnotationCount,
+							methodAnnotatedInboundName, methodAnnotatedOutboundName, isDeclarative);
 					if (!method.getReturnType().equals(Void.TYPE)) {
 						if (!StringUtils.hasText(methodAnnotatedOutboundName)) {
 							if (outputAnnotationCount == 0) {
-								throw new IllegalArgumentException(RETURN_TYPE_NO_OUTBOUND_SPECIFIED);
+								throw new IllegalArgumentException(StreamListenerErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED);
 							}
-							Assert.isTrue((outputAnnotationCount == 1), RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED);
+							Assert.isTrue((outputAnnotationCount == 1), StreamListenerErrorMessages.RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED);
 						}
 					}
-					if (isDeclarative && (!StringUtils.hasText(methodAnnotatedInboundName) || parameterTypes.length == 1)) {
+					if (isDeclarative) {
 						invokeSetupMethodOnListenedChannel(method, bean, methodAnnotatedInboundName, methodAnnotatedOutboundName);
 					}
 					else {
@@ -148,25 +136,34 @@ public class StreamListenerAnnotationBeanPostProcessor
 		return bean;
 	}
 
-	private boolean isDeclarativeStreamListenerMethod(Method method, String methodAnnotatedInboundName, String methodAnnotatedOutboundName) {
+	private boolean checkDeclarativeMethod(Method method, String methodAnnotatedInboundName, String methodAnnotatedOutboundName) {
 		int methodArgumentsLength = method.getParameterTypes().length;
 		for (int parameterIndex = 0; parameterIndex < methodArgumentsLength; parameterIndex++) {
-			MethodParameter methodParameter = MethodParameter.forMethodOrConstructor(method, parameterIndex);
+			MethodParameter methodParameter = MethodParameter
+					.forMethodOrConstructor(method, parameterIndex);
 			if (methodParameter.hasParameterAnnotation(Input.class)) {
-				String inboundName = (String) AnnotationUtils.getValue(methodParameter.getParameterAnnotation(Input.class));
-				Assert.isTrue(StringUtils.hasText(inboundName), INVALID_INBOUND_NAME);
-				return isDeclarativeMethodParameter(getBindableBean(inboundName), methodParameter);
+				String inboundName = (String) AnnotationUtils
+						.getValue(methodParameter.getParameterAnnotation(Input.class));
+				Assert.isTrue(StringUtils.hasText(inboundName), StreamListenerErrorMessages.INVALID_INBOUND_NAME);
+				Assert.isTrue(isDeclarativeMethodParameter(this.applicationContext.getBean(inboundName),
+						methodParameter), StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS);
+				return true;
 			}
 			if (methodParameter.hasParameterAnnotation(Output.class)) {
-				String outboundName = (String) AnnotationUtils.getValue(methodParameter.getParameterAnnotation(Output.class));
-				Assert.isTrue(StringUtils.hasText(outboundName), INVALID_OUTBOUND_NAME);
-				return isDeclarativeMethodParameter(getBindableBean(outboundName), methodParameter);
+				String outboundName = (String) AnnotationUtils
+						.getValue(methodParameter.getParameterAnnotation(Output.class));
+				Assert.isTrue(StringUtils.hasText(outboundName), StreamListenerErrorMessages.INVALID_OUTBOUND_NAME);
+				Assert.isTrue(isDeclarativeMethodParameter(this.applicationContext.getBean(outboundName),
+						methodParameter), StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS);
+				return true;
 			}
 			if (StringUtils.hasText(methodAnnotatedOutboundName)) {
-				return isDeclarativeMethodParameter(getBindableBean(methodAnnotatedOutboundName), methodParameter);
+				return isDeclarativeMethodParameter(
+						this.applicationContext.getBean(methodAnnotatedOutboundName), methodParameter);
 			}
 			if (StringUtils.hasText(methodAnnotatedInboundName)) {
-				return isDeclarativeMethodParameter(getBindableBean(methodAnnotatedInboundName), methodParameter);
+				return isDeclarativeMethodParameter(
+						this.applicationContext.getBean(methodAnnotatedInboundName), methodParameter);
 			}
 		}
 		return false;
@@ -186,18 +183,6 @@ public class StreamListenerAnnotationBeanPostProcessor
 		return false;
 	}
 
-	private Object getBindableBean(String boundElementName) {
-		try {
-			if (!this.boundElements.containsKey(boundElementName)) {
-				this.boundElements.put(boundElementName, this.applicationContext.getBean(boundElementName));
-			}
-			return this.boundElements.get(boundElementName);
-		}
-		catch (NoSuchBeanDefinitionException e) {
-			throw new IllegalStateException(TARGET_BEAN_NOT_EXISTS + ": " + boundElementName, e);
-		}
-	}
-
 	@SuppressWarnings({"rawtypes", "unchecked"})
 	private void invokeSetupMethodOnListenedChannel(Method method, Object bean, String inboundName, String outboundName) {
 		Object[] arguments = new Object[method.getParameterTypes().length];
@@ -216,7 +201,7 @@ public class StreamListenerAnnotationBeanPostProcessor
 			}
 			if (targetReferenceValue != null) {
 				Assert.isInstanceOf(String.class, targetReferenceValue, "Annotation value must be a String");
-				Object targetBean = getBindableBean((String) targetReferenceValue);
+				Object targetBean = this.applicationContext.getBean((String) targetReferenceValue);
 				if (parameterType.isAssignableFrom(targetBean.getClass())) {
 					arguments[parameterIndex] = targetBean;
 				}
@@ -234,7 +219,7 @@ public class StreamListenerAnnotationBeanPostProcessor
 								+ " to " + parameterType);
 			}
 			else {
-				throw new IllegalStateException(INVALID_DECLARATIVE_METHOD_PARAMS);
+				throw new IllegalStateException(StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS);
 			}
 		}
 		try {
@@ -282,7 +267,7 @@ public class StreamListenerAnnotationBeanPostProcessor
 		this.mappedBindings.put(streamListener.value(), invocableHandlerMethod);
 		SubscribableChannel channel = this.applicationContext.getBean(streamListener.value(),
 				SubscribableChannel.class);
-		final String defaultOutputChannel = getOutboundElementNameFromMethod(method);
+		final String defaultOutputChannel = StreamListenerMethodUtils.getOutboundElementNameFromMethod(method);
 		if (invocableHandlerMethod.isVoid()) {
 			Assert.isTrue(StringUtils.isEmpty(defaultOutputChannel),
 					"An output channel cannot be specified for a method that " +
@@ -309,7 +294,6 @@ public class StreamListenerAnnotationBeanPostProcessor
 		// Dump the mappings after the context has been created, ensuring that beans can be processed correctly
 		// again.
 		this.mappedBindings.clear();
-		this.boundElements.clear();
 	}
 
 	private Method checkProxy(Method methodArg, Object bean) {
diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerErrorMessages.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerErrorMessages.java
index 6bae9035c..4000d236a 100644
--- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerErrorMessages.java
+++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerErrorMessages.java
@@ -18,19 +18,23 @@ package org.springframework.cloud.stream.binding;
 /**
  * @author Ilayaperumal Gopinathan
  */
-public interface StreamListenerErrorMessages {
+public abstract class StreamListenerErrorMessages {
 
+	private static final String PREFIX = "A method annotated with @StreamListener ";
 
-	public static final String INPUT_AT_STREAM_LISTENER = "A @StreamListener may never be annotated with @Input. If it should listen to a specific input, " +
-			"use the value of @StreamListener instead.";
+	public static final String INPUT_AT_STREAM_LISTENER = PREFIX
+			+ "may never be annotated with @Input. "
+			+ "If it should listen to a specific input, use the value of @StreamListener instead";
 
-	public static final String RETURN_TYPE_NO_OUTBOUND_SPECIFIED = "StreamListener method with return type should have outbound target specified";
+	public static final String RETURN_TYPE_NO_OUTBOUND_SPECIFIED = PREFIX
+			+ "having a return type should also have an outbound target specified";
 
-	public static final String RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED = "StreamListener method with return type should have only one outbound target specified";
+	public static final String RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED = PREFIX
+			+ "having a return type should have only one outbound target specified";
 
-	public static final String INVALID_INBOUND_NAME = "@Input annotation should always be associated with a valid inbound name";
+	public static final String INVALID_INBOUND_NAME = "The @Input annotation must have the name of an input as value";
 
-	public static final String INVALID_OUTBOUND_NAME = "@Output annotation should always be associated with a valid outbound name";
+	public static final String INVALID_OUTBOUND_NAME = "The @Output annotation must have the name of an input as value";
 
 	public static final String ATLEAST_ONE_OUTPUT = "At least one output must be specified";
 
@@ -38,27 +42,23 @@ public interface StreamListenerErrorMessages {
 
 	public static final String SEND_TO_EMPTY_DESTINATION = "An empty destination cannot be specified";
 
-	public static final String INVALID_MESSAGE_HANDLER_METHOD_PARAMS = "@Input or @Output annotation is not supported as method parameter in StreamListener method with " +
-			"message handler mapping";
+	public static final String INVALID_INPUT_OUTPUT_METHOD_PARAMETERS = "@Input or @Output annotations are not permitted on "
+			+ "method parameters while using the @StreamListener value and a method-level output specification";
 
-	public static final String INVALID_INPUT_OUTPUT_METHOD_PARAMETERS = "@Input or @Output annotations are not permitted as " +
-			"method parameters when both inbound and outbound values are set as method annotated values";
+	public static final String NO_INPUT_DESTINATION = "No input destination is configured. Use either the @StreamListener value or @Input";
 
-	public static final String NO_INPUT_DESTINATION = "No input destination is configured. Use either a @StreamListener attribute or @Input";
-
-	public static final String INVALID_DECLARATIVE_METHOD_PARAMETERS = "Declarative StreamListener method should only have inbound or outbound targets as method parameters";
+	public static final String INVALID_DECLARATIVE_METHOD_PARAMETERS = PREFIX
+			+ "may use @Input or @Output annotations only in declarative mode and for parameters that are bound elements or convertible from bound elements.";
 
 	public static final String AMBIGUOUS_MESSAGE_HANDLER_METHOD_ARGUMENTS = "Ambiguous method arguments for the StreamListener method";
 
-	public static final String INVALID_INPUT_VALUES = "Cannot set both StreamListener attribute and @Input annotation as method parameter";
+	public static final String INVALID_INPUT_VALUES = "Cannot set both @StreamListener value and @Input annotation as method parameter";
 
-	public static final String INVALID_INPUT_VALUE_WITH_OUTPUT_METHOD_PARAM = "Cannot set StreamListener attribute when using" +
-			" @Output annotation as method parameter. Use @Input method parameter annotation to specify inbound value instead";
+	public static final String INVALID_INPUT_VALUE_WITH_OUTPUT_METHOD_PARAM = "Setting the @StreamListener value when using"
+			+ " @Output annotation as method parameter is not permitted. Use @Input method parameter annotation to specify inbound value instead";
 
-	public static final String INVALID_OUTPUT_VALUES = "Cannot set both Output (@Output/@SendTo) method annotation value" +
-			" and @Output annotation as a method parameter";
-
-	public static final String INVALID_DECLARATIVE_METHOD_PARAMS = "Declarative StreamListener method should only have inbound or outbound targets as method parameters";
+	public static final String INVALID_OUTPUT_VALUES = "Cannot set both output (@Output/@SendTo) method annotation value"
+			+ " and @Output annotation as a method parameter";
 
 	public static final String TARGET_BEAN_NOT_EXISTS = "Target bean doesn't exist for the bound element name";
 }
diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMethodUtils.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMethodUtils.java
index 0e067253e..0ca3f5aa0 100644
--- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMethodUtils.java
+++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMethodUtils.java
@@ -28,20 +28,6 @@ import org.springframework.util.Assert;
 import org.springframework.util.ObjectUtils;
 import org.springframework.util.StringUtils;
 
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.AMBIGUOUS_MESSAGE_HANDLER_METHOD_ARGUMENTS;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.ATLEAST_ONE_OUTPUT;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_INBOUND_NAME;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_INPUT_OUTPUT_METHOD_PARAMETERS;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_INPUT_VALUES;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_INPUT_VALUE_WITH_OUTPUT_METHOD_PARAM;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_MESSAGE_HANDLER_METHOD_PARAMS;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_OUTBOUND_NAME;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.INVALID_OUTPUT_VALUES;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.NO_INPUT_DESTINATION;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.SEND_TO_EMPTY_DESTINATION;
-import static org.springframework.cloud.stream.binding.StreamListenerErrorMessages.SEND_TO_MULTIPLE_DESTINATIONS;
-
 /**
  * This class contains utility methods for handling {@link StreamListener} annotated bean methods.
  *
@@ -71,39 +57,38 @@ public class StreamListenerMethodUtils {
 		return outputAnnotationCount;
 	}
 
-	protected static void assertStreamListenerMethod(Method method, int inputAnnotationCount, int outputAnnotationCount,
-			String methodAnnotatedInboundName, String methodAnnotatedOutboundName, boolean isDeclarative) {
+	protected static void validateStreamListenerMethod(Method method, int inputAnnotationCount, int outputAnnotationCount, String methodAnnotatedInboundName, String methodAnnotatedOutboundName, boolean isDeclarative) {
 		int methodArgumentsLength = method.getParameterTypes().length;
 		if (!isDeclarative) {
-			Assert.isTrue(inputAnnotationCount == 0 && outputAnnotationCount == 0, INVALID_MESSAGE_HANDLER_METHOD_PARAMS);
+			Assert.isTrue(inputAnnotationCount == 0 && outputAnnotationCount == 0, StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS);
 		}
 		if (StringUtils.hasText(methodAnnotatedInboundName) && StringUtils.hasText(methodAnnotatedOutboundName)) {
-			Assert.isTrue(inputAnnotationCount == 0 && outputAnnotationCount == 0, INVALID_INPUT_OUTPUT_METHOD_PARAMETERS);
+			Assert.isTrue(inputAnnotationCount == 0 && outputAnnotationCount == 0, StreamListenerErrorMessages.INVALID_INPUT_OUTPUT_METHOD_PARAMETERS);
 		}
 		if (StringUtils.hasText(methodAnnotatedInboundName)) {
-			Assert.isTrue(inputAnnotationCount == 0, INVALID_INPUT_VALUES);
-			Assert.isTrue(outputAnnotationCount == 0, INVALID_INPUT_VALUE_WITH_OUTPUT_METHOD_PARAM);
+			Assert.isTrue(inputAnnotationCount == 0, StreamListenerErrorMessages.INVALID_INPUT_VALUES);
+			Assert.isTrue(outputAnnotationCount == 0, StreamListenerErrorMessages.INVALID_INPUT_VALUE_WITH_OUTPUT_METHOD_PARAM);
 		}
 		else {
-			Assert.isTrue(inputAnnotationCount >= 1, NO_INPUT_DESTINATION);
+			Assert.isTrue(inputAnnotationCount >= 1, StreamListenerErrorMessages.NO_INPUT_DESTINATION);
 		}
 		if (StringUtils.hasText(methodAnnotatedOutboundName)) {
-			Assert.isTrue(outputAnnotationCount == 0, INVALID_OUTPUT_VALUES);
+			Assert.isTrue(outputAnnotationCount == 0, StreamListenerErrorMessages.INVALID_OUTPUT_VALUES);
 		}
 		if (isDeclarative) {
 			for (int parameterIndex = 0; parameterIndex < methodArgumentsLength; parameterIndex++) {
 				MethodParameter methodParameter = MethodParameter.forMethodOrConstructor(method, parameterIndex);
 				if (methodParameter.hasParameterAnnotation(Input.class)) {
 					String inboundName = (String) AnnotationUtils.getValue(methodParameter.getParameterAnnotation(Input.class));
-					Assert.isTrue(StringUtils.hasText(inboundName), INVALID_INBOUND_NAME);
+					Assert.isTrue(StringUtils.hasText(inboundName), StreamListenerErrorMessages.INVALID_INBOUND_NAME);
 				}
 				if (methodParameter.hasParameterAnnotation(Output.class)) {
 					String outboundName = (String) AnnotationUtils.getValue(methodParameter.getParameterAnnotation(Output.class));
-					Assert.isTrue(StringUtils.hasText(outboundName), INVALID_OUTBOUND_NAME);
+					Assert.isTrue(StringUtils.hasText(outboundName), StreamListenerErrorMessages.INVALID_OUTBOUND_NAME);
 				}
 			}
 			if (methodArgumentsLength > 1){
-				Assert.isTrue(inputAnnotationCount + outputAnnotationCount == methodArgumentsLength, INVALID_DECLARATIVE_METHOD_PARAMETERS);
+				Assert.isTrue(inputAnnotationCount + outputAnnotationCount == methodArgumentsLength, StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS);
 			}
 		}
 	}
@@ -124,26 +109,22 @@ public class StreamListenerMethodUtils {
 			}
 			if (numPayloadAnnotations > 0) {
 				Assert.isTrue(methodArgumentsLength == numAnnotatedMethodParameters && numPayloadAnnotations <= 1,
-						AMBIGUOUS_MESSAGE_HANDLER_METHOD_ARGUMENTS);
+						StreamListenerErrorMessages.AMBIGUOUS_MESSAGE_HANDLER_METHOD_ARGUMENTS);
 			}
 		}
 	}
 
-	protected static String getInboundElementNameFromMethod(StreamListener streamListener) {
-		return StringUtils.hasText(streamListener.value()) ? streamListener.value() : null;
-	}
-
 	protected static String getOutboundElementNameFromMethod(Method method) {
 		SendTo sendTo = AnnotationUtils.findAnnotation(method, SendTo.class);
 		if (sendTo != null) {
-			Assert.isTrue(!ObjectUtils.isEmpty(sendTo.value()), ATLEAST_ONE_OUTPUT);
-			Assert.isTrue(sendTo.value().length == 1, SEND_TO_MULTIPLE_DESTINATIONS);
-			Assert.hasText(sendTo.value()[0], SEND_TO_EMPTY_DESTINATION);
+			Assert.isTrue(!ObjectUtils.isEmpty(sendTo.value()), StreamListenerErrorMessages.ATLEAST_ONE_OUTPUT);
+			Assert.isTrue(sendTo.value().length == 1, StreamListenerErrorMessages.SEND_TO_MULTIPLE_DESTINATIONS);
+			Assert.hasText(sendTo.value()[0], StreamListenerErrorMessages.SEND_TO_EMPTY_DESTINATION);
 			return sendTo.value()[0];
 		}
 		Output output = AnnotationUtils.findAnnotation(method, Output.class);
 		if (output != null) {
-			Assert.isTrue(StringUtils.hasText(output.value()), ATLEAST_ONE_OUTPUT);
+			Assert.isTrue(StringUtils.hasText(output.value()), StreamListenerErrorMessages.ATLEAST_ONE_OUTPUT);
 			return output.value();
 		}
 		return null;