GH-110: Fix ElasticsearchConsumerConfiguration for auto-wire ambiguity

Fixes: https://github.com/spring-cloud/spring-functions-catalog/issues/110
This commit is contained in:
Artem Bilan
2024-12-19 11:43:39 -05:00
parent abae88d409
commit cc018cb994

View File

@@ -50,7 +50,6 @@ import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.gateway.AnnotationGatewayProxyFactoryBean;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
@@ -81,8 +80,7 @@ public class ElasticsearchConsumerConfiguration {
public static final String INDEX_NAME_HEADER = "INDEX_NAME";
@Bean
FactoryBean<MessageHandler> aggregator(MessageGroupStore messageGroupStore,
ElasticsearchConsumerProperties consumerProperties) {
FactoryBean<MessageHandler> elasticsearchAggregator(ElasticsearchConsumerProperties consumerProperties) {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setCorrelationStrategy((message) -> "");
@@ -91,6 +89,9 @@ public class ElasticsearchConsumerConfiguration {
aggregatorFactoryBean
.setGroupTimeoutExpression(new ValueExpression<>(consumerProperties.getGroupTimeout()));
}
SimpleMessageStore messageGroupStore = new SimpleMessageStore();
messageGroupStore.setTimeoutOnIdle(true);
messageGroupStore.setCopyOnGet(false);
aggregatorFactoryBean.setMessageStore(messageGroupStore);
// Currently, there is no way to customize the splitting behavior of an aggregator
@@ -119,16 +120,9 @@ public class ElasticsearchConsumerConfiguration {
}
@Bean
MessageGroupStore messageGroupStore() {
SimpleMessageStore messageGroupStore = new SimpleMessageStore();
messageGroupStore.setTimeoutOnIdle(true);
messageGroupStore.setCopyOnGet(false);
return messageGroupStore;
}
@Bean
IntegrationFlow elasticsearchConsumerFlow(@Qualifier("aggregator") MessageHandler aggregator,
ElasticsearchConsumerProperties properties, @Qualifier("indexingHandler") MessageHandler indexingHandler) {
IntegrationFlow elasticsearchConsumerFlow(@Qualifier("elasticsearchAggregator") MessageHandler aggregator,
ElasticsearchConsumerProperties properties,
@Qualifier("elasticsearchIndexingHandler") MessageHandler indexingHandler) {
return (flow) -> {
if (properties.getBatchSize() > 1) {
@@ -147,7 +141,7 @@ public class ElasticsearchConsumerConfiguration {
}
@Bean
public MessageHandler indexingHandler(ElasticsearchClient elasticsearchClient,
public MessageHandler elasticsearchIndexingHandler(ElasticsearchClient elasticsearchClient,
ElasticsearchConsumerProperties consumerProperties) {
return (message) -> {