Added support for sync and async bulk-indexing.

New properties for controlling the batch size and the timeout for a message group have been added, consistently with the jdbc-consumer implementation.
Added support for per message index override via INDEX_NAME header.
Overriding behavior of message headers mentioned in properties javadoc.
Improved debug log messages.

Copyright and authors update.

README
This commit is contained in:
Andrea Montemaggio
2021-02-17 14:45:29 -06:00
committed by Soby Chacko
parent 62575ffc75
commit 63dd5bc043
4 changed files with 367 additions and 47 deletions

View File

@@ -21,9 +21,11 @@ Properties grouped by prefix:
=== elasticsearch.consumer
$$async$$:: $$Indicates whether the indexing operation is async or not. By default indexing is done synchronously.$$ *($$Boolean$$, default: `$$false$$`)*
$$id$$:: $$The id of the document index.$$ *($$Expression$$, default: `$$<none>$$`)*
$$index$$:: $$Name of the index.$$ *($$String$$, default: `$$<none>$$`)*
$$routing$$:: $$Indicates the shard to route to. If not provided, this resolves to the ID used on the document.$$ *($$String$$, default: `$$<none>$$`)*
$$batch-size$$:: $$Number of items to index for each request. It defaults to 1. For values greater than 1 bulk indexing API will be used.$$ *($$Integer$$, default: `$$1$$`)*
$$group-timeout$$:: $$Timeout in milliseconds after which message group is flushed when bulk indexing is active. It defaults to -1, meaning no automatic flush of idle message groups occurs.$$ *($$Long$$, default: `$$-1$$`)*
$$id$$:: $$The id of the document to index. If set, the INDEX_ID header value overrides this property on a per message basis.$$ *($$Expression$$, default: `$$<none>$$`)*
$$index$$:: $$Name of the index. If set, the INDEX_NAME header value overrides this property on a per message basis.$$ *($$String$$, default: `$$<none>$$`)*
$$routing$$:: $$Indicates the shard to route to. If not provided, Elasticsearch will default to a hash of the document id.$$ *($$String$$, default: `$$<none>$$`)*
$$timeout-seconds$$:: $$Timeout for the shard to be available. If not set, it defaults to 1 minute set by the Elasticsearch client.$$ *($$Long$$, default: `$$0$$`)*
=== spring.elasticsearch.rest

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 the original author or authors.
* Copyright 2020-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,12 +17,20 @@
package org.springframework.cloud.fn.consumer.elasticsearch;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
@@ -31,12 +39,30 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor;
import org.springframework.integration.aggregator.MessageCountReleaseStrategy;
import org.springframework.integration.config.AggregatorFactoryBean;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author Soby Chacko
* @author Andrea Montemaggio
*/
@Configuration
@EnableConfigurationProperties(ElasticsearchConsumerProperties.class)
public class ElasticsearchConsumerConfiguration {
@@ -46,69 +72,211 @@ public class ElasticsearchConsumerConfiguration {
*/
public static final String INDEX_ID_HEADER = "INDEX_ID";
/**
* Message header for the Index name.
*/
public static final String INDEX_NAME_HEADER = "INDEX_NAME";
private static final Log logger = LogFactory.getLog(ElasticsearchConsumerConfiguration.class);
@Bean
public Consumer<Message<?>> elasticsearchConsumer(RestHighLevelClient restHighLevelClient,
ElasticsearchConsumerProperties consumerProperties) {
FactoryBean<MessageHandler> aggregator(MessageGroupStore messageGroupStore, ElasticsearchConsumerProperties consumerProperties) {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setCorrelationStrategy(message -> "");
aggregatorFactoryBean.setReleaseStrategy(new MessageCountReleaseStrategy(consumerProperties.getBatchSize()));
if (consumerProperties.getGroupTimeout() >= 0) {
aggregatorFactoryBean.setGroupTimeoutExpression(new ValueExpression<>(consumerProperties.getGroupTimeout()));
}
aggregatorFactoryBean.setMessageStore(messageGroupStore);
// Currently, there is no way to customize the splitting behavior of an aggregator receiving
// a Collection<Message<?>> from the configured MessageGroupProcessor.
// Thus, fooling the aggregator with a wrapper of Message<?> is just a straightforward way to preserve the
// individual message headers and release an entire batch to downstream indexing handler.
aggregatorFactoryBean.setProcessorBean(new AbstractAggregatingMessageGroupProcessor() {
@Override
protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
Collection<Message<?>> messages = group.getMessages();
Assert.notEmpty(messages, this.getClass().getSimpleName() + " cannot process empty message groups");
List<Object> payloads = new ArrayList<Object>(messages.size());
for (Message<?> message : messages) {
payloads.add(new MessageWrapper(message));
}
return payloads;
}
});
aggregatorFactoryBean.setExpireGroupsUponCompletion(true);
aggregatorFactoryBean.setSendPartialResultOnExpiry(true);
return aggregatorFactoryBean;
}
@Bean
MessageGroupStore messageGroupStore() {
SimpleMessageStore messageGroupStore = new SimpleMessageStore();
messageGroupStore.setTimeoutOnIdle(true);
messageGroupStore.setCopyOnGet(false);
return messageGroupStore;
}
@Bean
IntegrationFlow elasticsearchConsumerFlow(@Qualifier("aggregator") MessageHandler aggregator, ElasticsearchConsumerProperties properties,
@Qualifier("indexingHandler") MessageHandler indexingHandler) {
final IntegrationFlowBuilder builder =
IntegrationFlows.from(Consumer.class, gateway -> gateway.beanName("elasticsearchConsumer"));
if (properties.getBatchSize() > 1) {
builder.handle(aggregator);
}
return builder.handle(indexingHandler).get();
}
@Bean
public MessageHandler indexingHandler(RestHighLevelClient restHighLevelClient,
ElasticsearchConsumerProperties consumerProperties) {
return message -> {
if (message.getPayload() instanceof Iterable) {
BulkRequest bulkRequest = new BulkRequest();
StreamSupport.stream(((Iterable<?>) message.getPayload()).spliterator(), false)
.filter(MessageWrapper.class::isInstance)
.map(itemPayload -> ((MessageWrapper) itemPayload).getMessage())
.map(m -> buildIndexRequest(m, consumerProperties))
.forEach(bulkRequest::add);
IndexRequest request = new IndexRequest(consumerProperties.getIndex());
String id = "";
if (message.getHeaders().containsKey(INDEX_ID_HEADER)) {
id = (String) message.getHeaders().get(INDEX_ID_HEADER);
index(restHighLevelClient, bulkRequest, consumerProperties.isAsync());
}
else if (consumerProperties.getId() != null) {
id = consumerProperties.getId().getValue(message, String.class);
else {
IndexRequest request = buildIndexRequest(message, consumerProperties);
index(restHighLevelClient, request, consumerProperties.isAsync());
}
request.id(id);
if (message.getPayload() instanceof String) {
request.source((String) message.getPayload(), XContentType.JSON);
}
else if (message.getPayload() instanceof Map) {
request.source((Map<String, ?>) message.getPayload(), XContentType.JSON);
}
else if (message.getPayload() instanceof XContentBuilder) {
request.source((XContentBuilder) message.getPayload());
}
if (!StringUtils.isEmpty(consumerProperties.getRouting())) {
request.routing(consumerProperties.getRouting());
}
if (consumerProperties.getTimeoutSeconds() > 0) {
request.timeout(TimeValue.timeValueSeconds(consumerProperties.getTimeoutSeconds()));
}
index(restHighLevelClient, request, consumerProperties.isAsync());
};
}
private void index(RestHighLevelClient restHighLevelClient,
IndexRequest request, boolean isAsync) {
private IndexRequest buildIndexRequest(Message<?> message, ElasticsearchConsumerProperties consumerProperties) {
IndexRequest request = new IndexRequest();
String index = consumerProperties.getIndex();
if (message.getHeaders().containsKey(INDEX_NAME_HEADER)) {
index = (String) message.getHeaders().get(INDEX_NAME_HEADER);
}
request.index(index);
String id = "";
if (message.getHeaders().containsKey(INDEX_ID_HEADER)) {
id = (String) message.getHeaders().get(INDEX_ID_HEADER);
}
else if (consumerProperties.getId() != null) {
id = consumerProperties.getId().getValue(message, String.class);
}
request.id(id);
if (message.getPayload() instanceof String) {
request.source((String) message.getPayload(), XContentType.JSON);
}
else if (message.getPayload() instanceof Map) {
request.source((Map<String, ?>) message.getPayload(), XContentType.JSON);
}
else if (message.getPayload() instanceof XContentBuilder) {
request.source((XContentBuilder) message.getPayload());
}
if (!StringUtils.isEmpty(consumerProperties.getRouting())) {
request.routing(consumerProperties.getRouting());
}
if (consumerProperties.getTimeoutSeconds() > 0) {
request.timeout(TimeValue.timeValueSeconds(consumerProperties.getTimeoutSeconds()));
}
return request;
}
private void index(RestHighLevelClient restHighLevelClient, BulkRequest request, boolean isAsync) {
if (isAsync) {
restHighLevelClient.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener<BulkResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
if (logger.isDebugEnabled()) {
logger.debug("Document with ID: " + indexResponse.getId() + " has been indexed");
}
public void onResponse(BulkResponse bulkResponse) {
handleBulkResponse(bulkResponse);
}
@Override
public void onFailure(Exception e) {
throw new IllegalStateException("Error occurred while indexing the document", e);
throw new IllegalStateException("Error occurred while performing bulk index operation: " + e.getMessage(), e);
}
});
}
else {
try {
restHighLevelClient.index(request, RequestOptions.DEFAULT);
BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
handleBulkResponse(bulkResponse);
}
catch (IOException e) {
throw new IllegalStateException("Error occurred while indexing the document", e);
throw new IllegalStateException("Error occurred while performing bulk index operation: " + e.getMessage(), e);
}
}
}
private void index(RestHighLevelClient restHighLevelClient, IndexRequest request, boolean isAsync) {
if (isAsync) {
restHighLevelClient.indexAsync(request, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
handleResponse(indexResponse);
}
@Override
public void onFailure(Exception e) {
throw new IllegalStateException("Error occurred while indexing document: " + e.getMessage(), e);
}
});
}
else {
try {
IndexResponse response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
handleResponse(response);
}
catch (IOException e) {
throw new IllegalStateException("Error occurred while indexing document: " + e.getMessage(), e);
}
}
}
private void handleBulkResponse(BulkResponse response) {
if (logger.isDebugEnabled() || response.hasFailures()) {
for (BulkItemResponse itemResponse : response) {
if (itemResponse.isFailed()) {
logger.error(String.format("Index operation [i=%d, id=%s, index=%s] failed: %s",
itemResponse.getItemId(), itemResponse.getId(), itemResponse.getIndex(), itemResponse.getFailureMessage())
);
}
else {
DocWriteResponse r = itemResponse.getResponse();
logger.debug(String.format("Index operation [i=%d, id=%s, index=%s] succeeded: document [id=%s, version=%d] was written on shard %s.",
itemResponse.getItemId(), itemResponse.getId(), itemResponse.getIndex(), r.getId(), r.getVersion(), r.getShardId())
);
}
}
}
if (response.hasFailures()) {
throw new IllegalStateException("Bulk indexing operation completed with failures: " + response.buildFailureMessage());
}
}
private void handleResponse(IndexResponse response) {
logger.debug(String.format("Index operation [index=%s] succeeded: document [id=%s, version=%d] was written on shard %s.",
response.getIndex(), response.getId(), response.getVersion(), response.getShardId())
);
}
static class MessageWrapper {
private final Message<?> message;
MessageWrapper(Message<?> message) {
this.message = message;
}
public Message<?> getMessage() {
return message;
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 the original author or authors.
* Copyright 2020-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,22 +19,28 @@ package org.springframework.cloud.fn.consumer.elasticsearch;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.expression.Expression;
/**
* @author Soby Chacko
* @author Andrea Montemaggio
*/
@ConfigurationProperties("elasticsearch.consumer")
public class ElasticsearchConsumerProperties {
/**
* The id of the document index.
* The id of the document to index.
* If set, the INDEX_ID header value overrides this property on a per message basis.
*/
Expression id;
/**
* Name of the index.
* If set, the INDEX_NAME header value overrides this property on a per message basis.
*/
String index;
/**
* Indicates the shard to route to.
* If not provided, this resolves to the ID used on the document.
* If not provided, Elasticsearch will default to a hash of the document id.
*/
String routing;
@@ -50,6 +56,18 @@ public class ElasticsearchConsumerProperties {
*/
boolean async;
/**
* Number of items to index for each request. It defaults to 1.
* For values greater than 1 bulk indexing API will be used.
*/
int batchSize = 1;
/**
* Timeout in milliseconds after which message group is flushed when bulk indexing is active.
* It defaults to -1, meaning no automatic flush of idle message groups occurs.
*/
long groupTimeout = -1L;
public Expression getId() {
return id;
}
@@ -89,4 +107,20 @@ public class ElasticsearchConsumerProperties {
public void setAsync(boolean async) {
this.async = async;
}
public int getBatchSize() {
return batchSize;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public long getGroupTimeout() {
return groupTimeout;
}
public void setGroupTimeout(long groupTimeout) {
this.groupTimeout = groupTimeout;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2020-2020 the original author or authors.
* Copyright 2020-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@ package org.springframework.cloud.fn.consumer.elasticsearch;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import org.awaitility.Awaitility;
@@ -43,7 +44,13 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
/**
* @author Soby Chacko
* @author Andrea Montemaggio
*/
@Tag("integration")
@Testcontainers(disabledWithoutDocker = true)
public class ElasticsearchConsumerApplicationTests {
@@ -182,6 +189,115 @@ public class ElasticsearchConsumerApplicationTests {
});
}
@Test
public void testBulkIndexingWithIdFromHeader() {
this.contextRunner
.withPropertyValues("elasticsearch.consumer.index=foo_" + UUID.randomUUID(), "elasticsearch.consumer.batch-size=10",
"spring.elasticsearch.rest.uris=http://" + elasticsearch.getHttpHostAddress())
.run(context -> {
Consumer<Message<?>> elasticsearchConsumer = context.getBean("elasticsearchConsumer", Consumer.class);
ElasticsearchConsumerProperties properties = context.getBean(ElasticsearchConsumerProperties.class);
RestHighLevelClient restHighLevelClient = context.getBean(RestHighLevelClient.class);
for (int i = 0; i < properties.getBatchSize(); i++) {
final GetRequest getRequest = new GetRequest(properties.getIndex()).id(Integer.toString(i));
assertThatExceptionOfType(ElasticsearchStatusException.class)
.isThrownBy(() -> restHighLevelClient.get(getRequest, RequestOptions.DEFAULT))
.withFailMessage("Expected index not found exception for message %d")
.withMessageContaining("index_not_found_exception");
final Message<String> message = MessageBuilder
.withPayload("{\"seq\":" + i + ",\"age\":10,\"dateOfBirth\":1471466076564,"
+ "\"fullName\":\"John Doe\"}")
.setHeader(ElasticsearchConsumerConfiguration.INDEX_ID_HEADER, Integer.toString(i))
.build();
elasticsearchConsumer.accept(message);
}
for (int i = 0; i < properties.getBatchSize(); i++) {
GetRequest getRequest = new GetRequest(properties.getIndex()).id(Integer.toString(i));
GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
assertThat(response.isExists())
.withFailMessage("Document with id=%d cannot be found.", i)
.isTrue();
assertThat(response.getSource().get("seq")).isEqualTo(i);
}
});
}
@Test
public void testBulkIndexingItemFailure() {
this.contextRunner
.withPropertyValues("elasticsearch.consumer.index=foo_" + UUID.randomUUID(), "elasticsearch.consumer.batch-size=10",
"spring.elasticsearch.rest.uris=http://" + elasticsearch.getHttpHostAddress())
.run(context -> {
Consumer<Message<?>> elasticsearchConsumer = context.getBean("elasticsearchConsumer", Consumer.class);
ElasticsearchConsumerProperties properties = context.getBean(ElasticsearchConsumerProperties.class);
RestHighLevelClient restHighLevelClient = context.getBean(RestHighLevelClient.class);
for (int i = 0; i < properties.getBatchSize(); i++) {
final GetRequest getRequest = new GetRequest(properties.getIndex()).id(Integer.toString(i));
assertThatExceptionOfType(ElasticsearchStatusException.class)
.isThrownBy(() -> restHighLevelClient.get(getRequest, RequestOptions.DEFAULT))
.withFailMessage("Expected index not found exception for message %d")
.withMessageContaining("index_not_found_exception");
MessageBuilder<String> builder = MessageBuilder
.withPayload("{\"seq\":" + i + ",\"age\":10,\"dateOfBirth\":1471466076564,"
+ "\"fullName\":\"John Doe\"}")
.setHeader(ElasticsearchConsumerConfiguration.INDEX_ID_HEADER, Integer.toString(i));
if (i == 0) {
// set an invalid index name to make the first request fail
builder.setHeader(ElasticsearchConsumerConfiguration.INDEX_NAME_HEADER, "_" + properties.getIndex());
}
final Message<String> message = builder.build();
if (i < properties.getBatchSize() - 1) {
elasticsearchConsumer.accept(message);
}
else {
// last invocation
assertThatIllegalStateException()
.isThrownBy(() -> elasticsearchConsumer.accept(message))
.withMessageContaining("Bulk indexing operation completed with failures");
}
}
});
}
@Test
public void testIndexFromMessageHeader() {
this.contextRunner
.withPropertyValues("elasticsearch.consumer.index=foo",
"spring.elasticsearch.rest.uris=http://" + elasticsearch.getHttpHostAddress())
.run(context -> {
Consumer<Message<?>> elasticsearchConsumer = context.getBean("elasticsearchConsumer", Consumer.class);
ElasticsearchConsumerProperties properties = context.getBean(ElasticsearchConsumerProperties.class);
final String dynamicIndex = properties.getIndex() + "-2";
String jsonObject = "{\"age\":10,\"dateOfBirth\":1471466076564,"
+ "\"fullName\":\"John Doe\"}";
final Message<String> message = MessageBuilder.withPayload(jsonObject)
.setHeader(ElasticsearchConsumerConfiguration.INDEX_ID_HEADER, "2")
.setHeader(ElasticsearchConsumerConfiguration.INDEX_NAME_HEADER, dynamicIndex)
.build();
elasticsearchConsumer.accept(message);
RestHighLevelClient restHighLevelClient = context.getBean(RestHighLevelClient.class);
GetRequest getRequest = new GetRequest(dynamicIndex).id("2");
final GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
assertThat(response.isExists()).isTrue();
assertThat(response.getSourceAsString()).isEqualTo(jsonObject);
assertThat(response.getId()).isEqualTo("2");
});
}
@SpringBootApplication
static class ElasticsearchConsumerTestApplication {
}