diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/pom.xml b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/pom.xml
index 1a726f8..b05ba98 100644
--- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/pom.xml
+++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/pom.xml
@@ -9,7 +9,7 @@
org.springframework.boot
spring-boot-starter-parent
- 2.1.4.RELEASE
+ 2.2.4.RELEASE
@@ -22,7 +22,7 @@
springcloudstream
${project.version}
1.8
- Greenwich.SR1
+ Hoxton.SR2
diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/pom.xml b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/pom.xml
index 90c3f0f..4a58fb9 100644
--- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/pom.xml
+++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/pom.xml
@@ -45,17 +45,27 @@
test
- org.springframework.cloud
- spring-cloud-stream-test-support
+ org.springframework.boot
+ spring-boot-starter-test
test
- org.springframework.kafka
- spring-kafka-test
+ org.springframework.cloud
+ spring-cloud-stream
+ test-jar
+ test-binder
test
+
+ org.awaitility
+ awaitility
+ test
+
+
+ junit
+ junit
+
+
+
-
-
-
diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLogger.java b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLogger.java
index 7ddd7ed..a2eedb9 100644
--- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLogger.java
+++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLogger.java
@@ -1,20 +1,22 @@
package io.spring.dataflow.sample.usagecostlogger;
+import java.util.function.Consumer;
+
import io.spring.dataflow.sample.UsageCostDetail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
-import org.springframework.cloud.stream.annotation.EnableBinding;
-import org.springframework.cloud.stream.annotation.StreamListener;
-import org.springframework.cloud.stream.messaging.Sink;
-
-@EnableBinding(Sink.class)
+@Configuration
public class UsageCostLogger {
private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class);
- @StreamListener(Sink.INPUT)
- public void process(UsageCostDetail usageCostDetail) {
- logger.info(usageCostDetail.toString());
+ @Bean
+ public Consumer process() {
+ return usageCostDetail -> {
+ logger.info(usageCostDetail.toString());
+ };
}
}
\ No newline at end of file
diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/resources/application.properties b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/resources/application.properties
index f44486f..4b70e74 100644
--- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/resources/application.properties
+++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/resources/application.properties
@@ -1 +1,2 @@
+spring.cloud.stream.function.bindings.process-in-0=input
spring.cloud.stream.bindings.input.destination=usage-cost
diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/test/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLoggerApplicationTests.java b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/test/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLoggerApplicationTests.java
index bd8a599..cd09948 100644
--- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/test/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLoggerApplicationTests.java
+++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/test/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLoggerApplicationTests.java
@@ -1,53 +1,55 @@
package io.spring.dataflow.sample.usagecostlogger;
+import java.util.HashMap;
+import java.util.Map;
+
import io.spring.dataflow.sample.UsageCostDetail;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.boot.test.system.CapturedOutput;
+import org.springframework.boot.test.system.OutputCaptureExtension;
+import org.springframework.cloud.stream.binder.test.InputDestination;
+import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.converter.CompositeMessageConverter;
+import org.springframework.messaging.converter.MessageConverter;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.cloud.stream.annotation.EnableBinding;
-import org.springframework.cloud.stream.messaging.Sink;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
-import org.springframework.messaging.support.MessageBuilder;
-import org.springframework.test.context.junit4.SpringRunner;
-
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-
-@RunWith(SpringRunner.class)
-@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+@ExtendWith(OutputCaptureExtension.class)
public class UsageCostLoggerApplicationTests {
- @Autowired
- protected Sink sink;
-
- @Autowired
- protected UsageCostLogger usageCostLogger;
-
@Test
public void contextLoads() {
}
@Test
- public void testUsageCostLogger() throws Exception {
- ArgumentCaptor captor = ArgumentCaptor.forClass(UsageCostDetail.class);
- this.sink.input().send(MessageBuilder.withPayload("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}").build());
- verify(this.usageCostLogger).process(captor.capture());
- }
+ public void testUsageCostLogger(CapturedOutput output) {
+ try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
+ TestChannelBinderConfiguration
+ .getCompleteConfiguration(UsageCostLoggerApplication.class))
+ .web(WebApplicationType.NONE)
+ .run()) {
- @EnableAutoConfiguration
- @EnableBinding(Sink.class)
- static class TestConfig {
+ InputDestination source = context.getBean(InputDestination.class);
- // Override `UsageCostLogger` bean for spying.
- @Bean
- @Primary
- public UsageCostLogger usageCostLogger() {
- return spy(new UsageCostLogger());
+ UsageCostDetail usageCostDetail = new UsageCostDetail();
+ usageCostDetail.setUserId("user1");
+ usageCostDetail.setCallCost(3.0);
+ usageCostDetail.setDataCost(5.0);
+
+ final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
+ Map headers = new HashMap<>();
+ headers.put("contentType", "application/json");
+ MessageHeaders messageHeaders = new MessageHeaders(headers);
+ final Message> message = converter.toMessage(usageCostDetail, messageHeaders);
+
+ source.send(message);
+
+ Awaitility.await().until(output::getOut, value -> value.contains("{\"userId\": \"user1\", \"callCost\": \"3.0\", \"dataCost\": \"5.0\" }"));
}
}
}
diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/pom.xml b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/pom.xml
index 0c6e34e..8a4c0b6 100644
--- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/pom.xml
+++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/pom.xml
@@ -45,12 +45,9 @@
org.springframework.cloud
- spring-cloud-stream-test-support
- test
-
-
- org.springframework.kafka
- spring-kafka-test
+ spring-cloud-stream
+ test-jar
+ test-binder
test
diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessor.java b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessor.java
index 327e438..181374f 100644
--- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessor.java
+++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessor.java
@@ -1,27 +1,27 @@
package io.spring.dataflow.sample.usagecostprocessor;
+import java.util.function.Function;
+
import io.spring.dataflow.sample.UsageCostDetail;
import io.spring.dataflow.sample.UsageDetail;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
-import org.springframework.cloud.stream.annotation.EnableBinding;
-import org.springframework.cloud.stream.annotation.StreamListener;
-import org.springframework.cloud.stream.messaging.Processor;
-import org.springframework.messaging.handler.annotation.SendTo;
-
-@EnableBinding(Processor.class)
+@Configuration
public class UsageCostProcessor {
private double ratePerSecond = 0.1;
private double ratePerMB = 0.05;
- @StreamListener(Processor.INPUT)
- @SendTo(Processor.OUTPUT)
- public UsageCostDetail processUsageCost(UsageDetail usageDetail) {
- UsageCostDetail usageCostDetail = new UsageCostDetail();
- usageCostDetail.setUserId(usageDetail.getUserId());
- usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
- usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
- return usageCostDetail;
+ @Bean
+ public Function processUsageCost() {
+ return usageDetail -> {
+ UsageCostDetail usageCostDetail = new UsageCostDetail();
+ usageCostDetail.setUserId(usageDetail.getUserId());
+ usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
+ usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
+ return usageCostDetail;
+ };
}
}
diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/resources/application.properties b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/resources/application.properties
index 4ded97a..03e7ec0 100644
--- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/resources/application.properties
+++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/resources/application.properties
@@ -1,2 +1,4 @@
+spring.cloud.stream.function.bindings.processUsageCost-in-0=input
+spring.cloud.stream.function.bindings.processUsageCost-out-0=output
spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.output.destination=usage-cost
diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/test/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessorApplicationTests.java b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/test/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessorApplicationTests.java
index 3f3b37b..61be6ff 100644
--- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/test/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessorApplicationTests.java
+++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/test/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessorApplicationTests.java
@@ -1,16 +1,60 @@
package io.spring.dataflow.sample.usagecostprocessor;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.junit4.SpringRunner;
+import java.util.HashMap;
+import java.util.Map;
+
+import io.spring.dataflow.sample.UsageCostDetail;
+import io.spring.dataflow.sample.UsageDetail;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.cloud.stream.binder.test.InputDestination;
+import org.springframework.cloud.stream.binder.test.OutputDestination;
+import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.converter.CompositeMessageConverter;
+import org.springframework.messaging.converter.MessageConverter;
+
+import static org.assertj.core.api.Assertions.assertThat;
-@RunWith(SpringRunner.class)
-@SpringBootTest
public class UsageCostProcessorApplicationTests {
@Test
public void contextLoads() {
}
+ @Test
+ public void testUsageCostProcessor() {
+ try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
+ TestChannelBinderConfiguration.getCompleteConfiguration(
+ UsageCostProcessorApplication.class)).web(WebApplicationType.NONE)
+ .run()) {
+
+ InputDestination source = context.getBean(InputDestination.class);
+
+ UsageDetail usageDetail = new UsageDetail();
+ usageDetail.setUserId("user1");
+ usageDetail.setDuration(30L);
+ usageDetail.setData(100L);
+
+ final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
+ Map headers = new HashMap<>();
+ headers.put("contentType", "application/json");
+ MessageHeaders messageHeaders = new MessageHeaders(headers);
+ final Message> message = converter.toMessage(usageDetail, messageHeaders);
+
+ source.send(message);
+
+ OutputDestination target = context.getBean(OutputDestination.class);
+ Message sourceMessage = target.receive(10000);
+
+ final UsageCostDetail usageCostDetail = (UsageCostDetail) converter
+ .fromMessage(sourceMessage, UsageCostDetail.class);
+
+ assertThat(usageCostDetail.getCallCost()).isEqualTo(3.0);
+ assertThat(usageCostDetail.getDataCost()).isEqualTo(5.0);
+ }
+ }
}
diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/pom.xml b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/pom.xml
index 4c80245..589ac63 100644
--- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/pom.xml
+++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/pom.xml
@@ -43,14 +43,12 @@
spring-boot-starter-test
test
+
org.springframework.cloud
- spring-cloud-stream-test-support
- test
-
-
- org.springframework.kafka
- spring-kafka-test
+ spring-cloud-stream
+ test-jar
+ test-binder
test
diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSender.java b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSender.java
index cd3aafb..1eb7393 100644
--- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSender.java
+++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSender.java
@@ -1,31 +1,25 @@
package io.spring.dataflow.sample.usagedetailsender;
import java.util.Random;
+import java.util.function.Supplier;
import io.spring.dataflow.sample.UsageDetail;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.cloud.stream.annotation.EnableBinding;
-import org.springframework.cloud.stream.messaging.Source;
-import org.springframework.messaging.support.MessageBuilder;
-import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
-
-@EnableScheduling
-@EnableBinding(Source.class)
+@Configuration
public class UsageDetailSender {
- @Autowired
- private Source source;
-
private String[] users = {"user1", "user2", "user3", "user4", "user5"};
- @Scheduled(fixedDelay = 1000)
- public void sendEvents() {
- UsageDetail usageDetail = new UsageDetail();
- usageDetail.setUserId(this.users[new Random().nextInt(5)]);
- usageDetail.setDuration(new Random().nextInt(300));
- usageDetail.setData(new Random().nextInt(700));
- this.source.output().send(MessageBuilder.withPayload(usageDetail).build());
+ @Bean
+ public Supplier sendEvents() {
+ return () -> {
+ UsageDetail usageDetail = new UsageDetail();
+ usageDetail.setUserId(this.users[new Random().nextInt(5)]);
+ usageDetail.setDuration(new Random().nextInt(300));
+ usageDetail.setData(new Random().nextInt(700));
+ return usageDetail;
+ };
}
}
diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/resources/application.properties b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/resources/application.properties
index 98f2de8..70311f6 100644
--- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/resources/application.properties
+++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/resources/application.properties
@@ -1 +1,2 @@
+spring.cloud.stream.function.bindings.sendEvents-out-0=output
spring.cloud.stream.bindings.output.destination=usage-detail
diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/test/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSenderApplicationTests.java b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/test/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSenderApplicationTests.java
index c90e139..c1a8cdc 100644
--- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/test/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSenderApplicationTests.java
+++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/test/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSenderApplicationTests.java
@@ -1,44 +1,42 @@
package io.spring.dataflow.sample.usagedetailsender;
-import java.util.concurrent.TimeUnit;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
import io.spring.dataflow.sample.UsageDetail;
-import org.json.JSONObject;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.cloud.stream.messaging.Source;
-import org.springframework.cloud.stream.test.binder.MessageCollector;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.WebApplicationType;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.cloud.stream.binder.test.OutputDestination;
+import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
+import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
-import org.springframework.test.context.junit4.SpringRunner;
-import org.springframework.util.Assert;
+import org.springframework.messaging.converter.CompositeMessageConverter;
+import org.springframework.messaging.converter.MessageConverter;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
-@RunWith(SpringRunner.class)
-@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class UsageDetailSenderApplicationTests {
- @Autowired
- private MessageCollector messageCollector;
-
- @Autowired
- private Source source;
-
@Test
public void contextLoads() {
}
@Test
- public void testUsageDetailSender() throws Exception {
- Message message = this.messageCollector.forChannel(this.source.output()).poll(1, TimeUnit.SECONDS);
- String usageDetailJSON = message.getPayload().toString();
- assertTrue(usageDetailJSON.contains("userId"));
- assertTrue(usageDetailJSON.contains("duration"));
- assertTrue(usageDetailJSON.contains("data"));
- }
+ public void testUsageDetailSender() {
+ try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
+ TestChannelBinderConfiguration
+ .getCompleteConfiguration(UsageDetailSenderApplication.class))
+ .web(WebApplicationType.NONE)
+ .run()) {
+ OutputDestination target = context.getBean(OutputDestination.class);
+ Message sourceMessage = target.receive(10000);
+
+ final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
+ UsageDetail usageDetail = (UsageDetail) converter
+ .fromMessage(sourceMessage, UsageDetail.class);
+
+ assertThat(usageDetail.getUserId()).isBetween("user1", "user5");
+ assertThat(usageDetail.getData()).isBetween(0L, 700L);
+ assertThat(usageDetail.getDuration()).isBetween(0L, 300L);
+ }
+ }
}