GH-9478: Fix MessagingGatewaySupport.onInit() for calling super.onInit()

Fixes: #9478
Issue link: https://github.com/spring-projects/spring-integration/issues/9478

Any `MessagingGatewaySupport` implementation does not register itself into a `SmartLifecycleRoleController`
because they don't call  `super.onInit()` of the `AbstractEndpoint`

* Fix `MessagingGatewaySupport` for calling  `super.onInit()` from its `onInit()`
* Verify `SmartLifecycleRoleController` registration in the `KafkaInboundGatewayTests`
* Remove out of use XML `group-id` attribute from Kafka channel adapter
and move it to the `channel` XSD as the place where it is really used

(cherry picked from commit a8174d5bce)
This commit is contained in:
Artem Bilan
2024-09-17 12:16:31 -04:00
committed by Spring Builds
parent f77a59b0b0
commit 671c90655e
4 changed files with 17 additions and 9 deletions

View File

@@ -423,6 +423,7 @@ public abstract class MessagingGatewaySupport extends AbstractEndpoint
if (!this.replyTimeoutSet) {
this.messagingTemplate.setReceiveTimeout(endpointsDefaultTimeout);
}
super.onInit();
this.initialized = true;
}

View File

@@ -453,6 +453,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="group-id">
<xsd:annotation>
<xsd:documentation>
Set the 'group.id' KafkaConsumer property.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attributeGroup ref="kafkaTemplate"/>
</xsd:complexType>
@@ -764,13 +771,6 @@
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="group-id">
<xsd:annotation>
<xsd:documentation>
Set the 'group.id' KafkaConsumer property.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:attributeGroup>
<xsd:attributeGroup name="headerMapper">

View File

@@ -21,7 +21,8 @@
retry-template="retryTemplate"
recovery-callback="recoveryCallback"
bind-source-record="true"
on-partitions-assigned-seek-callback="onPartitionsAssignedSeekCallback"/>
on-partitions-assigned-seek-callback="onPartitionsAssignedSeekCallback"
role="testRole"/>
<bean id="template" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.kafka.core.KafkaTemplate" type="java.lang.Class"/>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.kafka.inbound.KafkaInboundGateway;
import org.springframework.integration.support.SmartLifecycleRoleController;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.test.annotation.DirtiesContext;
@@ -45,6 +46,9 @@ public class KafkaInboundGatewayTests {
@Autowired
private ApplicationContext context;
@Autowired
SmartLifecycleRoleController roleController;
@Test
public void testProps() {
assertThat(this.gateway1.isAutoStartup()).isFalse();
@@ -69,6 +73,8 @@ public class KafkaInboundGatewayTests {
assertThat(TestUtils.getPropertyValue(this.gateway1, "messagingTemplate.sendTimeout")).isEqualTo(5000L);
assertThat(TestUtils.getPropertyValue(this.gateway1, "messagingTemplate.receiveTimeout")).isEqualTo(43L);
assertThat(TestUtils.getPropertyValue(this.gateway1, "bindSourceRecord", Boolean.class)).isTrue();
assertThat(this.roleController.getRoles()).contains("testRole");
assertThat(this.roleController.getEndpointsRunningStatus("testRole")).containsEntry("gateway1", false);
}
}