GH-338: Add Timeout To Producer Close

Fixes #338

Conflicts:
	spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

(cherry picked from commit 53dd970)
This commit is contained in:
Gary Russell
2017-06-05 12:14:58 -04:00
committed by Artem Bilan
parent ecde553dd7
commit cf89cb3c50

View File

@@ -59,6 +59,8 @@ import org.springframework.context.Lifecycle;
*/
public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>, Lifecycle, DisposableBean {
private static final int DEFAULT_PHYSICAL_CLOSE_TIMEOUT = 30;
private static final Log logger = LogFactory.getLog(DefaultKafkaProducerFactory.class);
private final Map<String, Object> configs;
@@ -69,6 +71,8 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
private Serializer<V> valueSerializer;
private int physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT;
private volatile boolean running;
public DefaultKafkaProducerFactory(Map<String, Object> configs) {
@@ -90,12 +94,22 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
this.valueSerializer = valueSerializer;
}
/**
* The time to wait when physically closing the producer (when {@link #stop()} or {@link #destroy()} is invoked.
* Specified in seconds; default {@value #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}.
* @param physicalCloseTimeout the timeout in seconds.
* @since 1.0.7
*/
public void setPhysicalCloseTimeout(int physicalCloseTimeout) {
this.physicalCloseTimeout = physicalCloseTimeout;
}
@Override
public void destroy() throws Exception { //NOSONAR
CloseSafeProducer<K, V> producer = this.producer;
this.producer = null;
if (producer != null) {
producer.delegate.close();
producer.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
}
}