GH-338: Add Timeout To Producer Close
Fixes #338 Conflicts: spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java
This commit is contained in:
committed by
Artem Bilan
parent
a80a05bcb0
commit
53dd970c88
@@ -59,6 +59,8 @@ import org.springframework.context.Lifecycle;
|
|||||||
*/
|
*/
|
||||||
public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>, Lifecycle, DisposableBean {
|
public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>, Lifecycle, DisposableBean {
|
||||||
|
|
||||||
|
private static final int DEFAULT_PHYSICAL_CLOSE_TIMEOUT = 30;
|
||||||
|
|
||||||
private static final Log logger = LogFactory.getLog(DefaultKafkaProducerFactory.class);
|
private static final Log logger = LogFactory.getLog(DefaultKafkaProducerFactory.class);
|
||||||
|
|
||||||
private final Map<String, Object> configs;
|
private final Map<String, Object> configs;
|
||||||
@@ -69,6 +71,8 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
|
|||||||
|
|
||||||
private Serializer<V> valueSerializer;
|
private Serializer<V> valueSerializer;
|
||||||
|
|
||||||
|
private int physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT;
|
||||||
|
|
||||||
private volatile boolean running;
|
private volatile boolean running;
|
||||||
|
|
||||||
public DefaultKafkaProducerFactory(Map<String, Object> configs) {
|
public DefaultKafkaProducerFactory(Map<String, Object> configs) {
|
||||||
@@ -90,12 +94,22 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
|
|||||||
this.valueSerializer = valueSerializer;
|
this.valueSerializer = valueSerializer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The time to wait when physically closing the producer (when {@link #stop()} or {@link #destroy()} is invoked.
|
||||||
|
* Specified in seconds; default {@value #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}.
|
||||||
|
* @param physicalCloseTimeout the timeout in seconds.
|
||||||
|
* @since 1.0.7
|
||||||
|
*/
|
||||||
|
public void setPhysicalCloseTimeout(int physicalCloseTimeout) {
|
||||||
|
this.physicalCloseTimeout = physicalCloseTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void destroy() throws Exception { //NOSONAR
|
public void destroy() throws Exception { //NOSONAR
|
||||||
CloseSafeProducer<K, V> producer = this.producer;
|
CloseSafeProducer<K, V> producer = this.producer;
|
||||||
this.producer = null;
|
this.producer = null;
|
||||||
if (producer != null) {
|
if (producer != null) {
|
||||||
producer.delegate.close();
|
producer.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user