diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java index 0324b4fab2..539d3becd5 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2023 the original author or authors. + * Copyright 2015-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -133,6 +133,9 @@ public class KafkaMessageDrivenChannelAdapter extends MessageProducerSuppo if (JacksonPresent.isJackson2Present()) { MessagingMessageConverter messageConverter = new MessagingMessageConverter(); + // For consistency with the rest of Spring Integration channel adapters + messageConverter.setGenerateMessageId(true); + messageConverter.setGenerateTimestamp(true); DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper(); headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0])); messageConverter.setHeaderMapper(headerMapper); @@ -217,8 +220,7 @@ public class KafkaMessageDrivenChannelAdapter extends MessageProducerSuppo * channel is configured). Only used if a * {@link #setRetryTemplate(RetryTemplate)} is specified. Default is an * {@link ErrorMessageSendingRecoverer} if an error channel has been provided. Set to - * null if you wish to throw the exception back to the container after retries are - * exhausted. + * null if you wish to throw the exception back to the container after retries are exhausted. * @param recoveryCallback the recovery callback. */ public void setRecoveryCallback(RecoveryCallback recoveryCallback) { diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java index cf91379973..607994cbc9 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2024 the original author or authors. + * Copyright 2018-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -256,10 +256,15 @@ public class KafkaMessageSource extends AbstractMessageSource Duration.ofMillis(Math.max(this.pollTimeout.toMillis() * 20, MIN_ASSIGN_TIMEOUT)); // NOSONAR - magic this.commitTimeout = consumerProperties.getSyncCommitTimeout(); + MessagingMessageConverter messagingMessageConverter = (MessagingMessageConverter) this.messageConverter; + // For consistency with the rest of Spring Integration channel adapters + messagingMessageConverter.setGenerateMessageId(true); + messagingMessageConverter.setGenerateTimestamp(true); + if (JacksonPresent.isJackson2Present()) { DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper(); headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0])); - ((MessagingMessageConverter) this.messageConverter).setHeaderMapper(headerMapper); + messagingMessageConverter.setHeaderMapper(headerMapper); } } @@ -843,7 +848,7 @@ public class KafkaMessageSource extends AbstractMessageSource i.setRolledBack(true); return i.getRecord().offset(); }) - .collect(Collectors.toList()); + .toList(); if (!rewound.isEmpty()) { this.logger.warn(() -> "Rolled back " + KafkaUtils.format(record) + " later in-flight offsets " diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java index 28cda08605..a1be77c5e4 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2024 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -166,12 +166,14 @@ class MessageDrivenAdapterTests { assertThat(received).isNotNull(); MessageHeaders headers = received.getHeaders(); - assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1); - assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic1); - assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0); - assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L); - assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048607L); - assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME"); + assertThat(headers) + .containsEntry(KafkaHeaders.RECEIVED_KEY, 1) + .containsEntry(KafkaHeaders.RECEIVED_TOPIC, topic1) + .containsEntry(KafkaHeaders.RECEIVED_PARTITION, 0) + .containsEntry(KafkaHeaders.OFFSET, 0L) + .containsEntry(KafkaHeaders.RECEIVED_TIMESTAMP, 1487694048607L) + .containsEntry(KafkaHeaders.TIMESTAMP_TYPE, "CREATE_TIME") + .containsKeys(MessageHeaders.TIMESTAMP, MessageHeaders.ID); assertThat(headers.get("testHeader")).isEqualTo("testValue"); diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceTests.java index 49101a462a..cfe99822c3 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2024 the original author or authors. + * Copyright 2018-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -69,6 +69,7 @@ import org.springframework.kafka.support.LogIfLevelEnabled.Level; import org.springframework.kafka.support.TopicPartitionOffset; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; @@ -297,6 +298,7 @@ class MessageSourceTests { assertThat(received.getHeaders().get(KafkaHeaders.RAW_DATA)).isInstanceOf(ConsumerRecord.class); assertThat(received.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA)) .isSameAs(received.getHeaders().get(KafkaHeaders.RAW_DATA)); + assertThat(received.getHeaders()).containsKeys(MessageHeaders.TIMESTAMP, MessageHeaders.ID); StaticMessageHeaderAccessor.getAcknowledgmentCallback(received) .acknowledge(AcknowledgmentCallback.Status.ACCEPT); received = source.receive(); diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 893a270c43..2e22e85917 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -27,27 +27,34 @@ The `AbstractCorrelatingMessageHandler` does not throw an `IllegalArgumentExcept Instead, such a collection is wrapped into a single reply message. See xref:aggregator.adoc[Aggregator] for more information. -[[x6.4-correlation-changes]] +[[x6.5-correlation-changes]] == The `discardIndividuallyOnExpiry` Option For Correlation Handlers The aggregator and resequencer can now discard the whole expired group as a single message via setting `discardIndividuallyOnExpiry` to `false`. See xref:aggregator.adoc#releasestrategy[ReleaseStrategy] for more information. -[[x6.4-message-store-with-locks]] +[[x6.5-message-store-with-locks]] == The `LockRegistry` in the `MessageStore` The `AbstractMessageGroupStore` now can be configured with a `LockRegistry` to perform series of persistent operation atomically. See xref:message-store.adoc#use-lock-registry[Use LockRegistry] for more information. -[[x6.4-observation-changes]] +[[x6.5-observation-changes]] == Micrometer Observation Changes The `SourcePollingChannelAdapter` endpoint now starts a `CONSUMER` kind observation for the received message. The `MessageReceiverContext` now distinguishes between `handler`, `message-source` and `message-producer` values for the `spring.integration.type` low cardinality tag. See xref:metrics.adoc#micrometer-observation[Micrometer Observation] for more information. -[[x6.4-mqtt-changes]] +[[x6.5-mqtt-changes]] == Optional Paho MQTT Dependencies The `org.eclipse.paho:org.eclipse.paho.client.mqttv3` dependency for `spring-integration-mqtt` is now also optional as `org.eclipse.paho:org.eclipse.paho.mqttv5.client` always was. -See xref:mqtt.adoc[MQTT Support] for more information. \ No newline at end of file +See xref:mqtt.adoc[MQTT Support] for more information. + +[[x6.5-kafka-changes]] +== Apache Kafka support Changes + +The `KafkaMessageSource` and `KafkaMessageDrivenChannelAdapter` now generate `MessageHeaders.ID` and `MessageHeaders.TIMESTAMP` headers by default as the rest of Spring Integration channel adapters. +The behavior can be restored to the previous with injection of the `MessagingMessageConverter` with default settings. +See xref:kafka.adoc[Apache Kafka Support] for more information. \ No newline at end of file