From 2b3d2b63204c4a20ac8d813ea33c5fa87241c528 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 26 Feb 2020 19:13:46 -0500 Subject: [PATCH] Update stream samples to latest model * Update Stream applicaiton Kafka samples to use the latest functional model * Use the new test binder to test the components Resolves https://github.com/spring-cloud/spring-cloud-dataflow-samples/issues/131 --- .../streams/standalone-stream-kafka/pom.xml | 4 +- .../usage-cost-logger/pom.xml | 24 ++++-- .../usagecostlogger/UsageCostLogger.java | 18 +++-- .../src/main/resources/application.properties | 1 + .../UsageCostLoggerApplicationTests.java | 76 ++++++++++--------- .../usage-cost-processor/pom.xml | 9 +-- .../UsageCostProcessor.java | 28 +++---- .../src/main/resources/application.properties | 2 + .../UsageCostProcessorApplicationTests.java | 56 ++++++++++++-- .../usage-detail-sender/pom.xml | 10 +-- .../usagedetailsender/UsageDetailSender.java | 32 ++++---- .../src/main/resources/application.properties | 1 + .../UsageDetailSenderApplicationTests.java | 56 +++++++------- 13 files changed, 183 insertions(+), 134 deletions(-) diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/pom.xml b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/pom.xml index 1a726f8..b05ba98 100644 --- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/pom.xml +++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/pom.xml @@ -9,7 +9,7 @@ org.springframework.boot spring-boot-starter-parent - 2.1.4.RELEASE + 2.2.4.RELEASE @@ -22,7 +22,7 @@ springcloudstream ${project.version} 1.8 - Greenwich.SR1 + Hoxton.SR2 diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/pom.xml b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/pom.xml index 90c3f0f..4a58fb9 100644 --- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/pom.xml +++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/pom.xml @@ -45,17 +45,27 @@ test - org.springframework.cloud - spring-cloud-stream-test-support + org.springframework.boot + spring-boot-starter-test test - org.springframework.kafka - spring-kafka-test + org.springframework.cloud + spring-cloud-stream + test-jar + test-binder test + + org.awaitility + awaitility + test + + + junit + junit + + + - - - diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLogger.java b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLogger.java index 7ddd7ed..a2eedb9 100644 --- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLogger.java +++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLogger.java @@ -1,20 +1,22 @@ package io.spring.dataflow.sample.usagecostlogger; +import java.util.function.Consumer; + import io.spring.dataflow.sample.UsageCostDetail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.messaging.Sink; - -@EnableBinding(Sink.class) +@Configuration public class UsageCostLogger { private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class); - @StreamListener(Sink.INPUT) - public void process(UsageCostDetail usageCostDetail) { - logger.info(usageCostDetail.toString()); + @Bean + public Consumer process() { + return usageCostDetail -> { + logger.info(usageCostDetail.toString()); + }; } } \ No newline at end of file diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/resources/application.properties b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/resources/application.properties index f44486f..4b70e74 100644 --- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/resources/application.properties +++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/main/resources/application.properties @@ -1 +1,2 @@ +spring.cloud.stream.function.bindings.process-in-0=input spring.cloud.stream.bindings.input.destination=usage-cost diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/test/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLoggerApplicationTests.java b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/test/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLoggerApplicationTests.java index bd8a599..cd09948 100644 --- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/test/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLoggerApplicationTests.java +++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-logger/src/test/java/io/spring/dataflow/sample/usagecostlogger/UsageCostLoggerApplicationTests.java @@ -1,53 +1,55 @@ package io.spring.dataflow.sample.usagecostlogger; +import java.util.HashMap; +import java.util.Map; + import io.spring.dataflow.sample.UsageCostDetail; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.boot.test.system.CapturedOutput; +import org.springframework.boot.test.system.OutputCaptureExtension; +import org.springframework.cloud.stream.binder.test.InputDestination; +import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.MessageConverter; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.messaging.Sink; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.test.context.junit4.SpringRunner; - -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; - -@RunWith(SpringRunner.class) -@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@ExtendWith(OutputCaptureExtension.class) public class UsageCostLoggerApplicationTests { - @Autowired - protected Sink sink; - - @Autowired - protected UsageCostLogger usageCostLogger; - @Test public void contextLoads() { } @Test - public void testUsageCostLogger() throws Exception { - ArgumentCaptor captor = ArgumentCaptor.forClass(UsageCostDetail.class); - this.sink.input().send(MessageBuilder.withPayload("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}").build()); - verify(this.usageCostLogger).process(captor.capture()); - } + public void testUsageCostLogger(CapturedOutput output) { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration + .getCompleteConfiguration(UsageCostLoggerApplication.class)) + .web(WebApplicationType.NONE) + .run()) { - @EnableAutoConfiguration - @EnableBinding(Sink.class) - static class TestConfig { + InputDestination source = context.getBean(InputDestination.class); - // Override `UsageCostLogger` bean for spying. - @Bean - @Primary - public UsageCostLogger usageCostLogger() { - return spy(new UsageCostLogger()); + UsageCostDetail usageCostDetail = new UsageCostDetail(); + usageCostDetail.setUserId("user1"); + usageCostDetail.setCallCost(3.0); + usageCostDetail.setDataCost(5.0); + + final MessageConverter converter = context.getBean(CompositeMessageConverter.class); + Map headers = new HashMap<>(); + headers.put("contentType", "application/json"); + MessageHeaders messageHeaders = new MessageHeaders(headers); + final Message message = converter.toMessage(usageCostDetail, messageHeaders); + + source.send(message); + + Awaitility.await().until(output::getOut, value -> value.contains("{\"userId\": \"user1\", \"callCost\": \"3.0\", \"dataCost\": \"5.0\" }")); } } } diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/pom.xml b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/pom.xml index 0c6e34e..8a4c0b6 100644 --- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/pom.xml +++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/pom.xml @@ -45,12 +45,9 @@ org.springframework.cloud - spring-cloud-stream-test-support - test - - - org.springframework.kafka - spring-kafka-test + spring-cloud-stream + test-jar + test-binder test diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessor.java b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessor.java index 327e438..181374f 100644 --- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessor.java +++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessor.java @@ -1,27 +1,27 @@ package io.spring.dataflow.sample.usagecostprocessor; +import java.util.function.Function; + import io.spring.dataflow.sample.UsageCostDetail; import io.spring.dataflow.sample.UsageDetail; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.annotation.StreamListener; -import org.springframework.cloud.stream.messaging.Processor; -import org.springframework.messaging.handler.annotation.SendTo; - -@EnableBinding(Processor.class) +@Configuration public class UsageCostProcessor { private double ratePerSecond = 0.1; private double ratePerMB = 0.05; - @StreamListener(Processor.INPUT) - @SendTo(Processor.OUTPUT) - public UsageCostDetail processUsageCost(UsageDetail usageDetail) { - UsageCostDetail usageCostDetail = new UsageCostDetail(); - usageCostDetail.setUserId(usageDetail.getUserId()); - usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond); - usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB); - return usageCostDetail; + @Bean + public Function processUsageCost() { + return usageDetail -> { + UsageCostDetail usageCostDetail = new UsageCostDetail(); + usageCostDetail.setUserId(usageDetail.getUserId()); + usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond); + usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB); + return usageCostDetail; + }; } } diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/resources/application.properties b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/resources/application.properties index 4ded97a..03e7ec0 100644 --- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/resources/application.properties +++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/main/resources/application.properties @@ -1,2 +1,4 @@ +spring.cloud.stream.function.bindings.processUsageCost-in-0=input +spring.cloud.stream.function.bindings.processUsageCost-out-0=output spring.cloud.stream.bindings.input.destination=usage-detail spring.cloud.stream.bindings.output.destination=usage-cost diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/test/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessorApplicationTests.java b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/test/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessorApplicationTests.java index 3f3b37b..61be6ff 100644 --- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/test/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessorApplicationTests.java +++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-cost-processor/src/test/java/io/spring/dataflow/sample/usagecostprocessor/UsageCostProcessorApplicationTests.java @@ -1,16 +1,60 @@ package io.spring.dataflow.sample.usagecostprocessor; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.test.context.junit4.SpringRunner; +import java.util.HashMap; +import java.util.Map; + +import io.spring.dataflow.sample.UsageCostDetail; +import io.spring.dataflow.sample.UsageDetail; +import org.junit.jupiter.api.Test; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.stream.binder.test.InputDestination; +import org.springframework.cloud.stream.binder.test.OutputDestination; +import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.MessageConverter; + +import static org.assertj.core.api.Assertions.assertThat; -@RunWith(SpringRunner.class) -@SpringBootTest public class UsageCostProcessorApplicationTests { @Test public void contextLoads() { } + @Test + public void testUsageCostProcessor() { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration( + UsageCostProcessorApplication.class)).web(WebApplicationType.NONE) + .run()) { + + InputDestination source = context.getBean(InputDestination.class); + + UsageDetail usageDetail = new UsageDetail(); + usageDetail.setUserId("user1"); + usageDetail.setDuration(30L); + usageDetail.setData(100L); + + final MessageConverter converter = context.getBean(CompositeMessageConverter.class); + Map headers = new HashMap<>(); + headers.put("contentType", "application/json"); + MessageHeaders messageHeaders = new MessageHeaders(headers); + final Message message = converter.toMessage(usageDetail, messageHeaders); + + source.send(message); + + OutputDestination target = context.getBean(OutputDestination.class); + Message sourceMessage = target.receive(10000); + + final UsageCostDetail usageCostDetail = (UsageCostDetail) converter + .fromMessage(sourceMessage, UsageCostDetail.class); + + assertThat(usageCostDetail.getCallCost()).isEqualTo(3.0); + assertThat(usageCostDetail.getDataCost()).isEqualTo(5.0); + } + } } diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/pom.xml b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/pom.xml index 4c80245..589ac63 100644 --- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/pom.xml +++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/pom.xml @@ -43,14 +43,12 @@ spring-boot-starter-test test + org.springframework.cloud - spring-cloud-stream-test-support - test - - - org.springframework.kafka - spring-kafka-test + spring-cloud-stream + test-jar + test-binder test diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSender.java b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSender.java index cd3aafb..1eb7393 100644 --- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSender.java +++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSender.java @@ -1,31 +1,25 @@ package io.spring.dataflow.sample.usagedetailsender; import java.util.Random; +import java.util.function.Supplier; import io.spring.dataflow.sample.UsageDetail; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.stream.annotation.EnableBinding; -import org.springframework.cloud.stream.messaging.Source; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.annotation.Scheduled; - -@EnableScheduling -@EnableBinding(Source.class) +@Configuration public class UsageDetailSender { - @Autowired - private Source source; - private String[] users = {"user1", "user2", "user3", "user4", "user5"}; - @Scheduled(fixedDelay = 1000) - public void sendEvents() { - UsageDetail usageDetail = new UsageDetail(); - usageDetail.setUserId(this.users[new Random().nextInt(5)]); - usageDetail.setDuration(new Random().nextInt(300)); - usageDetail.setData(new Random().nextInt(700)); - this.source.output().send(MessageBuilder.withPayload(usageDetail).build()); + @Bean + public Supplier sendEvents() { + return () -> { + UsageDetail usageDetail = new UsageDetail(); + usageDetail.setUserId(this.users[new Random().nextInt(5)]); + usageDetail.setDuration(new Random().nextInt(300)); + usageDetail.setData(new Random().nextInt(700)); + return usageDetail; + }; } } diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/resources/application.properties b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/resources/application.properties index 98f2de8..70311f6 100644 --- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/resources/application.properties +++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/main/resources/application.properties @@ -1 +1,2 @@ +spring.cloud.stream.function.bindings.sendEvents-out-0=output spring.cloud.stream.bindings.output.destination=usage-detail diff --git a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/test/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSenderApplicationTests.java b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/test/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSenderApplicationTests.java index c90e139..c1a8cdc 100644 --- a/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/test/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSenderApplicationTests.java +++ b/dataflow-website/stream-developer-guides/streams/standalone-stream-kafka/usage-detail-sender/src/test/java/io/spring/dataflow/sample/usagedetailsender/UsageDetailSenderApplicationTests.java @@ -1,44 +1,42 @@ package io.spring.dataflow.sample.usagedetailsender; -import java.util.concurrent.TimeUnit; - -import com.fasterxml.jackson.databind.ObjectMapper; import io.spring.dataflow.sample.UsageDetail; -import org.json.JSONObject; -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.cloud.stream.messaging.Source; -import org.springframework.cloud.stream.test.binder.MessageCollector; +import org.junit.jupiter.api.Test; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.stream.binder.test.OutputDestination; +import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.messaging.Message; -import org.springframework.test.context.junit4.SpringRunner; -import org.springframework.util.Assert; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.MessageConverter; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; -@RunWith(SpringRunner.class) -@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class UsageDetailSenderApplicationTests { - @Autowired - private MessageCollector messageCollector; - - @Autowired - private Source source; - @Test public void contextLoads() { } @Test - public void testUsageDetailSender() throws Exception { - Message message = this.messageCollector.forChannel(this.source.output()).poll(1, TimeUnit.SECONDS); - String usageDetailJSON = message.getPayload().toString(); - assertTrue(usageDetailJSON.contains("userId")); - assertTrue(usageDetailJSON.contains("duration")); - assertTrue(usageDetailJSON.contains("data")); - } + public void testUsageDetailSender() { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration + .getCompleteConfiguration(UsageDetailSenderApplication.class)) + .web(WebApplicationType.NONE) + .run()) { + OutputDestination target = context.getBean(OutputDestination.class); + Message sourceMessage = target.receive(10000); + + final MessageConverter converter = context.getBean(CompositeMessageConverter.class); + UsageDetail usageDetail = (UsageDetail) converter + .fromMessage(sourceMessage, UsageDetail.class); + + assertThat(usageDetail.getUserId()).isBetween("user1", "user5"); + assertThat(usageDetail.getData()).isBetween(0L, 700L); + assertThat(usageDetail.getDuration()).isBetween(0L, 300L); + } + } }