diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderUnitTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderUnitTests.java index a5bbf5683..6c9e0a4c9 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderUnitTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderUnitTests.java @@ -305,15 +305,29 @@ public class KafkaBinderUnitTests { Binding messageChannelBinding = binder.bindConsumer(topic, group, channel, consumerProperties); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); - @SuppressWarnings("unchecked") - ArgumentCaptor> captor = ArgumentCaptor.forClass(Set.class); - if (earliest) { - verify(consumer).seekToBeginning(captor.capture()); + + if (!groupManage) { + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass(Set.class); + if (earliest) { + verify(consumer).seekToBeginning(captor.capture()); + } + else { + verify(consumer).seekToEnd(captor.capture()); + } + assertThat(captor.getValue()).containsExactlyInAnyOrderElementsOf(partitions); } else { - verify(consumer).seekToEnd(captor.capture()); + @SuppressWarnings("unchecked") + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + if (earliest) { + verify(consumer).seekToBeginning(captor.capture()); + } + else { + verify(consumer).seekToEnd(captor.capture()); + } + assertThat(captor.getValue()).containsExactlyInAnyOrderElementsOf(partitions); } - assertThat(captor.getValue()).containsExactlyInAnyOrderElementsOf(partitions); messageChannelBinding.unbind(); }