GH-134: Assert KafkaTemplate.flush() for NPE
Fixes GH-134 (https://github.com/spring-projects/spring-kafka/pull/137)
This commit is contained in:
committed by
Artem Bilan
parent
677d135932
commit
6a94ae2d79
@@ -31,6 +31,7 @@ import org.springframework.kafka.support.SendResult;
|
||||
import org.springframework.kafka.support.converter.MessageConverter;
|
||||
import org.springframework.kafka.support.converter.MessagingMessageConverter;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.SettableListenableFuture;
|
||||
|
||||
@@ -43,6 +44,7 @@ import org.springframework.util.concurrent.SettableListenableFuture;
|
||||
*
|
||||
* @author Marius Bogoevici
|
||||
* @author Gary Russell
|
||||
* @author Igor Stepanov
|
||||
*/
|
||||
public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
|
||||
|
||||
@@ -50,9 +52,10 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
|
||||
|
||||
private final ProducerFactory<K, V> producerFactory;
|
||||
|
||||
private final boolean autoFlush;
|
||||
|
||||
private MessageConverter messageConverter = new MessagingMessageConverter();
|
||||
|
||||
private final boolean autoFlush;
|
||||
private volatile Producer<K, V> producer;
|
||||
|
||||
private volatile String defaultTopic;
|
||||
@@ -172,6 +175,7 @@ private final boolean autoFlush;
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
Assert.state(this.producer != null, "'producer' must not be null for flushing.");
|
||||
this.producer.flush();
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,8 @@
|
||||
package org.springframework.kafka.core;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
|
||||
import static org.springframework.kafka.test.assertj.KafkaConditions.partition;
|
||||
import static org.springframework.kafka.test.assertj.KafkaConditions.value;
|
||||
@@ -50,7 +52,7 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @author Artem Bilan
|
||||
*
|
||||
* @author Igor Stepanov
|
||||
*/
|
||||
public class KafkaTemplateTests {
|
||||
|
||||
@@ -195,4 +197,18 @@ public class KafkaTemplateTests {
|
||||
pf.createProducer().close();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public void flushWithoutSend() throws Exception {
|
||||
KafkaTemplate template = new KafkaTemplate(mock(ProducerFactory.class));
|
||||
try {
|
||||
template.flush();
|
||||
fail("IllegalStateException expected");
|
||||
}
|
||||
catch (Exception e) {
|
||||
assertThat(e).isInstanceOf(IllegalStateException.class);
|
||||
assertThat(e.getMessage()).isEqualTo("'producer' must not be null for flushing.");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user