Update stream samples to latest model

* Update Stream applicaiton Kafka samples to use the latest functional model
* Use the new test binder to test the components

Resolves https://github.com/spring-cloud/spring-cloud-dataflow-samples/issues/131
This commit is contained in:
Soby Chacko
2020-02-26 19:13:46 -05:00
parent 158f455ffc
commit 2b3d2b6320
13 changed files with 183 additions and 134 deletions

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<version>2.2.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
@@ -22,7 +22,7 @@
<docker.org>springcloudstream</docker.org>
<docker.version>${project.version}</docker.version>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.SR1</spring-cloud.version>
<spring-cloud.version>Hoxton.SR2</spring-cloud.version>
</properties>
<dependencyManagement>

View File

@@ -45,17 +45,27 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

View File

@@ -1,20 +1,22 @@
package io.spring.dataflow.sample.usagecostlogger;
import java.util.function.Consumer;
import io.spring.dataflow.sample.UsageCostDetail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding(Sink.class)
@Configuration
public class UsageCostLogger {
private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class);
@StreamListener(Sink.INPUT)
public void process(UsageCostDetail usageCostDetail) {
logger.info(usageCostDetail.toString());
@Bean
public Consumer<UsageCostDetail> process() {
return usageCostDetail -> {
logger.info(usageCostDetail.toString());
};
}
}

View File

@@ -1 +1,2 @@
spring.cloud.stream.function.bindings.process-in-0=input
spring.cloud.stream.bindings.input.destination=usage-cost

View File

@@ -1,53 +1,55 @@
package io.spring.dataflow.sample.usagecostlogger;
import java.util.HashMap;
import java.util.Map;
import io.spring.dataflow.sample.UsageCostDetail;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ExtendWith(OutputCaptureExtension.class)
public class UsageCostLoggerApplicationTests {
@Autowired
protected Sink sink;
@Autowired
protected UsageCostLogger usageCostLogger;
@Test
public void contextLoads() {
}
@Test
public void testUsageCostLogger() throws Exception {
ArgumentCaptor<UsageCostDetail> captor = ArgumentCaptor.forClass(UsageCostDetail.class);
this.sink.input().send(MessageBuilder.withPayload("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}").build());
verify(this.usageCostLogger).process(captor.capture());
}
public void testUsageCostLogger(CapturedOutput output) {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(UsageCostLoggerApplication.class))
.web(WebApplicationType.NONE)
.run()) {
@EnableAutoConfiguration
@EnableBinding(Sink.class)
static class TestConfig {
InputDestination source = context.getBean(InputDestination.class);
// Override `UsageCostLogger` bean for spying.
@Bean
@Primary
public UsageCostLogger usageCostLogger() {
return spy(new UsageCostLogger());
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId("user1");
usageCostDetail.setCallCost(3.0);
usageCostDetail.setDataCost(5.0);
final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
final Message<?> message = converter.toMessage(usageCostDetail, messageHeaders);
source.send(message);
Awaitility.await().until(output::getOut, value -> value.contains("{\"userId\": \"user1\", \"callCost\": \"3.0\", \"dataCost\": \"5.0\" }"));
}
}
}

View File

@@ -45,12 +45,9 @@
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
</dependencies>

View File

@@ -1,27 +1,27 @@
package io.spring.dataflow.sample.usagecostprocessor;
import java.util.function.Function;
import io.spring.dataflow.sample.UsageCostDetail;
import io.spring.dataflow.sample.UsageDetail;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;
@EnableBinding(Processor.class)
@Configuration
public class UsageCostProcessor {
private double ratePerSecond = 0.1;
private double ratePerMB = 0.05;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public UsageCostDetail processUsageCost(UsageDetail usageDetail) {
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId(usageDetail.getUserId());
usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
return usageCostDetail;
@Bean
public Function<UsageDetail, UsageCostDetail> processUsageCost() {
return usageDetail -> {
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId(usageDetail.getUserId());
usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
return usageCostDetail;
};
}
}

View File

@@ -1,2 +1,4 @@
spring.cloud.stream.function.bindings.processUsageCost-in-0=input
spring.cloud.stream.function.bindings.processUsageCost-out-0=output
spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.output.destination=usage-cost

View File

@@ -1,16 +1,60 @@
package io.spring.dataflow.sample.usagecostprocessor;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;
import io.spring.dataflow.sample.UsageCostDetail;
import io.spring.dataflow.sample.UsageDetail;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest
public class UsageCostProcessorApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageCostProcessor() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
UsageCostProcessorApplication.class)).web(WebApplicationType.NONE)
.run()) {
InputDestination source = context.getBean(InputDestination.class);
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId("user1");
usageDetail.setDuration(30L);
usageDetail.setData(100L);
final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
final Message<?> message = converter.toMessage(usageDetail, messageHeaders);
source.send(message);
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000);
final UsageCostDetail usageCostDetail = (UsageCostDetail) converter
.fromMessage(sourceMessage, UsageCostDetail.class);
assertThat(usageCostDetail.getCallCost()).isEqualTo(3.0);
assertThat(usageCostDetail.getDataCost()).isEqualTo(5.0);
}
}
}

View File

@@ -43,14 +43,12 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
</dependencies>

View File

@@ -1,31 +1,25 @@
package io.spring.dataflow.sample.usagedetailsender;
import java.util.Random;
import java.util.function.Supplier;
import io.spring.dataflow.sample.UsageDetail;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@EnableScheduling
@EnableBinding(Source.class)
@Configuration
public class UsageDetailSender {
@Autowired
private Source source;
private String[] users = {"user1", "user2", "user3", "user4", "user5"};
@Scheduled(fixedDelay = 1000)
public void sendEvents() {
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId(this.users[new Random().nextInt(5)]);
usageDetail.setDuration(new Random().nextInt(300));
usageDetail.setData(new Random().nextInt(700));
this.source.output().send(MessageBuilder.withPayload(usageDetail).build());
@Bean
public Supplier<UsageDetail> sendEvents() {
return () -> {
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId(this.users[new Random().nextInt(5)]);
usageDetail.setDuration(new Random().nextInt(300));
usageDetail.setData(new Random().nextInt(700));
return usageDetail;
};
}
}

View File

@@ -1 +1,2 @@
spring.cloud.stream.function.bindings.sendEvents-out-0=output
spring.cloud.stream.bindings.output.destination=usage-detail

View File

@@ -1,44 +1,42 @@
package io.spring.dataflow.sample.usagedetailsender;
import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.spring.dataflow.sample.UsageDetail;
import org.json.JSONObject;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.Assert;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class UsageDetailSenderApplicationTests {
@Autowired
private MessageCollector messageCollector;
@Autowired
private Source source;
@Test
public void contextLoads() {
}
@Test
public void testUsageDetailSender() throws Exception {
Message message = this.messageCollector.forChannel(this.source.output()).poll(1, TimeUnit.SECONDS);
String usageDetailJSON = message.getPayload().toString();
assertTrue(usageDetailJSON.contains("userId"));
assertTrue(usageDetailJSON.contains("duration"));
assertTrue(usageDetailJSON.contains("data"));
}
public void testUsageDetailSender() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(UsageDetailSenderApplication.class))
.web(WebApplicationType.NONE)
.run()) {
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000);
final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
UsageDetail usageDetail = (UsageDetail) converter
.fromMessage(sourceMessage, UsageDetail.class);
assertThat(usageDetail.getUserId()).isBetween("user1", "user5");
assertThat(usageDetail.getData()).isBetween(0L, 700L);
assertThat(usageDetail.getDuration()).isBetween(0L, 300L);
}
}
}