Commit 28474aa3 authored by Artem Bilan's avatar Artem Bilan Committed by Phillip Webb

Fix compatibility with Apache Kafka 0.10.1

Update KafkaProperties since  Apache Kafka `0.10.1` changed the type
for the `ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG` from the
`Long` to `Integer`.

Kafka includes the following conversion logic:

    case LONG:
        if (value instanceof Integer)
            return ((Integer) value).longValue();
        if (value instanceof Long)
            return (Long) value;
        else if (value instanceof String)
            return Long.parseLong(trimmed);

So we remain compatible with both `0.10.0` and `0.10.1`

Closes gh-7723
parent f21e7940
...@@ -43,6 +43,7 @@ import org.springframework.util.CollectionUtils; ...@@ -43,6 +43,7 @@ import org.springframework.util.CollectionUtils;
* *
* @author Gary Russell * @author Gary Russell
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Artem Bilan
* @since 1.5.0 * @since 1.5.0
*/ */
@ConfigurationProperties(prefix = "spring.kafka") @ConfigurationProperties(prefix = "spring.kafka")
...@@ -199,7 +200,7 @@ public class KafkaProperties { ...@@ -199,7 +200,7 @@ public class KafkaProperties {
* Frequency in milliseconds that the consumer offsets are auto-committed to Kafka * Frequency in milliseconds that the consumer offsets are auto-committed to Kafka
* if 'enable.auto.commit' true. * if 'enable.auto.commit' true.
*/ */
private Long autoCommitInterval; private Integer autoCommitInterval;
/** /**
* What to do when there is no initial offset in Kafka or if the current offset * What to do when there is no initial offset in Kafka or if the current offset
...@@ -264,11 +265,11 @@ public class KafkaProperties { ...@@ -264,11 +265,11 @@ public class KafkaProperties {
return this.ssl; return this.ssl;
} }
public Long getAutoCommitInterval() { public Integer getAutoCommitInterval() {
return this.autoCommitInterval; return this.autoCommitInterval;
} }
public void setAutoCommitInterval(Long autoCommitInterval) { public void setAutoCommitInterval(Integer autoCommitInterval) {
this.autoCommitInterval = autoCommitInterval; this.autoCommitInterval = autoCommitInterval;
} }
......
...@@ -100,7 +100,7 @@ public class KafkaAutoConfigurationTests { ...@@ -100,7 +100,7 @@ public class KafkaAutoConfigurationTests {
assertThat(configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) assertThat(configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
.isEqualTo(Boolean.FALSE); .isEqualTo(Boolean.FALSE);
assertThat(configs.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)) assertThat(configs.get(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG))
.isEqualTo(123L); .isEqualTo(123);
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG))
.isEqualTo("earliest"); .isEqualTo("earliest");
assertThat(configs.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456); assertThat(configs.get(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)).isEqualTo(456);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment