GH-9455: Introduce IntegrationKeepAlive

Fixes: #9455
Issue link: https://github.com/spring-projects/spring-integration/issues/9455

* Add an `IntegrationKeepAlive` infrastructure bean to initiate a long-lived non-daemon thread
to keep application alive when it cannot be kept like that for various reason, but has to.
* Expose `spring.integration.keepAlive` global property to disable an `IntegrationKeepAlive` auto-startup
* Test and document the feature
This commit is contained in:
Artem Bilan
2024-09-20 16:53:35 -04:00
parent 430094f4b2
commit 2c2ed25d65
12 changed files with 374 additions and 1 deletions

View File

@@ -44,6 +44,7 @@ import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.config.xml.IntegrationNamespaceUtils;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.endpoint.management.IntegrationKeepAlive;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.support.IntegrationMessageHandlerMethodFactory;
import org.springframework.integration.json.JsonPathUtils;
@@ -129,6 +130,7 @@ public class DefaultConfiguringBeanFactoryPostProcessor implements BeanDefinitio
registerListMessageHandlerMethodFactory();
registerIntegrationConfigurationReport();
registerControlBusCommandRegistry();
registerKeepAlive();
}
@Override
@@ -460,4 +462,15 @@ public class DefaultConfiguringBeanFactoryPostProcessor implements BeanDefinitio
IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME);
}
private void registerKeepAlive() {
if (!this.beanFactory.containsBean(IntegrationContextUtils.INTEGRATION_KEEP_ALIVE_BEAN_NAME)) {
BeanDefinitionBuilder builder =
BeanDefinitionBuilder.genericBeanDefinition(IntegrationKeepAlive.class)
.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
this.registry.registerBeanDefinition(IntegrationContextUtils.INTEGRATION_KEEP_ALIVE_BEAN_NAME,
builder.getBeanDefinition());
}
}
}

View File

@@ -100,8 +100,18 @@ public abstract class IntegrationContextUtils {
public static final String LIST_MESSAGE_HANDLER_FACTORY_BEAN_NAME = "integrationListMessageHandlerMethodFactory";
/**
* The bean name for the {@code org.springframework.integration.support.management.ControlBusCommandRegistry}.
* @since 6.4
*/
public static final String CONTROL_BUS_COMMAND_REGISTRY_BEAN_NAME = "controlBusCommandRegistry";
/**
* The bean name for the {@code org.springframework.integration.endpoint.management.IntegrationKeepAlive}.
* @since 6.4
*/
public static final String INTEGRATION_KEEP_ALIVE_BEAN_NAME = "integrationKeepAlive";
/**
* The default timeout for blocking operations like send and receive messages.
* @since 6.1

View File

@@ -25,7 +25,7 @@ import org.springframework.util.StringUtils;
/**
* Utility class to encapsulate infrastructure Integration properties constants and their default values.
* The default values can be overridden by the {@code META-INF/spring.integration.properties} with this entries
* The default values can be overridden by the {@code META-INF/spring.integration.properties} with these entries
* (includes their default values):
* <ul>
* <li> {@code spring.integration.channels.autoCreate=true}
@@ -38,6 +38,7 @@ import org.springframework.util.StringUtils;
* <li> {@code spring.integration.channels.error.requireSubscribers=true}
* <li> {@code spring.integration.channels.error.ignoreFailures=true}
* <li> {@code spring.integration.endpoints.defaultTimeout=30000}
* <li> {@code spring.integration.keepAlive=true}
* </ul>
*
* @author Artem Bilan
@@ -117,6 +118,12 @@ public final class IntegrationProperties {
*/
public static final String ENDPOINTS_DEFAULT_TIMEOUT = INTEGRATION_PROPERTIES_PREFIX + "endpoints.defaultTimeout";
/**
* Set to {@code false} to fully disable Keep-Alive thread.
* @since 6.4
*/
public static final String KEEP_ALIVE = INTEGRATION_PROPERTIES_PREFIX + "keepAlive";
private static final Properties DEFAULTS;
private boolean channelsAutoCreate = true;
@@ -139,6 +146,8 @@ public final class IntegrationProperties {
private long endpointsDefaultTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;
private boolean keepAlive = true;
private volatile Properties properties;
static {
@@ -312,11 +321,30 @@ public final class IntegrationProperties {
/**
* Configure a value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
* @param endpointsDefaultTimeout the value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
* @since 6.2
*/
public void setEndpointsDefaultTimeout(long endpointsDefaultTimeout) {
this.endpointsDefaultTimeout = endpointsDefaultTimeout;
}
/**
* Return the value of {@link #KEEP_ALIVE} option.
* @return the value of {@link #KEEP_ALIVE} option.
* @since 6.4
*/
public boolean isKeepAlive() {
return this.keepAlive;
}
/**
* Configure a value for {@link #KEEP_ALIVE} option.
* Defaults {@code true} - set to {@code false} disable keep-alive thread.
* @param keepAlive {@code false} to disable keep-alive thread.
*/
public void setKeepAlive(boolean keepAlive) {
this.keepAlive = keepAlive;
}
/**
* Represent the current instance as a {@link Properties}.
* @return the {@link Properties} representation.

View File

@@ -0,0 +1,150 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.endpoint.management;
import java.time.Instant;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.SmartLifecycle;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.endpoint.AbstractPollingEndpoint;
import org.springframework.scheduling.TaskScheduler;
/**
* The component to keep an application alive when there are no non-daemon threads.
* Some application might just not rely on the loops in specific threads for their logic.
* Or target protocol to integrate with communicates via daemon threads.
* <p>
* A bean for this class is registered automatically by Spring Integration infrastructure.
* It is started by application context for a blocked keep-alive dedicated thread
* only if there is no {@link AbstractPollingEndpoint} beans in the application context
* or {@link TaskScheduler} is configured for daemon (or virtual) threads.
* <p>
* Can be stopped (or started respectively) manually after injection into some target service if found redundant.
* <p>
* The {@link IntegrationProperties#KEEP_ALIVE} integration global
* property can be set to {@code false} to disable this component regardless of the application logic.
*
* @author Artem Bilan
*
* @since 6.4
*/
public class IntegrationKeepAlive implements SmartLifecycle, SmartInitializingSingleton, BeanFactoryAware {
private static final Log LOG = LogFactory.getLog(IntegrationKeepAlive.class);
private final AtomicBoolean running = new AtomicBoolean();
private BeanFactory beanFactory;
private boolean autoStartup;
private volatile Thread keepAliveThread;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
@Override
public void afterSingletonsInstantiated() {
IntegrationProperties integrationProperties = IntegrationContextUtils.getIntegrationProperties(this.beanFactory);
this.autoStartup =
integrationProperties.isKeepAlive()
&& (isTaskSchedulerDaemon() || !isAbstractPollingEndpointPresent());
}
private boolean isTaskSchedulerDaemon() {
TaskScheduler taskScheduler = IntegrationContextUtils.getTaskScheduler(this.beanFactory);
AtomicBoolean isDaemon = new AtomicBoolean();
CountDownLatch checkDaemonThreadLatch = new CountDownLatch(1);
taskScheduler.schedule(() -> {
isDaemon.set(Thread.currentThread().isDaemon());
checkDaemonThreadLatch.countDown();
}, Instant.now());
boolean logWarning = false;
try {
if (!checkDaemonThreadLatch.await(10, TimeUnit.SECONDS)) {
logWarning = true;
}
}
catch (InterruptedException ex) {
logWarning = true;
}
if (logWarning) {
LOG.warn("The 'IntegrationKeepAlive' cannot check a 'TaskScheduler' daemon threads status. " +
"Falling back to 'keep-alive'");
}
return isDaemon.get();
}
private boolean isAbstractPollingEndpointPresent() {
return this.beanFactory.getBeanProvider(AbstractPollingEndpoint.class)
.stream()
.findAny()
.isPresent();
}
@Override
public boolean isAutoStartup() {
return this.autoStartup;
}
@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
this.keepAliveThread =
new Thread(() -> {
while (true) {
try {
Thread.sleep(Long.MAX_VALUE);
}
catch (InterruptedException ex) {
break;
}
}
});
this.keepAliveThread.setDaemon(false);
this.keepAliveThread.setName("spring-integration-keep-alive");
this.keepAliveThread.start();
}
}
@Override
public void stop() {
if (this.running.compareAndSet(true, false)) {
this.keepAliveThread.interrupt();
}
}
@Override
public boolean isRunning() {
return this.running.get();
}
}

View File

@@ -1,4 +1,6 @@
/**
* Provides classes related to endpoint management.
*/
@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields
package org.springframework.integration.endpoint.management;

View File

@@ -9,3 +9,4 @@ spring.integration.messagingTemplate.throwExceptionOnLateReply=false
spring.integration.readOnly.headers=
spring.integration.endpoints.noAutoStartup=
spring.integration.endpoints.defaultTimeout=30000
spring.integration.keepAlive=true

View File

@@ -21,6 +21,7 @@ import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.management.IntegrationKeepAlive;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.test.annotation.DirtiesContext;
@@ -52,6 +53,9 @@ public class IntegrationContextTests {
@Autowired
private ThreadPoolTaskScheduler taskScheduler;
@Autowired
private IntegrationKeepAlive integrationKeepAlive;
@Test
public void testIntegrationContextComponents() {
assertThat(this.integrationProperties.isMessagingTemplateThrowExceptionOnLateReply()).isTrue();
@@ -62,6 +66,7 @@ public class IntegrationContextTests {
assertThat(this.serviceActivator.isRunning()).isFalse();
assertThat(this.serviceActivatorExplicit.isAutoStartup()).isTrue();
assertThat(this.serviceActivatorExplicit.isRunning()).isTrue();
assertThat(this.integrationKeepAlive.isRunning()).isTrue();
}
}

View File

@@ -0,0 +1,135 @@
/*
* Copyright 2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.endpoint;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.endpoint.management.IntegrationKeepAlive;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;
/**
* @author Artem Bilan
*
* @since 6.4
*/
@SpringJUnitConfig
@DirtiesContext
public class IntegrationKeepAliveTests {
@Test
void keepAliveIsActive(@Autowired IntegrationKeepAlive integrationKeepAlive) {
assertThat(integrationKeepAlive.isRunning()).isTrue();
Thread keepAliveThread = TestUtils.getPropertyValue(integrationKeepAlive, "keepAliveThread", Thread.class);
assertThat(keepAliveThread.isAlive()).isTrue();
integrationKeepAlive.stop();
await().untilAsserted(() -> assertThat(keepAliveThread.isAlive()).isFalse());
integrationKeepAlive.start();
}
@Configuration
@EnableIntegration
public static class TestConfiguration {
}
@Nested
@ContextConfiguration(classes = WithPollingEndpoint.WithPollingEndpointConfig.class)
class WithPollingEndpoint {
@Test
void keepAliveNotActive(@Autowired IntegrationKeepAlive integrationKeepAlive) {
assertThat(integrationKeepAlive.isRunning()).isFalse();
}
@Configuration
static class WithPollingEndpointConfig {
@Bean
AbstractPollingEndpoint mockPollingEndpoint() {
return mock();
}
}
}
@Nested
@ContextConfiguration(classes = WithDaemonTaskScheduler.WithDaemonTaskSchedulerConfig.class)
class WithDaemonTaskScheduler {
@Test
void keepAliveActive(@Autowired IntegrationKeepAlive integrationKeepAlive) {
assertThat(integrationKeepAlive.isRunning()).isTrue();
}
@Configuration
static class WithDaemonTaskSchedulerConfig {
@Bean
AbstractPollingEndpoint mockPollingEndpoint() {
return mock();
}
@Bean
String daemonSetter(ThreadPoolTaskScheduler taskScheduler) {
taskScheduler.setDaemon(true);
return null;
}
}
}
@Nested
@ContextConfiguration(classes = WithGlobalProperty.WithGlobalPropertyConfig.class)
class WithGlobalProperty {
@Test
void keepAliveNotActive(@Autowired IntegrationKeepAlive integrationKeepAlive) {
assertThat(integrationKeepAlive.isRunning()).isFalse();
}
@Configuration
static class WithGlobalPropertyConfig {
@Bean(IntegrationContextUtils.INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME)
static IntegrationProperties integrationProperties() {
IntegrationProperties integrationProperties = new IntegrationProperties();
integrationProperties.setKeepAlive(false);
return integrationProperties;
}
}
}
}

View File

@@ -99,6 +99,7 @@
** xref:shutdown.adoc[]
** xref:graph.adoc[]
** xref:integration-graph-controller.adoc[]
** xref:keep-alive.adoc[]
* xref:reactive-streams.adoc[]
* xref:native-aot.adoc[]
* xref:endpoint-summary.adoc[]

View File

@@ -19,6 +19,7 @@ spring.integration.endpoints.noAutoStartup= <7>
spring.integration.channels.error.requireSubscribers=true <8>
spring.integration.channels.error.ignoreFailures=true <9>
spring.integration.endpoints.defaultTimeout=30000 <10>
spring.integration.keepAlive=true <11>
----
<1> When true, `input-channel` instances are automatically declared as `DirectChannel` instances when not explicitly found in the application context.
@@ -57,6 +58,11 @@ Since version 5.5.
Default value is 30 seconds to avoid indefinite blocking.
Can be configured to a negative value to restore infinite blocking behavior in endpoints.
Since version 6.2.
<11> Whether to start the `IntegrationKeepAlive`.
Default is `true`, however depends on the beans in the application context.
See xref:keep-alive.adoc[Keep Alive] for more information.
Since version 6.4.
====
These properties can be overridden by adding a `/META-INF/spring.integration.properties` file to the classpath or an `IntegrationContextUtils.INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME` bean for the `org.springframework.integration.context.IntegrationProperties` instance.
@@ -77,5 +83,6 @@ spring.integration.channels.maxBroadcastSubscribers=0x7fffffff
spring.integration.readOnly.headers=
spring.integration.messagingTemplate.throwExceptionOnLateReply=true
spring.integration.endpoints.defaultTimeout=30000
spring.integration.keepAlive=false
----

View File

@@ -0,0 +1,17 @@
[[keep-alive]]
= Integration Keep Alive
Starting with version 6.4, Spring Integration provides an `IntegrationKeepAlive` infrastructure bean.
It manages an `spring-integration-keep-alive` non-daemon forever thread which keeps an application running.
In some use-cases, e.g. `WebSocketInboundChannelAdapter` based on the `StandardWebSocketClient` does not use non-daemon thread for session, therefore an application may exit prematurely.
Or an application logic may have only service activators or outbound channel adapters which rely on some other interaction, but not loops from executors like the one from Web server.
Or all the threads in the application are virtual.
The `IntegrationKeepAlive` is started automatically only if `TaskScheduler` is configured for non-daemon threads and there is no `AbstractPollingEndpoint` beans in the application context.
In case of the `TaskScheduler` bean configured for daemon or virtual threads, the `IntegrationKeepAlive` is started regardless of the presence for `AbstractPollingEndpoint` beans.
This component can be disabled by the `spring.integration.keepAlive` global property.
See xref:configuration/global-properties.adoc[Global Properties] for more information.
The `IntegrationKeepAlive` can be injected in some service and stopped manually if there is no need to keep an application alive or such a status is managed somewhere else.
See also Spring Boot https://docs.spring.io/spring-boot/reference/features/spring-application.html#features.spring-application.virtual-threads[Virtual Threads] documentation for a `spring.main.keep-alive` property.

View File

@@ -27,6 +27,10 @@ The SpEL evaluation infrastructure now supports configuration for `IndexAccessor
Also, an out-of-the-box `JsonIndexAccessor` is provided.
See xref:spel.adoc[SpEL Support] for more information.
The `IntegrationKeepAlive` component has been introduced.
See xref:keep-alive.adoc[Integration Keep Alive] for more information.
[[x6.4-general]]
=== General Changes