GH-134: Assert KafkaTemplate.flush() for NPE

Fixes GH-134 (https://github.com/spring-projects/spring-kafka/pull/137)
This commit is contained in:
Igor Stepanov
2016-07-05 23:36:23 +03:00
committed by Artem Bilan
parent 677d135932
commit 6a94ae2d79
2 changed files with 22 additions and 2 deletions

View File

@@ -31,6 +31,7 @@ import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
@@ -43,6 +44,7 @@ import org.springframework.util.concurrent.SettableListenableFuture;
*
* @author Marius Bogoevici
* @author Gary Russell
* @author Igor Stepanov
*/
public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
@@ -50,9 +52,10 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
private final ProducerFactory<K, V> producerFactory;
private final boolean autoFlush;
private MessageConverter messageConverter = new MessagingMessageConverter();
private final boolean autoFlush;
private volatile Producer<K, V> producer;
private volatile String defaultTopic;
@@ -172,6 +175,7 @@ private final boolean autoFlush;
@Override
public void flush() {
Assert.state(this.producer != null, "'producer' must not be null for flushing.");
this.producer.flush();
}

View File

@@ -17,6 +17,8 @@
package org.springframework.kafka.core;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
import static org.springframework.kafka.test.assertj.KafkaConditions.partition;
import static org.springframework.kafka.test.assertj.KafkaConditions.value;
@@ -50,7 +52,7 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* @author Gary Russell
* @author Artem Bilan
*
* @author Igor Stepanov
*/
public class KafkaTemplateTests {
@@ -195,4 +197,18 @@ public class KafkaTemplateTests {
pf.createProducer().close();
}
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void flushWithoutSend() throws Exception {
KafkaTemplate template = new KafkaTemplate(mock(ProducerFactory.class));
try {
template.flush();
fail("IllegalStateException expected");
}
catch (Exception e) {
assertThat(e).isInstanceOf(IllegalStateException.class);
assertThat(e.getMessage()).isEqualTo("'producer' must not be null for flushing.");
}
}
}