From 53dd970c882166741f4445c4e71e17a8b56b3c55 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 5 Jun 2017 12:14:58 -0400 Subject: [PATCH] GH-338: Add Timeout To Producer Close Fixes #338 Conflicts: spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java --- .../kafka/core/DefaultKafkaProducerFactory.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java index b3c97523..f58c670c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java @@ -59,6 +59,8 @@ import org.springframework.context.Lifecycle; */ public class DefaultKafkaProducerFactory implements ProducerFactory, Lifecycle, DisposableBean { + private static final int DEFAULT_PHYSICAL_CLOSE_TIMEOUT = 30; + private static final Log logger = LogFactory.getLog(DefaultKafkaProducerFactory.class); private final Map configs; @@ -69,6 +71,8 @@ public class DefaultKafkaProducerFactory implements ProducerFactory, private Serializer valueSerializer; + private int physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT; + private volatile boolean running; public DefaultKafkaProducerFactory(Map configs) { @@ -90,12 +94,22 @@ public class DefaultKafkaProducerFactory implements ProducerFactory, this.valueSerializer = valueSerializer; } + /** + * The time to wait when physically closing the producer (when {@link #stop()} or {@link #destroy()} is invoked. + * Specified in seconds; default {@value #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}. + * @param physicalCloseTimeout the timeout in seconds. + * @since 1.0.7 + */ + public void setPhysicalCloseTimeout(int physicalCloseTimeout) { + this.physicalCloseTimeout = physicalCloseTimeout; + } + @Override public void destroy() throws Exception { //NOSONAR CloseSafeProducer producer = this.producer; this.producer = null; if (producer != null) { - producer.delegate.close(); + producer.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS); } }