Commit a7e57e09 authored by wonwoo's avatar wonwoo Committed by Brian Clozel

Configure codec buffer size in ES Reactive Rest client

This commit adds a new configuration property
`"spring.data.elasticsearch.client.reactive.max-in-memory-size"`
which configures the maximum amount of memory buffered by the
`WebClient` used by the Reactive ElasticSearch client.

See gh-20205
parent 2815e6ee
...@@ -29,6 +29,8 @@ import org.springframework.data.elasticsearch.client.ClientConfiguration; ...@@ -29,6 +29,8 @@ import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient;
import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients; import org.springframework.data.elasticsearch.client.reactive.ReactiveRestClients;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
/** /**
...@@ -52,6 +54,7 @@ public class ReactiveRestClientAutoConfiguration { ...@@ -52,6 +54,7 @@ public class ReactiveRestClientAutoConfiguration {
builder.usingSsl(); builder.usingSsl();
} }
configureTimeouts(builder, properties); configureTimeouts(builder, properties);
configureWebClient(builder, properties);
return builder.build(); return builder.build();
} }
...@@ -67,6 +70,19 @@ public class ReactiveRestClientAutoConfiguration { ...@@ -67,6 +70,19 @@ public class ReactiveRestClientAutoConfiguration {
}); });
} }
private void configureWebClient(ClientConfiguration.TerminalClientConfigurationBuilder builder,
ReactiveRestClientProperties properties) {
PropertyMapper map = PropertyMapper.get();
builder.withWebClientConfigurer((webClient) -> {
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs((configurer) -> map.from(properties.getMaxInMemorySize()).whenNonNull()
.asInt(DataSize::toBytes)
.to((maxInMemorySize) -> configurer.defaultCodecs().maxInMemorySize(maxInMemorySize)))
.build();
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
});
}
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
public ReactiveElasticsearchClient reactiveElasticsearchClient(ClientConfiguration clientConfiguration) { public ReactiveElasticsearchClient reactiveElasticsearchClient(ClientConfiguration clientConfiguration) {
......
...@@ -22,6 +22,7 @@ import java.util.Collections; ...@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.util.unit.DataSize;
/** /**
* Configuration properties for Elasticsearch Reactive REST clients. * Configuration properties for Elasticsearch Reactive REST clients.
...@@ -62,6 +63,12 @@ public class ReactiveRestClientProperties { ...@@ -62,6 +63,12 @@ public class ReactiveRestClientProperties {
*/ */
private Duration socketTimeout; private Duration socketTimeout;
/**
* Limit on the number of bytes that can be buffered whenever the input stream needs
* to be aggregated.
*/
private DataSize maxInMemorySize;
public List<String> getEndpoints() { public List<String> getEndpoints() {
return this.endpoints; return this.endpoints;
} }
...@@ -110,4 +117,12 @@ public class ReactiveRestClientProperties { ...@@ -110,4 +117,12 @@ public class ReactiveRestClientProperties {
this.socketTimeout = socketTimeout; this.socketTimeout = socketTimeout;
} }
public DataSize getMaxInMemorySize() {
return this.maxInMemorySize;
}
public void setMaxInMemorySize(DataSize maxInMemorySize) {
this.maxInMemorySize = maxInMemorySize;
}
} }
...@@ -77,6 +77,7 @@ public class ReactiveRestClientAutoConfigurationTests { ...@@ -77,6 +77,7 @@ public class ReactiveRestClientAutoConfigurationTests {
this.contextRunner.withPropertyValues( this.contextRunner.withPropertyValues(
"spring.data.elasticsearch.client.reactive.endpoints=" + elasticsearch.getContainerIpAddress() + ":" "spring.data.elasticsearch.client.reactive.endpoints=" + elasticsearch.getContainerIpAddress() + ":"
+ elasticsearch.getFirstMappedPort(), + elasticsearch.getFirstMappedPort(),
"spring.data.elasticsearch.client.reactive.max-in-memory-size=-1",
"spring.data.elasticsearch.client.reactive.connection-timeout=120s", "spring.data.elasticsearch.client.reactive.connection-timeout=120s",
"spring.data.elasticsearch.client.reactive.socket-timeout=120s").run((context) -> { "spring.data.elasticsearch.client.reactive.socket-timeout=120s").run((context) -> {
ReactiveElasticsearchClient client = context.getBean(ReactiveElasticsearchClient.class); ReactiveElasticsearchClient client = context.getBean(ReactiveElasticsearchClient.class);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment