GH-9801: Kafka: generate ID & TIMESTAMP headers by default

Fixes: https://github.com/spring-projects/spring-integration/issues/9801

Spring Integration Apache Kafka inbound channel adapters have always produced
messages without `ID` & `TIMESTAMP` headers.
See `MessagingMessageConverter` used over there by default.
However, for consistency with the rest of Spring Integration channel adapters,
it would be better to have Kafka-specific behave same way.

* Configure default `MessagingMessageConverter` in the `KafkaMessageSource` & `KafkaMessageDrivenChannelAdapter`
for generating `ID` & `TIMESTAMP` headers.
This commit is contained in:
Artem Bilan
2025-01-30 16:53:09 -05:00
parent c44363a69d
commit 054ff57728
5 changed files with 37 additions and 19 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2023 the original author or authors.
* Copyright 2015-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -133,6 +133,9 @@ public class KafkaMessageDrivenChannelAdapter<K, V> extends MessageProducerSuppo
if (JacksonPresent.isJackson2Present()) {
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
// For consistency with the rest of Spring Integration channel adapters
messageConverter.setGenerateMessageId(true);
messageConverter.setGenerateTimestamp(true);
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
messageConverter.setHeaderMapper(headerMapper);
@@ -217,8 +220,7 @@ public class KafkaMessageDrivenChannelAdapter<K, V> extends MessageProducerSuppo
* channel is configured). Only used if a
* {@link #setRetryTemplate(RetryTemplate)} is specified. Default is an
* {@link ErrorMessageSendingRecoverer} if an error channel has been provided. Set to
* null if you wish to throw the exception back to the container after retries are
* exhausted.
* null if you wish to throw the exception back to the container after retries are exhausted.
* @param recoveryCallback the recovery callback.
*/
public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018-2024 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -256,10 +256,15 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object>
Duration.ofMillis(Math.max(this.pollTimeout.toMillis() * 20, MIN_ASSIGN_TIMEOUT)); // NOSONAR - magic
this.commitTimeout = consumerProperties.getSyncCommitTimeout();
MessagingMessageConverter messagingMessageConverter = (MessagingMessageConverter) this.messageConverter;
// For consistency with the rest of Spring Integration channel adapters
messagingMessageConverter.setGenerateMessageId(true);
messagingMessageConverter.setGenerateTimestamp(true);
if (JacksonPresent.isJackson2Present()) {
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
((MessagingMessageConverter) this.messageConverter).setHeaderMapper(headerMapper);
messagingMessageConverter.setHeaderMapper(headerMapper);
}
}
@@ -843,7 +848,7 @@ public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object>
i.setRolledBack(true);
return i.getRecord().offset();
})
.collect(Collectors.toList());
.toList();
if (!rewound.isEmpty()) {
this.logger.warn(() -> "Rolled back " + KafkaUtils.format(record)
+ " later in-flight offsets "

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2024 the original author or authors.
* Copyright 2016-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -166,12 +166,14 @@ class MessageDrivenAdapterTests {
assertThat(received).isNotNull();
MessageHeaders headers = received.getHeaders();
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic1);
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048607L);
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
assertThat(headers)
.containsEntry(KafkaHeaders.RECEIVED_KEY, 1)
.containsEntry(KafkaHeaders.RECEIVED_TOPIC, topic1)
.containsEntry(KafkaHeaders.RECEIVED_PARTITION, 0)
.containsEntry(KafkaHeaders.OFFSET, 0L)
.containsEntry(KafkaHeaders.RECEIVED_TIMESTAMP, 1487694048607L)
.containsEntry(KafkaHeaders.TIMESTAMP_TYPE, "CREATE_TIME")
.containsKeys(MessageHeaders.TIMESTAMP, MessageHeaders.ID);
assertThat(headers.get("testHeader")).isEqualTo("testValue");

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018-2024 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -69,6 +69,7 @@ import org.springframework.kafka.support.LogIfLevelEnabled.Level;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
@@ -297,6 +298,7 @@ class MessageSourceTests {
assertThat(received.getHeaders().get(KafkaHeaders.RAW_DATA)).isInstanceOf(ConsumerRecord.class);
assertThat(received.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA))
.isSameAs(received.getHeaders().get(KafkaHeaders.RAW_DATA));
assertThat(received.getHeaders()).containsKeys(MessageHeaders.TIMESTAMP, MessageHeaders.ID);
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(AcknowledgmentCallback.Status.ACCEPT);
received = source.receive();

View File

@@ -27,27 +27,34 @@ The `AbstractCorrelatingMessageHandler` does not throw an `IllegalArgumentExcept
Instead, such a collection is wrapped into a single reply message.
See xref:aggregator.adoc[Aggregator] for more information.
[[x6.4-correlation-changes]]
[[x6.5-correlation-changes]]
== The `discardIndividuallyOnExpiry` Option For Correlation Handlers
The aggregator and resequencer can now discard the whole expired group as a single message via setting `discardIndividuallyOnExpiry` to `false`.
See xref:aggregator.adoc#releasestrategy[ReleaseStrategy] for more information.
[[x6.4-message-store-with-locks]]
[[x6.5-message-store-with-locks]]
== The `LockRegistry` in the `MessageStore`
The `AbstractMessageGroupStore` now can be configured with a `LockRegistry` to perform series of persistent operation atomically.
See xref:message-store.adoc#use-lock-registry[Use LockRegistry] for more information.
[[x6.4-observation-changes]]
[[x6.5-observation-changes]]
== Micrometer Observation Changes
The `SourcePollingChannelAdapter` endpoint now starts a `CONSUMER` kind observation for the received message.
The `MessageReceiverContext` now distinguishes between `handler`, `message-source` and `message-producer` values for the `spring.integration.type` low cardinality tag.
See xref:metrics.adoc#micrometer-observation[Micrometer Observation] for more information.
[[x6.4-mqtt-changes]]
[[x6.5-mqtt-changes]]
== Optional Paho MQTT Dependencies
The `org.eclipse.paho:org.eclipse.paho.client.mqttv3` dependency for `spring-integration-mqtt` is now also optional as `org.eclipse.paho:org.eclipse.paho.mqttv5.client` always was.
See xref:mqtt.adoc[MQTT Support] for more information.
See xref:mqtt.adoc[MQTT Support] for more information.
[[x6.5-kafka-changes]]
== Apache Kafka support Changes
The `KafkaMessageSource` and `KafkaMessageDrivenChannelAdapter` now generate `MessageHeaders.ID` and `MessageHeaders.TIMESTAMP` headers by default as the rest of Spring Integration channel adapters.
The behavior can be restored to the previous with injection of the `MessagingMessageConverter` with default settings.
See xref:kafka.adoc[Apache Kafka Support] for more information.