From da88336b8324589228ccf9c81a62cee493b7d395 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Fri, 13 May 2016 11:24:11 -0400 Subject: [PATCH] GH-76: Resolve Template Ambiguity with String Keys Resolves #76 Polishing Introduce sendDefault() methods instead. More Polishing --- .../kafka/test/rule/KafkaEmbedded.java | 36 +++++++++ .../kafka/core/KafkaOperations.java | 6 +- .../kafka/core/KafkaTemplate.java | 6 +- .../kafka/core/KafkaTemplateTests.java | 76 +++++++++++++----- ...ncurrentMessageListenerContainerTests.java | 80 +++++++++---------- src/reference/asciidoc/kafka.adoc | 12 ++- src/reference/asciidoc/quick-tour.adoc | 8 +- src/reference/asciidoc/testing.adoc | 12 ++- 8 files changed, 156 insertions(+), 80 deletions(-) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java index 5721f104..d65a22da 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/KafkaEmbedded.java @@ -75,6 +75,8 @@ import scala.collection.Set; @SuppressWarnings("serial") public class KafkaEmbedded extends ExternalResource implements KafkaRule { + public static final String SPRING_EMBEDDED_KAFKA_BROKERS = "spring.embedded.kafka.brokers"; + public static final long METADATA_PROPAGATION_TIMEOUT = 10000L; private final int count; @@ -157,10 +159,12 @@ public class KafkaEmbedded extends ExternalResource implements KafkaRule { for (String topic : this.topics) { AdminUtils.createTopic(zkUtils, topic, this.partitionsPerTopic, this.count, props); } + System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString()); } @Override protected void after() { + System.getProperties().remove(SPRING_EMBEDDED_KAFKA_BROKERS); for (KafkaServer kafkaServer : this.kafkaServers) { try { if (kafkaServer.brokerState().currentState() != (NotRunning.state())) { @@ -362,6 +366,11 @@ public class KafkaEmbedded extends ExternalResource implements KafkaRule { return true; } + /** + * Subscribe a consumer to all the embedded topics. + * @param consumer the consumer. + * @throws Exception an exception. + */ public void consumeFromAllEmbeddedTopics(Consumer consumer) throws Exception { final CountDownLatch consumerLatch = new CountDownLatch(1); consumer.subscribe(Arrays.asList(this.topics), new ConsumerRebalanceListener() { @@ -382,4 +391,31 @@ public class KafkaEmbedded extends ExternalResource implements KafkaRule { .isTrue(); } + /** + * Subscribe a consumer to one of the embedded topics. + * @param consumer the consumer. + * @param topic the topic. + * @throws Exception an exception. + */ + public void consumeFromAnEmbeddedTopic(Consumer consumer, String topic) throws Exception { + assertThat(this.topics).as("topic is not in embedded topic list").contains(topic); + final CountDownLatch consumerLatch = new CountDownLatch(1); + consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() { + + @Override + public void onPartitionsRevoked(Collection partitions) { + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + consumerLatch.countDown(); + } + + }); + consumer.poll(0); // force assignment + assertThat(consumerLatch.await(30, TimeUnit.SECONDS)) + .as("Failed to be assigned partitions from the embedded topics") + .isTrue(); + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java index 1efd1524..f82f0594 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java @@ -36,7 +36,7 @@ public interface KafkaOperations { * @param data The data. * @return a Future for the {@link SendResult}. */ - ListenableFuture> send(V data); + ListenableFuture> sendDefault(V data); /** * Send the data to the default topic with the provided key and no partition. @@ -44,7 +44,7 @@ public interface KafkaOperations { * @param data The data. * @return a Future for the {@link SendResult}. */ - ListenableFuture> send(K key, V data); + ListenableFuture> sendDefault(K key, V data); /** * Send the data to the default topic with the provided key and partition. @@ -53,7 +53,7 @@ public interface KafkaOperations { * @param data the data. * @return a Future for the {@link SendResult}. */ - ListenableFuture> send(int partition, K key, V data); + ListenableFuture> sendDefault(int partition, K key, V data); /** * Send the data to the provided topic with no key or partition. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java index 5a6796ce..0a7f9d26 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java @@ -125,17 +125,17 @@ private final boolean autoFlush; } @Override - public ListenableFuture> send(V data) { + public ListenableFuture> sendDefault(V data) { return send(this.defaultTopic, data); } @Override - public ListenableFuture> send(K key, V data) { + public ListenableFuture> sendDefault(K key, V data) { return send(this.defaultTopic, key, data); } @Override - public ListenableFuture> send(int partition, K key, V data) { + public ListenableFuture> sendDefault(int partition, K key, V data) { return send(this.defaultTopic, partition, key, data); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java index 3909ef2c..074966fe 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java @@ -29,6 +29,10 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.assertj.core.api.Assertions; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -50,10 +54,12 @@ import org.springframework.util.concurrent.ListenableFutureCallback; */ public class KafkaTemplateTests { - private static final String TEMPLATE_TOPIC = "templateTopic"; + private static final String INT_KEY_TOPIC = "intKeyTopic"; + + private static final String STRING_KEY_TOPIC = "stringKeyTopic"; @ClassRule - public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEMPLATE_TOPIC); + public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, INT_KEY_TOPIC, STRING_KEY_TOPIC); private static Consumer consumer; @@ -63,39 +69,43 @@ public class KafkaTemplateTests { DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory( consumerProps); consumer = cf.createConsumer(); - embeddedKafka.consumeFromAllEmbeddedTopics(consumer); + embeddedKafka.consumeFromAnEmbeddedTopic(consumer, INT_KEY_TOPIC); } + @AfterClass + public static void tearDown() { + consumer.close(); + } @Test public void testTemplate() throws Exception { Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf, true); - template.setDefaultTopic(TEMPLATE_TOPIC); - template.send("foo"); - assertThat(KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC)).has(value("foo")); - template.send(0, 2, "bar"); - ConsumerRecord received = KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC); + template.setDefaultTopic(INT_KEY_TOPIC); + template.sendDefault("foo"); + assertThat(KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC)).has(value("foo")); + template.sendDefault(0, 2, "bar"); + ConsumerRecord received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC); assertThat(received).has(key(2)); assertThat(received).has(partition(0)); assertThat(received).has(value("bar")); - template.send(TEMPLATE_TOPIC, 0, 2, "baz"); - received = KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC); + template.send(INT_KEY_TOPIC, 0, 2, "baz"); + received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC); assertThat(received).has(key(2)); assertThat(received).has(partition(0)); assertThat(received).has(value("baz")); - template.send(TEMPLATE_TOPIC, 0, "qux"); - received = KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC); + template.send(INT_KEY_TOPIC, 0, "qux"); + received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC); assertThat(received).has(key((Integer) null)); assertThat(received).has(partition(0)); assertThat(received).has(value("qux")); template.send(MessageBuilder.withPayload("fiz") - .setHeader(KafkaHeaders.TOPIC, TEMPLATE_TOPIC) + .setHeader(KafkaHeaders.TOPIC, INT_KEY_TOPIC) .setHeader(KafkaHeaders.PARTITION_ID, 0) .setHeader(KafkaHeaders.MESSAGE_KEY, 2) .build()); - received = KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC); + received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC); assertThat(received).has(key(2)); assertThat(received).has(partition(0)); assertThat(received).has(value("fiz")); @@ -103,12 +113,11 @@ public class KafkaTemplateTests { .setHeader(KafkaHeaders.PARTITION_ID, 0) .setHeader(KafkaHeaders.MESSAGE_KEY, 2) .build()); - received = KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC); + received = KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC); assertThat(received).has(key(2)); assertThat(received).has(partition(0)); assertThat(received).has(value("buz")); - - consumer.close(); + pf.createProducer().close(); } @Test @@ -116,7 +125,7 @@ public class KafkaTemplateTests { Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); - template.setDefaultTopic(TEMPLATE_TOPIC); + template.setDefaultTopic(INT_KEY_TOPIC); final CountDownLatch latch = new CountDownLatch(1); template.setProducerListener(new ProducerListenerAdapter() { @@ -132,9 +141,10 @@ public class KafkaTemplateTests { } }); - template.send("foo"); + template.sendDefault("foo"); template.flush(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + pf.createProducer().close(); } @Test @@ -142,8 +152,8 @@ public class KafkaTemplateTests { Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf, true); - template.setDefaultTopic(TEMPLATE_TOPIC); - ListenableFuture> future = template.send("foo"); + template.setDefaultTopic(INT_KEY_TOPIC); + ListenableFuture> future = template.sendDefault("foo"); template.flush(); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference> theResult = new AtomicReference<>(); @@ -160,7 +170,29 @@ public class KafkaTemplateTests { } }); - assertThat(KafkaTestUtils.getSingleRecord(consumer, TEMPLATE_TOPIC)).has(value("foo")); + assertThat(KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC)).has(value("foo")); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + pf.createProducer().close(); } + + @Test + public void testTemplateDisambiguation() throws Exception { + Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(senderProps); + pf.setKeySerializer(new StringSerializer()); + KafkaTemplate template = new KafkaTemplate<>(pf, true); + template.setDefaultTopic(STRING_KEY_TOPIC); + Map consumerProps = KafkaTestUtils.consumerProps("testTString", "false", embeddedKafka); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(consumerProps); + cf.setKeyDeserializer(new StringDeserializer()); + Consumer consumer = cf.createConsumer(); + embeddedKafka.consumeFromAnEmbeddedTopic(consumer, STRING_KEY_TOPIC); + template.sendDefault("foo", "bar"); + template.flush(); + ConsumerRecord record = KafkaTestUtils.getSingleRecord(consumer, STRING_KEY_TOPIC); + assertThat(record).has(Assertions.>allOf(key("foo"), value("bar"))); + consumer.close(); + pf.createProducer().close(); + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index 550abea7..76b13ab1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -109,10 +109,10 @@ public class ConcurrentMessageListenerContainerTests { ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(topic1); - template.send(0, "foo"); - template.send(2, "bar"); - template.send(0, "baz"); - template.send(2, "qux"); + template.sendDefault(0, "foo"); + template.sendDefault(2, "bar"); + template.sendDefault(0, "baz"); + template.sendDefault(2, "qux"); template.flush(); assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); container.stop(); @@ -161,10 +161,10 @@ public class ConcurrentMessageListenerContainerTests { ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(topic1); - template.send(0, "foo"); - template.send(2, "bar"); - template.send(0, "baz"); - template.send(2, "qux"); + template.sendDefault(0, "foo"); + template.sendDefault(2, "bar"); + template.sendDefault(0, "baz"); + template.sendDefault(2, "qux"); template.flush(); assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(rebalancePartitionsAssignedLatch.await(60, TimeUnit.SECONDS)).isTrue(); @@ -197,10 +197,10 @@ public class ConcurrentMessageListenerContainerTests { ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(topic2); - template.send(0, "foo"); - template.send(2, "bar"); - template.send(0, "baz"); - template.send(2, "qux"); + template.sendDefault(0, "foo"); + template.sendDefault(2, "bar"); + template.sendDefault(0, "baz"); + template.sendDefault(2, "qux"); template.flush(); assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); container.stop(); @@ -273,10 +273,10 @@ public class ConcurrentMessageListenerContainerTests { ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(topic3); - template.send(0, "foo"); - template.send(2, "bar"); - template.send(0, "baz"); - template.send(2, "qux"); + template.sendDefault(0, "foo"); + template.sendDefault(2, "bar"); + template.sendDefault(0, "baz"); + template.sendDefault(2, "qux"); template.flush(); assertThat(latch1.await(60, TimeUnit.SECONDS)).isTrue(); @@ -365,10 +365,10 @@ public class ConcurrentMessageListenerContainerTests { ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(topic); - template.send(0, "foo"); - template.send(2, "bar"); - template.send(0, "baz"); - template.send(2, "qux"); + template.sendDefault(0, "foo"); + template.sendDefault(2, "bar"); + template.sendDefault(0, "baz"); + template.sendDefault(2, "qux"); template.flush(); assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); container.stop(); @@ -383,10 +383,10 @@ public class ConcurrentMessageListenerContainerTests { ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(topic7); - template.send(0, "foo"); - template.send(2, "bar"); - template.send(0, "baz"); - template.send(2, "qux"); + template.sendDefault(0, "foo"); + template.sendDefault(2, "bar"); + template.sendDefault(0, "baz"); + template.sendDefault(2, "qux"); template.flush(); Map props = KafkaTestUtils.consumerProps("testManualExisting", "false", embeddedKafka); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -422,10 +422,10 @@ public class ConcurrentMessageListenerContainerTests { }); container.start(); ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); - template.send(0, "fooo"); - template.send(2, "barr"); - template.send(0, "bazz"); - template.send(2, "quxx"); + template.sendDefault(0, "fooo"); + template.sendDefault(2, "barr"); + template.sendDefault(0, "bazz"); + template.sendDefault(2, "quxx"); template.flush(); assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); assertThat(commits.await(60, TimeUnit.SECONDS)).isTrue(); @@ -441,10 +441,10 @@ public class ConcurrentMessageListenerContainerTests { ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(topic8); - template.send(0, "foo"); - template.send(2, "bar"); - template.send(0, "baz"); - template.send(2, "qux"); + template.sendDefault(0, "foo"); + template.sendDefault(2, "bar"); + template.sendDefault(0, "baz"); + template.sendDefault(2, "qux"); template.flush(); Map props = KafkaTestUtils.consumerProps("testManualExistingSync", "false", embeddedKafka); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -467,10 +467,10 @@ public class ConcurrentMessageListenerContainerTests { container.setBeanName("testManualExisting"); container.start(); ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()); - template.send(0, "fooo"); - template.send(2, "barr"); - template.send(0, "bazz"); - template.send(2, "quxx"); + template.sendDefault(0, "fooo"); + template.sendDefault(2, "barr"); + template.sendDefault(0, "bazz"); + template.sendDefault(2, "quxx"); template.flush(); assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); container.stop(); @@ -549,10 +549,10 @@ public class ConcurrentMessageListenerContainerTests { ProducerFactory pf = new DefaultKafkaProducerFactory(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(topic6); - template.send(0, "foo"); - template.send(2, "bar"); - template.send(0, "baz"); - template.send(2, "qux"); + template.sendDefault(0, "foo"); + template.sendDefault(2, "bar"); + template.sendDefault(0, "baz"); + template.sendDefault(2, "qux"); template.flush(); assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); container.stop(); diff --git a/src/reference/asciidoc/kafka.adoc b/src/reference/asciidoc/kafka.adoc index a4de6443..2c92f2a7 100644 --- a/src/reference/asciidoc/kafka.adoc +++ b/src/reference/asciidoc/kafka.adoc @@ -1,18 +1,20 @@ [[kafka]] === Using Spring for Apache Kafka -==== Sending Messages with the KafkaTemplate +==== Sending Messages + +===== KafkaTemplate The `KafkaTemplate` wraps a producer and provides convenience methods to send data to kafka topics. Both asynchronous and synchronous methods are provided, with the async methods returning a `Future`. [source, java] ---- -ListenableFuture> send(V data); +ListenableFuture> sendDefault(V data); -ListenableFuture> send(K key, V data); +ListenableFuture> sendDefault(K key, V data); -ListenableFuture> send(int partition, K key, V data); +ListenableFuture> sendDefault(int partition, K key, V data); ListenableFuture> send(String topic, V data); @@ -29,6 +31,8 @@ ListenableFuture> send(Message message); void flush(); ---- +The first 3 methods require that a default topic has been provided to the template. + To use the template, configure a producer factory and provide it in the template's constructor: [source, java] diff --git a/src/reference/asciidoc/quick-tour.adoc b/src/reference/asciidoc/quick-tour.adoc index 93152650..aa364184 100644 --- a/src/reference/asciidoc/quick-tour.adoc +++ b/src/reference/asciidoc/quick-tour.adoc @@ -59,10 +59,10 @@ public void testAutoCommit() throws Exception { Thread.sleep(1000); // wait a bit for the container to start KafkaTemplate template = createTemplate(); template.setDefaultTopic(topic1); - template.send(0, "foo"); - template.send(2, "bar"); - template.send(0, "baz"); - template.send(2, "qux"); + template.sendDefault(0, "foo"); + template.sendDefault(2, "bar"); + template.sendDefault(0, "baz"); + template.sendDefault(2, "qux"); template.flush(); assertTrue(latch.await(60, TimeUnit.SECONDS)); container.stop(); diff --git a/src/reference/asciidoc/testing.adoc b/src/reference/asciidoc/testing.adoc index fe5a8c82..66603219 100644 --- a/src/reference/asciidoc/testing.adoc +++ b/src/reference/asciidoc/testing.adoc @@ -89,11 +89,15 @@ Usage: [source, java] ---- ... -template.syncSend(0, 2, "bar"); +template.sendDefault(0, 2, "bar"); ConsumerRecord received = KafkaTestUtils.getSingleRecord(consumer, "topic"); ... ---- +When the embedded server is started by JUnit, it sets a system property `spring.embedded.kafka.brokers` to the address +of the broker(s). +A convenient constant `KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS` is provided for this property. + ==== Hamcrest Matchers The `o.s.kafka.test.hamcrest.KafkaMatchers` provides the following matchers: @@ -184,9 +188,9 @@ public class KafkaTemplateTests { new DefaultKafkaProducerFactory(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(TEMPLATE_TOPIC); - template.send("foo"); + template.sendDefault("foo"); assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo")); - template.send(0, 2, "bar"); + template.sendDefault(0, 2, "bar"); ConsumerRecord received = records.poll(10, TimeUnit.SECONDS); assertThat(received, hasKey(2)); assertThat(received, hasPartition(0)); @@ -207,7 +211,7 @@ The above uses the hamcrest matchers; with `AssertJ`, the final part looks like ---- ... assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo")); - template.send(0, 2, "bar"); + template.sendDefault(0, 2, "bar"); ConsumerRecord received = records.poll(10, TimeUnit.SECONDS); assertThat(received).has(key(2)); assertThat(received).has(partition(0));