diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/hamcrest/KafkaMatchers.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/hamcrest/KafkaMatchers.java index e0d7348c..25dd7ab9 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/hamcrest/KafkaMatchers.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/hamcrest/KafkaMatchers.java @@ -34,6 +34,7 @@ public final class KafkaMatchers { /** * @param key the key + * @param the type. * @return a Matcher that matches the key in a consumer record. */ public static Matcher> hasKey(K key) { @@ -42,6 +43,7 @@ public final class KafkaMatchers { /** * @param value the value. + * @param the type. * @return a Matcher that matches the value in a consumer record. */ public static Matcher> hasValue(V value) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java index 033eac77..e90f57da 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafka.java @@ -106,7 +106,7 @@ import org.springframework.context.annotation.Import; * * Note that the created containers are not registered with the application context but * can be easily located for management purposes using the - * {@link org.springframework.kafka.listener.KafkaListenerEndpointRegistry + * {@link org.springframework.kafka.config.KafkaListenerEndpointRegistry * KafkaListenerEndpointRegistry}. * *

@@ -133,7 +133,7 @@ import org.springframework.context.annotation.Import; *

* When more control is desired, a {@code @Configuration} class may implement * {@link KafkaListenerConfigurer}. This allows access to the underlying - * {@link org.springframework.kafka.listener.KafkaListenerEndpointRegistrar + * {@link org.springframework.kafka.config.KafkaListenerEndpointRegistrar * KafkaListenerEndpointRegistrar} instance. The following example demonstrates how to * specify an explicit default {@code KafkaListenerContainerFactory} * @@ -162,7 +162,7 @@ import org.springframework.context.annotation.Import; * * * It is also possible to specify a custom - * {@link org.springframework.kafka.listener.KafkaListenerEndpointRegistry + * {@link org.springframework.kafka.config.KafkaListenerEndpointRegistry * KafkaListenerEndpointRegistry} in case you need more control on the way the containers * are created and managed. The example below also demonstrates how to customize the * {@code KafkaHandlerMethodFactory} to use with a custom @@ -243,8 +243,8 @@ import org.springframework.context.annotation.Import; * * @see KafkaListener * @see KafkaListenerAnnotationBeanPostProcessor - * @see org.springframework.kafka.listener.KafkaListenerEndpointRegistrar - * @see org.springframework.kafka.listener.KafkaListenerEndpointRegistry + * @see org.springframework.kafka.config.KafkaListenerEndpointRegistrar + * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaBootstrapConfiguration.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaBootstrapConfiguration.java index 89620314..8cbc3e29 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaBootstrapConfiguration.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaBootstrapConfiguration.java @@ -21,7 +21,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Role; import org.springframework.kafka.config.KafkaListenerConfigUtils; -import org.springframework.kafka.listener.KafkaListenerEndpointRegistry; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; /** * {@code @Configuration} class that registers a {@link KafkaListenerAnnotationBeanPostProcessor} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java index eb172974..7643d4ea 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java @@ -32,7 +32,7 @@ import org.springframework.messaging.handler.annotation.MessageMapping; * listener on the specified topics. * * The {@link #containerFactory()} - * identifies the {@link org.springframework.kafka.listener.KafkaListenerContainerFactory + * identifies the {@link org.springframework.kafka.config.KafkaListenerContainerFactory * KafkaListenerContainerFactory} to use to build the Kafka listener container. If not * set, a default container factory is assumed to be available with a bean * name of {@code kafkaListenerContainerFactory} unless an explicit default has been @@ -47,7 +47,7 @@ import org.springframework.messaging.handler.annotation.MessageMapping; *

    *
  • {@link org.apache.kafka.clients.consumer.ConsumerRecord} to * access to the raw Kafka message
  • - *
  • {@link org.springframework.kafka.listener.Acknowledgment} to manually ack
  • + *
  • {@link org.springframework.kafka.support.Acknowledgment} to manually ack
  • *
  • {@link org.springframework.messaging.handler.annotation.Payload @Payload}-annotated method * arguments including the support of validation
  • *
  • {@link org.springframework.messaging.handler.annotation.Header @Header}-annotated method @@ -64,13 +64,13 @@ import org.springframework.messaging.handler.annotation.MessageMapping; * *

    When defined at the method level, a listener container is created for each method. The * {@link MessageListener} is a {@link MessagingMessageListenerAdapter}, configured with a - * {@link org.springframework.kafka.listener.MethodKafkaListenerEndpoint}. + * {@link org.springframework.kafka.config.MethodKafkaListenerEndpoint}. * *

    When defined at the class level, a single message listener container is used to service * all methods annotated with {@code @KafkaHandler}. Method signatures of such annotated * methods must not cause any ambiguity such that a single method can be resolved for a * particular inbound message. The {@link MessagingMessageListenerAdapter} is configured with - * a {@link org.springframework.kafka.listener.MultiMethodKafkaListenerEndpoint}. + * a {@link org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint}. * * @author Gary Russell * @@ -89,12 +89,12 @@ public @interface KafkaListener { * The unique identifier of the container managing for this endpoint. *

    If none is specified an auto-generated one is provided. * @return the {@code id} for the container managing for this endpoint. - * @see org.springframework.kafka.listener.KafkaListenerEndpointRegistry#getListenerContainer(String) + * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String) */ String id() default ""; /** - * The bean name of the {@link org.springframework.kafka.listener.KafkaListenerContainerFactory} + * The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory} * to use to create the message listener container responsible to serve this endpoint. *

    If not specified, the default container factory is used, if any. * @return the container factory bean name. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 5fe2199c..04eb7acf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -45,11 +45,11 @@ import org.springframework.context.expression.StandardBeanExpressionResolver; import org.springframework.core.Ordered; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.kafka.config.KafkaListenerConfigUtils; -import org.springframework.kafka.listener.KafkaListenerContainerFactory; -import org.springframework.kafka.listener.KafkaListenerEndpointRegistrar; -import org.springframework.kafka.listener.KafkaListenerEndpointRegistry; -import org.springframework.kafka.listener.MethodKafkaListenerEndpoint; -import org.springframework.kafka.listener.MultiMethodKafkaListenerEndpoint; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.config.MethodKafkaListenerEndpoint; +import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory; import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; @@ -60,7 +60,7 @@ import org.springframework.util.StringUtils; /** * Bean post-processor that registers methods annotated with {@link KafkaListener} * to be invoked by a Kafka message listener container created under the covers - * by a {@link org.springframework.kafka.listener.KafkaListenerContainerFactory} + * by a {@link org.springframework.kafka.config.KafkaListenerContainerFactory} * according to the parameters of the annotation. * *

    Annotated methods can use flexible arguments as defined by {@link KafkaListener}. @@ -82,14 +82,14 @@ import org.springframework.util.StringUtils; * @see KafkaListenerConfigurer * @see KafkaListenerEndpointRegistrar * @see KafkaListenerEndpointRegistry - * @see org.springframework.kafka.listener.KafkaListenerEndpoint + * @see org.springframework.kafka.config.KafkaListenerEndpoint * @see MethodKafkaListenerEndpoint */ public class KafkaListenerAnnotationBeanPostProcessor implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton { /** - * The bean name of the default {@link org.springframework.kafka.listener.KafkaListenerContainerFactory}. + * The bean name of the default {@link org.springframework.kafka.config.KafkaListenerContainerFactory}. */ static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactory"; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerConfigurer.java index 4a383807..99d5f677 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerConfigurer.java @@ -17,13 +17,13 @@ package org.springframework.kafka.annotation; -import org.springframework.kafka.listener.KafkaListenerEndpointRegistrar; +import org.springframework.kafka.config.KafkaListenerEndpointRegistrar; /** * Optional interface to be implemented by Spring managed bean willing * to customize how Kafka listener endpoints are configured. Typically * used to defined the default - * {@link org.springframework.kafka.listener.KafkaListenerContainerFactory + * {@link org.springframework.kafka.config.KafkaListenerContainerFactory * KafkaListenerContainerFactory} to use or for registering Kafka endpoints * in a programmatic fashion as opposed to the declarative * approach of using the @{@link KafkaListener} annotation. @@ -33,16 +33,16 @@ import org.springframework.kafka.listener.KafkaListenerEndpointRegistrar; * @author Stephane Nicoll * * @see EnableKafka - * @see org.springframework.kafka.listener.KafkaListenerEndpointRegistrar + * @see org.springframework.kafka.config.KafkaListenerEndpointRegistrar */ public interface KafkaListenerConfigurer { /** - * Callback allowing a {@link org.springframework.kafka.listener.KafkaListenerEndpointRegistry - * KafkaListenerEndpointRegistry} and specific {@link org.springframework.kafka.listener.KafkaListenerEndpoint + * Callback allowing a {@link org.springframework.kafka.config.KafkaListenerEndpointRegistry + * KafkaListenerEndpointRegistry} and specific {@link org.springframework.kafka.config.KafkaListenerEndpoint * KafkaListenerEndpoint} instances to be registered against the given * {@link KafkaListenerEndpointRegistrar}. The default - * {@link org.springframework.kafka.listener.KafkaListenerContainerFactory KafkaListenerContainerFactory} + * {@link org.springframework.kafka.config.KafkaListenerContainerFactory KafkaListenerContainerFactory} * can also be customized. * @param registrar the registrar to be configured */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java index ebe0ce2b..200ffc1e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java @@ -23,8 +23,6 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.listener.ErrorHandler; -import org.springframework.kafka.listener.KafkaListenerContainerFactory; -import org.springframework.kafka.listener.KafkaListenerEndpoint; /** * Base {@link KafkaListenerContainerFactory} for Spring's base container implementation. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java similarity index 97% rename from spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaListenerEndpoint.java rename to spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java index d629912d..739f71dd 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.kafka.listener; +package org.springframework.kafka.config; import java.util.ArrayList; import java.util.Arrays; @@ -31,6 +31,8 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.config.BeanExpressionContext; import org.springframework.beans.factory.config.BeanExpressionResolver; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.kafka.listener.MessageListener; +import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.util.Assert; /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java similarity index 91% rename from spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerContainerFactory.java rename to spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java index 7dc6e92a..d97883fd 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerContainerFactory.java @@ -14,9 +14,9 @@ * limitations under the License. */ -package org.springframework.kafka.listener; - +package org.springframework.kafka.config; +import org.springframework.kafka.listener.MessageListenerContainer; /** * Factory of {@link MessageListenerContainer} based on a diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java similarity index 95% rename from spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerEndpoint.java rename to spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java index 5dc7ebbe..d34cb68c 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java @@ -14,13 +14,15 @@ * limitations under the License. */ -package org.springframework.kafka.listener; +package org.springframework.kafka.config; import java.util.Collection; import java.util.regex.Pattern; import org.apache.kafka.common.TopicPartition; +import org.springframework.kafka.listener.MessageListenerContainer; + /** * Model for a Kafka listener endpoint. Can be used against a * {@link org.springframework.kafka.annotation.KafkaListenerConfigurer diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerEndpointRegistrar.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java similarity index 99% rename from spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerEndpointRegistrar.java rename to spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java index c5186a26..31783adc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerEndpointRegistrar.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.kafka.listener; +package org.springframework.kafka.config; import java.util.ArrayList; import java.util.List; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerEndpointRegistry.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java similarity index 98% rename from spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerEndpointRegistry.java rename to spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java index 25808b3f..d79d3e44 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerEndpointRegistry.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.kafka.listener; +package org.springframework.kafka.config; import java.util.ArrayList; import java.util.Collection; @@ -36,6 +36,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.SmartLifecycle; +import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.util.Assert; import org.springframework.util.StringUtils; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java similarity index 97% rename from spring-kafka/src/main/java/org/springframework/kafka/listener/MethodKafkaListenerEndpoint.java rename to spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index 45e47ca9..4bed83de 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -14,10 +14,11 @@ * limitations under the License. */ -package org.springframework.kafka.listener; +package org.springframework.kafka.config; import java.lang.reflect.Method; +import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.adapter.HandlerAdapter; import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter; import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/MultiMethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java similarity index 97% rename from spring-kafka/src/main/java/org/springframework/kafka/listener/MultiMethodKafkaListenerEndpoint.java rename to spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java index ff8bc3b6..4d3e29e1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/MultiMethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.kafka.listener; +package org.springframework.kafka.config; import java.lang.reflect.Method; import java.util.ArrayList; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerContainerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerContainerFactory.java index 41554efc..07bced4a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerContainerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerContainerFactory.java @@ -21,9 +21,7 @@ import java.util.Collection; import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer.ContainerOffsetResetStrategy; -import org.springframework.kafka.listener.KafkaListenerContainerFactory; -import org.springframework.kafka.listener.KafkaListenerEndpoint; +import org.springframework.kafka.listener.KafkaMessageListenerContainer.ContainerOffsetResetStrategy; /** * A {@link KafkaListenerContainerFactory} implementation to build a regular @@ -63,7 +61,7 @@ public class SimpleKafkaListenerContainerFactory /** * @param resetStrategy the reset strategy - * @see ConcurrentMessageListenerContainer#setResetStrategy(ContainerOffsetResetStrategy) + * @see ConcurrentMessageListenerContainer#setResetStrategy */ public void setResetStrategy(ContainerOffsetResetStrategy resetStrategy) { this.resetStrategy = resetStrategy; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerEndpoint.java index 0ce2b679..a5e7e92f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/SimpleKafkaListenerEndpoint.java @@ -17,8 +17,6 @@ package org.springframework.kafka.config; -import org.springframework.kafka.listener.AbstractKafkaListenerEndpoint; -import org.springframework.kafka.listener.KafkaListenerEndpoint; import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.listener.MessageListenerContainer; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingMessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingMessageListener.java index 0cae85f3..9d8d6154 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingMessageListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingMessageListener.java @@ -18,6 +18,8 @@ package org.springframework.kafka.listener; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.support.Acknowledgment; + /** * Listener for handling incoming Kafka messages, propagating an acknowledgment handle that recipients * can invoke when the message has been processed. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java index 3b014fbd..288a44df 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java @@ -26,6 +26,7 @@ import java.util.regex.Pattern; import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.listener.KafkaMessageListenerContainer.ContainerOffsetResetStrategy; import org.springframework.util.Assert; /** @@ -59,14 +60,15 @@ public class ConcurrentMessageListenerContainer extends AbstractMessageLis /** * Construct an instance with the supplied configuration properties and specific * topics/partitions - when using this constructor, a - * {@link #setResetStrategy(ContainerOffsetResetStrategy)} can be used. + * {@link #setResetStrategy(KafkaMessageListenerContainer.ContainerOffsetResetStrategy) + * ContainerOffsetResetStrategy} can be used. * The topic partitions are distributed evenly across the delegate * {@link KafkaMessageListenerContainer}s. * @param consumerFactory the consumer factory. * @param topicPartitions the topics/partitions; duplicates are eliminated. */ public ConcurrentMessageListenerContainer(ConsumerFactory consumerFactory, - TopicPartition... topicPartitions) { + TopicPartition... topicPartitions) { Assert.notNull(consumerFactory, "A ConsumerFactory must be provided"); Assert.notEmpty(topicPartitions, "A list of partitions must be provided"); Assert.noNullElements(topicPartitions, "The list of partitions cannot contain null elements"); @@ -80,7 +82,8 @@ public class ConcurrentMessageListenerContainer extends AbstractMessageLis /** * Construct an instance with the supplied configuration properties and topics. * When using this constructor, a - * {@link #setResetStrategy(ContainerOffsetResetStrategy)} cannot be used. + * {@link #setResetStrategy(KafkaMessageListenerContainer.ContainerOffsetResetStrategy) + * ContainerOffSetResetStrategy} cannot be used. * @param consumerFactory the consumer factory. * @param topics the topics. */ @@ -96,7 +99,8 @@ public class ConcurrentMessageListenerContainer extends AbstractMessageLis /** * Construct an instance with the supplied configuration properties and topic * pattern. When using this constructor, a - * {@link #setResetStrategy(ContainerOffsetResetStrategy)} cannot be used. + * {@link #setResetStrategy(KafkaMessageListenerContainer.ContainerOffsetResetStrategy) + * ContainerOffSetResetStrategy} cannot be used. * @param consumerFactory the consumer factory. * @param topicPattern the topic pattern. */ @@ -117,7 +121,7 @@ public class ConcurrentMessageListenerContainer extends AbstractMessageLis *

  • RECENT: Set to a recent message based on {@link #setRecentOffset(long) recentOffset}
  • *
* - * @param resetStrategy the {@link ContainerOffsetResetStrategy} + * @param resetStrategy the {@link KafkaMessageListenerContainer.ContainerOffsetResetStrategy} */ public void setResetStrategy(ContainerOffsetResetStrategy resetStrategy) { this.resetStrategy = resetStrategy; @@ -125,7 +129,7 @@ public class ConcurrentMessageListenerContainer extends AbstractMessageLis /** * Set the number of records back from the latest when using - * {@link ContainerOffsetResetStrategy#RECENT}. + * {@link KafkaMessageListenerContainer.ContainerOffsetResetStrategy#RECENT}. * @param recentOffset the offset from the latest; default 1. */ public void setRecentOffset(long recentOffset) { @@ -233,8 +237,4 @@ public class ConcurrentMessageListenerContainer extends AbstractMessageLis } } - public enum ContainerOffsetResetStrategy { - LATEST, EARLIEST, NONE, RECENT - } - } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 86f8810c..d1c125b7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -42,7 +42,7 @@ import org.apache.kafka.common.errors.WakeupException; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer.ContainerOffsetResetStrategy; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.scheduling.SchedulingAwareRunnable; import org.springframework.util.Assert; @@ -133,9 +133,9 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener *
  • LATEST: Set to the last message; receive new messages only
  • *
  • RECENT: Set to a recent message based on {@link #setRecentOffset(long) recentOffset}
  • * - * @param resetStrategy the {@link ContainerOffsetResetStrategy} + * @param resetStrategy the {@link KafkaMessageListenerContainer.ContainerOffsetResetStrategy} */ - public void setResetStrategy(ContainerOffsetResetStrategy resetStrategy) { + public void setResetStrategy(KafkaMessageListenerContainer.ContainerOffsetResetStrategy resetStrategy) { this.resetStrategy = resetStrategy; } @@ -508,4 +508,8 @@ public class KafkaMessageListenerContainer extends AbstractMessageListener } + public enum ContainerOffsetResetStrategy { + LATEST, EARLIEST, NONE, RECENT + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractAdaptableMessageListener.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractAdaptableMessageListener.java index 29fabfad..d36f425f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractAdaptableMessageListener.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractAdaptableMessageListener.java @@ -43,12 +43,12 @@ public abstract class AbstractAdaptableMessageListener implements MessageL /** * Kafka {@link MessageListener} entry point. *

    - * Delegates the message to the target listener method, with appropriate conversion of the message argument. In case - * of an exception, the {@link #handleListenerException(Throwable)} method will be invoked. + * Delegates the message to the target listener method, with appropriate conversion of the message argument. + * In case of an exception, the {@link #handleListenerException(Throwable)} method will be invoked. *

    * @param record the incoming Kafka {@link ConsumerRecord}. * @see #handleListenerException - * @see #onMessage(ConsumerRecord, org.springframework.kafka.listener.Acknowledgment) + * @see AcknowledgingMessageListener#onMessage(ConsumerRecord, org.springframework.kafka.support.Acknowledgment) */ @Override public void onMessage(ConsumerRecord record) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 586ce49f..e1a96bd8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -18,8 +18,8 @@ package org.springframework.kafka.listener.adapter; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.kafka.listener.Acknowledgment; import org.springframework.kafka.listener.ListenerExecutionFailedException; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.messaging.Message; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/Acknowledgment.java b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java similarity index 96% rename from spring-kafka/src/main/java/org/springframework/kafka/listener/Acknowledgment.java rename to spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java index 565fcce1..4e36a93b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/Acknowledgment.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package org.springframework.kafka.listener; +package org.springframework.kafka.support; /** * Handle for acknowledging the processing of a diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java index ac650939..4231dfe1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessageConverter.java @@ -18,7 +18,7 @@ package org.springframework.kafka.support.converter; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.kafka.listener.Acknowledgment; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.messaging.Message; /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java index 7b09580f..61b10bb2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java @@ -20,7 +20,7 @@ import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.kafka.listener.Acknowledgment; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index dca74ae0..c9ca20a1 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -33,6 +33,8 @@ import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.KafkaListenerContainerFactory; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.config.SimpleKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @@ -40,12 +42,10 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; -import org.springframework.kafka.listener.Acknowledgment; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; -import org.springframework.kafka.listener.KafkaListenerContainerFactory; -import org.springframework.kafka.listener.KafkaListenerEndpointRegistry; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.rule.KafkaEmbedded; import org.springframework.messaging.handler.annotation.Header; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java index 492c9a3c..ba95afbf 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java @@ -50,7 +50,8 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; -import org.springframework.kafka.listener.ConcurrentMessageListenerContainer.ContainerOffsetResetStrategy; +import org.springframework.kafka.listener.KafkaMessageListenerContainer.ContainerOffsetResetStrategy; +import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.test.rule.KafkaEmbedded; import org.springframework.kafka.test.utils.KafkaTestUtils;