GH-4: Fix Tangles

Resolves #4
Resolves #5

Move ContainerOffsetResetStrategy

Move Ack

Move Endpoints to config

Move Registrar Etc to config

* Fix JavaDocs warnings
This commit is contained in:
Gary Russell
2016-03-08 14:32:10 -05:00
committed by Artem Bilan
parent 47f1de132c
commit 3efd7d4f2d
26 changed files with 75 additions and 66 deletions

View File

@@ -34,6 +34,7 @@ public final class KafkaMatchers {
/**
* @param key the key
* @param <K> the type.
* @return a Matcher that matches the key in a consumer record.
*/
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) {
@@ -42,6 +43,7 @@ public final class KafkaMatchers {
/**
* @param value the value.
* @param <V> the type.
* @return a Matcher that matches the value in a consumer record.
*/
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) {

View File

@@ -106,7 +106,7 @@ import org.springframework.context.annotation.Import;
*
* Note that the created containers are not registered with the application context but
* can be easily located for management purposes using the
* {@link org.springframework.kafka.listener.KafkaListenerEndpointRegistry
* {@link org.springframework.kafka.config.KafkaListenerEndpointRegistry
* KafkaListenerEndpointRegistry}.
*
* <p>
@@ -133,7 +133,7 @@ import org.springframework.context.annotation.Import;
* <p>
* When more control is desired, a {@code @Configuration} class may implement
* {@link KafkaListenerConfigurer}. This allows access to the underlying
* {@link org.springframework.kafka.listener.KafkaListenerEndpointRegistrar
* {@link org.springframework.kafka.config.KafkaListenerEndpointRegistrar
* KafkaListenerEndpointRegistrar} instance. The following example demonstrates how to
* specify an explicit default {@code KafkaListenerContainerFactory}
*
@@ -162,7 +162,7 @@ import org.springframework.context.annotation.Import;
* </pre>
*
* It is also possible to specify a custom
* {@link org.springframework.kafka.listener.KafkaListenerEndpointRegistry
* {@link org.springframework.kafka.config.KafkaListenerEndpointRegistry
* KafkaListenerEndpointRegistry} in case you need more control on the way the containers
* are created and managed. The example below also demonstrates how to customize the
* {@code KafkaHandlerMethodFactory} to use with a custom
@@ -243,8 +243,8 @@ import org.springframework.context.annotation.Import;
*
* @see KafkaListener
* @see KafkaListenerAnnotationBeanPostProcessor
* @see org.springframework.kafka.listener.KafkaListenerEndpointRegistrar
* @see org.springframework.kafka.listener.KafkaListenerEndpointRegistry
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistrar
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)

View File

@@ -21,7 +21,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Role;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.listener.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
/**
* {@code @Configuration} class that registers a {@link KafkaListenerAnnotationBeanPostProcessor}

View File

@@ -32,7 +32,7 @@ import org.springframework.messaging.handler.annotation.MessageMapping;
* listener on the specified topics.
*
* The {@link #containerFactory()}
* identifies the {@link org.springframework.kafka.listener.KafkaListenerContainerFactory
* identifies the {@link org.springframework.kafka.config.KafkaListenerContainerFactory
* KafkaListenerContainerFactory} to use to build the Kafka listener container. If not
* set, a <em>default</em> container factory is assumed to be available with a bean
* name of {@code kafkaListenerContainerFactory} unless an explicit default has been
@@ -47,7 +47,7 @@ import org.springframework.messaging.handler.annotation.MessageMapping;
* <ul>
* <li>{@link org.apache.kafka.clients.consumer.ConsumerRecord} to
* access to the raw Kafka message</li>
* <li>{@link org.springframework.kafka.listener.Acknowledgment} to manually ack</li>
* <li>{@link org.springframework.kafka.support.Acknowledgment} to manually ack</li>
* <li>{@link org.springframework.messaging.handler.annotation.Payload @Payload}-annotated method
* arguments including the support of validation</li>
* <li>{@link org.springframework.messaging.handler.annotation.Header @Header}-annotated method
@@ -64,13 +64,13 @@ import org.springframework.messaging.handler.annotation.MessageMapping;
*
* <p>When defined at the method level, a listener container is created for each method. The
* {@link MessageListener} is a {@link MessagingMessageListenerAdapter}, configured with a
* {@link org.springframework.kafka.listener.MethodKafkaListenerEndpoint}.
* {@link org.springframework.kafka.config.MethodKafkaListenerEndpoint}.
*
* <p>When defined at the class level, a single message listener container is used to service
* all methods annotated with {@code @KafkaHandler}. Method signatures of such annotated
* methods must not cause any ambiguity such that a single method can be resolved for a
* particular inbound message. The {@link MessagingMessageListenerAdapter} is configured with
* a {@link org.springframework.kafka.listener.MultiMethodKafkaListenerEndpoint}.
* a {@link org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint}.
*
* @author Gary Russell
*
@@ -89,12 +89,12 @@ public @interface KafkaListener {
* The unique identifier of the container managing for this endpoint.
* <p>If none is specified an auto-generated one is provided.
* @return the {@code id} for the container managing for this endpoint.
* @see org.springframework.kafka.listener.KafkaListenerEndpointRegistry#getListenerContainer(String)
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
*/
String id() default "";
/**
* The bean name of the {@link org.springframework.kafka.listener.KafkaListenerContainerFactory}
* The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
* to use to create the message listener container responsible to serve this endpoint.
* <p>If not specified, the default container factory is used, if any.
* @return the container factory bean name.

View File

@@ -45,11 +45,11 @@ import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.listener.KafkaListenerContainerFactory;
import org.springframework.kafka.listener.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.listener.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
@@ -60,7 +60,7 @@ import org.springframework.util.StringUtils;
/**
* Bean post-processor that registers methods annotated with {@link KafkaListener}
* to be invoked by a Kafka message listener container created under the covers
* by a {@link org.springframework.kafka.listener.KafkaListenerContainerFactory}
* by a {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
* according to the parameters of the annotation.
*
* <p>Annotated methods can use flexible arguments as defined by {@link KafkaListener}.
@@ -82,14 +82,14 @@ import org.springframework.util.StringUtils;
* @see KafkaListenerConfigurer
* @see KafkaListenerEndpointRegistrar
* @see KafkaListenerEndpointRegistry
* @see org.springframework.kafka.listener.KafkaListenerEndpoint
* @see org.springframework.kafka.config.KafkaListenerEndpoint
* @see MethodKafkaListenerEndpoint
*/
public class KafkaListenerAnnotationBeanPostProcessor<K, V>
implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
/**
* The bean name of the default {@link org.springframework.kafka.listener.KafkaListenerContainerFactory}.
* The bean name of the default {@link org.springframework.kafka.config.KafkaListenerContainerFactory}.
*/
static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactory";

View File

@@ -17,13 +17,13 @@
package org.springframework.kafka.annotation;
import org.springframework.kafka.listener.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
/**
* Optional interface to be implemented by Spring managed bean willing
* to customize how Kafka listener endpoints are configured. Typically
* used to defined the default
* {@link org.springframework.kafka.listener.KafkaListenerContainerFactory
* {@link org.springframework.kafka.config.KafkaListenerContainerFactory
* KafkaListenerContainerFactory} to use or for registering Kafka endpoints
* in a <em>programmatic</em> fashion as opposed to the <em>declarative</em>
* approach of using the @{@link KafkaListener} annotation.
@@ -33,16 +33,16 @@ import org.springframework.kafka.listener.KafkaListenerEndpointRegistrar;
* @author Stephane Nicoll
*
* @see EnableKafka
* @see org.springframework.kafka.listener.KafkaListenerEndpointRegistrar
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistrar
*/
public interface KafkaListenerConfigurer {
/**
* Callback allowing a {@link org.springframework.kafka.listener.KafkaListenerEndpointRegistry
* KafkaListenerEndpointRegistry} and specific {@link org.springframework.kafka.listener.KafkaListenerEndpoint
* Callback allowing a {@link org.springframework.kafka.config.KafkaListenerEndpointRegistry
* KafkaListenerEndpointRegistry} and specific {@link org.springframework.kafka.config.KafkaListenerEndpoint
* KafkaListenerEndpoint} instances to be registered against the given
* {@link KafkaListenerEndpointRegistrar}. The default
* {@link org.springframework.kafka.listener.KafkaListenerContainerFactory KafkaListenerContainerFactory}
* {@link org.springframework.kafka.config.KafkaListenerContainerFactory KafkaListenerContainerFactory}
* can also be customized.
* @param registrar the registrar to be configured
*/

View File

@@ -23,8 +23,6 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.KafkaListenerContainerFactory;
import org.springframework.kafka.listener.KafkaListenerEndpoint;
/**
* Base {@link KafkaListenerContainerFactory} for Spring's base container implementation.

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.kafka.listener;
package org.springframework.kafka.config;
import java.util.ArrayList;
import java.util.Arrays;
@@ -31,6 +31,8 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanExpressionContext;
import org.springframework.beans.factory.config.BeanExpressionResolver;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.util.Assert;
/**

View File

@@ -14,9 +14,9 @@
* limitations under the License.
*/
package org.springframework.kafka.listener;
package org.springframework.kafka.config;
import org.springframework.kafka.listener.MessageListenerContainer;
/**
* Factory of {@link MessageListenerContainer} based on a

View File

@@ -14,13 +14,15 @@
* limitations under the License.
*/
package org.springframework.kafka.listener;
package org.springframework.kafka.config;
import java.util.Collection;
import java.util.regex.Pattern;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.listener.MessageListenerContainer;
/**
* Model for a Kafka listener endpoint. Can be used against a
* {@link org.springframework.kafka.annotation.KafkaListenerConfigurer

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.kafka.listener;
package org.springframework.kafka.config;
import java.util.ArrayList;
import java.util.List;

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.kafka.listener;
package org.springframework.kafka.config;
import java.util.ArrayList;
import java.util.Collection;
@@ -36,6 +36,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

View File

@@ -14,10 +14,11 @@
* limitations under the License.
*/
package org.springframework.kafka.listener;
package org.springframework.kafka.config;
import java.lang.reflect.Method;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.adapter.HandlerAdapter;
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.kafka.listener;
package org.springframework.kafka.config;
import java.lang.reflect.Method;
import java.util.ArrayList;

View File

@@ -21,9 +21,7 @@ import java.util.Collection;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer.ContainerOffsetResetStrategy;
import org.springframework.kafka.listener.KafkaListenerContainerFactory;
import org.springframework.kafka.listener.KafkaListenerEndpoint;
import org.springframework.kafka.listener.KafkaMessageListenerContainer.ContainerOffsetResetStrategy;
/**
* A {@link KafkaListenerContainerFactory} implementation to build a regular
@@ -63,7 +61,7 @@ public class SimpleKafkaListenerContainerFactory<K, V>
/**
* @param resetStrategy the reset strategy
* @see ConcurrentMessageListenerContainer#setResetStrategy(ContainerOffsetResetStrategy)
* @see ConcurrentMessageListenerContainer#setResetStrategy
*/
public void setResetStrategy(ContainerOffsetResetStrategy resetStrategy) {
this.resetStrategy = resetStrategy;

View File

@@ -17,8 +17,6 @@
package org.springframework.kafka.config;
import org.springframework.kafka.listener.AbstractKafkaListenerEndpoint;
import org.springframework.kafka.listener.KafkaListenerEndpoint;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;

View File

@@ -18,6 +18,8 @@ package org.springframework.kafka.listener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.support.Acknowledgment;
/**
* Listener for handling incoming Kafka messages, propagating an acknowledgment handle that recipients
* can invoke when the message has been processed.

View File

@@ -26,6 +26,7 @@ import java.util.regex.Pattern;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.KafkaMessageListenerContainer.ContainerOffsetResetStrategy;
import org.springframework.util.Assert;
/**
@@ -59,14 +60,15 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis
/**
* Construct an instance with the supplied configuration properties and specific
* topics/partitions - when using this constructor, a
* {@link #setResetStrategy(ContainerOffsetResetStrategy)} can be used.
* {@link #setResetStrategy(KafkaMessageListenerContainer.ContainerOffsetResetStrategy)
* ContainerOffsetResetStrategy} can be used.
* The topic partitions are distributed evenly across the delegate
* {@link KafkaMessageListenerContainer}s.
* @param consumerFactory the consumer factory.
* @param topicPartitions the topics/partitions; duplicates are eliminated.
*/
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
TopicPartition... topicPartitions) {
TopicPartition... topicPartitions) {
Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
Assert.notEmpty(topicPartitions, "A list of partitions must be provided");
Assert.noNullElements(topicPartitions, "The list of partitions cannot contain null elements");
@@ -80,7 +82,8 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis
/**
* Construct an instance with the supplied configuration properties and topics.
* When using this constructor, a
* {@link #setResetStrategy(ContainerOffsetResetStrategy)} cannot be used.
* {@link #setResetStrategy(KafkaMessageListenerContainer.ContainerOffsetResetStrategy)
* ContainerOffSetResetStrategy} cannot be used.
* @param consumerFactory the consumer factory.
* @param topics the topics.
*/
@@ -96,7 +99,8 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis
/**
* Construct an instance with the supplied configuration properties and topic
* pattern. When using this constructor, a
* {@link #setResetStrategy(ContainerOffsetResetStrategy)} cannot be used.
* {@link #setResetStrategy(KafkaMessageListenerContainer.ContainerOffsetResetStrategy)
* ContainerOffSetResetStrategy} cannot be used.
* @param consumerFactory the consumer factory.
* @param topicPattern the topic pattern.
*/
@@ -117,7 +121,7 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis
* <li>RECENT: Set to a recent message based on {@link #setRecentOffset(long) recentOffset}</li>
* </ul>
*
* @param resetStrategy the {@link ContainerOffsetResetStrategy}
* @param resetStrategy the {@link KafkaMessageListenerContainer.ContainerOffsetResetStrategy}
*/
public void setResetStrategy(ContainerOffsetResetStrategy resetStrategy) {
this.resetStrategy = resetStrategy;
@@ -125,7 +129,7 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis
/**
* Set the number of records back from the latest when using
* {@link ContainerOffsetResetStrategy#RECENT}.
* {@link KafkaMessageListenerContainer.ContainerOffsetResetStrategy#RECENT}.
* @param recentOffset the offset from the latest; default 1.
*/
public void setRecentOffset(long recentOffset) {
@@ -233,8 +237,4 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis
}
}
public enum ContainerOffsetResetStrategy {
LATEST, EARLIEST, NONE, RECENT
}
}

View File

@@ -42,7 +42,7 @@ import org.apache.kafka.common.errors.WakeupException;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer.ContainerOffsetResetStrategy;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.util.Assert;
@@ -133,9 +133,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
* <li>LATEST: Set to the last message; receive new messages only</li>
* <li>RECENT: Set to a recent message based on {@link #setRecentOffset(long) recentOffset}</li>
* </ul>
* @param resetStrategy the {@link ContainerOffsetResetStrategy}
* @param resetStrategy the {@link KafkaMessageListenerContainer.ContainerOffsetResetStrategy}
*/
public void setResetStrategy(ContainerOffsetResetStrategy resetStrategy) {
public void setResetStrategy(KafkaMessageListenerContainer.ContainerOffsetResetStrategy resetStrategy) {
this.resetStrategy = resetStrategy;
}
@@ -508,4 +508,8 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
}
public enum ContainerOffsetResetStrategy {
LATEST, EARLIEST, NONE, RECENT
}
}

View File

@@ -43,12 +43,12 @@ public abstract class AbstractAdaptableMessageListener<K, V> implements MessageL
/**
* Kafka {@link MessageListener} entry point.
* <p>
* Delegates the message to the target listener method, with appropriate conversion of the message argument. In case
* of an exception, the {@link #handleListenerException(Throwable)} method will be invoked.
* Delegates the message to the target listener method, with appropriate conversion of the message argument.
* In case of an exception, the {@link #handleListenerException(Throwable)} method will be invoked.
* <p>
* @param record the incoming Kafka {@link ConsumerRecord}.
* @see #handleListenerException
* @see #onMessage(ConsumerRecord, org.springframework.kafka.listener.Acknowledgment)
* @see AcknowledgingMessageListener#onMessage(ConsumerRecord, org.springframework.kafka.support.Acknowledgment)
*/
@Override
public void onMessage(ConsumerRecord<K, V> record) {

View File

@@ -18,8 +18,8 @@ package org.springframework.kafka.listener.adapter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.Acknowledgment;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.messaging.Message;

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.kafka.listener;
package org.springframework.kafka.support;
/**
* Handle for acknowledging the processing of a

View File

@@ -18,7 +18,7 @@ package org.springframework.kafka.support.converter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.Acknowledgment;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
/**

View File

@@ -20,7 +20,7 @@ import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.Acknowledgment;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;

View File

@@ -33,6 +33,8 @@ import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.SimpleKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@@ -40,12 +42,10 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.Acknowledgment;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.KafkaListenerContainerFactory;
import org.springframework.kafka.listener.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.messaging.handler.annotation.Header;

View File

@@ -50,7 +50,8 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer.ContainerOffsetResetStrategy;
import org.springframework.kafka.listener.KafkaMessageListenerContainer.ContainerOffsetResetStrategy;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;