Commit e2df9bce authored by Artem Bilan's avatar Artem Bilan Committed by Stephane Nicoll

Expose Spring Integration global properties

Spring Integration comes with some global properties which can be
configured via `META-INF/spring.integration.properties`. The framework
then provides an `integrationGlobalProperties` bean as an
`org.springframework.integration.context.IntegrationProperties`
instance.

This commit allows users to configure these using regular
`application.properties`. If a `META-INF/spring.integration.properties`
file exists, the values are used as fallback.

See gh-25377
parent deaff1a5
......@@ -80,6 +80,23 @@ import org.springframework.util.StringUtils;
TaskSchedulingAutoConfiguration.class })
public class IntegrationAutoConfiguration {
@Bean(name = IntegrationContextUtils.INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME)
@ConditionalOnMissingBean(name = IntegrationContextUtils.INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME)
public static org.springframework.integration.context.IntegrationProperties integrationGlobalProperties(
IntegrationProperties properties) {
org.springframework.integration.context.IntegrationProperties integrationProperties = new org.springframework.integration.context.IntegrationProperties();
integrationProperties.setChannelsAutoCreate(properties.getChannels().isAutoCreate());
integrationProperties.setChannelsMaxUnicastSubscribers(properties.getChannels().getMaxUnicastSubscribers());
integrationProperties.setChannelsMaxBroadcastSubscribers(properties.getChannels().getMaxBroadcastSubscribers());
integrationProperties.setErrorChannelRequireSubscribers(properties.getChannels().isErrorRequireSubscribers());
integrationProperties.setErrorChannelIgnoreFailures(properties.getChannels().isErrorIgnoreFailures());
integrationProperties
.setMessagingTemplateThrowExceptionOnLateReply(properties.getEndpoints().isThrowExceptionOnLateReply());
integrationProperties.setReadOnlyHeaders(properties.getEndpoints().getReadOnlyHeaders());
integrationProperties.setNoAutoStartupEndpoints(properties.getEndpoints().getNoAutoStartup());
return integrationProperties;
}
/**
* Basic Spring Integration configuration.
*/
......
/*
* Copyright 2012-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.integration;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.env.EnvironmentPostProcessor;
import org.springframework.boot.env.OriginTrackedMapPropertySource;
import org.springframework.boot.env.PropertiesPropertySourceLoader;
import org.springframework.boot.origin.Origin;
import org.springframework.boot.origin.OriginLookup;
import org.springframework.core.Ordered;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.PropertySource;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.integration.context.IntegrationProperties;
/**
* The {@link EnvironmentPostProcessor} for Spring Integration.
*
* @author Artem Bilan
* @since 2.5
*/
public class IntegrationEnvironmentPostProcessor implements EnvironmentPostProcessor, Ordered {
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
@Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
registerIntegrationPropertiesFileSource(environment);
}
private static void registerIntegrationPropertiesFileSource(ConfigurableEnvironment environment) {
Resource integrationPropertiesResource = new ClassPathResource("META-INF/spring.integration.properties");
PropertiesPropertySourceLoader loader = new PropertiesPropertySourceLoader();
try {
OriginTrackedMapPropertySource propertyFileSource = (OriginTrackedMapPropertySource) loader
.load("integration-properties-file", integrationPropertiesResource).get(0);
environment.getPropertySources().addLast(new IntegrationPropertySource(propertyFileSource));
}
catch (FileNotFoundException ex) {
// Ignore when no META-INF/spring.integration.properties file in classpath
}
catch (IOException ex) {
throw new IllegalStateException(
"Failed to load integration properties from " + integrationPropertiesResource, ex);
}
}
private static final class IntegrationPropertySource extends PropertySource<Map<String, Object>>
implements OriginLookup<String> {
private static final String PREFIX = "spring.integration.";
private static final Map<String, String> KEYS_MAPPING = new HashMap<>();
static {
KEYS_MAPPING.put(PREFIX + "channels.auto-create", IntegrationProperties.CHANNELS_AUTOCREATE);
KEYS_MAPPING.put(PREFIX + "channels.max-unicast-subscribers",
IntegrationProperties.CHANNELS_MAX_UNICAST_SUBSCRIBERS);
KEYS_MAPPING.put(PREFIX + "channels.max-broadcast-subscribers",
IntegrationProperties.CHANNELS_MAX_BROADCAST_SUBSCRIBERS);
KEYS_MAPPING.put(PREFIX + "channels.error-require-subscribers",
IntegrationProperties.ERROR_CHANNEL_REQUIRE_SUBSCRIBERS);
KEYS_MAPPING.put(PREFIX + "channels.error-ignore-failures",
IntegrationProperties.ERROR_CHANNEL_IGNORE_FAILURES);
KEYS_MAPPING.put(PREFIX + "endpoints.throw-exception-on-late-reply",
IntegrationProperties.THROW_EXCEPTION_ON_LATE_REPLY);
KEYS_MAPPING.put(PREFIX + "endpoints.read-only-headers", IntegrationProperties.READ_ONLY_HEADERS);
KEYS_MAPPING.put(PREFIX + "endpoints.no-auto-startup", IntegrationProperties.ENDPOINTS_NO_AUTO_STARTUP);
}
private final OriginTrackedMapPropertySource origin;
IntegrationPropertySource(OriginTrackedMapPropertySource origin) {
super("original-integration-properties", origin.getSource());
this.origin = origin;
}
@Override
public Object getProperty(String name) {
return this.origin.getProperty(KEYS_MAPPING.get(name));
}
@Override
public Origin getOrigin(String key) {
return this.origin.getOrigin(KEYS_MAPPING.get(name));
}
}
}
......@@ -32,10 +32,22 @@ import org.springframework.boot.jdbc.DataSourceInitializationMode;
@ConfigurationProperties(prefix = "spring.integration")
public class IntegrationProperties {
private final Channels channels = new Channels();
private final Endpoints endpoints = new Endpoints();
private final Jdbc jdbc = new Jdbc();
private final RSocket rsocket = new RSocket();
public Channels getChannels() {
return this.channels;
}
public Endpoints getEndpoints() {
return this.endpoints;
}
public Jdbc getJdbc() {
return this.jdbc;
}
......@@ -44,6 +56,118 @@ public class IntegrationProperties {
return this.rsocket;
}
public static class Channels {
/**
* Whether to create input channels when no respective beans.
*/
private boolean autoCreate = true;
/**
* Default number of max subscribers on unicasting channels.
*/
private int maxUnicastSubscribers = Integer.MAX_VALUE;
/**
* Default number of max subscribers on broadcasting channels.
*/
private int maxBroadcastSubscribers = Integer.MAX_VALUE;
/**
* Require subscribers flag for global 'errorChannel'.
*/
private boolean errorRequireSubscribers = true;
/**
* Ignore failures flag for global 'errorChannel'.
*/
private boolean errorIgnoreFailures = true;
public void setAutoCreate(boolean autoCreate) {
this.autoCreate = autoCreate;
}
public boolean isAutoCreate() {
return this.autoCreate;
}
public void setMaxUnicastSubscribers(int maxUnicastSubscribers) {
this.maxUnicastSubscribers = maxUnicastSubscribers;
}
public int getMaxUnicastSubscribers() {
return this.maxUnicastSubscribers;
}
public void setMaxBroadcastSubscribers(int maxBroadcastSubscribers) {
this.maxBroadcastSubscribers = maxBroadcastSubscribers;
}
public int getMaxBroadcastSubscribers() {
return this.maxBroadcastSubscribers;
}
public void setErrorRequireSubscribers(boolean errorRequireSubscribers) {
this.errorRequireSubscribers = errorRequireSubscribers;
}
public boolean isErrorRequireSubscribers() {
return this.errorRequireSubscribers;
}
public void setErrorIgnoreFailures(boolean errorIgnoreFailures) {
this.errorIgnoreFailures = errorIgnoreFailures;
}
public boolean isErrorIgnoreFailures() {
return this.errorIgnoreFailures;
}
}
public static class Endpoints {
/**
* Whether throw an exception on late reply for gateways.
*/
private boolean throwExceptionOnLateReply = false;
/**
* Ignored headers during message building.
*/
private String[] readOnlyHeaders = {};
/**
* Spring Integration endpoints do not start automatically.
*/
private String[] noAutoStartup = {};
public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) {
this.throwExceptionOnLateReply = throwExceptionOnLateReply;
}
public boolean isThrowExceptionOnLateReply() {
return this.throwExceptionOnLateReply;
}
public void setReadOnlyHeaders(String[] readOnlyHeaders) {
this.readOnlyHeaders = readOnlyHeaders;
}
public String[] getReadOnlyHeaders() {
return this.readOnlyHeaders;
}
public void setNoAutoStartup(String[] noAutoStartup) {
this.noAutoStartup = noAutoStartup;
}
public String[] getNoAutoStartup() {
return this.noAutoStartup;
}
}
public static class Jdbc {
private static final String DEFAULT_SCHEMA_LOCATION = "classpath:org/springframework/"
......@@ -139,7 +263,7 @@ public class IntegrationProperties {
/**
* Whether to handle message mapping for RSocket via Spring Integration.
*/
boolean messageMappingEnabled;
private boolean messageMappingEnabled;
public boolean isMessageMappingEnabled() {
return this.messageMappingEnabled;
......
......@@ -7,6 +7,10 @@ org.springframework.boot.autoconfigure.logging.ConditionEvaluationReportLoggingL
org.springframework.context.ApplicationListener=\
org.springframework.boot.autoconfigure.BackgroundPreinitializer
# Environment Post Processors
org.springframework.boot.env.EnvironmentPostProcessor=\
org.springframework.boot.autoconfigure.integration.IntegrationEnvironmentPostProcessor
# Auto Configuration Import Listeners
org.springframework.boot.autoconfigure.AutoConfigurationImportListener=\
org.springframework.boot.autoconfigure.condition.ConditionEvaluationReportAutoConfigurationImportListener
......
......@@ -16,10 +16,15 @@
package org.springframework.boot.autoconfigure.integration;
import java.io.File;
import java.util.Arrays;
import java.util.List;
import javax.management.MBeanServer;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
......@@ -37,16 +42,25 @@ import org.springframework.boot.autoconfigure.rsocket.RSocketStrategiesAutoConfi
import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfiguration;
import org.springframework.boot.jdbc.DataSourceInitializationMode;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.config.IntegrationManagementConfigurer;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.endpoint.MessageProcessorMessageSource;
import org.springframework.integration.gateway.RequestReplyExchanger;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.MessageProcessor;
import org.springframework.integration.rsocket.ClientRSocketConnector;
import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
......@@ -250,6 +264,115 @@ class IntegrationAutoConfigurationTests {
});
}
@Test
void integrationGlobalPropertiesAutoConfigured() {
new ApplicationContextRunner(() -> {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.setResourceLoader(
new FilteringResourceLoader(new DefaultResourceLoader(), "META-INF/spring.integration.properties"));
return context;
}).withConfiguration(AutoConfigurations.of(JmxAutoConfiguration.class, IntegrationAutoConfiguration.class))
.withPropertyValues("spring.integration.channels.auto-create=false",
"spring.integration.channels.max-unicast-subscribers=2",
"spring.integration.channels.max-broadcast-subscribers=3",
"spring.integration.channels.error-require-subscribers=false",
"spring.integration.channels.error-ignore-failures=false",
"spring.integration.endpoints.throw-exception-on-late-reply=true",
"spring.integration.endpoints.read-only-headers=ignoredHeader",
"spring.integration.endpoints.no-auto-startup=notStartedEndpoint,_org.springframework.integration.errorLogger")
.withBean("testDirectChannel", DirectChannel.class)
.withInitializer((applicationContext) -> new IntegrationEnvironmentPostProcessor()
.postProcessEnvironment(applicationContext.getEnvironment(), null))
.run((context) -> {
assertThat(context)
.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, PublishSubscribeChannel.class)
.hasFieldOrPropertyWithValue("requireSubscribers", false)
.hasFieldOrPropertyWithValue("ignoreFailures", false)
.hasFieldOrPropertyWithValue("maxSubscribers", 3);
assertThat(context).getBean("testDirectChannel", DirectChannel.class)
.hasFieldOrPropertyWithValue("maxSubscribers", 2);
LoggingHandler loggingHandler = context.getBean(LoggingHandler.class);
assertThat(loggingHandler)
.hasFieldOrPropertyWithValue("messageBuilderFactory.readOnlyHeaders",
new String[] { "ignoredHeader" })
.extracting("integrationProperties", InstanceOfAssertFactories.MAP)
.containsEntry(
org.springframework.integration.context.IntegrationProperties.THROW_EXCEPTION_ON_LATE_REPLY,
"true")
.containsEntry(
org.springframework.integration.context.IntegrationProperties.ENDPOINTS_NO_AUTO_STARTUP,
"notStartedEndpoint,_org.springframework.integration.errorLogger");
assertThat(context)
.getBean(IntegrationContextUtils.ERROR_LOGGER_BEAN_NAME, EventDrivenConsumer.class)
.hasFieldOrPropertyWithValue("autoStartup", false);
});
}
@Test
void integrationGlobalPropertiesUserBeanOverridesAutoConfiguration() {
this.contextRunner.withPropertyValues("spring.integration.channels.auto-create=false",
"spring.integration.channels.max-unicast-subscribers=2",
"spring.integration.channels.max-broadcast-subscribers=3",
"spring.integration.channels.error-require-subscribers=false",
"spring.integration.channels.error-ignore-failures=false",
"spring.integration.endpoints.throw-exception-on-late-reply=true",
"spring.integration.endpoints.read-only-headers=ignoredHeader",
"spring.integration.endpoints.no-auto-startup=notStartedEndpoint,_org.springframework.integration.errorLogger")
.withBean(IntegrationContextUtils.INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME,
org.springframework.integration.context.IntegrationProperties.class, () -> {
org.springframework.integration.context.IntegrationProperties properties = new org.springframework.integration.context.IntegrationProperties();
properties.setChannelsMaxUnicastSubscribers(5);
return properties;
})
.withInitializer((applicationContext) -> new IntegrationEnvironmentPostProcessor()
.postProcessEnvironment(applicationContext.getEnvironment(), null))
.run((context) -> assertThat(context).getBean(LoggingHandler.class)
.extracting("integrationProperties", InstanceOfAssertFactories.MAP)
.containsEntry(
org.springframework.integration.context.IntegrationProperties.CHANNELS_AUTOCREATE,
"true")
.containsEntry(
org.springframework.integration.context.IntegrationProperties.ERROR_CHANNEL_REQUIRE_SUBSCRIBERS,
"true")
.containsEntry(
org.springframework.integration.context.IntegrationProperties.ERROR_CHANNEL_IGNORE_FAILURES,
"true")
.containsEntry(
org.springframework.integration.context.IntegrationProperties.THROW_EXCEPTION_ON_LATE_REPLY,
"false")
.containsEntry(
org.springframework.integration.context.IntegrationProperties.CHANNELS_MAX_UNICAST_SUBSCRIBERS,
"5")
.containsEntry(
org.springframework.integration.context.IntegrationProperties.CHANNELS_MAX_BROADCAST_SUBSCRIBERS,
"2147483647")
.containsEntry(
org.springframework.integration.context.IntegrationProperties.ENDPOINTS_NO_AUTO_STARTUP,
"")
.containsEntry(org.springframework.integration.context.IntegrationProperties.READ_ONLY_HEADERS,
""));
}
@Test
void integrationGlobalPropertiesFromSpringIntegrationPropertiesFile() {
// See META-INF/spring.integration.properties
this.contextRunner
.withPropertyValues("spring.integration.channels.auto-create=false",
"spring.integration.endpoints.read-only-headers=ignoredHeader")
.withInitializer((applicationContext) -> new IntegrationEnvironmentPostProcessor()
.postProcessEnvironment(applicationContext.getEnvironment(), null))
.run((context) -> assertThat(context).getBean(LoggingHandler.class)
.extracting("integrationProperties", InstanceOfAssertFactories.MAP)
.containsEntry(
org.springframework.integration.context.IntegrationProperties.CHANNELS_AUTOCREATE,
"false")
.containsEntry(org.springframework.integration.context.IntegrationProperties.READ_ONLY_HEADERS,
"ignoredHeader")
.containsEntry(
org.springframework.integration.context.IntegrationProperties.ENDPOINTS_NO_AUTO_STARTUP,
"testService*"));
}
@Configuration(proxyBeanMethods = false)
static class CustomMBeanExporter {
......@@ -304,4 +427,32 @@ class IntegrationAutoConfigurationTests {
}
private static final class FilteringResourceLoader implements ResourceLoader {
private final ResourceLoader delegate;
private final List<String> resourcesToFilter;
FilteringResourceLoader(ResourceLoader delegate, String... resourcesToFilter) {
this.delegate = delegate;
this.resourcesToFilter = Arrays.asList(resourcesToFilter);
}
@Override
public Resource getResource(String location) {
if (!this.resourcesToFilter.contains(location)) {
return this.delegate.getResource(location);
}
else {
return new FileSystemResource(mock(File.class));
}
}
@Override
public ClassLoader getClassLoader() {
return this.delegate.getClassLoader();
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment