GH-295: Unique client.id for Concurrent Container

Resolves: https://github.com/spring-projects/spring-kafka/issues/295

When JMX is enabled and a `client.id` property provided, we get an `InstanceAlreadyExistsException`
when using the `ConcurrentMessageListenerContainer` with `concurrency > 1`.

Add a mechanism to append the client index to the client.id.

Also fix private inner class ctors -> package to avoid synthetic ctor.

Fix deprecation warning and test

Conflicts:
	spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
	spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java
	src/reference/asciidoc/kafka.adoc

* Fix compilation warnings on the cast to `ClientIdSuffixAware`
This commit is contained in:
Gary Russell
2017-04-25 12:19:44 -04:00
committed by Artem Bilan
parent 943e48c32f
commit cbd7788ed8
5 changed files with 164 additions and 6 deletions

View File

@@ -0,0 +1,45 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.core;
import org.apache.kafka.clients.consumer.Consumer;
/**
* If a {@code ConsumerFactory} implements this interface, it supports customizing the
* {@code client.id} by adding a suffix, if a {@code client.id} property is provided.
*
* @deprecated - in 2.0, this method will move to {@link ConsumerFactory}.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
* @since 1.0.6
*
*/
@Deprecated
public interface ClientIdSuffixAware<K, V> {
/**
* Create a consumer, appending the suffix to the {@code client.id} property,
* if present.
* @param clientIdSuffix the suffix.
* @return the consumer.
*/
Consumer<K, V> createConsumer(String clientIdSuffix);
}

View File

@@ -16,6 +16,7 @@
package org.springframework.kafka.core;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -36,7 +37,8 @@ import org.apache.kafka.common.serialization.Deserializer;
* @author Gary Russell
* @author Murali Reddy
*/
public class DefaultKafkaConsumerFactory<K, V> implements ConsumerFactory<K, V> {
@SuppressWarnings("deprecation")
public class DefaultKafkaConsumerFactory<K, V> implements ConsumerFactory<K, V>, ClientIdSuffixAware<K, V> {
private final Map<String, Object> configs;
@@ -64,13 +66,44 @@ public class DefaultKafkaConsumerFactory<K, V> implements ConsumerFactory<K, V>
this.valueDeserializer = valueDeserializer;
}
/**
* Return an unmodifiable reference to the configuration map for this factory.
* Useful for cloning to make a similar factory.
* @return the configs.
* @since 1.0.6
*/
public Map<String, Object> getConfigurationProperties() {
return Collections.unmodifiableMap(this.configs);
}
@Override
public Consumer<K, V> createConsumer() {
return createKafkaConsumer();
}
@Override
public Consumer<K, V> createConsumer(String clientIdSuffix) {
return createKafkaConsumer(clientIdSuffix);
}
protected KafkaConsumer<K, V> createKafkaConsumer() {
return new KafkaConsumer<K, V>(this.configs, this.keyDeserializer, this.valueDeserializer);
return createKafkaConsumer(this.configs);
}
protected KafkaConsumer<K, V> createKafkaConsumer(String clientIdSuffix) {
if (!this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG) || clientIdSuffix == null) {
return createKafkaConsumer();
}
else {
Map<String, Object> modifiedClientIdConfigs = new HashMap<>(this.configs);
modifiedClientIdConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG,
modifiedClientIdConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG) + clientIdSuffix);
return createKafkaConsumer(modifiedClientIdConfigs);
}
}
protected KafkaConsumer<K, V> createKafkaConsumer(Map<String, Object> configs) {
return new KafkaConsumer<K, V>(configs, this.keyDeserializer, this.valueDeserializer);
}
@Override

View File

@@ -123,6 +123,7 @@ public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageLis
if (getApplicationEventPublisher() != null) {
container.setApplicationEventPublisher(getApplicationEventPublisher());
}
container.setClientIdSuffix("-" + i);
container.start();
this.containers.add(container);
}

View File

@@ -87,6 +87,8 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
private AcknowledgingMessageListener<K, V> acknowledgingMessageListener;
private String clientIdSuffix;
/**
* Construct an instance with the supplied configuration properties.
* @param consumerFactory the consumer factory.
@@ -117,6 +119,16 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
}
}
/**
* Set a suffix to add to the {@code client.id} consumer property (if the consumer
* factory supports it).
* @param clientIdSuffix the suffix to add.
* @since 1.0.6
*/
public void setClientIdSuffix(String clientIdSuffix) {
this.clientIdSuffix = clientIdSuffix;
}
/**
* Return the {@link TopicPartition}s currently assigned to this container,
* either explicitly or by Kafka; may be null if not assigned yet.
@@ -214,7 +226,9 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
@Override
public String toString() {
return "KafkaMessageListenerContainer [id=" + getBeanName() + ", topicPartitions=" + getAssignedPartitions()
return "KafkaMessageListenerContainer [id=" + getBeanName()
+ (this.clientIdSuffix != null ? ", clientIndex=" + this.clientIdSuffix : "")
+ ", topicPartitions=" + getAssignedPartitions()
+ "]";
}
@@ -278,10 +292,17 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
*/
private boolean paused;
private ListenerConsumer(MessageListener<K, V> listener, AcknowledgingMessageListener<K, V> ackListener) {
@SuppressWarnings({"deprecation", "unchecked"})
ListenerConsumer(MessageListener<K, V> listener, AcknowledgingMessageListener<K, V> ackListener) {
Assert.state(!this.isAnyManualAck || !this.autoCommit,
"Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
final Consumer<K, V> consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
final Consumer<K, V> consumer =
KafkaMessageListenerContainer.this.consumerFactory
instanceof org.springframework.kafka.core.ClientIdSuffixAware
? ((org.springframework.kafka.core.ClientIdSuffixAware<K, V>) KafkaMessageListenerContainer
.this.consumerFactory)
.createConsumer(KafkaMessageListenerContainer.this.clientIdSuffix)
: KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@@ -758,6 +779,10 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
private volatile Thread executingThread;
ListenerInvoker() {
super();
}
@Override
public void run() {
Assert.isTrue(this.active, "This instance is not active anymore");
@@ -831,7 +856,7 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
private final boolean immediate;
private ConsumerAcknowledgment(ConsumerRecord<K, V> record, boolean immediate) {
ConsumerAcknowledgment(ConsumerRecord<K, V> record, boolean immediate) {
this.record = record;
this.immediate = immediate;
}
@@ -866,6 +891,10 @@ public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListener
private static final Log logger = LogFactory.getLog(LoggingCommitCallback.class);
LoggingCommitCallback() {
super();
}
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {

View File

@@ -0,0 +1,50 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.core;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.Test;
/**
* @author Gary Russell
* @since 1.0.6
*
*/
public class DefaultKafkaConsumerFactoryTests {
@Test
public void testClientId() {
Map<String, Object> configs = Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, "foo");
DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<String, String>(configs) {
@Override
protected KafkaConsumer<String, String> createKafkaConsumer(Map<String, Object> configs) {
assertThat(configs.get(ConsumerConfig.CLIENT_ID_CONFIG)).isEqualTo("foo-1");
return null;
}
};
factory.createConsumer("-1");
}
}