diff --git a/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/IntegerEncoderDecoder.java b/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/IntegerEncoderDecoder.java deleted file mode 100644 index 381741581..000000000 --- a/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/IntegerEncoderDecoder.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2014 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.kafka; - -import kafka.serializer.Decoder; -import kafka.serializer.Encoder; -import kafka.utils.VerifiableProperties; - -import org.springframework.util.Assert; - - -/** - * A Kafka encoder / decoder used to serialize a single int, used as the kafka partition key. - * - * @author Eric Bottard - */ -public class IntegerEncoderDecoder implements Encoder, Decoder { - - - public IntegerEncoderDecoder() { - this(new VerifiableProperties()); - } - - public IntegerEncoderDecoder(VerifiableProperties properties) { - } - - @Override - public Integer fromBytes(byte[] bytes) { - Assert.isTrue(bytes.length == 4); - return bytes[0] << 24 | (bytes[1] & 0xFF) << 16 | (bytes[2] & 0xFF) << 8 | (bytes[3] & 0xFF); - } - - @Override - public byte[] toBytes(Integer message) { - int value = message.intValue(); - return new byte[] { - (byte) (value >>> 24), - (byte) (value >>> 16), - (byte) (value >>> 8), - (byte) value - }; - } - -} diff --git a/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.java b/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.java index 99576815f..c523f0854 100644 --- a/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.java +++ b/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderHealthIndicator.java @@ -30,6 +30,7 @@ import scala.collection.Seq; import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties; import org.springframework.integration.kafka.core.BrokerAddress; import org.springframework.integration.kafka.core.Partition; @@ -42,16 +43,21 @@ public class KafkaBinderHealthIndicator implements HealthIndicator { private final KafkaMessageChannelBinder binder; - public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder) { + private final KafkaBinderConfigurationProperties configurationProperties; + + public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder, + KafkaBinderConfigurationProperties configurationProperties) { this.binder = binder; + this.configurationProperties = configurationProperties; } @Override public Health health() { ZkClient zkClient = null; try { - zkClient = new ZkClient(binder.getZkAddress(), binder.getZkSessionTimeout(), - binder.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$); + zkClient = new ZkClient(configurationProperties.getZkConnectionString(), + configurationProperties.getZkSessionTimeout(), + configurationProperties.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$); Set brokersInClusterSet = new HashSet<>(); Seq allBrokersInCluster = ZkUtils$.MODULE$.getAllBrokersInCluster(zkClient); Collection brokersInCluster = JavaConversions.asJavaCollection(allBrokersInCluster); diff --git a/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java b/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java index 393dc983a..c76aacdca 100644 --- a/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java +++ b/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java @@ -64,6 +64,7 @@ import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; import org.springframework.cloud.stream.binder.HeaderMode; import org.springframework.cloud.stream.binder.MessageValues; import org.springframework.cloud.stream.binder.PartitionHandler; +import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties; import org.springframework.http.MediaType; import org.springframework.integration.channel.FixedSubscriberChannel; import org.springframework.integration.endpoint.EventDrivenConsumer; @@ -117,12 +118,10 @@ import org.springframework.util.StringUtils; * @author Mark Fisher * @author Soby Chacko */ -public class KafkaMessageChannelBinder - extends - AbstractBinder, - ExtendedProducerProperties> +public class KafkaMessageChannelBinder extends + AbstractBinder, ExtendedProducerProperties> implements ExtendedPropertiesBinder, - DisposableBean { + DisposableBean { public static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer(); @@ -134,91 +133,41 @@ public class KafkaMessageChannelBinder DAEMON_THREAD_FACTORY = threadFactory; } - private boolean autoCreateTopics = true; + private final KafkaBinderConfigurationProperties configurationProperties; - private boolean autoAddPartitions; + private final String[] headersToMap; private RetryOperations metadataRetryOperations; private final Map> topicsInUse = new HashMap<>(); - private final ZookeeperConnect zookeeperConnect; - - private final String brokers; - - private String[] headersToMap; - - private final String zkAddress; - // -------- Default values for properties ------- - private int replicationFactor = 1; - - private int requiredAcks = 1; - - private int queueSize = 1024; - - private int maxWait = 100; - - private int fetchSize = 1024 * 1024; - - private int minPartitionCount = 1; - private ConnectionFactory connectionFactory; - private int socketBufferSize = 2097152; - - private int offsetUpdateTimeWindow = 10000; - - private int offsetUpdateCount; - - private int offsetUpdateShutdownTimeout = 2000; - - private int zkSessionTimeout = 10000; - - private int zkConnectionTimeout = 10000; - private ProducerListener producerListener; private volatile Producer dlqProducer; private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties(); - public KafkaMessageChannelBinder(ZookeeperConnect zookeeperConnect, String brokers, String zkAddress, - String... headersToMap) { - this.zookeeperConnect = zookeeperConnect; - this.brokers = brokers; - this.zkAddress = zkAddress; - if (ObjectUtils.isEmpty(headersToMap)) { + public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties) { + this.configurationProperties = configurationProperties; + String[] configuredHeaders = configurationProperties.getHeaders(); + if (ObjectUtils.isEmpty(configuredHeaders)) { this.headersToMap = BinderHeaders.STANDARD_HEADERS; } else { String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, - BinderHeaders.STANDARD_HEADERS.length + headersToMap.length); - System.arraycopy(headersToMap, 0, combinedHeadersToMap, BinderHeaders.STANDARD_HEADERS.length, - headersToMap.length); + BinderHeaders.STANDARD_HEADERS.length + configuredHeaders.length); + System.arraycopy(configuredHeaders, 0, combinedHeadersToMap, BinderHeaders.STANDARD_HEADERS.length, + configuredHeaders.length); this.headersToMap = combinedHeadersToMap; } } String getZkAddress() { - return this.zkAddress; - } - - public void setSocketBufferSize(int socketBufferSize) { - this.socketBufferSize = socketBufferSize; - } - - public void setOffsetUpdateTimeWindow(int offsetUpdateTimeWindow) { - this.offsetUpdateTimeWindow = offsetUpdateTimeWindow; - } - - public void setOffsetUpdateCount(int offsetUpdateCount) { - this.offsetUpdateCount = offsetUpdateCount; - } - - public void setOffsetUpdateShutdownTimeout(int offsetUpdateShutdownTimeout) { - this.offsetUpdateShutdownTimeout = offsetUpdateShutdownTimeout; + return this.configurationProperties.getZkConnectionString(); } public ConnectionFactory getConnectionFactory() { @@ -243,9 +192,10 @@ public class KafkaMessageChannelBinder @Override public void onInit() throws Exception { - ZookeeperConfiguration configuration = new ZookeeperConfiguration(this.zookeeperConnect); - configuration.setBufferSize(socketBufferSize); - configuration.setMaxWait(maxWait); + ZookeeperConfiguration configuration = new ZookeeperConfiguration( + new ZookeeperConnect(configurationProperties.getZkConnectionString())); + configuration.setBufferSize(configurationProperties.getSocketBufferSize()); + configuration.setMaxWait(configurationProperties.getMaxWait()); DefaultConnectionFactory defaultConnectionFactory = new DefaultConnectionFactory(configuration); defaultConnectionFactory.afterPropertiesSet(); this.connectionFactory = defaultConnectionFactory; @@ -292,62 +242,6 @@ public class KafkaMessageChannelBinder } } - public void setReplicationFactor(int replicationFactor) { - this.replicationFactor = replicationFactor; - } - - public void setRequiredAcks(int requiredAcks) { - this.requiredAcks = requiredAcks; - } - - public void setQueueSize(int queueSize) { - this.queueSize = queueSize; - } - - public void setFetchSize(int fetchSize) { - this.fetchSize = fetchSize; - } - - public void setMinPartitionCount(int minPartitionCount) { - this.minPartitionCount = minPartitionCount; - } - - public void setMaxWait(int maxWait) { - this.maxWait = maxWait; - } - - public int getZkSessionTimeout() { - return this.zkSessionTimeout; - } - - public void setZkSessionTimeout(int zkSessionTimeout) { - this.zkSessionTimeout = zkSessionTimeout; - } - - public int getZkConnectionTimeout() { - return this.zkConnectionTimeout; - } - - public void setZkConnectionTimeout(int zkConnectionTimeout) { - this.zkConnectionTimeout = zkConnectionTimeout; - } - - public boolean isAutoCreateTopics() { - return autoCreateTopics; - } - - public void setAutoCreateTopics(boolean autoCreateTopics) { - this.autoCreateTopics = autoCreateTopics; - } - - public boolean isAutoAddPartitions() { - return autoAddPartitions; - } - - public void setAutoAddPartitions(boolean autoAddPartitions) { - this.autoAddPartitions = autoAddPartitions; - } - @Override public KafkaConsumerProperties getExtendedConsumerProperties(String channelName) { return extendedBindingProperties.getExtendedConsumerProperties(channelName); @@ -403,9 +297,9 @@ public class KafkaMessageChannelBinder if (properties.getPartitionCount() < partitions.size()) { if (logger.isInfoEnabled()) { - logger.info("The `partitionCount` of the producer for topic " + name + " is " + - properties.getPartitionCount() + ", smaller than the actual partition count of " + - partitions.size() + " of the topic. The larger number will be used instead."); + logger.info("The `partitionCount` of the producer for topic " + name + " is " + + properties.getPartitionCount() + ", smaller than the actual partition count of " + + partitions.size() + " of the topic. The larger number will be used instead."); } } @@ -417,11 +311,11 @@ public class KafkaMessageChannelBinder producerMetadata.setCompressionType(properties.getExtension().getCompressionType()); producerMetadata.setBatchBytes(properties.getExtension().getBufferSize()); Properties additionalProps = new Properties(); - additionalProps.put(ProducerConfig.ACKS_CONFIG, String.valueOf(requiredAcks)); + additionalProps.put(ProducerConfig.ACKS_CONFIG, String.valueOf(configurationProperties.getRequiredAcks())); additionalProps.put(ProducerConfig.LINGER_MS_CONFIG, String.valueOf(properties.getExtension().getBatchTimeout())); - ProducerFactoryBean producerFB = new ProducerFactoryBean<>(producerMetadata, brokers, - additionalProps); + ProducerFactoryBean producerFB = new ProducerFactoryBean<>(producerMetadata, + configurationProperties.getKafkaConnectionString(), additionalProps); try { final ProducerConfiguration producerConfiguration = new ProducerConfiguration<>( @@ -456,35 +350,39 @@ public class KafkaMessageChannelBinder */ private Collection ensureTopicCreated(final String topicName, final int partitionCount) { - final ZkClient zkClient = new ZkClient(zkAddress, getZkSessionTimeout(), getZkConnectionTimeout(), + final ZkClient zkClient = new ZkClient(configurationProperties.getZkConnectionString(), + configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$); try { final Properties topicConfig = new Properties(); TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient); if (topicMetadata.errorCode() == ErrorMapping.NoError()) { - // only consider minPartitionCount for resizing if autoAddPartitions is true - int effectivePartitionCount = isAutoAddPartitions() ? Math.max(minPartitionCount, partitionCount) - : partitionCount; + // only consider minPartitionCount for resizing if autoAddPartitions is + // true + int effectivePartitionCount = configurationProperties.isAutoAddPartitions() + ? Math.max(configurationProperties.getMinPartitionCount(), partitionCount) : partitionCount; if (topicMetadata.partitionsMetadata().size() < effectivePartitionCount) { - if (isAutoAddPartitions()) { + if (configurationProperties.isAutoAddPartitions()) { AdminUtils.addPartitions(zkClient, topicName, effectivePartitionCount, null, false, new Properties()); } else { int topicSize = topicMetadata.partitionsMetadata().size(); - throw new BinderException("The number of expected partitions was: " + partitionCount - + ", but " + topicSize + (topicSize > 1 ? " have " : " has ") + "been found instead." + - "Consider either increasing the partition count of the topic or enabling `autoAddPartitions`"); + throw new BinderException("The number of expected partitions was: " + partitionCount + ", but " + + topicSize + (topicSize > 1 ? " have " : " has ") + "been found instead." + + "Consider either increasing the partition count of the topic or enabling `autoAddPartitions`"); } } } else if (topicMetadata.errorCode() == ErrorMapping.UnknownTopicOrPartitionCode()) { - if (isAutoCreateTopics()) { + if (configurationProperties.isAutoCreateTopics()) { Seq brokerList = ZkUtils.getSortedBrokerList(zkClient); // always consider minPartitionCount for topic creation - int effectivePartitionCount = Math.max(this.minPartitionCount, partitionCount); + int effectivePartitionCount = Math.max(configurationProperties.getMinPartitionCount(), + partitionCount); final scala.collection.Map> replicaAssignment = AdminUtils - .assignReplicasToBrokers(brokerList, effectivePartitionCount, replicationFactor, -1, -1); + .assignReplicasToBrokers(brokerList, effectivePartitionCount, + configurationProperties.getReplicationFactor(), -1, -1); metadataRetryOperations.execute(new RetryCallback() { @Override public Object doWithRetry(RetryContext context) throws RuntimeException { @@ -580,8 +478,8 @@ public class KafkaMessageChannelBinder offsetManager.resetOffsets(listenedPartitions); } messageListenerContainer.setOffsetManager(offsetManager); - messageListenerContainer.setQueueSize(queueSize); - messageListenerContainer.setMaxFetch(fetchSize); + messageListenerContainer.setQueueSize(configurationProperties.getQueueSize()); + messageListenerContainer.setMaxFetch(configurationProperties.getFetchSize()); int concurrency = Math.min(properties.getConcurrency(), listenedPartitions.size()); messageListenerContainer.setConcurrency(concurrency); @@ -630,6 +528,7 @@ public class KafkaMessageChannelBinder messageListenerContainer.setMessageListener(new AcknowledgingMessageListener() { final AcknowledgingMessageListener originalMessageListener = (AcknowledgingMessageListener) messageListenerContainer .getMessageListener(); + @Override public void onMessage(final KafkaMessage message, final Acknowledgment acknowledgment) { retryTemplate.execute(new RetryCallback() { @@ -723,11 +622,11 @@ public class KafkaMessageChannelBinder producerMetadata.setCompressionType(ProducerMetadata.CompressionType.none); producerMetadata.setBatchBytes(16384); Properties additionalProps = new Properties(); - additionalProps.put(ProducerConfig.ACKS_CONFIG, String.valueOf(requiredAcks)); - additionalProps.put(ProducerConfig.LINGER_MS_CONFIG, - String.valueOf(0)); + additionalProps.put(ProducerConfig.ACKS_CONFIG, + String.valueOf(configurationProperties.getRequiredAcks())); + additionalProps.put(ProducerConfig.LINGER_MS_CONFIG, String.valueOf(0)); ProducerFactoryBean producerFactoryBean = new ProducerFactoryBean<>( - producerMetadata, brokers, additionalProps); + producerMetadata, configurationProperties.getKafkaConnectionString(), additionalProps); dlqProducer = producerFactoryBean.getObject(); } } @@ -742,15 +641,16 @@ public class KafkaMessageChannelBinder try { KafkaNativeOffsetManager kafkaOffsetManager = new KafkaNativeOffsetManager(connectionFactory, - zookeeperConnect, Collections.emptyMap()); + new ZookeeperConnect(configurationProperties.getZkConnectionString()), + Collections.emptyMap()); kafkaOffsetManager.setConsumerId(group); kafkaOffsetManager.setReferenceTimestamp(referencePoint); kafkaOffsetManager.afterPropertiesSet(); WindowingOffsetManager windowingOffsetManager = new WindowingOffsetManager(kafkaOffsetManager); - windowingOffsetManager.setTimespan(offsetUpdateTimeWindow); - windowingOffsetManager.setCount(offsetUpdateCount); - windowingOffsetManager.setShutdownTimeout(offsetUpdateShutdownTimeout); + windowingOffsetManager.setTimespan(configurationProperties.getOffsetUpdateTimeWindow()); + windowingOffsetManager.setCount(configurationProperties.getOffsetUpdateCount()); + windowingOffsetManager.setShutdownTimeout(configurationProperties.getOffsetUpdateShutdownTimeout()); windowingOffsetManager.afterPropertiesSet(); return windowingOffsetManager; @@ -760,7 +660,6 @@ public class KafkaMessageChannelBinder } } - private String toDisplayString(String original, int maxCharacters) { if (original.length() <= maxCharacters) { return original; @@ -880,8 +779,7 @@ public class KafkaMessageChannelBinder } public enum StartOffset { - earliest(OffsetRequest.EarliestTime()), - latest(OffsetRequest.LatestTime()); + earliest(OffsetRequest.EarliestTime()), latest(OffsetRequest.LatestTime()); private final long referencePoint; diff --git a/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java b/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java index 10def801e..623bbf35d 100644 --- a/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java +++ b/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java @@ -31,7 +31,6 @@ import org.springframework.context.annotation.Import; import org.springframework.integration.codec.Codec; import org.springframework.integration.kafka.support.LoggingProducerListener; import org.springframework.integration.kafka.support.ProducerListener; -import org.springframework.integration.kafka.support.ZookeeperConnect; /** * @author David Turanski @@ -42,15 +41,15 @@ import org.springframework.integration.kafka.support.ZookeeperConnect; */ @Configuration @ConditionalOnMissingBean(Binder.class) -@Import({KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class}) -@EnableConfigurationProperties({KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class}) +@Import({ KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class }) +@EnableConfigurationProperties({ KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class }) public class KafkaBinderConfiguration { @Autowired private Codec codec; @Autowired - private KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties; + private KafkaBinderConfigurationProperties configurationProperties; @Autowired private KafkaExtendedBindingProperties kafkaExtendedBindingProperties; @@ -58,36 +57,10 @@ public class KafkaBinderConfiguration { @Autowired private ProducerListener producerListener; - @Bean - ZookeeperConnect zookeeperConnect() { - ZookeeperConnect zookeeperConnect = new ZookeeperConnect(); - zookeeperConnect.setZkConnect(kafkaBinderConfigurationProperties.getZkConnectionString()); - return zookeeperConnect; - } - @Bean KafkaMessageChannelBinder kafkaMessageChannelBinder() { - String[] headers = kafkaBinderConfigurationProperties.getHeaders(); - String kafkaConnectionString = kafkaBinderConfigurationProperties.getKafkaConnectionString(); - String zkConnectionString = kafkaBinderConfigurationProperties.getZkConnectionString(); - KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder( - zookeeperConnect(), kafkaConnectionString, zkConnectionString, headers); + KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(configurationProperties); kafkaMessageChannelBinder.setCodec(codec); - kafkaMessageChannelBinder.setOffsetUpdateTimeWindow(kafkaBinderConfigurationProperties.getOffsetUpdateTimeWindow()); - kafkaMessageChannelBinder.setOffsetUpdateCount(kafkaBinderConfigurationProperties.getOffsetUpdateCount()); - kafkaMessageChannelBinder.setOffsetUpdateShutdownTimeout(kafkaBinderConfigurationProperties.getOffsetUpdateShutdownTimeout()); - - kafkaMessageChannelBinder.setZkSessionTimeout(kafkaBinderConfigurationProperties.getZkSessionTimeout()); - kafkaMessageChannelBinder.setZkConnectionTimeout(kafkaBinderConfigurationProperties.getZkConnectionTimeout()); - - kafkaMessageChannelBinder.setFetchSize(kafkaBinderConfigurationProperties.getFetchSize()); - kafkaMessageChannelBinder.setMinPartitionCount(kafkaBinderConfigurationProperties.getMinPartitionCount()); - kafkaMessageChannelBinder.setQueueSize(kafkaBinderConfigurationProperties.getQueueSize()); - kafkaMessageChannelBinder.setReplicationFactor(kafkaBinderConfigurationProperties.getReplicationFactor()); - kafkaMessageChannelBinder.setRequiredAcks(kafkaBinderConfigurationProperties.getRequiredAcks()); - kafkaMessageChannelBinder.setMaxWait(kafkaBinderConfigurationProperties.getMaxWait()); - kafkaMessageChannelBinder.setAutoCreateTopics(kafkaBinderConfigurationProperties.isAutoCreateTopics()); - kafkaMessageChannelBinder.setAutoAddPartitions(kafkaBinderConfigurationProperties.isAutoAddPartitions()); kafkaMessageChannelBinder.setProducerListener(producerListener); kafkaMessageChannelBinder.setExtendedBindingProperties(kafkaExtendedBindingProperties); return kafkaMessageChannelBinder; @@ -101,6 +74,6 @@ public class KafkaBinderConfiguration { @Bean KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder) { - return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder); + return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, configurationProperties); } } diff --git a/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfigurationProperties.java b/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfigurationProperties.java index a6b7fb330..98ee963f2 100644 --- a/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfigurationProperties.java +++ b/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfigurationProperties.java @@ -27,11 +27,11 @@ import org.springframework.util.StringUtils; @ConfigurationProperties(prefix = "spring.cloud.stream.kafka.binder") public class KafkaBinderConfigurationProperties { - private String[] zkNodes = new String[] {"localhost"}; + private String[] zkNodes = new String[] { "localhost" }; private String defaultZkPort = "2181"; - private String[] brokers = new String[] {"localhost"}; + private String[] brokers = new String[] { "localhost" }; private String defaultBrokerPort = "9092"; @@ -49,6 +49,8 @@ public class KafkaBinderConfigurationProperties { private boolean autoAddPartitions; + private int socketBufferSize = 2097152; + /** * ZK session timeout in milliseconds. */ @@ -97,7 +99,7 @@ public class KafkaBinderConfigurationProperties { return zkNodes; } - public void setZkNodes(String[] zkNodes) { + public void setZkNodes(String... zkNodes) { this.zkNodes = zkNodes; } @@ -109,7 +111,7 @@ public class KafkaBinderConfigurationProperties { return brokers; } - public void setBrokers(String[] brokers) { + public void setBrokers(String... brokers) { this.brokers = brokers; } @@ -117,7 +119,7 @@ public class KafkaBinderConfigurationProperties { this.defaultBrokerPort = defaultBrokerPort; } - public void setHeaders(String[] headers) { + public void setHeaders(String... headers) { this.headers = headers; } @@ -230,4 +232,12 @@ public class KafkaBinderConfigurationProperties { public void setAutoAddPartitions(boolean autoAddPartitions) { this.autoAddPartitions = autoAddPartitions; } + + public int getSocketBufferSize() { + return socketBufferSize; + } + + public void setSocketBufferSize(int socketBufferSize) { + this.socketBufferSize = socketBufferSize; + } } diff --git a/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java b/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java index 99af3b549..d7d84519d 100644 --- a/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java +++ b/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java @@ -45,7 +45,6 @@ import org.springframework.integration.kafka.core.Partition; import org.springframework.integration.kafka.core.TopicNotFoundException; import org.springframework.integration.kafka.support.ProducerConfiguration; import org.springframework.integration.kafka.support.ProducerMetadata; -import org.springframework.integration.kafka.support.ZookeeperConnect; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @@ -97,11 +96,19 @@ public class KafkaBinderTests extends @Override protected KafkaTestBinder getBinder() { if (binder == null) { - binder = new KafkaTestBinder(kafkaTestSupport, new KafkaBinderConfigurationProperties()); + KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); + binder = new KafkaTestBinder(binderConfiguration); } return binder; } + private KafkaBinderConfigurationProperties createConfigurationProperties() { + KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties(); + binderConfiguration.setBrokers(kafkaTestSupport.getBrokerAddress()); + binderConfiguration.setZkNodes(kafkaTestSupport.getZkConnectString()); + return binderConfiguration; + } + @Override protected ExtendedConsumerProperties createConsumerProperties() { return new ExtendedConsumerProperties<>(new KafkaConsumerProperties()); @@ -216,9 +223,9 @@ public class KafkaBinderTests extends byte[] ratherBigPayload = new byte[2048]; Arrays.fill(ratherBigPayload, (byte) 65); - KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties(); + KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); binderConfiguration.setMinPartitionCount(10); - KafkaTestBinder binder = new KafkaTestBinder(kafkaTestSupport, binderConfiguration); + KafkaTestBinder binder = new KafkaTestBinder(binderConfiguration); DirectChannel moduleOutputChannel = new DirectChannel(); QueueChannel moduleInputChannel = new QueueChannel(); @@ -247,10 +254,9 @@ public class KafkaBinderTests extends byte[] ratherBigPayload = new byte[2048]; Arrays.fill(ratherBigPayload, (byte) 65); - KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties(); - binderConfiguration.setMinPartitionCount(5); - KafkaTestBinder binder = new KafkaTestBinder(kafkaTestSupport, binderConfiguration); - + KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); + binderConfiguration.setMinPartitionCount(6); + KafkaTestBinder binder = new KafkaTestBinder(binderConfiguration); DirectChannel moduleOutputChannel = new DirectChannel(); QueueChannel moduleInputChannel = new QueueChannel(); ExtendedProducerProperties producerProperties = createProducerProperties(); @@ -269,7 +275,7 @@ public class KafkaBinderTests extends assertArrayEquals(ratherBigPayload, (byte[]) inbound.getPayload()); Collection partitions = binder.getCoreBinder().getConnectionFactory().getPartitions( "foo" + uniqueBindingId + ".0"); - assertThat(partitions, hasSize(5)); + assertThat(partitions, hasSize(6)); producerBinding.unbind(); consumerBinding.unbind(); } @@ -279,9 +285,9 @@ public class KafkaBinderTests extends byte[] ratherBigPayload = new byte[2048]; Arrays.fill(ratherBigPayload, (byte) 65); - KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties(); - binderConfiguration.setMinPartitionCount(5); - KafkaTestBinder binder = new KafkaTestBinder(kafkaTestSupport, binderConfiguration); + KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); + binderConfiguration.setMinPartitionCount(4); + KafkaTestBinder binder = new KafkaTestBinder(binderConfiguration); DirectChannel moduleOutputChannel = new DirectChannel(); QueueChannel moduleInputChannel = new QueueChannel(); @@ -309,8 +315,7 @@ public class KafkaBinderTests extends @Test @SuppressWarnings("unchecked") public void testDefaultConsumerStartsAtEarliest() throws Exception { - KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), - kafkaTestSupport.getBrokerAddress(), kafkaTestSupport.getZkConnectString()); + KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(createConfigurationProperties()); GenericApplicationContext context = new GenericApplicationContext(); context.refresh(); binder.setApplicationContext(context); @@ -406,8 +411,8 @@ public class KafkaBinderTests extends @Test @SuppressWarnings("unchecked") public void testResume() throws Exception { - KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), - kafkaTestSupport.getBrokerAddress(), kafkaTestSupport.getZkConnectString()); + KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); + KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties); GenericApplicationContext context = new GenericApplicationContext(); context.refresh(); binder.setApplicationContext(context); @@ -444,8 +449,7 @@ public class KafkaBinderTests extends @Test public void testSyncProducerMetadata() throws Exception { - KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), - kafkaTestSupport.getBrokerAddress(), kafkaTestSupport.getZkConnectString()); + KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(createConfigurationProperties()); GenericApplicationContext context = new GenericApplicationContext(); context.refresh(); binder.setApplicationContext(context); @@ -467,12 +471,10 @@ public class KafkaBinderTests extends @Test public void testAutoCreateTopicsDisabledFailsIfTopicMissing() throws Exception { - - KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder( - new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), kafkaTestSupport.getBrokerAddress(), - kafkaTestSupport.getZkConnectString()); + KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); + configurationProperties.setAutoCreateTopics(false); + KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties); GenericApplicationContext context = new GenericApplicationContext(); - binder.setAutoCreateTopics(false); context.refresh(); binder.setApplicationContext(context); binder.afterPropertiesSet(); @@ -508,12 +510,10 @@ public class KafkaBinderTests extends String testTopicName = "existing" + System.currentTimeMillis(); AdminUtils.createTopic(kafkaTestSupport.getZkClient(), testTopicName, 5, 1, new Properties()); - - KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder( - new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), kafkaTestSupport.getBrokerAddress(), - kafkaTestSupport.getZkConnectString()); + KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); + configurationProperties.setAutoCreateTopics(false); + KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties); GenericApplicationContext context = new GenericApplicationContext(); - binder.setAutoCreateTopics(false); context.refresh(); binder.setApplicationContext(context); binder.afterPropertiesSet(); @@ -528,13 +528,10 @@ public class KafkaBinderTests extends String testTopicName = "existing" + System.currentTimeMillis(); AdminUtils.createTopic(kafkaTestSupport.getZkClient(), testTopicName, 1, 1, new Properties()); - - KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder( - new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), kafkaTestSupport.getBrokerAddress(), - kafkaTestSupport.getZkConnectString()); + KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); + configurationProperties.setAutoAddPartitions(false); + KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties); GenericApplicationContext context = new GenericApplicationContext(); - binder.setAutoAddPartitions(false); - context.refresh(); binder.setApplicationContext(context); binder.afterPropertiesSet(); @@ -558,12 +555,10 @@ public class KafkaBinderTests extends String testTopicName = "existing" + System.currentTimeMillis(); AdminUtils.createTopic(kafkaTestSupport.getZkClient(), testTopicName, 6, 1, new Properties()); - - KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder( - new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), kafkaTestSupport.getBrokerAddress(), - kafkaTestSupport.getZkConnectString()); + KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); + configurationProperties.setAutoAddPartitions(false); + KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties); GenericApplicationContext context = new GenericApplicationContext(); - binder.setAutoAddPartitions(false); RetryTemplate metatadataRetrievalRetryOperations = new RetryTemplate(); metatadataRetrievalRetryOperations.setRetryPolicy(new SimpleRetryPolicy()); FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); @@ -594,11 +589,10 @@ public class KafkaBinderTests extends @Test public void testAutoCreateTopicsEnabledSucceeds() throws Exception { - KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder( - new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), kafkaTestSupport.getBrokerAddress(), - kafkaTestSupport.getZkConnectString()); + KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); + configurationProperties.setAutoCreateTopics(true); + KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties); GenericApplicationContext context = new GenericApplicationContext(); - binder.setAutoCreateTopics(true); context.refresh(); binder.setApplicationContext(context); binder.afterPropertiesSet(); @@ -619,13 +613,10 @@ public class KafkaBinderTests extends public void testPartitionCountNotReduced() throws Exception { String testTopicName = "existing" + System.currentTimeMillis(); AdminUtils.createTopic(kafkaTestSupport.getZkClient(), testTopicName, 6, 1, new Properties()); - - KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder( - new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), kafkaTestSupport.getBrokerAddress(), - kafkaTestSupport.getZkConnectString()); - + KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); + configurationProperties.setAutoAddPartitions(true); + KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties); GenericApplicationContext context = new GenericApplicationContext(); - binder.setAutoAddPartitions(true); context.refresh(); binder.setApplicationContext(context); binder.afterPropertiesSet(); @@ -647,15 +638,11 @@ public class KafkaBinderTests extends public void testPartitionCountIncreasedIfAutoAddPartitionsSet() throws Exception { String testTopicName = "existing" + System.currentTimeMillis(); AdminUtils.createTopic(kafkaTestSupport.getZkClient(), testTopicName, 1, 1, new Properties()); - - KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder( - new ZookeeperConnect(kafkaTestSupport.getZkConnectString()), kafkaTestSupport.getBrokerAddress(), - kafkaTestSupport.getZkConnectString()); - - binder.setMinPartitionCount(6); - + KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); + configurationProperties.setMinPartitionCount(6); + configurationProperties.setAutoAddPartitions(true); + KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties); GenericApplicationContext context = new GenericApplicationContext(); - binder.setAutoAddPartitions(true); context.refresh(); binder.setApplicationContext(context); binder.afterPropertiesSet(); diff --git a/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTestBinder.java b/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTestBinder.java index 2eaebab9b..b58fd7fc2 100644 --- a/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTestBinder.java +++ b/spring-cloud-stream-binders/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTestBinder.java @@ -25,7 +25,6 @@ import org.springframework.cloud.stream.binder.AbstractTestBinder; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties; -import org.springframework.cloud.stream.test.junit.kafka.KafkaTestSupport; import org.springframework.cloud.stream.test.junit.kafka.TestKafkaCluster; import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.codec.Codec; @@ -33,7 +32,6 @@ import org.springframework.integration.codec.kryo.KryoRegistrar; import org.springframework.integration.codec.kryo.PojoCodec; import org.springframework.integration.kafka.support.LoggingProducerListener; import org.springframework.integration.kafka.support.ProducerListener; -import org.springframework.integration.kafka.support.ZookeeperConnect; import org.springframework.integration.tuple.TupleKryoRegistrar; /** @@ -48,21 +46,15 @@ import org.springframework.integration.tuple.TupleKryoRegistrar; public class KafkaTestBinder extends AbstractTestBinder, ExtendedProducerProperties> { - public KafkaTestBinder(KafkaTestSupport kafkaTestSupport, KafkaBinderConfigurationProperties binderConfiguration) { + public KafkaTestBinder(KafkaBinderConfigurationProperties binderConfiguration) { try { - ZookeeperConnect zookeeperConnect = new ZookeeperConnect(); - zookeeperConnect.setZkConnect(kafkaTestSupport.getZkConnectString()); - KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(zookeeperConnect, - kafkaTestSupport.getBrokerAddress(), kafkaTestSupport.getZkConnectString()); + KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration); binder.setCodec(getCodec()); ProducerListener producerListener = new LoggingProducerListener(); binder.setProducerListener(producerListener); GenericApplicationContext context = new GenericApplicationContext(); context.refresh(); binder.setApplicationContext(context); - binder.setFetchSize(binderConfiguration.getFetchSize()); - binder.setMaxWait(binderConfiguration.getMaxWait()); - binder.setMinPartitionCount(binderConfiguration.getMinPartitionCount()); binder.afterPropertiesSet(); this.setBinder(binder); } diff --git a/spring-cloud-stream-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc b/spring-cloud-stream-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc index d8b747132..2710eb087 100644 --- a/spring-cloud-stream-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc +++ b/spring-cloud-stream-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc @@ -1121,7 +1121,10 @@ If set to `false`, the binder will rely on the partition size of the topic being If the partition count of the target topic is smaller than the expected value, the binder will fail to start. + Default: `false`. - +spring.cloud.stream.kafka.binder.socketBufferSize:: + Size (in bytes) of the socket buffer to be used by the Kafka consumers. ++ +Default: `2097152`. ==== Kafka Consumer Properties