GH-4 - Implement MongoDB-based event publication repository

Signed-off-by: Björn Kieling <bkieling@vmware.com>
This commit is contained in:
Dmitry Belyaev
2022-07-21 16:28:26 +02:00
parent 1e295039c4
commit 314a3823ad
8 changed files with 552 additions and 52 deletions

View File

@@ -17,6 +17,7 @@
<module>spring-modulith-events-core</module>
<module>spring-modulith-events-jpa</module>
<module>spring-modulith-events-jdbc</module>
<module>spring-modulith-events-mongodb</module>
<module>spring-modulith-events-jackson</module>
<module>spring-modulith-events-tests</module>
<module>spring-modulith-events-starter</module>

View File

@@ -94,82 +94,94 @@ class JdbcEventPublicationRepositoryIntegrationTests {
assertThat(repository.findIncompletePublications()).isEmpty();
}
@Test // GH-3
void shouldUpdateSingleEventPublication() {
@Nested
class Update {
var testEvent1 = new TestEvent("id1");
var testEvent2 = new TestEvent("id2");
var serializedEvent1 = "{\"eventId\":\"id1\"}";
var serializedEvent2 = "{\"eventId\":\"id2\"}";
@Test
// GH-3
void shouldUpdateSingleEventPublication() {
when(serializer.serialize(testEvent1)).thenReturn(serializedEvent1);
when(serializer.deserialize(serializedEvent1, TestEvent.class)).thenReturn(testEvent1);
when(serializer.serialize(testEvent2)).thenReturn(serializedEvent2);
when(serializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2);
var testEvent1 = new TestEvent("id1");
var testEvent2 = new TestEvent("id2");
var serializedEvent1 = "{\"eventId\":\"id1\"}";
var serializedEvent2 = "{\"eventId\":\"id2\"}";
var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER);
var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER);
when(serializer.serialize(testEvent1)).thenReturn(serializedEvent1);
when(serializer.deserialize(serializedEvent1, TestEvent.class)).thenReturn(testEvent1);
when(serializer.serialize(testEvent2)).thenReturn(serializedEvent2);
when(serializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2);
// Store publication
repository.create(publication1);
repository.create(publication2);
var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER);
var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER);
// Complete publication
repository.update(publication2.markCompleted());
// Store publication
repository.create(publication1);
repository.create(publication2);
assertThat(repository.findIncompletePublications()).hasSize(1)
.element(0).extracting(EventPublication::getEvent).isEqualTo(testEvent1);
// Complete publication
repository.update(publication2.markCompleted());
assertThat(repository.findIncompletePublications()).hasSize(1)
.element(0).extracting(EventPublication::getEvent).isEqualTo(testEvent1);
}
}
@Test // GH-3
void shouldTolerateEmptyResult() {
@Nested
class FindByEventAndTargetIdentifier {
var testEvent = new TestEvent("id");
var serializedEvent = "{\"eventId\":\"id\"}";
@Test
// GH-3
void shouldTolerateEmptyResult() {
when(serializer.serialize(testEvent)).thenReturn(serializedEvent);
var testEvent = new TestEvent("id");
var serializedEvent = "{\"eventId\":\"id\"}";
assertThat(repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER)).isEmpty();
}
when(serializer.serialize(testEvent)).thenReturn(serializedEvent);
@Test // GH-3
void shouldReturnTheOldestEvent() throws Exception {
assertThat(repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER)).isEmpty();
}
var testEvent = new TestEvent("id");
var serializedEvent = "{\"eventId\":\"id\"}";
@Test
// GH-3
void shouldReturnTheOldestEvent() throws Exception {
when(serializer.serialize(testEvent)).thenReturn(serializedEvent);
when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
var testEvent = new TestEvent("id");
var serializedEvent = "{\"eventId\":\"id\"}";
var publicationOld = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
Thread.sleep(10);
var publicationNew = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
when(serializer.serialize(testEvent)).thenReturn(serializedEvent);
when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
repository.create(publicationNew);
repository.create(publicationOld);
var publicationOld = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
Thread.sleep(10);
var publicationNew = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
var actual = repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER);
repository.create(publicationNew);
repository.create(publicationOld);
assertThat(actual).hasValueSatisfying(it -> {
assertThat(it.getPublicationDate()).isEqualTo(publicationOld.getPublicationDate());
});
}
var actual = repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER);
@Test // GH-3
void shouldSilentlyIgnoreNotSerializableEvents() {
assertThat(actual).hasValueSatisfying(it -> {
assertThat(it.getPublicationDate()).isEqualTo(publicationOld.getPublicationDate());
});
}
var testEvent = new TestEvent("id");
var serializedEvent = "{\"eventId\":\"id\"}";
@Test
// GH-3
void shouldSilentlyIgnoreNotSerializableEvents() {
when(serializer.serialize(testEvent)).thenReturn(serializedEvent);
when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
var testEvent = new TestEvent("id");
var serializedEvent = "{\"eventId\":\"id\"}";
// Store publication
repository.create(CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER));
when(serializer.serialize(testEvent)).thenReturn(serializedEvent);
when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
operations.update("UPDATE EVENT_PUBLICATION SET EVENT_TYPE='abc'");
// Store publication
repository.create(CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER));
assertThat(repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER)).isEmpty();
operations.update("UPDATE EVENT_PUBLICATION SET EVENT_TYPE='abc'");
assertThat(repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER)).isEmpty();
}
}
}

View File

@@ -0,0 +1,51 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.experimental</groupId>
<artifactId>spring-modulith-events</artifactId>
<version>0.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<name>Spring Modulith - Events - MongoDB-based repository</name>
<artifactId>spring-modulith-events-mongodb</artifactId>
<properties>
<module.name>org.springframework.modulith.events.mongodb</module.name>
</properties>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>spring-modulith-events-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>de.flapdoodle.embed</groupId>
<artifactId>de.flapdoodle.embed.mongo</artifactId>
<version>3.4.6</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,50 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.mongodb;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.PersistenceCreator;
import org.springframework.data.mongodb.core.mapping.Document;
import java.time.Instant;
/**
* A MongoDB Document to represent event publications.
*
* @author Dmitry Belyaev
* @author Björn Kieling
*/
@Document(collection = "org_springframework_modulith_events")
@Getter
@RequiredArgsConstructor
@AllArgsConstructor(access = AccessLevel.PACKAGE, onConstructor = @__(@PersistenceCreator))
class MongoDbEventPublication {
@Id
private String id;
private final Instant publicationDate;
private final String listenerId;
private final Object event;
@Setter(AccessLevel.PACKAGE)
private Instant completionDate;
}

View File

@@ -0,0 +1,152 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.mongodb;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.util.TypeInformation;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventPublicationRepository;
import org.springframework.modulith.events.PublicationTargetIdentifier;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
/**
* Repository to store {@link EventPublication}s in a MongoDB.
*
* @author Björn Kieling
* @author Dmitry Belyaev
*/
public class MongoDbEventPublicationRepository implements EventPublicationRepository {
private final MongoTemplate mongoTemplate;
public MongoDbEventPublicationRepository(MongoTemplate mongoTemplate) {
this.mongoTemplate = mongoTemplate;
}
@Override
public EventPublication create(EventPublication publication) {
mongoTemplate.save(domainToDocument(publication));
return publication;
}
@Override
public EventPublication update(CompletableEventPublication publication) {
findDocumentsByEventAndTargetIdentifier(publication.getEvent(), publication.getTargetIdentifier())
.stream()
.findFirst()
.ifPresent(document -> {
document.setCompletionDate(publication.getCompletionDate().orElse(null));
mongoTemplate.save(document);
});
return publication;
}
@Override
public List<EventPublication> findIncompletePublications() {
var query = Query.query(Criteria.where("completionDate").isNull());
return mongoTemplate.find(query, MongoDbEventPublication.class).stream() //
.map(this::documentToDomain) //
.collect(Collectors.toList());
}
@Override
public Optional<EventPublication> findByEventAndTargetIdentifier(
Object event, PublicationTargetIdentifier targetIdentifier) {
var documents = findDocumentsByEventAndTargetIdentifier(event, targetIdentifier);
var results = documents
.stream() //
.map(this::documentToDomain) //
.toList();
// if there are several events with exactly the same payload we return the oldest one first
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
private List<MongoDbEventPublication> findDocumentsByEventAndTargetIdentifier(
Object event, PublicationTargetIdentifier targetIdentifier) {
// we need to enforce writing of the type information
var eventAsMongoType = mongoTemplate.getConverter().convertToMongoType(event, TypeInformation.of(Object.class));
var query = Query //
.query(Criteria //
.where("event").is(eventAsMongoType) //
.and("listenerId").is(targetIdentifier.getValue())) //
.with(Sort.by("publicationDate").ascending());
return mongoTemplate.find(query, MongoDbEventPublication.class);
}
private MongoDbEventPublication domainToDocument(EventPublication publication) {
return new MongoDbEventPublication( //
publication.getPublicationDate(), //
publication.getTargetIdentifier().getValue(), //
publication.getEvent());
}
private CompletableEventPublication documentToDomain(MongoDbEventPublication document) {
return MongoDbEventPublicationAdapter.of(document);
}
@EqualsAndHashCode
@RequiredArgsConstructor(staticName = "of")
private static class MongoDbEventPublicationAdapter implements CompletableEventPublication {
private final MongoDbEventPublication publication;
@Override
public Object getEvent() {
return publication.getEvent();
}
@Override
public PublicationTargetIdentifier getTargetIdentifier() {
return PublicationTargetIdentifier.of(publication.getListenerId());
}
@Override
public Instant getPublicationDate() {
return publication.getPublicationDate();
}
@Override
public Optional<Instant> getCompletionDate() {
return Optional.ofNullable(publication.getCompletionDate());
}
@Override
public boolean isPublicationCompleted() {
return publication.getCompletionDate() != null;
}
@Override
public CompletableEventPublication markCompleted() {
publication.setCompletionDate(Instant.now());
return this;
}
}
}

View File

@@ -0,0 +1,2 @@
@org.springframework.lang.NonNullApi
package org.springframework.modulith.events.mongodb;

View File

@@ -0,0 +1,206 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.mongodb;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.within;
import java.io.IOException;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Optional;
import lombok.Getter;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.PublicationTargetIdentifier;
import org.springframework.modulith.testapp.TestApplication;
import org.springframework.test.context.ContextConfiguration;
import com.mongodb.client.MongoClients;
import de.flapdoodle.embed.mongo.MongodExecutable;
import de.flapdoodle.embed.mongo.MongodStarter;
import de.flapdoodle.embed.mongo.config.ImmutableMongodConfig;
import de.flapdoodle.embed.mongo.config.MongodConfig;
import de.flapdoodle.embed.mongo.config.Net;
import de.flapdoodle.embed.mongo.distribution.Version;
import de.flapdoodle.embed.process.runtime.Network;
import lombok.EqualsAndHashCode;
/**
* @author Björn Kieling
* @author Dmitry Belyaev
*/
@DataMongoTest
@ContextConfiguration(classes = TestApplication.class)
class MongoDbEventPublicationRepositoryTest {
private static final PublicationTargetIdentifier TARGET_IDENTIFIER = PublicationTargetIdentifier.of("listener");
private static final String CONNECTION_STRING = "mongodb://%s:%d";
private static String ip;
private static int port;
private static MongodExecutable mongodExecutable;
private MongoTemplate mongoTemplate;
private MongoDbEventPublicationRepository repository;
@BeforeAll
static void startupMongoDb() throws IOException {
// Refer to https://www.baeldung.com/spring-boot-embedded-mongodb
ip = "localhost";
port = Network.freeServerPort(Network.getLocalHost());
ImmutableMongodConfig mongodConfig = MongodConfig.builder().version(Version.Main.PRODUCTION)
.net(new Net(ip, port, Network.localhostIsIPv6())).build();
MongodStarter starter = MongodStarter.getDefaultInstance();
mongodExecutable = starter.prepare(mongodConfig);
mongodExecutable.start();
}
@AfterAll
static void shutdownMongoDb() {
mongodExecutable.stop();
}
@BeforeEach
void setUp() {
mongoTemplate = new MongoTemplate(MongoClients.create(String.format(CONNECTION_STRING, ip, port)), "test");
repository = new MongoDbEventPublicationRepository(mongoTemplate);
}
@AfterEach
void tearDown() {
mongoTemplate.remove(MongoDbEventPublication.class).all();
}
@Test
void shouldPersistAndUpdateEventPublication() {
TestEvent testEvent = new TestEvent("abc");
CompletableEventPublication publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
// Store publication
repository.create(publication);
List<EventPublication> eventPublications = repository.findIncompletePublications();
assertThat(eventPublications).hasSize(1);
assertThat(eventPublications.get(0).getEvent()).isEqualTo(publication.getEvent());
assertThat(eventPublications.get(0).getTargetIdentifier()).isEqualTo(publication.getTargetIdentifier());
assertThat(repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER)).isPresent();
// Complete publication
repository.update(publication.markCompleted());
assertThat(repository.findIncompletePublications()).isEmpty();
}
@Test
void shouldUpdateSingleEventPublication() {
TestEvent testEvent1 = new TestEvent("id1");
TestEvent testEvent2 = new TestEvent("id2");
CompletableEventPublication publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER);
CompletableEventPublication publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER);
repository.create(publication1);
repository.create(publication2);
repository.update(publication2.markCompleted());
List<EventPublication> withCompletionDateNull = repository.findIncompletePublications();
assertThat(withCompletionDateNull).hasSize(1);
assertThat(withCompletionDateNull.get(0).getEvent()).isEqualTo(testEvent1);
}
@Nested
class FindByEventAndTargetIdentifier {
@Test
void shouldFindEventPublicationByEventAndTargetIdentifier() {
TestEvent testEvent1 = new TestEvent("abc");
TestEvent testEvent2 = new TestEvent("def");
CompletableEventPublication publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER);
repository.create(publication2);
CompletableEventPublication publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER);
repository.create(publication1);
CompletableEventPublication publication3 = CompletableEventPublication.of(
testEvent1, PublicationTargetIdentifier.of(TARGET_IDENTIFIER.getValue() + "!"));
repository.create(publication3);
Optional<EventPublication> actual = repository.findByEventAndTargetIdentifier(testEvent1, TARGET_IDENTIFIER);
assertThat(actual).isPresent();
assertThat(actual.get().getEvent()).isEqualTo(testEvent1);
assertThat(actual.get().getTargetIdentifier()).isEqualTo(TARGET_IDENTIFIER);
}
@Test
void shouldTolerateEmptyResultTest() {
TestEvent testEvent = new TestEvent("id");
Optional<EventPublication> actual =
repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER);
assertThat(actual).isEmpty();
}
@Test
void shouldReturnTheOldestEventTest() throws InterruptedException {
TestEvent testEvent = new TestEvent("id");
CompletableEventPublication publicationOld = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
Thread.sleep(10);
CompletableEventPublication publicationNew = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
repository.create(publicationNew);
repository.create(publicationOld);
Optional<EventPublication> actual =
repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER);
assertThat(actual).isNotEmpty();
assertThat(actual.get().getPublicationDate())
.isCloseTo(publicationOld.getPublicationDate(), within(1, ChronoUnit.MILLIS));
}
}
@EqualsAndHashCode
@Getter
private static final class TestEvent {
private final String eventId;
private TestEvent(String eventId) {
this.eventId = eventId;
}
}
}

View File

@@ -0,0 +1,26 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.testapp;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Dmitry Belyaev
* @author Björn Kieling
*/
@SpringBootApplication
public class TestApplication {
}