Use KafkaBinderConfigurationProperties directly

Fixes #548

- Remove redundant property copying
- Make `socketBufferSize` configurable (it wasn't provided as a configuration option)
This commit is contained in:
Marius Bogoevici
2016-05-25 17:49:17 -04:00
parent 1d0031d356
commit 51a451300a
8 changed files with 131 additions and 320 deletions

View File

@@ -1,58 +0,0 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import kafka.serializer.Decoder;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;
import org.springframework.util.Assert;
/**
* A Kafka encoder / decoder used to serialize a single int, used as the kafka partition key.
*
* @author Eric Bottard
*/
public class IntegerEncoderDecoder implements Encoder<Integer>, Decoder<Integer> {
public IntegerEncoderDecoder() {
this(new VerifiableProperties());
}
public IntegerEncoderDecoder(VerifiableProperties properties) {
}
@Override
public Integer fromBytes(byte[] bytes) {
Assert.isTrue(bytes.length == 4);
return bytes[0] << 24 | (bytes[1] & 0xFF) << 16 | (bytes[2] & 0xFF) << 8 | (bytes[3] & 0xFF);
}
@Override
public byte[] toBytes(Integer message) {
int value = message.intValue();
return new byte[] {
(byte) (value >>> 24),
(byte) (value >>> 16),
(byte) (value >>> 8),
(byte) value
};
}
}

View File

@@ -30,6 +30,7 @@ import scala.collection.Seq;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.Partition;
@@ -42,16 +43,21 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
private final KafkaMessageChannelBinder binder;
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder) {
private final KafkaBinderConfigurationProperties configurationProperties;
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
KafkaBinderConfigurationProperties configurationProperties) {
this.binder = binder;
this.configurationProperties = configurationProperties;
}
@Override
public Health health() {
ZkClient zkClient = null;
try {
zkClient = new ZkClient(binder.getZkAddress(), binder.getZkSessionTimeout(),
binder.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$);
zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(),
configurationProperties.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$);
Set<String> brokersInClusterSet = new HashSet<>();
Seq<Broker> allBrokersInCluster = ZkUtils$.MODULE$.getAllBrokersInCluster(zkClient);
Collection<Broker> brokersInCluster = JavaConversions.asJavaCollection(allBrokersInCluster);

View File

@@ -64,6 +64,7 @@ import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.HeaderMode;
import org.springframework.cloud.stream.binder.MessageValues;
import org.springframework.cloud.stream.binder.PartitionHandler;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.http.MediaType;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.endpoint.EventDrivenConsumer;
@@ -117,12 +118,10 @@ import org.springframework.util.StringUtils;
* @author Mark Fisher
* @author Soby Chacko
*/
public class KafkaMessageChannelBinder
extends
AbstractBinder<MessageChannel, ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>>
public class KafkaMessageChannelBinder extends
AbstractBinder<MessageChannel, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>>
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties>,
DisposableBean {
DisposableBean {
public static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer();
@@ -134,91 +133,41 @@ public class KafkaMessageChannelBinder
DAEMON_THREAD_FACTORY = threadFactory;
}
private boolean autoCreateTopics = true;
private final KafkaBinderConfigurationProperties configurationProperties;
private boolean autoAddPartitions;
private final String[] headersToMap;
private RetryOperations metadataRetryOperations;
private final Map<String, Collection<Partition>> topicsInUse = new HashMap<>();
private final ZookeeperConnect zookeeperConnect;
private final String brokers;
private String[] headersToMap;
private final String zkAddress;
// -------- Default values for properties -------
private int replicationFactor = 1;
private int requiredAcks = 1;
private int queueSize = 1024;
private int maxWait = 100;
private int fetchSize = 1024 * 1024;
private int minPartitionCount = 1;
private ConnectionFactory connectionFactory;
private int socketBufferSize = 2097152;
private int offsetUpdateTimeWindow = 10000;
private int offsetUpdateCount;
private int offsetUpdateShutdownTimeout = 2000;
private int zkSessionTimeout = 10000;
private int zkConnectionTimeout = 10000;
private ProducerListener producerListener;
private volatile Producer<byte[], byte[]> dlqProducer;
private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties();
public KafkaMessageChannelBinder(ZookeeperConnect zookeeperConnect, String brokers, String zkAddress,
String... headersToMap) {
this.zookeeperConnect = zookeeperConnect;
this.brokers = brokers;
this.zkAddress = zkAddress;
if (ObjectUtils.isEmpty(headersToMap)) {
public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties) {
this.configurationProperties = configurationProperties;
String[] configuredHeaders = configurationProperties.getHeaders();
if (ObjectUtils.isEmpty(configuredHeaders)) {
this.headersToMap = BinderHeaders.STANDARD_HEADERS;
}
else {
String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0,
BinderHeaders.STANDARD_HEADERS.length + headersToMap.length);
System.arraycopy(headersToMap, 0, combinedHeadersToMap, BinderHeaders.STANDARD_HEADERS.length,
headersToMap.length);
BinderHeaders.STANDARD_HEADERS.length + configuredHeaders.length);
System.arraycopy(configuredHeaders, 0, combinedHeadersToMap, BinderHeaders.STANDARD_HEADERS.length,
configuredHeaders.length);
this.headersToMap = combinedHeadersToMap;
}
}
String getZkAddress() {
return this.zkAddress;
}
public void setSocketBufferSize(int socketBufferSize) {
this.socketBufferSize = socketBufferSize;
}
public void setOffsetUpdateTimeWindow(int offsetUpdateTimeWindow) {
this.offsetUpdateTimeWindow = offsetUpdateTimeWindow;
}
public void setOffsetUpdateCount(int offsetUpdateCount) {
this.offsetUpdateCount = offsetUpdateCount;
}
public void setOffsetUpdateShutdownTimeout(int offsetUpdateShutdownTimeout) {
this.offsetUpdateShutdownTimeout = offsetUpdateShutdownTimeout;
return this.configurationProperties.getZkConnectionString();
}
public ConnectionFactory getConnectionFactory() {
@@ -243,9 +192,10 @@ public class KafkaMessageChannelBinder
@Override
public void onInit() throws Exception {
ZookeeperConfiguration configuration = new ZookeeperConfiguration(this.zookeeperConnect);
configuration.setBufferSize(socketBufferSize);
configuration.setMaxWait(maxWait);
ZookeeperConfiguration configuration = new ZookeeperConfiguration(
new ZookeeperConnect(configurationProperties.getZkConnectionString()));
configuration.setBufferSize(configurationProperties.getSocketBufferSize());
configuration.setMaxWait(configurationProperties.getMaxWait());
DefaultConnectionFactory defaultConnectionFactory = new DefaultConnectionFactory(configuration);
defaultConnectionFactory.afterPropertiesSet();
this.connectionFactory = defaultConnectionFactory;
@@ -292,62 +242,6 @@ public class KafkaMessageChannelBinder
}
}
public void setReplicationFactor(int replicationFactor) {
this.replicationFactor = replicationFactor;
}
public void setRequiredAcks(int requiredAcks) {
this.requiredAcks = requiredAcks;
}
public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}
public void setFetchSize(int fetchSize) {
this.fetchSize = fetchSize;
}
public void setMinPartitionCount(int minPartitionCount) {
this.minPartitionCount = minPartitionCount;
}
public void setMaxWait(int maxWait) {
this.maxWait = maxWait;
}
public int getZkSessionTimeout() {
return this.zkSessionTimeout;
}
public void setZkSessionTimeout(int zkSessionTimeout) {
this.zkSessionTimeout = zkSessionTimeout;
}
public int getZkConnectionTimeout() {
return this.zkConnectionTimeout;
}
public void setZkConnectionTimeout(int zkConnectionTimeout) {
this.zkConnectionTimeout = zkConnectionTimeout;
}
public boolean isAutoCreateTopics() {
return autoCreateTopics;
}
public void setAutoCreateTopics(boolean autoCreateTopics) {
this.autoCreateTopics = autoCreateTopics;
}
public boolean isAutoAddPartitions() {
return autoAddPartitions;
}
public void setAutoAddPartitions(boolean autoAddPartitions) {
this.autoAddPartitions = autoAddPartitions;
}
@Override
public KafkaConsumerProperties getExtendedConsumerProperties(String channelName) {
return extendedBindingProperties.getExtendedConsumerProperties(channelName);
@@ -403,9 +297,9 @@ public class KafkaMessageChannelBinder
if (properties.getPartitionCount() < partitions.size()) {
if (logger.isInfoEnabled()) {
logger.info("The `partitionCount` of the producer for topic " + name + " is " +
properties.getPartitionCount() + ", smaller than the actual partition count of " +
partitions.size() + " of the topic. The larger number will be used instead.");
logger.info("The `partitionCount` of the producer for topic " + name + " is "
+ properties.getPartitionCount() + ", smaller than the actual partition count of "
+ partitions.size() + " of the topic. The larger number will be used instead.");
}
}
@@ -417,11 +311,11 @@ public class KafkaMessageChannelBinder
producerMetadata.setCompressionType(properties.getExtension().getCompressionType());
producerMetadata.setBatchBytes(properties.getExtension().getBufferSize());
Properties additionalProps = new Properties();
additionalProps.put(ProducerConfig.ACKS_CONFIG, String.valueOf(requiredAcks));
additionalProps.put(ProducerConfig.ACKS_CONFIG, String.valueOf(configurationProperties.getRequiredAcks()));
additionalProps.put(ProducerConfig.LINGER_MS_CONFIG,
String.valueOf(properties.getExtension().getBatchTimeout()));
ProducerFactoryBean<byte[], byte[]> producerFB = new ProducerFactoryBean<>(producerMetadata, brokers,
additionalProps);
ProducerFactoryBean<byte[], byte[]> producerFB = new ProducerFactoryBean<>(producerMetadata,
configurationProperties.getKafkaConnectionString(), additionalProps);
try {
final ProducerConfiguration<byte[], byte[]> producerConfiguration = new ProducerConfiguration<>(
@@ -456,35 +350,39 @@ public class KafkaMessageChannelBinder
*/
private Collection<Partition> ensureTopicCreated(final String topicName, final int partitionCount) {
final ZkClient zkClient = new ZkClient(zkAddress, getZkSessionTimeout(), getZkConnectionTimeout(),
final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(),
configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(),
ZKStringSerializer$.MODULE$);
try {
final Properties topicConfig = new Properties();
TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient);
if (topicMetadata.errorCode() == ErrorMapping.NoError()) {
// only consider minPartitionCount for resizing if autoAddPartitions is true
int effectivePartitionCount = isAutoAddPartitions() ? Math.max(minPartitionCount, partitionCount)
: partitionCount;
// only consider minPartitionCount for resizing if autoAddPartitions is
// true
int effectivePartitionCount = configurationProperties.isAutoAddPartitions()
? Math.max(configurationProperties.getMinPartitionCount(), partitionCount) : partitionCount;
if (topicMetadata.partitionsMetadata().size() < effectivePartitionCount) {
if (isAutoAddPartitions()) {
if (configurationProperties.isAutoAddPartitions()) {
AdminUtils.addPartitions(zkClient, topicName, effectivePartitionCount, null, false,
new Properties());
}
else {
int topicSize = topicMetadata.partitionsMetadata().size();
throw new BinderException("The number of expected partitions was: " + partitionCount
+ ", but " + topicSize + (topicSize > 1 ? " have " : " has ") + "been found instead." +
"Consider either increasing the partition count of the topic or enabling `autoAddPartitions`");
throw new BinderException("The number of expected partitions was: " + partitionCount + ", but "
+ topicSize + (topicSize > 1 ? " have " : " has ") + "been found instead."
+ "Consider either increasing the partition count of the topic or enabling `autoAddPartitions`");
}
}
}
else if (topicMetadata.errorCode() == ErrorMapping.UnknownTopicOrPartitionCode()) {
if (isAutoCreateTopics()) {
if (configurationProperties.isAutoCreateTopics()) {
Seq<Object> brokerList = ZkUtils.getSortedBrokerList(zkClient);
// always consider minPartitionCount for topic creation
int effectivePartitionCount = Math.max(this.minPartitionCount, partitionCount);
int effectivePartitionCount = Math.max(configurationProperties.getMinPartitionCount(),
partitionCount);
final scala.collection.Map<Object, Seq<Object>> replicaAssignment = AdminUtils
.assignReplicasToBrokers(brokerList, effectivePartitionCount, replicationFactor, -1, -1);
.assignReplicasToBrokers(brokerList, effectivePartitionCount,
configurationProperties.getReplicationFactor(), -1, -1);
metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() {
@Override
public Object doWithRetry(RetryContext context) throws RuntimeException {
@@ -580,8 +478,8 @@ public class KafkaMessageChannelBinder
offsetManager.resetOffsets(listenedPartitions);
}
messageListenerContainer.setOffsetManager(offsetManager);
messageListenerContainer.setQueueSize(queueSize);
messageListenerContainer.setMaxFetch(fetchSize);
messageListenerContainer.setQueueSize(configurationProperties.getQueueSize());
messageListenerContainer.setMaxFetch(configurationProperties.getFetchSize());
int concurrency = Math.min(properties.getConcurrency(), listenedPartitions.size());
messageListenerContainer.setConcurrency(concurrency);
@@ -630,6 +528,7 @@ public class KafkaMessageChannelBinder
messageListenerContainer.setMessageListener(new AcknowledgingMessageListener() {
final AcknowledgingMessageListener originalMessageListener = (AcknowledgingMessageListener) messageListenerContainer
.getMessageListener();
@Override
public void onMessage(final KafkaMessage message, final Acknowledgment acknowledgment) {
retryTemplate.execute(new RetryCallback<Object, RuntimeException>() {
@@ -723,11 +622,11 @@ public class KafkaMessageChannelBinder
producerMetadata.setCompressionType(ProducerMetadata.CompressionType.none);
producerMetadata.setBatchBytes(16384);
Properties additionalProps = new Properties();
additionalProps.put(ProducerConfig.ACKS_CONFIG, String.valueOf(requiredAcks));
additionalProps.put(ProducerConfig.LINGER_MS_CONFIG,
String.valueOf(0));
additionalProps.put(ProducerConfig.ACKS_CONFIG,
String.valueOf(configurationProperties.getRequiredAcks()));
additionalProps.put(ProducerConfig.LINGER_MS_CONFIG, String.valueOf(0));
ProducerFactoryBean<byte[], byte[]> producerFactoryBean = new ProducerFactoryBean<>(
producerMetadata, brokers, additionalProps);
producerMetadata, configurationProperties.getKafkaConnectionString(), additionalProps);
dlqProducer = producerFactoryBean.getObject();
}
}
@@ -742,15 +641,16 @@ public class KafkaMessageChannelBinder
try {
KafkaNativeOffsetManager kafkaOffsetManager = new KafkaNativeOffsetManager(connectionFactory,
zookeeperConnect, Collections.<Partition, Long>emptyMap());
new ZookeeperConnect(configurationProperties.getZkConnectionString()),
Collections.<Partition, Long>emptyMap());
kafkaOffsetManager.setConsumerId(group);
kafkaOffsetManager.setReferenceTimestamp(referencePoint);
kafkaOffsetManager.afterPropertiesSet();
WindowingOffsetManager windowingOffsetManager = new WindowingOffsetManager(kafkaOffsetManager);
windowingOffsetManager.setTimespan(offsetUpdateTimeWindow);
windowingOffsetManager.setCount(offsetUpdateCount);
windowingOffsetManager.setShutdownTimeout(offsetUpdateShutdownTimeout);
windowingOffsetManager.setTimespan(configurationProperties.getOffsetUpdateTimeWindow());
windowingOffsetManager.setCount(configurationProperties.getOffsetUpdateCount());
windowingOffsetManager.setShutdownTimeout(configurationProperties.getOffsetUpdateShutdownTimeout());
windowingOffsetManager.afterPropertiesSet();
return windowingOffsetManager;
@@ -760,7 +660,6 @@ public class KafkaMessageChannelBinder
}
}
private String toDisplayString(String original, int maxCharacters) {
if (original.length() <= maxCharacters) {
return original;
@@ -880,8 +779,7 @@ public class KafkaMessageChannelBinder
}
public enum StartOffset {
earliest(OffsetRequest.EarliestTime()),
latest(OffsetRequest.LatestTime());
earliest(OffsetRequest.EarliestTime()), latest(OffsetRequest.LatestTime());
private final long referencePoint;

View File

@@ -31,7 +31,6 @@ import org.springframework.context.annotation.Import;
import org.springframework.integration.codec.Codec;
import org.springframework.integration.kafka.support.LoggingProducerListener;
import org.springframework.integration.kafka.support.ProducerListener;
import org.springframework.integration.kafka.support.ZookeeperConnect;
/**
* @author David Turanski
@@ -42,15 +41,15 @@ import org.springframework.integration.kafka.support.ZookeeperConnect;
*/
@Configuration
@ConditionalOnMissingBean(Binder.class)
@Import({KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class})
@EnableConfigurationProperties({KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class})
@Import({ KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class })
@EnableConfigurationProperties({ KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class })
public class KafkaBinderConfiguration {
@Autowired
private Codec codec;
@Autowired
private KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties;
private KafkaBinderConfigurationProperties configurationProperties;
@Autowired
private KafkaExtendedBindingProperties kafkaExtendedBindingProperties;
@@ -58,36 +57,10 @@ public class KafkaBinderConfiguration {
@Autowired
private ProducerListener producerListener;
@Bean
ZookeeperConnect zookeeperConnect() {
ZookeeperConnect zookeeperConnect = new ZookeeperConnect();
zookeeperConnect.setZkConnect(kafkaBinderConfigurationProperties.getZkConnectionString());
return zookeeperConnect;
}
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder() {
String[] headers = kafkaBinderConfigurationProperties.getHeaders();
String kafkaConnectionString = kafkaBinderConfigurationProperties.getKafkaConnectionString();
String zkConnectionString = kafkaBinderConfigurationProperties.getZkConnectionString();
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
zookeeperConnect(), kafkaConnectionString, zkConnectionString, headers);
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(configurationProperties);
kafkaMessageChannelBinder.setCodec(codec);
kafkaMessageChannelBinder.setOffsetUpdateTimeWindow(kafkaBinderConfigurationProperties.getOffsetUpdateTimeWindow());
kafkaMessageChannelBinder.setOffsetUpdateCount(kafkaBinderConfigurationProperties.getOffsetUpdateCount());
kafkaMessageChannelBinder.setOffsetUpdateShutdownTimeout(kafkaBinderConfigurationProperties.getOffsetUpdateShutdownTimeout());
kafkaMessageChannelBinder.setZkSessionTimeout(kafkaBinderConfigurationProperties.getZkSessionTimeout());
kafkaMessageChannelBinder.setZkConnectionTimeout(kafkaBinderConfigurationProperties.getZkConnectionTimeout());
kafkaMessageChannelBinder.setFetchSize(kafkaBinderConfigurationProperties.getFetchSize());
kafkaMessageChannelBinder.setMinPartitionCount(kafkaBinderConfigurationProperties.getMinPartitionCount());
kafkaMessageChannelBinder.setQueueSize(kafkaBinderConfigurationProperties.getQueueSize());
kafkaMessageChannelBinder.setReplicationFactor(kafkaBinderConfigurationProperties.getReplicationFactor());
kafkaMessageChannelBinder.setRequiredAcks(kafkaBinderConfigurationProperties.getRequiredAcks());
kafkaMessageChannelBinder.setMaxWait(kafkaBinderConfigurationProperties.getMaxWait());
kafkaMessageChannelBinder.setAutoCreateTopics(kafkaBinderConfigurationProperties.isAutoCreateTopics());
kafkaMessageChannelBinder.setAutoAddPartitions(kafkaBinderConfigurationProperties.isAutoAddPartitions());
kafkaMessageChannelBinder.setProducerListener(producerListener);
kafkaMessageChannelBinder.setExtendedBindingProperties(kafkaExtendedBindingProperties);
return kafkaMessageChannelBinder;
@@ -101,6 +74,6 @@ public class KafkaBinderConfiguration {
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder);
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, configurationProperties);
}
}

View File

@@ -27,11 +27,11 @@ import org.springframework.util.StringUtils;
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.binder")
public class KafkaBinderConfigurationProperties {
private String[] zkNodes = new String[] {"localhost"};
private String[] zkNodes = new String[] { "localhost" };
private String defaultZkPort = "2181";
private String[] brokers = new String[] {"localhost"};
private String[] brokers = new String[] { "localhost" };
private String defaultBrokerPort = "9092";
@@ -49,6 +49,8 @@ public class KafkaBinderConfigurationProperties {
private boolean autoAddPartitions;
private int socketBufferSize = 2097152;
/**
* ZK session timeout in milliseconds.
*/
@@ -97,7 +99,7 @@ public class KafkaBinderConfigurationProperties {
return zkNodes;
}
public void setZkNodes(String[] zkNodes) {
public void setZkNodes(String... zkNodes) {
this.zkNodes = zkNodes;
}
@@ -109,7 +111,7 @@ public class KafkaBinderConfigurationProperties {
return brokers;
}
public void setBrokers(String[] brokers) {
public void setBrokers(String... brokers) {
this.brokers = brokers;
}
@@ -117,7 +119,7 @@ public class KafkaBinderConfigurationProperties {
this.defaultBrokerPort = defaultBrokerPort;
}
public void setHeaders(String[] headers) {
public void setHeaders(String... headers) {
this.headers = headers;
}
@@ -230,4 +232,12 @@ public class KafkaBinderConfigurationProperties {
public void setAutoAddPartitions(boolean autoAddPartitions) {
this.autoAddPartitions = autoAddPartitions;
}
public int getSocketBufferSize() {
return socketBufferSize;
}
public void setSocketBufferSize(int socketBufferSize) {
this.socketBufferSize = socketBufferSize;
}
}

View File

@@ -45,7 +45,6 @@ import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.TopicNotFoundException;
import org.springframework.integration.kafka.support.ProducerConfiguration;
import org.springframework.integration.kafka.support.ProducerMetadata;
import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@@ -97,11 +96,19 @@ public class KafkaBinderTests extends
@Override
protected KafkaTestBinder getBinder() {
if (binder == null) {
binder = new KafkaTestBinder(kafkaTestSupport, new KafkaBinderConfigurationProperties());
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binder = new KafkaTestBinder(binderConfiguration);
}
return binder;
}
private KafkaBinderConfigurationProperties createConfigurationProperties() {
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
binderConfiguration.setBrokers(kafkaTestSupport.getBrokerAddress());
binderConfiguration.setZkNodes(kafkaTestSupport.getZkConnectString());
return binderConfiguration;
}
@Override
protected ExtendedConsumerProperties<KafkaConsumerProperties> createConsumerProperties() {
return new ExtendedConsumerProperties<>(new KafkaConsumerProperties());
@@ -216,9 +223,9 @@ public class KafkaBinderTests extends
byte[] ratherBigPayload = new byte[2048];
Arrays.fill(ratherBigPayload, (byte) 65);
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setMinPartitionCount(10);
KafkaTestBinder binder = new KafkaTestBinder(kafkaTestSupport, binderConfiguration);
KafkaTestBinder binder = new KafkaTestBinder(binderConfiguration);
DirectChannel moduleOutputChannel = new DirectChannel();
QueueChannel moduleInputChannel = new QueueChannel();
@@ -247,10 +254,9 @@ public class KafkaBinderTests extends
byte[] ratherBigPayload = new byte[2048];
Arrays.fill(ratherBigPayload, (byte) 65);
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
binderConfiguration.setMinPartitionCount(5);
KafkaTestBinder binder = new KafkaTestBinder(kafkaTestSupport, binderConfiguration);
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setMinPartitionCount(6);
KafkaTestBinder binder = new KafkaTestBinder(binderConfiguration);
DirectChannel moduleOutputChannel = new DirectChannel();
QueueChannel moduleInputChannel = new QueueChannel();
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
@@ -269,7 +275,7 @@ public class KafkaBinderTests extends
assertArrayEquals(ratherBigPayload, (byte[]) inbound.getPayload());
Collection<Partition> partitions = binder.getCoreBinder().getConnectionFactory().getPartitions(
"foo" + uniqueBindingId + ".0");
assertThat(partitions, hasSize(5));
assertThat(partitions, hasSize(6));
producerBinding.unbind();
consumerBinding.unbind();
}
@@ -279,9 +285,9 @@ public class KafkaBinderTests extends
byte[] ratherBigPayload = new byte[2048];
Arrays.fill(ratherBigPayload, (byte) 65);
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties();
binderConfiguration.setMinPartitionCount(5);
KafkaTestBinder binder = new KafkaTestBinder(kafkaTestSupport, binderConfiguration);
KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties();
binderConfiguration.setMinPartitionCount(4);
KafkaTestBinder binder = new KafkaTestBinder(binderConfiguration);
DirectChannel moduleOutputChannel = new DirectChannel();
QueueChannel moduleInputChannel = new QueueChannel();
@@ -309,8 +315,7 @@ public class KafkaBinderTests extends
@Test
@SuppressWarnings("unchecked")
public void testDefaultConsumerStartsAtEarliest() throws Exception {
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(new ZookeeperConnect(kafkaTestSupport.getZkConnectString()),
kafkaTestSupport.getBrokerAddress(), kafkaTestSupport.getZkConnectString());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(createConfigurationProperties());
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
@@ -406,8 +411,8 @@ public class KafkaBinderTests extends
@Test
@SuppressWarnings("unchecked")
public void testResume() throws Exception {
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(new ZookeeperConnect(kafkaTestSupport.getZkConnectString()),
kafkaTestSupport.getBrokerAddress(), kafkaTestSupport.getZkConnectString());
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
@@ -444,8 +449,7 @@ public class KafkaBinderTests extends
@Test
public void testSyncProducerMetadata() throws Exception {
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(new ZookeeperConnect(kafkaTestSupport.getZkConnectString()),
kafkaTestSupport.getBrokerAddress(), kafkaTestSupport.getZkConnectString());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(createConfigurationProperties());
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
@@ -467,12 +471,10 @@ public class KafkaBinderTests extends
@Test
public void testAutoCreateTopicsDisabledFailsIfTopicMissing() throws Exception {
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), kafkaTestSupport.getBrokerAddress(),
kafkaTestSupport.getZkConnectString());
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
configurationProperties.setAutoCreateTopics(false);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
binder.setAutoCreateTopics(false);
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
@@ -508,12 +510,10 @@ public class KafkaBinderTests extends
String testTopicName = "existing" + System.currentTimeMillis();
AdminUtils.createTopic(kafkaTestSupport.getZkClient(), testTopicName, 5, 1, new Properties());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), kafkaTestSupport.getBrokerAddress(),
kafkaTestSupport.getZkConnectString());
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
configurationProperties.setAutoCreateTopics(false);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
binder.setAutoCreateTopics(false);
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
@@ -528,13 +528,10 @@ public class KafkaBinderTests extends
String testTopicName = "existing" + System.currentTimeMillis();
AdminUtils.createTopic(kafkaTestSupport.getZkClient(), testTopicName, 1, 1, new Properties());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), kafkaTestSupport.getBrokerAddress(),
kafkaTestSupport.getZkConnectString());
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
configurationProperties.setAutoAddPartitions(false);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
binder.setAutoAddPartitions(false);
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
@@ -558,12 +555,10 @@ public class KafkaBinderTests extends
String testTopicName = "existing" + System.currentTimeMillis();
AdminUtils.createTopic(kafkaTestSupport.getZkClient(), testTopicName, 6, 1, new Properties());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), kafkaTestSupport.getBrokerAddress(),
kafkaTestSupport.getZkConnectString());
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
configurationProperties.setAutoAddPartitions(false);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
binder.setAutoAddPartitions(false);
RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate();
metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
@@ -594,11 +589,10 @@ public class KafkaBinderTests extends
@Test
public void testAutoCreateTopicsEnabledSucceeds() throws Exception {
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), kafkaTestSupport.getBrokerAddress(),
kafkaTestSupport.getZkConnectString());
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
configurationProperties.setAutoCreateTopics(true);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
binder.setAutoCreateTopics(true);
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
@@ -619,13 +613,10 @@ public class KafkaBinderTests extends
public void testPartitionCountNotReduced() throws Exception {
String testTopicName = "existing" + System.currentTimeMillis();
AdminUtils.createTopic(kafkaTestSupport.getZkClient(), testTopicName, 6, 1, new Properties());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), kafkaTestSupport.getBrokerAddress(),
kafkaTestSupport.getZkConnectString());
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
configurationProperties.setAutoAddPartitions(true);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
binder.setAutoAddPartitions(true);
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();
@@ -647,15 +638,11 @@ public class KafkaBinderTests extends
public void testPartitionCountIncreasedIfAutoAddPartitionsSet() throws Exception {
String testTopicName = "existing" + System.currentTimeMillis();
AdminUtils.createTopic(kafkaTestSupport.getZkClient(), testTopicName, 1, 1, new Properties());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(
new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), kafkaTestSupport.getBrokerAddress(),
kafkaTestSupport.getZkConnectString());
binder.setMinPartitionCount(6);
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
configurationProperties.setMinPartitionCount(6);
configurationProperties.setAutoAddPartitions(true);
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties);
GenericApplicationContext context = new GenericApplicationContext();
binder.setAutoAddPartitions(true);
context.refresh();
binder.setApplicationContext(context);
binder.afterPropertiesSet();

View File

@@ -25,7 +25,6 @@ import org.springframework.cloud.stream.binder.AbstractTestBinder;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.test.junit.kafka.KafkaTestSupport;
import org.springframework.cloud.stream.test.junit.kafka.TestKafkaCluster;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.codec.Codec;
@@ -33,7 +32,6 @@ import org.springframework.integration.codec.kryo.KryoRegistrar;
import org.springframework.integration.codec.kryo.PojoCodec;
import org.springframework.integration.kafka.support.LoggingProducerListener;
import org.springframework.integration.kafka.support.ProducerListener;
import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.integration.tuple.TupleKryoRegistrar;
/**
@@ -48,21 +46,15 @@ import org.springframework.integration.tuple.TupleKryoRegistrar;
public class KafkaTestBinder extends
AbstractTestBinder<KafkaMessageChannelBinder, ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>> {
public KafkaTestBinder(KafkaTestSupport kafkaTestSupport, KafkaBinderConfigurationProperties binderConfiguration) {
public KafkaTestBinder(KafkaBinderConfigurationProperties binderConfiguration) {
try {
ZookeeperConnect zookeeperConnect = new ZookeeperConnect();
zookeeperConnect.setZkConnect(kafkaTestSupport.getZkConnectString());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(zookeeperConnect,
kafkaTestSupport.getBrokerAddress(), kafkaTestSupport.getZkConnectString());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration);
binder.setCodec(getCodec());
ProducerListener producerListener = new LoggingProducerListener();
binder.setProducerListener(producerListener);
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
binder.setFetchSize(binderConfiguration.getFetchSize());
binder.setMaxWait(binderConfiguration.getMaxWait());
binder.setMinPartitionCount(binderConfiguration.getMinPartitionCount());
binder.afterPropertiesSet();
this.setBinder(binder);
}

View File

@@ -1121,7 +1121,10 @@ If set to `false`, the binder will rely on the partition size of the topic being
If the partition count of the target topic is smaller than the expected value, the binder will fail to start.
+
Default: `false`.
spring.cloud.stream.kafka.binder.socketBufferSize::
Size (in bytes) of the socket buffer to be used by the Kafka consumers.
+
Default: `2097152`.
==== Kafka Consumer Properties