GH-425: testManualCommitEx: set syncCommit=false
Resolves spring-projects/spring-kafka#425
To really perform `commitCallback` via `commitAsync()`
the `commitSync` must be set to `false` for the `ContainerProperties`
**Cherry-pick to 1.3 and master**
(cherry picked from commit 0cad02e)
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -42,7 +42,6 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
@@ -284,7 +283,6 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore // TODO https://github.com/spring-projects/spring-kafka/issues/62 using SYNC for avoidance
|
||||
public void testManualCommitExisting() throws Exception {
|
||||
this.logger.info("Start MANUAL_IMMEDIATE with Existing");
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
@@ -307,7 +305,7 @@ public class ConcurrentMessageListenerContainerTests {
|
||||
latch.countDown();
|
||||
});
|
||||
containerProps.setAckMode(AckMode.MANUAL_IMMEDIATE);
|
||||
|
||||
containerProps.setSyncCommits(false);
|
||||
final CountDownLatch commits = new CountDownLatch(8);
|
||||
final AtomicReference<Exception> exceptionRef = new AtomicReference<>();
|
||||
containerProps.setCommitCallback((offsets, exception) -> {
|
||||
|
||||
Reference in New Issue
Block a user