Optimize sequence handling in MongoDB job repository

This commit is contained in:
Mahmoud Ben Hassine
2024-11-21 02:18:34 +01:00
parent 6c330d6bd1
commit 50a41a3cb0
4 changed files with 33 additions and 56 deletions

View File

@@ -15,20 +15,20 @@
*/
package org.springframework.batch.core.repository.dao;
import com.mongodb.client.model.FindOneAndUpdateOptions;
import com.mongodb.client.model.ReturnDocument;
import org.bson.Document;
import org.springframework.dao.DataAccessException;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
import static org.springframework.data.mongodb.core.query.Criteria.where;
import static org.springframework.data.mongodb.core.query.Query.query;
// Based on https://www.mongodb.com/blog/post/generating-globally-unique-identifiers-for-use-with-mongodb
// Section: Use a single counter document to generate unique identifiers one at a time
/**
* @author Mahmoud Ben Hassine
* @author Christoph Strobl
* @since 5.2.0
*/
public class MongoSequenceIncrementer implements DataFieldMaxValueIncrementer {
@@ -44,13 +44,11 @@ public class MongoSequenceIncrementer implements DataFieldMaxValueIncrementer {
@Override
public long nextLongValue() throws DataAccessException {
// TODO optimize
MongoSequence sequence = mongoTemplate.findOne(new Query(), MongoSequence.class, sequenceName);
Query query = query(where("_id").is(sequence.getId()));
Update update = new Update().inc("count", 1);
// The following does not return the modified document
mongoTemplate.findAndModify(query, update, MongoSequence.class, sequenceName);
return mongoTemplate.findOne(new Query(), MongoSequence.class, sequenceName).getCount();
return mongoTemplate.execute("BATCH_SEQUENCES",
collection -> collection
.findOneAndUpdate(new Document("_id", sequenceName), new Document("$inc", new Document("count", 1)),
new FindOneAndUpdateOptions().returnDocument(ReturnDocument.AFTER))
.getLong("count"));
}
@Override
@@ -63,33 +61,4 @@ public class MongoSequenceIncrementer implements DataFieldMaxValueIncrementer {
throw new UnsupportedOperationException();
}
public static final class MongoSequence {
private String id;
private long count;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
@Override
public String toString() {
return "MongoSequence{" + "id='" + id + '\'' + ", count=" + count + '}';
}
}
}

View File

@@ -2,6 +2,4 @@
db.getCollection("BATCH_JOB_INSTANCE").drop();
db.getCollection("BATCH_JOB_EXECUTION").drop();
db.getCollection("BATCH_STEP_EXECUTION").drop();
db.getCollection("BATCH_JOB_INSTANCE_SEQ").drop();
db.getCollection("BATCH_JOB_EXECUTION_SEQ").drop();
db.getCollection("BATCH_STEP_EXECUTION_SEQ").drop();
db.getCollection("BATCH_SEQUENCES").drop();

View File

@@ -2,9 +2,17 @@
db.createCollection("BATCH_JOB_INSTANCE");
db.createCollection("BATCH_JOB_EXECUTION");
db.createCollection("BATCH_STEP_EXECUTION");
db.createCollection("BATCH_JOB_INSTANCE_SEQ");
db.createCollection("BATCH_JOB_EXECUTION_SEQ");
db.createCollection("BATCH_STEP_EXECUTION_SEQ");
db.getCollection("BATCH_JOB_INSTANCE_SEQ").insertOne({count : 0});
db.getCollection("BATCH_JOB_EXECUTION_SEQ").insertOne({count : 0});
db.getCollection("BATCH_STEP_EXECUTION_SEQ").insertOne({count : 0});
// SEQUENCES
db.createCollection("BATCH_SEQUENCES");
db.getCollection("BATCH_SEQUENCES").insertOne({_id: "BATCH_JOB_INSTANCE_SEQ", count: Long(0)});
db.getCollection("BATCH_SEQUENCES").insertOne({_id: "BATCH_JOB_EXECUTION_SEQ", count: Long(0)});
db.getCollection("BATCH_SEQUENCES").insertOne({_id: "BATCH_STEP_EXECUTION_SEQ", count: Long(0)});
// INDICES
db.getCollection("BATCH_JOB_INSTANCE").createIndex("job_name_idx", {"jobName": 1}, {});
db.getCollection("BATCH_JOB_INSTANCE").createIndex("job_name_key_idx", {"jobName": 1, "jobKey": 1}, {});
db.getCollection("BATCH_JOB_INSTANCE").createIndex("job_instance_idx", {"jobInstanceId": -1}, {});
db.getCollection("BATCH_JOB_EXECUTION").createIndex("job_instance_idx", {"jobInstanceId": 1}, {});
db.getCollection("BATCH_JOB_EXECUTION").createIndex("job_instance_idx", {"jobInstanceId": 1, "status": 1}, {});
db.getCollection("BATCH_STEP_EXECUTION").createIndex("step_execution_idx", {"stepExecutionId": 1}, {});

View File

@@ -16,6 +16,7 @@
package org.springframework.batch.core.repository.support;
import java.time.LocalDateTime;
import java.util.Map;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
@@ -75,12 +76,13 @@ public class MongoDBJobRepositoryIntegrationTests {
mongoTemplate.createCollection("BATCH_JOB_INSTANCE");
mongoTemplate.createCollection("BATCH_JOB_EXECUTION");
mongoTemplate.createCollection("BATCH_STEP_EXECUTION");
mongoTemplate.createCollection("BATCH_JOB_INSTANCE_SEQ");
mongoTemplate.createCollection("BATCH_JOB_EXECUTION_SEQ");
mongoTemplate.createCollection("BATCH_STEP_EXECUTION_SEQ");
mongoTemplate.getCollection("BATCH_JOB_INSTANCE_SEQ").insertOne(new Document("count", 0));
mongoTemplate.getCollection("BATCH_JOB_EXECUTION_SEQ").insertOne(new Document("count", 0));
mongoTemplate.getCollection("BATCH_STEP_EXECUTION_SEQ").insertOne(new Document("count", 0));
mongoTemplate.createCollection("BATCH_SEQUENCES");
mongoTemplate.getCollection("BATCH_SEQUENCES")
.insertOne(new Document(Map.of("_id", "BATCH_JOB_INSTANCE_SEQ", "count", 0L)));
mongoTemplate.getCollection("BATCH_SEQUENCES")
.insertOne(new Document(Map.of("_id", "BATCH_JOB_EXECUTION_SEQ", "count", 0L)));
mongoTemplate.getCollection("BATCH_SEQUENCES")
.insertOne(new Document(Map.of("_id", "BATCH_STEP_EXECUTION_SEQ", "count", 0L)));
}
@Test