partitioning custom app changes

This commit is contained in:
Soby Chacko
2018-08-21 22:33:43 -04:00
parent 81753e15f1
commit 0b8155b245
2 changed files with 25 additions and 29 deletions

View File

@@ -2,21 +2,15 @@ package sample.acceptance.tests;
import org.junit.Test;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class PartitioningKafkaAcceptanceTests extends AbstractAcceptanceTests {
Set<String> partitions = new HashSet<>();
@Test
public void testPartitioningWith3ConsumersKafka() throws Exception {
@@ -34,15 +28,15 @@ public class PartitioningKafkaAcceptanceTests extends AbstractAcceptanceTests {
String consumer3Url = System.getProperty("partitioning.consumer3.route");
//String consumer4Url = System.getProperty("partitioning.consumer4.route");
Future<?> future1 = verifyPartitions("Partitioning Consumer-1", consumer1Url, consumer2Url, consumer3Url,
Future<?> future1 = verifyPartitions("Partitioning Consumer-1", consumer1Url,
"f received from partition 0",
"g received from partition 0",
"h received from partition 0");
Future<?> future2 = verifyPartitions("Partitioning Consumer-2", consumer1Url, consumer2Url, consumer3Url,
Future<?> future2 = verifyPartitions("Partitioning Consumer-2", consumer2Url,
"fo received from 1",
"go received from partition 1",
"ho received from partition 1");
Future<?> future3 = verifyPartitions("Partitioning Consumer-3", consumer1Url, consumer2Url, consumer3Url,
Future<?> future3 = verifyPartitions("Partitioning Consumer-3", consumer3Url,
"foo received from 2",
"goo received from 2",
"hoo received from partition 2");
@@ -54,28 +48,28 @@ public class PartitioningKafkaAcceptanceTests extends AbstractAcceptanceTests {
verifyResults(future1, future2, future3);
}
private Future<?> verifyPartitions(String consumerMsg, String consumer1Route, String consumer2Route, String consumer3Route,
private Future<?> verifyPartitions(String consumerMsg, String consumerRoute,
String... entries) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<?> submit = executorService.submit(() -> {
boolean found = waitForLogEntry(consumerMsg, consumer1Route, entries);
if (found) {
partitions.add(consumer1Route);
}
if (!found) {
found = waitForLogEntry(consumerMsg, consumer2Route, entries);
if (found) {
partitions.add(consumer2Route);
}
}
if (!found) {
found = waitForLogEntry(consumerMsg, consumer3Route, entries);
if (found) {
partitions.add(consumer3Route);
}
}
boolean found = waitForLogEntry(consumerMsg, consumerRoute, entries);
// if (found) {
// partitions.add(consumerRoute);
// }
// if (!found) {
// found = waitForLogEntry(consumerMsg, consumer2Route, entries);
// if (found) {
// partitions.add(consumer2Route);
// }
// }
// if (!found) {
// found = waitForLogEntry(consumerMsg, consumer3Route, entries);
// if (found) {
// partitions.add(consumer3Route);
// }
// }
if (!found) {
fail("Could not find the test data in the logs");
@@ -95,6 +89,5 @@ public class PartitioningKafkaAcceptanceTests extends AbstractAcceptanceTests {
throw e;
}
}
assertEquals("Not all app instances received data equally", 3, partitions.size());
}
}