GH-76: Resolve Template Ambiguity with String Keys

Resolves #76

Polishing

Introduce sendDefault() methods instead.

More Polishing
This commit is contained in:
Gary Russell
2016-05-13 11:24:11 -04:00
committed by Artem Bilan
parent 560ff6f088
commit da88336b83
8 changed files with 156 additions and 80 deletions

View File

@@ -75,6 +75,8 @@ import scala.collection.Set;
@SuppressWarnings("serial")
public class KafkaEmbedded extends ExternalResource implements KafkaRule {
public static final String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers";
public static final long METADATA_PROPAGATION_TIMEOUT = 10000L;
private final int count;
@@ -157,10 +159,12 @@ public class KafkaEmbedded extends ExternalResource implements KafkaRule {
for (String topic : this.topics) {
AdminUtils.createTopic(zkUtils, topic, this.partitionsPerTopic, this.count, props);
}
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
}
@Override
protected void after() {
System.getProperties().remove(SPRING_EMBEDDED_KAFKA_BROKERS);
for (KafkaServer kafkaServer : this.kafkaServers) {
try {
if (kafkaServer.brokerState().currentState() != (NotRunning.state())) {
@@ -362,6 +366,11 @@ public class KafkaEmbedded extends ExternalResource implements KafkaRule {
return true;
}
/**
* Subscribe a consumer to all the embedded topics.
* @param consumer the consumer.
* @throws Exception an exception.
*/
public void consumeFromAllEmbeddedTopics(Consumer<?, ?> consumer) throws Exception {
final CountDownLatch consumerLatch = new CountDownLatch(1);
consumer.subscribe(Arrays.asList(this.topics), new ConsumerRebalanceListener() {
@@ -382,4 +391,31 @@ public class KafkaEmbedded extends ExternalResource implements KafkaRule {
.isTrue();
}
/**
* Subscribe a consumer to one of the embedded topics.
* @param consumer the consumer.
* @param topic the topic.
* @throws Exception an exception.
*/
public void consumeFromAnEmbeddedTopic(Consumer<?, ?> consumer, String topic) throws Exception {
assertThat(this.topics).as("topic is not in embedded topic list").contains(topic);
final CountDownLatch consumerLatch = new CountDownLatch(1);
consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
consumerLatch.countDown();
}
});
consumer.poll(0); // force assignment
assertThat(consumerLatch.await(30, TimeUnit.SECONDS))
.as("Failed to be assigned partitions from the embedded topics")
.isTrue();
}
}

View File

@@ -36,7 +36,7 @@ public interface KafkaOperations<K, V> {
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(V data);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
/**
* Send the data to the default topic with the provided key and no partition.
@@ -44,7 +44,7 @@ public interface KafkaOperations<K, V> {
* @param data The data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
/**
* Send the data to the default topic with the provided key and partition.
@@ -53,7 +53,7 @@ public interface KafkaOperations<K, V> {
* @param data the data.
* @return a Future for the {@link SendResult}.
*/
ListenableFuture<SendResult<K, V>> send(int partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(int partition, K key, V data);
/**
* Send the data to the provided topic with no key or partition.

View File

@@ -125,17 +125,17 @@ private final boolean autoFlush;
}
@Override
public ListenableFuture<SendResult<K, V>> send(V data) {
public ListenableFuture<SendResult<K, V>> sendDefault(V data) {
return send(this.defaultTopic, data);
}
@Override
public ListenableFuture<SendResult<K, V>> send(K key, V data) {
public ListenableFuture<SendResult<K, V>> sendDefault(K key, V data) {
return send(this.defaultTopic, key, data);
}
@Override
public ListenableFuture<SendResult<K, V>> send(int partition, K key, V data) {
public ListenableFuture<SendResult<K, V>> sendDefault(int partition, K key, V data) {
return send(this.defaultTopic, partition, key, data);
}

View File

@@ -29,6 +29,10 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -50,10 +54,12 @@ import org.springframework.util.concurrent.ListenableFutureCallback;
*/
public class KafkaTemplateTests {
private static final String TEMPLATE_TOPIC = "templateTopic";
private static final String INT_KEY_TOPIC = "intKeyTopic";
private static final String STRING_KEY_TOPIC = "stringKeyTopic";
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEMPLATE_TOPIC);
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, INT_KEY_TOPIC, STRING_KEY_TOPIC);
private static Consumer<Integer, String> consumer;
@@ -63,39 +69,43 @@ public class KafkaTemplateTests {
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(
consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, INT_KEY_TOPIC);
}
@AfterClass
public static void tearDown() {
consumer.close();
}
@Test
public void testTemplate() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.send("foo");
assertThat(KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC)).has(value("foo"));
template.send(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC);
template.setDefaultTopic(INT_KEY_TOPIC);
template.sendDefault("foo");
assertThat(KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
assertThat(received).has(key(2));
assertThat(received).has(partition(0));
assertThat(received).has(value("bar"));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC);
template.send(INT_KEY_TOPIC, 0, 2, "baz");
received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
assertThat(received).has(key(2));
assertThat(received).has(partition(0));
assertThat(received).has(value("baz"));
template.send(TEMPLATE_TOPIC, 0, "qux");
received = KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC);
template.send(INT_KEY_TOPIC, 0, "qux");
received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
assertThat(received).has(key((Integer) null));
assertThat(received).has(partition(0));
assertThat(received).has(value("qux"));
template.send(MessageBuilder.withPayload("fiz")
.setHeader(KafkaHeaders.TOPIC, TEMPLATE_TOPIC)
.setHeader(KafkaHeaders.TOPIC, INT_KEY_TOPIC)
.setHeader(KafkaHeaders.PARTITION_ID, 0)
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
.build());
received = KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC);
received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
assertThat(received).has(key(2));
assertThat(received).has(partition(0));
assertThat(received).has(value("fiz"));
@@ -103,12 +113,11 @@ public class KafkaTemplateTests {
.setHeader(KafkaHeaders.PARTITION_ID, 0)
.setHeader(KafkaHeaders.MESSAGE_KEY, 2)
.build());
received = KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC);
received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC);
assertThat(received).has(key(2));
assertThat(received).has(partition(0));
assertThat(received).has(value("buz"));
consumer.close();
pf.createProducer().close();
}
@Test
@@ -116,7 +125,7 @@ public class KafkaTemplateTests {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.setDefaultTopic(INT_KEY_TOPIC);
final CountDownLatch latch = new CountDownLatch(1);
template.setProducerListener(new ProducerListenerAdapter<Integer, String>() {
@@ -132,9 +141,10 @@ public class KafkaTemplateTests {
}
});
template.send("foo");
template.sendDefault("foo");
template.flush();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
pf.createProducer().close();
}
@Test
@@ -142,8 +152,8 @@ public class KafkaTemplateTests {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic(TEMPLATE_TOPIC);
ListenableFuture<SendResult<Integer, String>> future = template.send("foo");
template.setDefaultTopic(INT_KEY_TOPIC);
ListenableFuture<SendResult<Integer, String>> future = template.sendDefault("foo");
template.flush();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<SendResult<Integer, String>> theResult = new AtomicReference<>();
@@ -160,7 +170,29 @@ public class KafkaTemplateTests {
}
});
assertThat(KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC)).has(value("foo"));
assertThat(KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC)).has(value("foo"));
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
pf.createProducer().close();
}
@Test
public void testTemplateDisambiguation() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<String, String>(senderProps);
pf.setKeySerializer(new StringSerializer());
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic(STRING_KEY_TOPIC);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testTString", "false", embeddedKafka);
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<String, String>(consumerProps);
cf.setKeyDeserializer(new StringDeserializer());
Consumer<String, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, STRING_KEY_TOPIC);
template.sendDefault("foo", "bar");
template.flush();
ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, STRING_KEY_TOPIC);
assertThat(record).has(Assertions.<ConsumerRecord<String, String>>allOf(key("foo"), value("bar")));
consumer.close();
pf.createProducer().close();
}
}

View File

@@ -109,10 +109,10 @@ public class ConcurrentMessageListenerContainerTests {
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic1);
template.send(0, "foo");
template.send(2, "bar");
template.send(0, "baz");
template.send(2, "qux");
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
container.stop();
@@ -161,10 +161,10 @@ public class ConcurrentMessageListenerContainerTests {
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic1);
template.send(0, "foo");
template.send(2, "bar");
template.send(0, "baz");
template.send(2, "qux");
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(rebalancePartitionsAssignedLatch.await(60, TimeUnit.SECONDS)).isTrue();
@@ -197,10 +197,10 @@ public class ConcurrentMessageListenerContainerTests {
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic2);
template.send(0, "foo");
template.send(2, "bar");
template.send(0, "baz");
template.send(2, "qux");
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
container.stop();
@@ -273,10 +273,10 @@ public class ConcurrentMessageListenerContainerTests {
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic3);
template.send(0, "foo");
template.send(2, "bar");
template.send(0, "baz");
template.send(2, "qux");
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertThat(latch1.await(60, TimeUnit.SECONDS)).isTrue();
@@ -365,10 +365,10 @@ public class ConcurrentMessageListenerContainerTests {
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic);
template.send(0, "foo");
template.send(2, "bar");
template.send(0, "baz");
template.send(2, "qux");
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
container.stop();
@@ -383,10 +383,10 @@ public class ConcurrentMessageListenerContainerTests {
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic7);
template.send(0, "foo");
template.send(2, "bar");
template.send(0, "baz");
template.send(2, "qux");
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
Map<String, Object> props = KafkaTestUtils.consumerProps("testManualExisting", "false", embeddedKafka);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -422,10 +422,10 @@ public class ConcurrentMessageListenerContainerTests {
});
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
template.send(0, "fooo");
template.send(2, "barr");
template.send(0, "bazz");
template.send(2, "quxx");
template.sendDefault(0, "fooo");
template.sendDefault(2, "barr");
template.sendDefault(0, "bazz");
template.sendDefault(2, "quxx");
template.flush();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
assertThat(commits.await(60, TimeUnit.SECONDS)).isTrue();
@@ -441,10 +441,10 @@ public class ConcurrentMessageListenerContainerTests {
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic8);
template.send(0, "foo");
template.send(2, "bar");
template.send(0, "baz");
template.send(2, "qux");
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
Map<String, Object> props = KafkaTestUtils.consumerProps("testManualExistingSync", "false", embeddedKafka);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -467,10 +467,10 @@ public class ConcurrentMessageListenerContainerTests {
container.setBeanName("testManualExisting");
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
template.send(0, "fooo");
template.send(2, "barr");
template.send(0, "bazz");
template.send(2, "quxx");
template.sendDefault(0, "fooo");
template.sendDefault(2, "barr");
template.sendDefault(0, "bazz");
template.sendDefault(2, "quxx");
template.flush();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
container.stop();
@@ -549,10 +549,10 @@ public class ConcurrentMessageListenerContainerTests {
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic6);
template.send(0, "foo");
template.send(2, "bar");
template.send(0, "baz");
template.send(2, "qux");
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
container.stop();

View File

@@ -1,18 +1,20 @@
[[kafka]]
=== Using Spring for Apache Kafka
==== Sending Messages with the KafkaTemplate
==== Sending Messages
===== KafkaTemplate
The `KafkaTemplate` wraps a producer and provides convenience methods to send data to kafka topics.
Both asynchronous and synchronous methods are provided, with the async methods returning a `Future`.
[source, java]
----
ListenableFuture<SendResult<K, V>> send(V data);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> send(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> send(int partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(int partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
@@ -29,6 +31,8 @@ ListenableFuture<SendResult<K, V>> send(Message<?> message);
void flush();
----
The first 3 methods require that a default topic has been provided to the template.
To use the template, configure a producer factory and provide it in the template's constructor:
[source, java]

View File

@@ -59,10 +59,10 @@ public void testAutoCommit() throws Exception {
Thread.sleep(1000); // wait a bit for the container to start
KafkaTemplate<Integer, String> template = createTemplate();
template.setDefaultTopic(topic1);
template.send(0, "foo");
template.send(2, "bar");
template.send(0, "baz");
template.send(2, "qux");
template.sendDefault(0, "foo");
template.sendDefault(2, "bar");
template.sendDefault(0, "baz");
template.sendDefault(2, "qux");
template.flush();
assertTrue(latch.await(60, TimeUnit.SECONDS));
container.stop();

View File

@@ -89,11 +89,15 @@ Usage:
[source, java]
----
...
template.syncSend(0, 2, "bar");
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...
----
When the embedded server is started by JUnit, it sets a system property `spring.embedded.kafka.brokers` to the address
of the broker(s).
A convenient constant `KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS` is provided for this property.
==== Hamcrest Matchers
The `o.s.kafka.test.hamcrest.KafkaMatchers` provides the following matchers:
@@ -184,9 +188,9 @@ public class KafkaTemplateTests {
new DefaultKafkaProducerFactory<Integer, String>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.send("foo");
template.sendDefault("foo");
assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
template.send(0, 2, "bar");
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
@@ -207,7 +211,7 @@ The above uses the hamcrest matchers; with `AssertJ`, the final part looks like
----
...
assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.send(0, 2, "bar");
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received).has(key(2));
assertThat(received).has(partition(0));