diff --git a/custom-stream-apps/partitioning-consumer-sample-kafka/src/main/resources/application.yml b/custom-stream-apps/partitioning-consumer-sample-kafka/src/main/resources/application.yml index 1c0dda7..d22a0d7 100644 --- a/custom-stream-apps/partitioning-consumer-sample-kafka/src/main/resources/application.yml +++ b/custom-stream-apps/partitioning-consumer-sample-kafka/src/main/resources/application.yml @@ -5,6 +5,10 @@ spring: binder: autoAddPartitions: true minPartitionCount: 3 + bindings: + input: + consumer: + autoRebalanceEnabled: false bindings: input: destination: partitioned.destination @@ -12,5 +16,4 @@ spring: consumer: partitioned: true instanceIndex: 0 - instanceCount: 3 - autoRebalanceEnabled: false \ No newline at end of file + instanceCount: 3 \ No newline at end of file diff --git a/spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/PartitioningKafkaAcceptanceTests.java b/spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/PartitioningKafkaAcceptanceTests.java index 9153e5d..c3cf55a 100644 --- a/spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/PartitioningKafkaAcceptanceTests.java +++ b/spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/PartitioningKafkaAcceptanceTests.java @@ -2,21 +2,15 @@ package sample.acceptance.tests; import org.junit.Test; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; public class PartitioningKafkaAcceptanceTests extends AbstractAcceptanceTests { - Set partitions = new HashSet<>(); - - @Test public void testPartitioningWith3ConsumersKafka() throws Exception { @@ -34,15 +28,15 @@ public class PartitioningKafkaAcceptanceTests extends AbstractAcceptanceTests { String consumer3Url = System.getProperty("partitioning.consumer3.route"); //String consumer4Url = System.getProperty("partitioning.consumer4.route"); - Future future1 = verifyPartitions("Partitioning Consumer-1", consumer1Url, consumer2Url, consumer3Url, + Future future1 = verifyPartitions("Partitioning Consumer-1", consumer1Url, "f received from partition 0", "g received from partition 0", "h received from partition 0"); - Future future2 = verifyPartitions("Partitioning Consumer-2", consumer1Url, consumer2Url, consumer3Url, + Future future2 = verifyPartitions("Partitioning Consumer-2", consumer2Url, "fo received from 1", "go received from partition 1", "ho received from partition 1"); - Future future3 = verifyPartitions("Partitioning Consumer-3", consumer1Url, consumer2Url, consumer3Url, + Future future3 = verifyPartitions("Partitioning Consumer-3", consumer3Url, "foo received from 2", "goo received from 2", "hoo received from partition 2"); @@ -54,28 +48,28 @@ public class PartitioningKafkaAcceptanceTests extends AbstractAcceptanceTests { verifyResults(future1, future2, future3); } - private Future verifyPartitions(String consumerMsg, String consumer1Route, String consumer2Route, String consumer3Route, + private Future verifyPartitions(String consumerMsg, String consumerRoute, String... entries) { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future submit = executorService.submit(() -> { - boolean found = waitForLogEntry(consumerMsg, consumer1Route, entries); - if (found) { - partitions.add(consumer1Route); - } - if (!found) { - found = waitForLogEntry(consumerMsg, consumer2Route, entries); - if (found) { - partitions.add(consumer2Route); - } - } - if (!found) { - found = waitForLogEntry(consumerMsg, consumer3Route, entries); - if (found) { - partitions.add(consumer3Route); - } - } + boolean found = waitForLogEntry(consumerMsg, consumerRoute, entries); +// if (found) { +// partitions.add(consumerRoute); +// } +// if (!found) { +// found = waitForLogEntry(consumerMsg, consumer2Route, entries); +// if (found) { +// partitions.add(consumer2Route); +// } +// } +// if (!found) { +// found = waitForLogEntry(consumerMsg, consumer3Route, entries); +// if (found) { +// partitions.add(consumer3Route); +// } +// } if (!found) { fail("Could not find the test data in the logs"); @@ -95,6 +89,5 @@ public class PartitioningKafkaAcceptanceTests extends AbstractAcceptanceTests { throw e; } } - assertEquals("Not all app instances received data equally", 3, partitions.size()); } }