Update to Spring Cloud Stream Brooklyn.BUILD-SNAPSHOT

This commit is contained in:
Marius Bogoevici
2016-08-25 22:56:58 -04:00
parent 95c8f9afdd
commit 0b00a2361d
26 changed files with 328 additions and 201 deletions

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -29,10 +29,10 @@ import org.springframework.context.ConfigurableApplicationContext;
public class DoubleApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder().
from(SourceApplication.class).args("--fixedDelay=5000")
new AggregateApplicationBuilder(DoubleApplication.class, args)
.from(SourceApplication.class).args("--fixedDelay=5000")
.via(ProcessorApplication.class)
.to(SinkApplication.class).args("--debug=true").run("--spring.application.name=aggregate-test");
.to(SinkApplication.class).args("--debug=true").run();
}
}

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -12,7 +12,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -42,14 +42,14 @@
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support-internal</artifactId>
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<artifactId>kafka_2.11</artifactId>
<classifier>test</classifier>
<version>0.8.2.1</version>
<version>0.9.0.1</version>
<scope>test</scope>
<exclusions>
<exclusion>

View File

@@ -16,10 +16,6 @@
package multibinder;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
import java.util.List;
import java.util.UUID;
import org.hamcrest.CoreMatchers;
@@ -32,7 +28,9 @@ import org.junit.runner.RunWith;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
@@ -40,12 +38,10 @@ import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.KafkaProducerProperties;
import org.springframework.cloud.stream.test.junit.kafka.KafkaTestSupport;
import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.Configuration;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
@@ -53,27 +49,28 @@ import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = MultibinderApplication.class)
@SpringBootTest(classes = MultibinderApplication.class)
@WebAppConfiguration
@DirtiesContext
public class TwoKafkaBindersApplicationTest {
@ClassRule
public static KafkaTestSupport kafkaTestSupport1 = new KafkaTestSupport(true);
public static KafkaEmbedded kafkaTestSupport1 = new KafkaEmbedded(1);
@ClassRule
public static KafkaTestSupport kafkaTestSupport2 = new KafkaTestSupport(true);
public static KafkaEmbedded kafkaTestSupport2 = new KafkaEmbedded(1);
@ClassRule
public static RedisTestSupport redisTestSupport = new RedisTestSupport();
@BeforeClass
public static void setupEnvironment() {
System.setProperty("kafkaBroker1", kafkaTestSupport1.getBrokerAddress());
System.setProperty("zk1", kafkaTestSupport1.getZkConnectString());
System.setProperty("kafkaBroker2", kafkaTestSupport2.getBrokerAddress());
System.setProperty("zk2", kafkaTestSupport2.getZkConnectString());
System.setProperty("kafkaBroker1", kafkaTestSupport1.getBrokersAsString());
System.setProperty("zk1", kafkaTestSupport1.getZookeeperConnectionString());
System.setProperty("kafkaBroker2", kafkaTestSupport2.getBrokersAsString());
System.setProperty("zk2", kafkaTestSupport2.getZookeeperConnectionString());
}
@Autowired
@@ -83,18 +80,19 @@ public class TwoKafkaBindersApplicationTest {
public void contextLoads() {
Binder<MessageChannel, ?, ?> binder1 = binderFactory.getBinder("kafka1");
KafkaMessageChannelBinder kafka1 = (KafkaMessageChannelBinder) binder1;
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(kafka1.getConnectionFactory());
Configuration configuration = (Configuration) directFieldAccessor.getPropertyValue("configuration");
List<BrokerAddress> brokerAddresses = configuration.getBrokerAddresses();
Assert.assertThat(brokerAddresses, hasSize(1));
Assert.assertThat(brokerAddresses, contains(BrokerAddress.fromAddress(kafkaTestSupport1.getBrokerAddress())));
DirectFieldAccessor directFieldAccessor1 = new DirectFieldAccessor(kafka1);
KafkaBinderConfigurationProperties configuration1 =
(KafkaBinderConfigurationProperties) directFieldAccessor1.getPropertyValue("configurationProperties");
Assert.assertThat(configuration1.getBrokers(), arrayWithSize(1));
Assert.assertThat(configuration1.getBrokers()[0], equalTo(kafkaTestSupport1.getBrokersAsString()));
Binder<MessageChannel, ?, ?> binder2 = binderFactory.getBinder("kafka2");
KafkaMessageChannelBinder kafka2 = (KafkaMessageChannelBinder) binder2;
DirectFieldAccessor directFieldAccessor2 = new DirectFieldAccessor(kafka2.getConnectionFactory());
Configuration configuration2 = (Configuration) directFieldAccessor2.getPropertyValue("configuration");
List<BrokerAddress> brokerAddresses2 = configuration2.getBrokerAddresses();
Assert.assertThat(brokerAddresses2, hasSize(1));
Assert.assertThat(brokerAddresses2, contains(BrokerAddress.fromAddress(kafkaTestSupport2.getBrokerAddress())));
DirectFieldAccessor directFieldAccessor2 = new DirectFieldAccessor(kafka2);
KafkaBinderConfigurationProperties configuration2 =
(KafkaBinderConfigurationProperties) directFieldAccessor2.getPropertyValue("configurationProperties");
Assert.assertThat(configuration2.getBrokers(), arrayWithSize(1));
Assert.assertThat(configuration2.getBrokers()[0], equalTo(kafkaTestSupport2.getBrokersAsString()));
}
@Test
@@ -110,7 +108,7 @@ public class TwoKafkaBindersApplicationTest {
String testPayload = "testFoo" + UUID.randomUUID().toString();
dataProducer.send(MessageBuilder.withPayload(testPayload).build());
Message<?> receive = dataConsumer.receive(2000);
Message<?> receive = dataConsumer.receive(5000);
Assert.assertThat(receive, Matchers.notNullValue());
Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo(testPayload));
}

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -34,23 +34,23 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-sink</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-redis</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support-internal</artifactId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -6,7 +6,7 @@ spring:
bindings:
input:
destination: dataIn
binder: redis
binder: kafka
output:
destination: dataOut
binder: rabbit

View File

@@ -24,47 +24,37 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.rabbit.RabbitConsumerProperties;
import org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder;
import org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder;
import org.springframework.cloud.stream.test.junit.rabbit.RabbitTestSupport;
import org.springframework.cloud.stream.test.junit.redis.RedisTestSupport;
import org.springframework.cloud.stream.binder.test.junit.rabbit.RabbitTestSupport;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
/**
* @author Marius Bogoevici
* @author Gary Russell
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = MultibinderApplication.class)
@WebAppConfiguration
@DirtiesContext
public class RabbitAndRedisBinderApplicationTests {
public class RabbitAndKafkaBinderApplicationTests {
@ClassRule
public static RabbitTestSupport rabbitTestSupport = new RabbitTestSupport();
@ClassRule
public static RedisTestSupport redisTestSupport = new RedisTestSupport();
@Autowired
private BinderFactory<MessageChannel> binderFactory;
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "test");
private final String randomGroup = UUID.randomUUID().toString();
@@ -77,25 +67,39 @@ public class RabbitAndRedisBinderApplicationTests {
}
@Test
public void contextLoads() {
public void contextLoads() throws Exception {
// passing connection arguments arguments to the embedded Kafka instance
ConfigurableApplicationContext context = SpringApplication.run(MultibinderApplication.class,
"--spring.cloud.stream.kafka.binder.brokers=" + kafkaEmbedded.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + kafkaEmbedded.getZookeeperConnectionString());
context.close();
}
@Test
public void messagingWorks() {
public void messagingWorks() throws Exception {
// passing connection arguments arguments to the embedded Kafka instance
ConfigurableApplicationContext context = SpringApplication.run(MultibinderApplication.class,
"--spring.cloud.stream.kafka.binder.brokers=" + kafkaEmbedded.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + kafkaEmbedded.getZookeeperConnectionString(),
"--spring.cloud.stream.bindings.output.producer.requiredGroups=" + this.randomGroup);
DirectChannel dataProducer = new DirectChannel();
((RedisMessageChannelBinder)binderFactory.getBinder("redis"))
.bindProducer("dataIn", dataProducer, new ExtendedProducerProperties<>(new ProducerProperties()));
BinderFactory<?> binderFactory = context.getBean(BinderFactory.class);
QueueChannel dataConsumer = new QueueChannel();
((RabbitMessageChannelBinder)binderFactory.getBinder("rabbit")).bindConsumer("dataOut", this.randomGroup,
((RabbitMessageChannelBinder) binderFactory.getBinder("rabbit")).bindConsumer("dataOut", this.randomGroup,
dataConsumer, new ExtendedConsumerProperties<>(new RabbitConsumerProperties()));
((KafkaMessageChannelBinder) binderFactory.getBinder("kafka"))
.bindProducer("dataIn", dataProducer, new ExtendedProducerProperties<>(new KafkaProducerProperties()));
String testPayload = "testFoo" + this.randomGroup;
dataProducer.send(MessageBuilder.withPayload(testPayload).build());
Message<?> receive = dataConsumer.receive(2000);
Message<?> receive = dataConsumer.receive(10000);
Assert.assertThat(receive, Matchers.notNullValue());
Assert.assertThat(receive.getPayload(), CoreMatchers.equalTo(testPayload));
context.close();
}
}

81
pom.xml
View File

@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
<packaging>pom</packaging>
<url>https://github.com/spring-cloud/spring-cloud-stream-samples</url>
<organization>
@@ -13,11 +13,11 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.1.1.RELEASE</version>
<version>1.2.0.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-cloud-stream.version>1.0.2.RELEASE</spring-cloud-stream.version>
<spring-cloud-stream.version>Brooklyn.BUILD-SNAPSHOT</spring-cloud-stream.version>
<java.version>1.8</java.version>
</properties>
<modules>
@@ -30,6 +30,7 @@
<module>rxjava-processor</module>
<module>multi-io</module>
<module>stream-listener</module>
<module>reactive-processor-kafka</module>
</modules>
<dependencyManagement>
<dependencies>
@@ -43,22 +44,17 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-source</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-sink</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-sample-transform</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<version>${spring-cloud-stream.version}</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>
@@ -75,4 +71,67 @@
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
<id>spring</id>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>http://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>http://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>http://repo.spring.io/release</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>http://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>http://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>http://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</profile>
</profiles>
</project>

View File

View File

@@ -0,0 +1,62 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-cloud-stream-samples</artifactId>
<groupId>org.springframework.cloud</groupId>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>reactive-processor-kafka</artifactId>
<properties>
<java.version>1.8</java.version>
<start-class>reactive.kafka.ReactiveKafkaProcessorApplication</start-class>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.1.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>exec</classifier>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,32 @@
package reactive.kafka;
import java.time.Duration;
import reactor.core.publisher.Flux;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
@SpringBootApplication
@EnableBinding(Processor.class)
public class ReactiveKafkaProcessorApplication {
public static void main(String[] args) {
SpringApplication.run(ReactiveKafkaProcessorApplication.class, args);
}
@StreamListener
@Output(Processor.OUTPUT)
public Flux<String> toUpperCase(@Input(Processor.INPUT) Flux<String> inbound) {
return inbound.
log()
.window(Duration.ofSeconds(10), Duration.ofSeconds(5))
.flatMap(w -> w.reduce("", (s1,s2)->s1+s2))
.log();
}
}

View File

@@ -0,0 +1,10 @@
server:
port: 8082
spring:
cloud:
stream:
bindings:
output:
destination: transformed
input:
destination: testtock

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -24,7 +24,6 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-rxjava</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -5,7 +5,7 @@ spring:
stream:
bindings:
input:
destination: testtock
destination: transformed
# uncomment below to have this module consume from a specific partition
# in the range of 0 to N-1, where N is the upstream module's partitionCount
#consumerProperties:

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>
@@ -31,10 +31,6 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -0,0 +1,33 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
/**
* @author Marius Bogoevici
*/
public class Bar {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

View File

@@ -1,101 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
import org.springframework.cloud.stream.converter.AbstractFromMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.util.MimeType;
/**
* @author Ilayaperumal Gopinathan
*/
@Configuration
public class Converters {
//Register custom converter
@Bean
public AbstractFromMessageConverter fooConverter() {
return new FooToBarConverter();
}
public static class Foo {
private String value = "foo";
public String getValue() {
return this.value;
}
public void setValue(String value) {
this.value = value;
}
}
public static class Bar {
private String value = "init";
public Bar(String value) {
this.value = value;
}
public String getValue() {
return this.value;
}
public void setValue(String value) {
this.value = value;
}
}
public static class FooToBarConverter extends AbstractFromMessageConverter {
public FooToBarConverter() {
super(MimeType.valueOf("test/bar"));
}
@Override
protected Class<?>[] supportedTargetTypes() {
return new Class[] {Bar.class};
}
@Override
protected Class<?>[] supportedPayloadTypes() {
return new Class<?>[] {Foo.class};
}
@Override
public Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object result = null;
try {
if (message.getPayload() instanceof Foo) {
Foo fooPayload = (Foo) message.getPayload();
result = new Bar(fooPayload.getValue());
}
}
catch (Exception e) {
logger.error(e.getMessage(), e);
throw new MessageConversionException(e.getMessage());
}
return result;
}
}
}

View File

@@ -0,0 +1,33 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package demo;
/**
* @author Marius Bogoevici
*/
public class Foo {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

View File

@@ -29,11 +29,11 @@ public class SampleSink {
// Sink application definition
@StreamListener(Sink.SAMPLE)
public void receive(Converters.Bar barMessage) {
public void receive(Foo fooMessage) {
System.out.println("******************");
System.out.println("At the Sink");
System.out.println("******************");
System.out.println("Received transformed message " + barMessage.getValue() + " of type " + barMessage.getClass());
System.out.println("Received transformed message " + fooMessage.getValue() + " of type " + fooMessage.getClass());
}
public interface Sink {

View File

@@ -24,7 +24,9 @@ import org.springframework.integration.annotation.Poller;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
/**
* @author Ilayaperumal Gopinathan
@@ -40,10 +42,9 @@ public class SampleSource {
System.out.println("******************");
System.out.println("At the Source");
System.out.println("******************");
Converters.Foo foo = new Converters.Foo();
foo.setValue("hi");
System.out.println("Sending value: " + foo.getValue() + " of type " + foo.getClass());
return new GenericMessage(foo);
String value = "{\"value\":\"hi\"}";
System.out.println("Sending value: " + value);
return MessageBuilder.withPayload(value).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build();
}
};
}

View File

@@ -33,7 +33,7 @@ public class SampleTransformer {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Converters.Bar receive(Converters.Bar barMessage) {
public Bar receive(Bar barMessage) {
System.out.println("******************");
System.out.println("At the transformer");
System.out.println("******************");

View File

@@ -10,5 +10,6 @@ spring:
destination: testtock
output:
destination: xformed
content-type: application/json
sample-sink:
destination: xformed

View File

@@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-samples</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<version>1.1.0.BUILD-SNAPSHOT</version>
</parent>
<properties>