Add StreamListener Usage Sample

This resolves #8

Doc updates

Address review comments
This commit is contained in:
Ilayaperumal Gopinathan
2016-03-22 00:52:52 +05:30
committed by Gary Russell
parent 78a5e4f5d9
commit 2414a2f1ef
10 changed files with 429 additions and 0 deletions

View File

@@ -26,6 +26,7 @@
<module>multibinder-differentsystems</module>
<module>rxjava-processor</module>
<module>multi-io</module>
<module>stream-listener</module>
</modules>
<dependencyManagement>
<dependencies>

42
stream-listener/README.md Normal file
View File

@@ -0,0 +1,42 @@
Spring Cloud Stream Stream Listener Sample
=============================
In this *Spring Cloud Stream* sample, the application shows how to use StreamListener support to enable message mapping and automatic type conversion.
## Requirements
To run this sample, you will need to have installed:
* Java 8 or Above
This example requires Redis to be running on localhost.
## Code Tour
This sample is a Spring Boot application that bundles multiple application together to showcase how to use StreamListener to enable
message mapping and automatic type conversion.
* TypeConversionApplication - the Spring Boot Main Application
* Converters - the class that holds the required custom message converter that converts POJO of type `Foo` to `Bar`
* SampleSource - the app that generates a message of type `Foo` that has the value `hi`
* SampleTransformer - the app that has the message handler method annotated with @StreamListener to map the process input channel to a type `Bar`.
This will make sure to apply `FooToBarConverter` automatically without having a need to specify `content-type` for the channel explicitly.
The annotation @SendTo on the message handler method will make sure to send the output to the provided output channel.
* SampleSink - the app that receives the converted message from the transformer output.
Please note that the applications (SampleSource, SampleTransformer and SampleSink) are bundled inside the single application for the demo
purpose only. In practice, these applications run on their own. If at all they need to be bundled together, the best practice is to use
`AggregateApplicationBuilder`. Refer the sample `double` for more info on aggregate application.
## Building with Maven
Build the sample by executing:
>$ mvn clean package
## Running the Sample
To start the source module execute the following:
>$ java -jar target/spring-cloud-stream-sample-stream-listener-1.0.0.BUILD-SNAPSHOT-exec.jar

59
stream-listener/pom.xml Normal file
View File

@@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-sample-stream-listener</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-stream-sample-stream-listener</name>
<description>Demo project for stream listener</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
<start-class>demo.TypeConversionApplication</start-class>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>exec</classifier>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,101 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.cloud.stream.converter.AbstractFromMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.util.MimeType;
/**
* @author Ilayaperumal Gopinathan
*/
@Configuration
public class Converters {
//Register custom converter
@Bean
public AbstractFromMessageConverter fooConverter() {
return new FooToBarConverter();
}
public static class Foo {
private String value = "foo";
public String getValue() {
return this.value;
}
public void setValue(String value) {
this.value = value;
}
}
public static class Bar {
private String value = "init";
public Bar(String value) {
this.value = value;
}
public String getValue() {
return this.value;
}
public void setValue(String value) {
this.value = value;
}
}
public static class FooToBarConverter extends AbstractFromMessageConverter {
public FooToBarConverter() {
super(MimeType.valueOf("test/bar"));
}
@Override
protected Class<?>[] supportedTargetTypes() {
return new Class[] {Bar.class};
}
@Override
protected Class<?>[] supportedPayloadTypes() {
return new Class<?>[] {Foo.class};
}
@Override
public Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object result = null;
try {
if (message.getPayload() instanceof Foo) {
Foo fooPayload = (Foo) message.getPayload();
result = new Bar(fooPayload.getValue());
}
}
catch (Exception e) {
logger.error(e.getMessage(), e);
throw new MessageConversionException(e.getMessage());
}
return result;
}
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.SubscribableChannel;
/**
* @author Ilayaperumal Gopinathan
*/
@EnableBinding(SampleSink.Sink.class)
public class SampleSink {
// Sink application definition
@StreamListener(Sink.SAMPLE)
public void receive(Converters.Bar barMessage) {
System.out.println("******************");
System.out.println("At the Sink");
System.out.println("******************");
System.out.println("Received transformed message " + barMessage.getValue() + " of type " + barMessage.getClass());
}
public interface Sink {
String SAMPLE = "sample-sink";
@Input(SAMPLE)
SubscribableChannel sampleSink();
}
}

View File

@@ -0,0 +1,57 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
/**
* @author Ilayaperumal Gopinathan
*/
@EnableBinding(SampleSource.Source.class)
public class SampleSource {
@Bean
@InboundChannelAdapter(value = Source.SAMPLE, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource<String> timerMessageSource() {
return new MessageSource<String>() {
public Message<String> receive() {
System.out.println("******************");
System.out.println("At the Source");
System.out.println("******************");
Converters.Foo foo = new Converters.Foo();
foo.setValue("hi");
System.out.println("Sending value: " + foo.getValue() + " of type " + foo.getClass());
return new GenericMessage(foo);
}
};
}
public interface Source {
String SAMPLE = "sample-source";
@Output(SAMPLE)
MessageChannel sampleSource();
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;
/**
* @author Ilayaperumal Gopinathan
*/
@EnableBinding(Processor.class)
public class SampleTransformer {
private static final String TRANSFORMATION_VALUE = "HI";
// Transformer application definition
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Converters.Bar receive(Converters.Bar barMessage) {
System.out.println("******************");
System.out.println("At the transformer");
System.out.println("******************");
System.out.println("Received value "+ barMessage.getValue() + " of type " + barMessage.getClass());
System.out.println("Transforming the value to " + TRANSFORMATION_VALUE + " and with the type " + barMessage.getClass());
barMessage.setValue(TRANSFORMATION_VALUE);
return barMessage;
}
}

View File

@@ -0,0 +1,29 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TypeConversionApplication {
public static void main(String[] args) {
SpringApplication.run(TypeConversionApplication.class, args);
}
}

View File

@@ -0,0 +1,14 @@
server:
port: 8082
spring:
cloud:
stream:
bindings:
sample-source:
destination: testtock
input:
destination: testtock
output:
destination: xformed
sample-sink:
destination: xformed

View File

@@ -0,0 +1,36 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = TypeConversionApplication.class)
@WebAppConfiguration
@DirtiesContext
public class ModuleApplicationTests {
@Test
public void contextLoads() {
}
}