More structural changes to samples repo

Remove CF Acceptance tests
Convert local sample tests as e2e tests for the various sample apps
Polishing
This commit is contained in:
Soby Chacko
2018-08-17 21:02:30 -04:00
parent 1dabe82fc9
commit e2111652fd
48 changed files with 34 additions and 1416 deletions

View File

@@ -0,0 +1,78 @@
package sample.acceptance.tests;
import org.assertj.core.util.Files;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.stream.Stream;
import static org.junit.Assert.fail;
/**
* @author Soby Chacko
*/
public abstract class AbstractSampleTests {
private static final Logger logger = LoggerFactory.getLogger(AbstractSampleTests.class);
protected boolean waitForLogEntryInFile(String app, File f, String... entries) {
logger.info("Looking for '" + StringUtils.arrayToCommaDelimitedString(entries) + "' in logfile for " + app);
long timeout = System.currentTimeMillis() + (60 * 1000);
boolean exists = false;
while (!exists && System.currentTimeMillis() < timeout) {
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e.getMessage(), e);
}
logger.info("Polling to get log file. Remaining poll time = "
+ (timeout - System.currentTimeMillis() + " ms."));
String log = Files.contentOf(f, StandardCharsets.UTF_8);
if (log != null) {
if (Stream.of(entries).allMatch(log::contains)) {
exists = true;
}
}
}
if (exists) {
logger.info("Matched all '" + StringUtils.arrayToCommaDelimitedString(entries) + "' in logfile for app " + app);
} else {
logger.error("ERROR: Couldn't find all '" + StringUtils.arrayToCommaDelimitedString(entries) + "' in logfile for " + app);
fail("Could not find the test data in the logs");
}
return true;
}
protected boolean waitForLogEntryInFileWithoutFailing(String app, File f, String... entries) {
logger.info("Looking for '" + StringUtils.arrayToCommaDelimitedString(entries) + "' in logfile for " + app);
long timeout = System.currentTimeMillis() + (60 * 1000);
boolean exists = false;
while (!exists && System.currentTimeMillis() < timeout) {
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e.getMessage(), e);
}
logger.info("Polling to get log file. Remaining poll time = "
+ (timeout - System.currentTimeMillis() + " ms."));
String log = Files.contentOf(f, StandardCharsets.UTF_8);
if (log != null) {
if (Stream.of(entries).allMatch(log::contains)) {
exists = true;
}
}
}
if (exists) {
logger.info("Matched all '" + StringUtils.arrayToCommaDelimitedString(entries) + "' in logfile for app " + app);
}
return exists;
}
}

View File

@@ -0,0 +1,201 @@
package sample.acceptance.tests;
import org.assertj.core.util.Files;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.fail;
/**
* @author Soby Chacko
*/
public class PartitioningAcceptanceTests extends AbstractSampleTests {
private static final Logger logger = LoggerFactory.getLogger(PartitioningAcceptanceTests.class);
@Test
public void testPartitioningKafka() throws Exception {
Process producerProcess = null;
Process consumer1Process = null;
Process consumer2Process = null;
try {
ProcessBuilder producerProcessBuilder = new ProcessBuilder("java", "-jar", "/tmp/partitioning-producer-kafka.jar");
File producerFile = Files.newTemporaryFile();
logger.info("Output is redirected to " + producerFile.getAbsolutePath());
producerProcessBuilder.redirectOutput(producerFile);
producerProcess = producerProcessBuilder.start();
waitForLogEntryInFile("Partitioning producer", producerFile, "Started PartProducerApplication in");
ProcessBuilder consumer1Builder = new ProcessBuilder("java", "-jar", "/tmp/partitioning-consumer-kafka.jar", "--server.port=12001");
File consumer1File = Files.newTemporaryFile();
logger.info("Output is redirected to " + consumer1File.getAbsolutePath());
consumer1Builder.redirectOutput(consumer1File);
consumer1Process = consumer1Builder.start();
ProcessBuilder consumer2Builder = new ProcessBuilder("java", "-jar", "/tmp/partitioning-consumer-kafka.jar", "--server.port=12002");
File consumer2File = Files.newTemporaryFile();
logger.info("Output is redirected to " + consumer2File.getAbsolutePath());
consumer2Builder.redirectOutput(consumer2File);
consumer2Process = consumer2Builder.start();
Future<?> future1 = verifyPartitions("Partitioning Consumer-1", consumer1File, "Partitioning Consumer-2", consumer2File,
"f received from partition 0", "g received from partition 0", "h received from partition 0");
Future<?> future2 = verifyPartitions("Partitioning Consumer-1", consumer1File, "Partitioning Consumer-2", consumer2File,
"fo received from partition 1", "go received from partition 1", "ho received from partition 1");
Future<?> future3 = verifyPartitions("Partitioning Consumer-2",consumer2File, "Partitioning Consumer-1", consumer1File,
"foo received from partition 2", "goo received from partition 2", "hoo received from partition 2");
Future<?> future4 = verifyPartitions("Partitioning Consumer-2",consumer2File, "Partitioning Consumer-1", consumer1File,
"fooz received from partition 3", "gooz received from partition 3", "hooz received from partition 3");
verifyResults(future1, future2, future3, future4);
}
finally {
if (producerProcess != null) {
producerProcess.destroyForcibly();
}
if (consumer1Process != null) {
consumer1Process.destroyForcibly();
}
if (consumer2Process != null) {
consumer2Process.destroyForcibly();
}
}
}
@Test
public void testPartitioningRabbit() throws Exception {
Process producerProcess = null;
Process consumer1Process = null;
Process consumer2Process = null;
Process consumer3Process = null;
Process consumer4Process = null;
try {
ProcessBuilder producerProcessBuilder = new ProcessBuilder("java", "-jar", "/tmp/partitioning-producer-rabbit.jar");
File producerFile = Files.newTemporaryFile();
logger.info("Output is redirected to " + producerFile.getAbsolutePath());
producerProcessBuilder.redirectOutput(producerFile);
producerProcess = producerProcessBuilder.start();
waitForLogEntryInFile("Partitioning producer", producerFile, "Started PartProducerApplication in");
ProcessBuilder consumer1Builder = new ProcessBuilder("java", "-jar", "/tmp/partitioning-consumer-rabbit.jar", "--server.port=12003");
File consumer1File = Files.newTemporaryFile();
logger.info("Output is redirected to " + consumer1File.getAbsolutePath());
consumer1Builder.redirectOutput(consumer1File);
consumer1Process = consumer1Builder.start();
ProcessBuilder consumer2Builder = new ProcessBuilder("java", "-jar", "/tmp/partitioning-consumer-rabbit.jar", "--server.port=12004",
"--spring.cloud.stream.bindings.input.consumer.instanceIndex=1");
File consumer2File = Files.newTemporaryFile();
logger.info("Output is redirected to " + consumer2File.getAbsolutePath());
consumer2Builder.redirectOutput(consumer2File);
consumer2Process = consumer2Builder.start();
ProcessBuilder consumer3Builder = new ProcessBuilder("java", "-jar", "/tmp/partitioning-consumer-rabbit.jar", "--server.port=12005",
"--spring.cloud.stream.bindings.input.consumer.instanceIndex=2");
File consumer3File = Files.newTemporaryFile();
logger.info("Output is redirected to " + consumer3File.getAbsolutePath());
consumer3Builder.redirectOutput(consumer3File);
consumer3Process = consumer3Builder.start();
ProcessBuilder consumer4Builder = new ProcessBuilder("java", "-jar", "/tmp/partitioning-consumer-rabbit.jar", "--server.port=12006",
"--spring.cloud.stream.bindings.input.consumer.instanceIndex=3");
File consumer4File = Files.newTemporaryFile();
logger.info("Output is redirected to " + consumer4File.getAbsolutePath());
consumer4Builder.redirectOutput(consumer4File);
consumer4Process = consumer4Builder.start();
Future<?> future1 = verifyPartitions("Partitioning Consumer-1", consumer1File,
"f received from partition partitioned.destination.myGroup-0",
"g received from partition partitioned.destination.myGroup-0",
"h received from partition partitioned.destination.myGroup-0");
Future<?> future2 = verifyPartitions("Partitioning Consumer-2", consumer2File,
"fo received from partition partitioned.destination.myGroup-1",
"go received from partition partitioned.destination.myGroup-1",
"ho received from partition partitioned.destination.myGroup-1");
Future<?> future3 = verifyPartitions("Partitioning Consumer-3",consumer3File,
"foo received from partition partitioned.destination.myGroup-2",
"goo received from partition partitioned.destination.myGroup-2",
"hoo received from partition partitioned.destination.myGroup-2");
Future<?> future4 = verifyPartitions("Partitioning Consumer-4",consumer4File,
"fooz received from partition partitioned.destination.myGroup-3",
"gooz received from partition partitioned.destination.myGroup-3",
"hooz received from partition partitioned.destination.myGroup-3");
verifyResults(future1, future2, future3, future4);
}
finally {
if (producerProcess != null) {
producerProcess.destroyForcibly();
}
if (consumer1Process != null) {
consumer1Process.destroyForcibly();
}
if (consumer2Process != null) {
consumer2Process.destroyForcibly();
}
if (consumer3Process != null) {
consumer3Process.destroyForcibly();
}
if (consumer4Process != null) {
consumer4Process.destroyForcibly();
}
}
}
private Future<?> verifyPartitions(String consumer1Msg, File consumer1File,
String consumer2Msg, File consumer2File,
String... entries) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<?> submit = executorService.submit(() -> {
boolean found = waitForLogEntryInFileWithoutFailing(consumer1Msg, consumer1File, entries);
if (!found) {
found = waitForLogEntryInFileWithoutFailing(consumer2Msg, consumer2File, entries);
}
if (!found) {
fail("Could not find the test data in the logs");
}
});
executorService.shutdown();
return submit;
}
private Future<?> verifyPartitions(String consumer1Msg, File consumer1File,
String... entries) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<?> submit = executorService.submit(() -> {
boolean found = waitForLogEntryInFileWithoutFailing(consumer1Msg, consumer1File, entries);
if (!found) {
fail("Could not find the test data in the logs");
}
});
executorService.shutdown();
return submit;
}
private void verifyResults(Future<?>... futures) throws Exception {
for (Future<?> future : futures) {
try {
future.get();
}
catch (Exception e) {
throw e;
}
}
}
}

View File

@@ -0,0 +1,344 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sample.acceptance.tests;
import org.assertj.core.util.Files;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.SingleConnectionDataSource;
import org.springframework.web.client.RestTemplate;
import javax.sql.DataSource;
import java.io.File;
import static org.junit.Assert.fail;
/**
* Do not run these tests as part of an IDE build or individually.
* These are acceptance tests for the spring cloud stream samples.
* The recommended way to run these tests are using the runAcceptanceTests.sh script in this module.
* More about running that script can be found in the README.
*
* @author Soby Chacko
*/
public class SampleAcceptanceTests extends AbstractSampleTests {
private static final Logger logger = LoggerFactory.getLogger(SampleAcceptanceTests.class);
private Process process;
@After
public void stopTheApp() {
if (process != null) {
process.destroyForcibly();
}
}
@Test
public void testJdbcSourceSampleKafka() throws Exception {
ProcessBuilder pb = new ProcessBuilder("java", "-jar", "/tmp/jdbc-source-kafka-sample.jar");
File file = Files.newTemporaryFile();
logger.info("Output is redirected to " + file.getAbsolutePath());
pb.redirectOutput(file);
process = pb.start();
waitForLogEntryInFile("JDBC Source", file,"Started SampleJdbcSource in");
waitForLogEntryInFile("JDBC Source", file,
"Data received...[{id=1, name=Bob, tag=null}, {id=2, name=Jane, tag=null}, {id=3, name=John, tag=null}]");
}
@Test
public void testJdbcSourceSampleRabbit() throws Exception {
ProcessBuilder pb = new ProcessBuilder("java", "-jar", "/tmp/jdbc-source-rabbit-sample.jar");
File file = Files.newTemporaryFile();
logger.info("Output is redirected to " + file.getAbsolutePath());
pb.redirectOutput(file);
process = pb.start();
waitForLogEntryInFile("JDBC Source", file,"Started SampleJdbcSource in");
waitForLogEntryInFile("JDBC Source", file,
"Data received...[{id=1, name=Bob, tag=null}, {id=2, name=Jane, tag=null}, {id=3, name=John, tag=null}]");
}
@Test
public void testJdbcSinkSampleKafka() throws Exception {
ProcessBuilder pb = new ProcessBuilder("java", "-jar", "/tmp/jdbc-sink-kafka-sample.jar");
File file = Files.newTemporaryFile();
logger.info("Output is redirected to " + file.getAbsolutePath());
pb.redirectOutput(file);
process = pb.start();
waitForLogEntryInFile("JDBC Sink", file,"Started SampleJdbcSink in");
verifyJdbcSink();
}
@Test
public void testJdbcSinkSampleRabbit() throws Exception {
ProcessBuilder pb = new ProcessBuilder("java", "-jar", "/tmp/jdbc-sink-rabbit-sample.jar");
File file = Files.newTemporaryFile();
logger.info("Output is redirected to " + file.getAbsolutePath());
pb.redirectOutput(file);
process = pb.start();
waitForLogEntryInFile("JDBC Sink", file,"Started SampleJdbcSink in");
verifyJdbcSink();
}
@Test
public void testDynamicSourceSampleKafka() throws Exception {
ProcessBuilder pb = new ProcessBuilder("java", "-jar", "/tmp/dynamic-destination-source-kafka-sample.jar", "--management.endpoints.web.exposure.include=*");
File file = Files.newTemporaryFile();
logger.info("Output is redirected to " + file.getAbsolutePath());
pb.redirectOutput(file);
process = pb.start();
waitForLogEntryInFile("Dynamic Source", file,"Started SourceApplication in");
RestTemplate restTemplate = new RestTemplate();
restTemplate.postForObject(
"http://localhost:8080",
"{\"id\":\"customerId-1\",\"bill-pay\":\"100\"}", String.class);
waitForLogEntryInFile("Dynamic Source", file,
"Data received from customer-1...{\"id\":\"customerId-1\",\"bill-pay\":\"100\"}");
restTemplate.postForObject(
"http://localhost:8080",
"{\"id\":\"customerId-2\",\"bill-pay2\":\"200\"}", String.class);
waitForLogEntryInFile("Dynamic Source", file,
"Data received from customer-2...{\"id\":\"customerId-2\",\"bill-pay2\":\"200\"}");
Files.delete(file);
}
@Test
public void testDynamicSourceSampleRabbit() throws Exception {
ProcessBuilder pb = new ProcessBuilder("java", "-jar", "/tmp/dynamic-destination-source-rabbit-sample.jar", "--management.endpoints.web.exposure.include=*");
File file = Files.newTemporaryFile();
logger.info("Output is redirected to " + file.getAbsolutePath());
pb.redirectOutput(file);
process = pb.start();
waitForLogEntryInFile("Dynamic Source", file,"Started SourceApplication in");
RestTemplate restTemplate = new RestTemplate();
restTemplate.postForObject(
"http://localhost:8080",
"{\"id\":\"customerId-1\",\"bill-pay\":\"100\"}", String.class);
waitForLogEntryInFile("Dynamic Source", file,
"Data received from customer-1...{\"id\":\"customerId-1\",\"bill-pay\":\"100\"}");
restTemplate.postForObject(
"http://localhost:8080",
"{\"id\":\"customerId-2\",\"bill-pay2\":\"200\"}", String.class);
waitForLogEntryInFile("Dynamic Source", file,
"Data received from customer-2...{\"id\":\"customerId-2\",\"bill-pay2\":\"200\"}");
Files.delete(file);
}
@Test
public void testMultiBinderKafkaInputRabbitOutput() throws Exception {
ProcessBuilder pb = new ProcessBuilder("java", "-jar", "/tmp/multibinder-kafka-rabbit-sample.jar");
File file = Files.newTemporaryFile();
logger.info("Output is redirected to " + file.getAbsolutePath());
pb.redirectOutput(file);
process = pb.start();
waitForLogEntryInFile("Multibinder", file,"Started MultibinderApplication in");
waitForLogEntryInFile("Multibinder", file, "Data received...bar", "Data received...foo");
}
@Test
public void testMultiBinderTwoKafkaClusters() throws Exception {
ProcessBuilder pb = new ProcessBuilder("java", "-jar", "/tmp/multibinder-two-kafka-clusters-sample.jar",
"--kafkaBroker1=localhost:9092", "--zk1=localhost:2181",
"--kafkaBroker2=localhost:9093", "--zk2=localhost:2182");
File file = Files.newTemporaryFile();
logger.info("Output is redirected to " + file.getAbsolutePath());
pb.redirectOutput(file);
process = pb.start();
waitForLogEntryInFile("Multibinder 2 Kafka Clusters", file,"Started MultibinderApplication in");
waitForLogEntryInFile("Multibinder 2 Kafka Clusters", file, "Data received...bar", "Data received...foo");
Files.delete(file);
}
@Test
public void testStreamListenerBasicSampleKafka() throws Exception {
ProcessBuilder pb = new ProcessBuilder("java", "-jar", "/tmp/streamlistener-basic-kafka-sample.jar");
File file = Files.newTemporaryFile();
logger.info("Output is redirected to " + file.getAbsolutePath());
pb.redirectOutput(file);
process = pb.start();
waitForLogEntryInFile("Streamlistener basic", file,"Started TypeConversionApplication in");
waitForLogEntryInFile("Streamlistener basic", file,
"At the Source", "Sending value: {\"value\":\"hi\"}", "At the transformer",
"Received value hi of type class demo.Bar",
"Transforming the value to HI and with the type class demo.Bar",
"At the Sink",
"Received transformed message HI of type class demo.Foo");
}
@Test
public void testStreamListenerBasicSampleRabbit() throws Exception {
ProcessBuilder pb = new ProcessBuilder("java", "-jar", "/tmp/streamlistener-basic-rabbit-sample.jar");
File file = Files.newTemporaryFile();
logger.info("Output is redirected to " + file.getAbsolutePath());
pb.redirectOutput(file);
process = pb.start();
waitForLogEntryInFile("Streamlistener basic", file,"Started TypeConversionApplication in");
waitForLogEntryInFile("Streamlistener basic", file,
"At the Source", "Sending value: {\"value\":\"hi\"}", "At the transformer",
"Received value hi of type class demo.Bar",
"Transforming the value to HI and with the type class demo.Bar",
"At the Sink",
"Received transformed message HI of type class demo.Foo");
}
@Test
public void testReactiveProcessorSampleKafka() throws Exception {
ProcessBuilder pb = new ProcessBuilder("java", "-jar", "/tmp/reactive-processor-kafka-sample.jar");
File file = Files.newTemporaryFile();
logger.info("Output is redirected to " + file.getAbsolutePath());
pb.redirectOutput(file);
process = pb.start();
waitForLogEntryInFile("Reactive processor", file,"Started ReactiveProcessorApplication in");
waitForLogEntryInFile("Reactive processor", file,
"Data received: foobarfoobarfoo",
"Data received: barfoobarfoobar");
}
@Test
public void testReactiveProcessorSampleRabbit() throws Exception {
ProcessBuilder pb = new ProcessBuilder("java", "-jar", "/tmp/reactive-processor-rabbit-sample.jar");
File file = Files.newTemporaryFile();
logger.info("Output is redirected to " + file.getAbsolutePath());
pb.redirectOutput(file);
process = pb.start();
waitForLogEntryInFile("Reactive processor", file,"Started ReactiveProcessorApplication in");
waitForLogEntryInFile("Reactive processor", file,
"Data received: foobarfoobarfoo",
"Data received: barfoobarfoobar");
}
@Test
public void testSensorAverageReactiveSampleKafka() throws Exception {
ProcessBuilder pb = new ProcessBuilder("java", "-jar", "/tmp/sensor-average-reactive-kafka-sample.jar");
File file = Files.newTemporaryFile();
logger.info("Output is redirected to " + file.getAbsolutePath());
pb.redirectOutput(file);
process = pb.start();
waitForLogEntryInFile("Sensor average", file,"Started SensorAverageProcessorApplication in");
waitForLogEntryInFile("Sensor average", file,
"Data received: {\"id\":100100,\"average\":",
"Data received: {\"id\":100200,\"average\":", "Data received: {\"id\":100300,\"average\":");
}
@Test
public void testSensorAverageReactiveSampleRabbit() throws Exception {
ProcessBuilder pb = new ProcessBuilder("java", "-jar", "/tmp/sensor-average-reactive-rabbit-sample.jar");
File file = Files.newTemporaryFile();
logger.info("Output is redirected to " + file.getAbsolutePath());
pb.redirectOutput(file);
process = pb.start();
waitForLogEntryInFile("Sensor average", file,"Started SensorAverageProcessorApplication in");
waitForLogEntryInFile("Sensor average", file,
"Data received: {\"id\":100100,\"average\":",
"Data received: {\"id\":100200,\"average\":", "Data received: {\"id\":100300,\"average\":");
Files.delete(file);
}
@Test
public void testKafkaStreamsWordCount() throws Exception {
ProcessBuilder pb = new ProcessBuilder("java", "-jar", "/tmp/kafka-streams-word-count-sample.jar",
"--spring.cloud.stream.kafka.streams.timeWindow.length=60000");
File file = Files.newTemporaryFile();
logger.info("Output is redirected to " + file.getAbsolutePath());
pb.redirectOutput(file);
process = pb.start();
waitForLogEntryInFile("Kafka Streams WordCount", file,"Started KafkaStreamsWordCountApplication in");
waitForLogEntryInFile("Kafka Streams WordCount", file,
"Data received...{\"word\":\"foo\",\"count\":",
"Data received...{\"word\":\"bar\",\"count\":",
"Data received...{\"word\":\"foobar\",\"count\":",
"Data received...{\"word\":\"baz\",\"count\":",
"Data received...{\"word\":\"fox\",\"count\":");
Files.delete(file);
}
private void verifyJdbcSink() {
JdbcTemplate db;
DataSource dataSource = new SingleConnectionDataSource("jdbc:mariadb://localhost:3306/sample_mysql_db",
"root", "pwd", false);
db = new JdbcTemplate(dataSource);
long timeout = System.currentTimeMillis() + (30 * 1000);
boolean exists = false;
while (!exists && System.currentTimeMillis() < timeout) {
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e.getMessage(), e);
}
Integer count = db.queryForObject("select count(*) from test", Integer.class);
if (count > 0) {
exists = true;
}
}
if (!exists) {
fail("No records found in database!");
}
}
}

View File

@@ -0,0 +1,117 @@
package sample.acceptance.tests;
import org.assertj.core.util.Files;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;
import java.io.File;
import static org.junit.Assert.fail;
/**
* @author Soby Chacko
*/
public class SchemaRegistryVanillaSampleTests extends AbstractSampleTests {
private static final Logger logger = LoggerFactory.getLogger(SchemaRegistryVanillaSampleTests.class);
@Test
public void testSchemaRegistryVanillaKafka() throws Exception {
runAgainstMiddleware("/tmp/schema-registry-vanilla-registry-kafka.jar",
"/tmp/schema-registry-vanilla-consumer-kafka.jar",
"/tmp/schema-registry-vanilla-producer1-kafka.jar",
"/tmp/schema-registry-vanilla-producer2-kafka.jar");
}
@Test
public void testSchemaRegistryVanillaRabbit() throws Exception {
runAgainstMiddleware("/tmp/schema-registry-vanilla-registry-rabbit.jar",
"/tmp/schema-registry-vanilla-consumer-rabbit.jar",
"/tmp/schema-registry-vanilla-producer1-rabbit.jar",
"/tmp/schema-registry-vanilla-producer2-rabbit.jar");
}
private void runAgainstMiddleware(String registryJar, String consumerJar, String producer1Jar, String producer2Jar) throws Exception {
Process registryProcess = null;
Process consumerProcess = null;
Process producer1Process = null;
Process producer2Process = null;
try {
ProcessBuilder pbRegistry = new ProcessBuilder("java", "-jar", registryJar);
File registryFile = Files.newTemporaryFile();
logger.info("Output is redirected to " + registryFile.getAbsolutePath());
pbRegistry.redirectOutput(registryFile);
registryProcess = pbRegistry.start();
waitForLogEntryInFile("Schema Registry Vanilla Server", registryFile, "Started RegistryApplication in");
ProcessBuilder pbConsumer = new ProcessBuilder("java", "-jar", consumerJar);
File consumerFile = Files.newTemporaryFile();
logger.info("Output is redirected to " + consumerFile.getAbsolutePath());
pbConsumer.redirectOutput(consumerFile);
consumerProcess = pbConsumer.start();
waitForLogEntryInFile("Schema Registry Vanilla Consumer", consumerFile, "Started ConsumerApplication in");
ProcessBuilder pbProducer1 = new ProcessBuilder("java", "-jar", producer1Jar);
File producer1File = Files.newTemporaryFile();
logger.info("Output is redirected to " + producer1File.getAbsolutePath());
pbProducer1.redirectOutput(producer1File);
producer1Process = pbProducer1.start();
waitForLogEntryInFile("Schema Registry Vanilla Producer1", producer1File, "Started Producer1Application in");
ProcessBuilder pbProducer2 = new ProcessBuilder("java", "-jar", producer2Jar);
File producer2File = Files.newTemporaryFile();
logger.info("Output is redirected to " + producer2File.getAbsolutePath());
pbProducer2.redirectOutput(producer2File);
producer2Process = pbProducer2.start();
waitForLogEntryInFile("Schema Registry Vanilla Producer2", producer2File, "Started Producer2Application in");
RestTemplate restTemplate = new RestTemplate();
MultiValueMap<String, Object> parametersMap = new LinkedMultiValueMap<>();
parametersMap.add("id", "foobar");
parametersMap.add("temperature", 30);
parametersMap.add("acceleration", 10);
parametersMap.add("velocity", 20);
restTemplate.postForObject(
"http://localhost:9009/messagesX", parametersMap, String.class);
boolean found = waitForLogEntryInFile("Schema Registry Vanilla Consumer", consumerFile,
"{\"id\": \"foobar-v1\", \"internalTemperature\": 30.0, \"externalTemperature\": 0.0, \"acceleration\": 10.0, \"velocity\": 20.0}");
if (!found) {
fail("Could not find the test data in the logs");
}
restTemplate.postForObject(
"http://localhost:9010/messagesX", parametersMap, String.class);
waitForLogEntryInFile("Schema Registry Vanilla Consumer", consumerFile,
"{\"id\": \"foobar-v2\", \"internalTemperature\": 30.0, \"externalTemperature\": 0.0, \"acceleration\": 10.0, \"velocity\": 20.0}");
} finally {
if (registryProcess != null) {
registryProcess.destroyForcibly();
}
if (consumerProcess != null) {
consumerProcess.destroyForcibly();
}
if (producer1Process != null) {
producer1Process.destroyForcibly();
}
if (producer2Process != null) {
producer2Process.destroyForcibly();
}
}
}
}