GH-251 - Improve efficiency of event publication completion.

Changed the EventPublicationRepository interface to allow marking an event as completed without having to materialize it in the first place. This allows us to get rid of CompletableEventPublication. EventPublication not exposes its identifier to make sure the stores can actually store the same id.

Introduced EventPublicationRegistry.deleteCompletedPublicationsOlderThan(Duration) to purge completed event publications before a given point in time.
This commit is contained in:
Oliver Drotbohm
2023-08-07 20:40:06 +02:00
parent 22ec81bf0f
commit 79e465c10e
16 changed files with 455 additions and 307 deletions

View File

@@ -0,0 +1,33 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events;
import java.time.Instant;
/**
* Internal interface to be able to mark {@link EventPublication} instances as completed.
*
* @author Oliver Drotbohm
*/
interface Completable {
/**
* Marks the instance as completed at the given {@link Instant}.
*
* @param instant must not be {@literal null}.
*/
void markCompleted(Instant instant);
}

View File

@@ -1,77 +0,0 @@
/*
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events;
import java.time.Clock;
import java.time.Instant;
import java.util.Optional;
/**
* An event publication that can be completed.
*
* @author Oliver Drotbohm
*/
public interface CompletableEventPublication extends EventPublication {
/**
* Returns the completion date of the publication.
*
* @return will never be {@literal null}.
*/
Optional<Instant> getCompletionDate();
/**
* Returns whether the publication of the event has completed.
*
* @return will never be {@literal null}.
*/
default boolean isPublicationCompleted() {
return getCompletionDate().isPresent();
}
/**
* Marks the event publication as completed.
*
* @return will never be {@literal null}.
*/
CompletableEventPublication markCompleted();
/**
* Creates a {@link CompletableEventPublication} for the given event an listener identifier using a default
* {@link Instant}. Prefer using {@link #of(Object, PublicationTargetIdentifier, Instant)} with a dedicated
* {@link Instant} obtained from a {@link Clock}.
*
* @param event must not be {@literal null}.
* @param id must not be {@literal null}.
* @return will never be {@literal null}.
* @see #of(Object, PublicationTargetIdentifier, Instant)
*/
static CompletableEventPublication of(Object event, PublicationTargetIdentifier id) {
return new DefaultEventPublication(event, id, Instant.now());
}
/**
* Creates a {@link CompletableEventPublication} for the given event an listener identifier and publication date.
*
* @param event must not be {@literal null}.
* @param id must not be {@literal null}.
* @param publicationDate must not be {@literal null}.
* @return will never be {@literal null}.
*/
static CompletableEventPublication of(Object event, PublicationTargetIdentifier id, Instant publicationDate) {
return new DefaultEventPublication(event, id, publicationDate);
}
}

View File

@@ -18,16 +18,18 @@ package org.springframework.modulith.events;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.springframework.util.Assert;
/**
* Default {@link CompletableEventPublication} implementation.
* Default {@link Completable} implementation.
*
* @author Oliver Drotbohm
*/
class DefaultEventPublication implements CompletableEventPublication {
class DefaultEventPublication implements EventPublication {
private final UUID identifier;
private final Object event;
private final PublicationTargetIdentifier targetIdentifier;
private final Instant publicationDate;
@@ -47,12 +49,22 @@ class DefaultEventPublication implements CompletableEventPublication {
Assert.notNull(targetIdentifier, "PublicationTargetIdentifier must not be null!");
Assert.notNull(publicationDate, "Publication date must not be null!");
this.identifier = UUID.randomUUID();
this.event = event;
this.targetIdentifier = targetIdentifier;
this.publicationDate = publicationDate;
this.completionDate = Optional.empty();
}
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublication#getPublicationIdentifier()
*/
@Override
public UUID getIdentifier() {
return identifier;
}
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublication#getEvent()
@@ -89,13 +101,11 @@ class DefaultEventPublication implements CompletableEventPublication {
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.CompletableEventPublication#markCompleted()
* @see org.springframework.modulith.events.CompletableEventPublication#markCompleted(java.time.Instant)
*/
@Override
public CompletableEventPublication markCompleted() {
this.completionDate = Optional.of(Instant.now());
return this;
public void markCompleted(Instant instant) {
this.completionDate = Optional.of(instant);
}
/*

View File

@@ -16,6 +16,7 @@
package org.springframework.modulith.events;
import java.time.Clock;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.stream.Stream;
@@ -39,6 +40,7 @@ import org.springframework.util.Assert;
public class DefaultEventPublicationRegistry implements DisposableBean, EventPublicationRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventPublicationRegistry.class);
private static final String REGISTER = "Registering publication of {} for {}.";
private final EventPublicationRepository events;
private final Clock clock;
@@ -65,7 +67,8 @@ public class DefaultEventPublicationRegistry implements DisposableBean, EventPub
@Override
public Collection<EventPublication> store(Object event, Stream<PublicationTargetIdentifier> listeners) {
return listeners.map(it -> map(event, it))
return listeners.map(it -> EventPublication.of(event, it, clock.instant()))
.peek(it -> LOGGER.debug(REGISTER, it.getEvent().getClass().getName(), it.getTargetIdentifier().getValue()))
.map(events::create)
.toList();
}
@@ -90,12 +93,24 @@ public class DefaultEventPublicationRegistry implements DisposableBean, EventPub
Assert.notNull(event, "Domain event must not be null!");
Assert.notNull(targetIdentifier, "Listener identifier must not be null!");
events.findIncompletePublicationsByEventAndTargetIdentifier(event, targetIdentifier) //
.map(DefaultEventPublicationRegistry::logCompleted) //
.map(e -> CompletableEventPublication.of(e.getEvent(), e.getTargetIdentifier()))
.ifPresent(it -> events.update(it.markCompleted()));
LOGGER.debug("Marking publication of event {} to listener {} completed.", //
event.getClass().getName(), targetIdentifier.getValue());
events.markCompleted(event, targetIdentifier, clock.instant());
}
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublicationRegistry#deleteCompletedPublicationsOlderThan(java.time.Duration)
*/
@Override
public void deleteCompletedPublicationsOlderThan(Duration duration) {
Assert.notNull(duration, "Duration must not be null!");
events.deleteCompletedPublicationsBefore(clock.instant().minus(duration));
};
/*
* (non-Javadoc)
* @see org.springframework.beans.factory.DisposableBean#destroy()
@@ -121,23 +136,4 @@ public class DefaultEventPublicationRegistry implements DisposableBean, EventPub
LOGGER.info("{} {} - {}", prefix, it.getEvent().getClass().getName(), it.getTargetIdentifier().getValue());
}
}
private EventPublication map(Object event, PublicationTargetIdentifier targetIdentifier) {
var result = CompletableEventPublication.of(event, targetIdentifier, clock.instant());
LOGGER.debug("Registering publication of {} for {}.", //
result.getEvent().getClass().getName(), result.getTargetIdentifier().getValue());
return result;
}
private static EventPublication logCompleted(EventPublication publication) {
LOGGER.debug("Marking publication of event {} to listener {} completed.", //
publication.getEvent().getClass().getName(), publication.getTargetIdentifier().getValue());
return publication;
}
}

View File

@@ -15,7 +15,10 @@
*/
package org.springframework.modulith.events;
import java.time.Clock;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.PayloadApplicationEvent;
@@ -28,7 +31,40 @@ import org.springframework.util.Assert;
* @author Björn Kieling
* @author Dmitry Belyaev
*/
public interface EventPublication extends Comparable<EventPublication> {
public interface EventPublication extends Comparable<EventPublication>, Completable {
/**
* Creates a {@link EventPublication} for the given event an listener identifier using a default {@link Instant}.
* Prefer using {@link #of(Object, PublicationTargetIdentifier, Instant)} with a dedicated {@link Instant} obtained
* from a {@link Clock}.
*
* @param event must not be {@literal null}.
* @param id must not be {@literal null}.
* @return will never be {@literal null}.
* @see #of(Object, PublicationTargetIdentifier, Instant)
*/
static EventPublication of(Object event, PublicationTargetIdentifier id) {
return new DefaultEventPublication(event, id, Instant.now());
}
/**
* Creates a {@link EventPublication} for the given event an listener identifier and publication date.
*
* @param event must not be {@literal null}.
* @param id must not be {@literal null}.
* @param publicationDate must not be {@literal null}.
* @return will never be {@literal null}.
*/
static EventPublication of(Object event, PublicationTargetIdentifier id, Instant publicationDate) {
return new DefaultEventPublication(event, id, publicationDate);
}
/**
* Returns a unique identifier for this publication.
*
* @return will never be {@literal null}.
*/
UUID getIdentifier();
/**
* Returns the event that is published.
@@ -79,6 +115,22 @@ public interface EventPublication extends Comparable<EventPublication> {
return this.getTargetIdentifier().equals(identifier);
}
/**
* Returns the completion date of the publication.
*
* @return will never be {@literal null}.
*/
Optional<Instant> getCompletionDate();
/**
* Returns whether the publication of the event has completed.
*
* @return will never be {@literal null}.
*/
default boolean isPublicationCompleted() {
return getCompletionDate().isPresent();
}
/*
* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)

View File

@@ -15,6 +15,7 @@
*/
package org.springframework.modulith.events;
import java.time.Duration;
import java.util.Collection;
import java.util.stream.Stream;
@@ -52,4 +53,11 @@ public interface EventPublicationRegistry {
* @param targetIdentifier must not be {@literal null}.
*/
void markCompleted(Object event, PublicationTargetIdentifier targetIdentifier);
/**
* Deletes all completed {@link EventPublication}s that have been completed before the given {@link Duration}.
*
* @param duration must not be {@literal null}.
*/
void deleteCompletedPublicationsOlderThan(Duration duration);
}

View File

@@ -15,9 +15,12 @@
*/
package org.springframework.modulith.events;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import org.springframework.util.Assert;
/**
* Repository to store {@link EventPublication}s.
*
@@ -36,12 +39,30 @@ public interface EventPublicationRepository {
EventPublication create(EventPublication publication);
/**
* Update the data store to mark the backing log entry as completed.
* Marks the given {@link EventPublication} as completed.
*
* @param publication must not be {@literal null}.
* @return will never be {@literal null}.
* @param completionDate must not be {@literal null}.
*/
EventPublication update(CompletableEventPublication publication);
default void markCompleted(EventPublication publication, Instant completionDate) {
Assert.notNull(publication, "EventPublication must not be null!");
Assert.notNull(completionDate, "Instant must not be null!");
publication.markCompleted(completionDate);
markCompleted(publication.getEvent(), publication.getTargetIdentifier(), completionDate);
}
/**
* Marks the publication for the given event and {@link PublicationTargetIdentifier} to be completed at the given
* {@link Instant}.
*
* @param event must not be {@literal null}.
* @param identifier must not be {@literal null}.
* @param completionDate must not be {@literal null}.
*/
void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate);
/**
* Returns all {@link EventPublication} that have not been completed yet.
@@ -64,4 +85,11 @@ public interface EventPublicationRepository {
* Deletes all publications that were already marked as completed.
*/
void deleteCompletedPublications();
/**
* Deletes all publication that were already marked as completed with a completion date before the given one.
*
* @param instant must not be {@literal null}.
*/
void deleteCompletedPublicationsBefore(Instant instant);
}

View File

@@ -24,13 +24,13 @@ import org.junit.jupiter.api.Test;
* @author Björn Kieling
* @author Dmitry Belyaev
*/
class CompletableEventPublicationUnitTests {
class EventPublicationUnitTests {
@Test
void rejectsNullEvent() {
assertThatExceptionOfType(IllegalArgumentException.class)//
.isThrownBy(() -> CompletableEventPublication.of(null, PublicationTargetIdentifier.of("foo")))//
.isThrownBy(() -> EventPublication.of(null, PublicationTargetIdentifier.of("foo")))//
.withMessageContaining("Event");
}
@@ -38,27 +38,17 @@ class CompletableEventPublicationUnitTests {
void rejectsNullTargetIdentifier() {
assertThatExceptionOfType(IllegalArgumentException.class)//
.isThrownBy(() -> CompletableEventPublication.of(new Object(), null))//
.isThrownBy(() -> EventPublication.of(new Object(), null))//
.withMessageContaining("TargetIdentifier");
}
@Test
void publicationIsIncompleteByDefault() {
CompletableEventPublication publication = CompletableEventPublication.of(new Object(),
EventPublication publication = EventPublication.of(new Object(),
PublicationTargetIdentifier.of("foo"));
assertThat(publication.isPublicationCompleted()).isFalse();
assertThat(publication.getCompletionDate()).isNotPresent();
}
@Test
void completionCapturesDate() {
CompletableEventPublication publication = CompletableEventPublication
.of(new Object(), PublicationTargetIdentifier.of("foo")).markCompleted();
assertThat(publication.isPublicationCompleted()).isTrue();
assertThat(publication.getCompletionDate()).isPresent();
}
}

View File

@@ -31,7 +31,6 @@ import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.lang.Nullable;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventPublicationRepository;
import org.springframework.modulith.events.EventSerializer;
@@ -68,6 +67,14 @@ class JdbcEventPublicationRepository implements EventPublicationRepository {
WHERE ID = ?
""";
private static final String SQL_STATEMENT_UPDATE_BY_EVENT_AND_LISTENER_ID = """
UPDATE EVENT_PUBLICATION
SET COMPLETION_DATE = ?
WHERE
LISTENER_ID = ?
AND SERIALIZED_EVENT = ?
""";
private static final String SQL_STATEMENT_FIND_BY_EVENT_AND_LISTENER_ID = """
SELECT *
FROM EVENT_PUBLICATION
@@ -79,10 +86,17 @@ class JdbcEventPublicationRepository implements EventPublicationRepository {
""";
private static final String SQL_STATEMENT_DELETE_UNCOMPLETED = """
DELETE
FROM EVENT_PUBLICATION
WHERE
COMPLETION_DATE IS NOT NULL
DELETE
FROM EVENT_PUBLICATION
WHERE
COMPLETION_DATE IS NOT NULL
""";
private static final String SQL_STATEMENT_DELETE_UNCOMPLETED_BEFORE = """
DELETE
FROM EVENT_PUBLICATION
WHERE
COMPLETION_DATE < ?
""";
private final JdbcOperations operations;
@@ -121,7 +135,7 @@ class JdbcEventPublicationRepository implements EventPublicationRepository {
operations.update( //
SQL_STATEMENT_INSERT, //
uuidToDatabase(UUID.randomUUID()), //
uuidToDatabase(publication.getIdentifier()), //
publication.getEvent().getClass().getName(), //
publication.getTargetIdentifier().getValue(), //
Timestamp.from(publication.getPublicationDate()), //
@@ -130,23 +144,18 @@ class JdbcEventPublicationRepository implements EventPublicationRepository {
return publication;
}
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublicationRepository#markCompleted(java.lang.Object, org.springframework.modulith.events.PublicationTargetIdentifier, java.time.Instant)
*/
@Override
@Transactional
public EventPublication update(CompletableEventPublication publication) {
public void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate) {
var serializedEvent = serializeEvent(publication.getEvent());
var listenerId = publication.getTargetIdentifier().getValue();
var potentialPublicationIdsToBeUpdated = operations.query( //
SQL_STATEMENT_FIND_BY_EVENT_AND_LISTENER_ID, //
(rs, rowNum) -> getUuidFromResultSet(rs), //
serializedEvent, //
listenerId);
potentialPublicationIdsToBeUpdated.stream()
.findFirst()
.ifPresent(id -> update(id, publication));
return publication;
operations.update(SQL_STATEMENT_UPDATE_BY_EVENT_AND_LISTENER_ID, //
Timestamp.from(completionDate), //
identifier.getValue(), //
serializer.serialize(event));
}
@Override
@@ -165,10 +174,7 @@ class JdbcEventPublicationRepository implements EventPublicationRepository {
@Transactional(readOnly = true)
@SuppressWarnings("null")
public List<EventPublication> findIncompletePublications() {
return operations.query( //
SQL_STATEMENT_FIND_UNCOMPLETED, //
this::resultSetToPublications);
return operations.query(SQL_STATEMENT_FIND_UNCOMPLETED, this::resultSetToPublications);
}
@Override
@@ -176,14 +182,16 @@ class JdbcEventPublicationRepository implements EventPublicationRepository {
operations.execute(SQL_STATEMENT_DELETE_UNCOMPLETED);
}
private void update(UUID id, CompletableEventPublication publication) {
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublicationRepository#deleteCompletedPublicationsBefore(java.time.Instant)
*/
@Override
public void deleteCompletedPublicationsBefore(Instant instant) {
var timestamp = publication.getCompletionDate().map(Timestamp::from).orElse(null);
Assert.notNull(instant, "Instant must not be null!");
operations.update( //
SQL_STATEMENT_UPDATE, //
timestamp, //
uuidToDatabase(id));
operations.update(SQL_STATEMENT_DELETE_UNCOMPLETED_BEFORE, Timestamp.from(instant));
}
@SuppressWarnings("null")
@@ -269,7 +277,7 @@ class JdbcEventPublicationRepository implements EventPublicationRepository {
}
}
private static class JdbcEventPublication implements CompletableEventPublication {
private static class JdbcEventPublication implements EventPublication {
private final UUID id;
private final Instant publicationDate;
@@ -308,6 +316,15 @@ class JdbcEventPublicationRepository implements EventPublicationRepository {
this.completionDate = completionDate;
}
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublication#getPublicationIdentifier()
*/
@Override
public UUID getIdentifier() {
return id;
}
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublication#getEvent()
@@ -355,14 +372,11 @@ class JdbcEventPublicationRepository implements EventPublicationRepository {
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.CompletableEventPublication#markCompleted()
* @see org.springframework.modulith.events.Completable#markCompleted(java.time.Instant)
*/
@Override
public CompletableEventPublication markCompleted() {
this.completionDate = Instant.now();
return this;
public void markCompleted(Instant instant) {
this.completionDate = instant;
}
/*

View File

@@ -21,6 +21,7 @@ import static org.mockito.Mockito.*;
import lombok.Value;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
@@ -34,7 +35,6 @@ import org.springframework.boot.test.autoconfigure.jdbc.JdbcTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Import;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.events.PublicationTargetIdentifier;
@@ -79,10 +79,7 @@ class JdbcEventPublicationRepositoryIntegrationTests {
when(serializer.serialize(testEvent)).thenReturn(serializedEvent);
when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
var publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
// Store publication
repository.create(publication);
var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER));
var eventPublications = repository.findIncompletePublications();
@@ -96,7 +93,7 @@ class JdbcEventPublicationRepositoryIntegrationTests {
.isPresent();
// Complete publication
repository.update(publication.markCompleted());
repository.markCompleted(publication, Instant.now());
assertThat(repository.findIncompletePublications()).isEmpty();
}
@@ -117,13 +114,7 @@ class JdbcEventPublicationRepositoryIntegrationTests {
}
private void createPublicationAt(LocalDateTime publicationDate) {
EventPublication publication = mock(EventPublication.class);
when(publication.getEvent()).thenReturn("");
when(publication.getTargetIdentifier()).thenReturn(TARGET_IDENTIFIER);
when(publication.getPublicationDate()).thenReturn(publicationDate.toInstant(ZoneOffset.UTC));
repository.create(publication);
repository.create(EventPublication.of("", TARGET_IDENTIFIER, publicationDate.toInstant(ZoneOffset.UTC)));
}
@Test // GH-3
@@ -139,15 +130,11 @@ class JdbcEventPublicationRepositoryIntegrationTests {
when(serializer.serialize(testEvent2)).thenReturn(serializedEvent2);
when(serializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2);
var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER);
var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER);
// Store publication
repository.create(publication1);
repository.create(publication2);
repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER));
var publication = repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER));
// Complete publication
repository.update(publication2.markCompleted());
repository.markCompleted(publication, Instant.now());
assertThat(repository.findIncompletePublications()).hasSize(1)
.element(0).extracting(EventPublication::getEvent).isEqualTo(testEvent1);
@@ -174,10 +161,10 @@ class JdbcEventPublicationRepositoryIntegrationTests {
when(serializer.serialize(testEvent)).thenReturn(serializedEvent);
when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
var publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
var publication = EventPublication.of(testEvent, TARGET_IDENTIFIER);
repository.create(publication);
repository.update(publication.markCompleted());
repository.markCompleted(publication, Instant.now());
var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER);
@@ -193,23 +180,19 @@ class JdbcEventPublicationRepositoryIntegrationTests {
when(serializer.serialize(testEvent)).thenReturn(serializedEvent);
when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
var publicationOld = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER));
Thread.sleep(10);
var publicationNew = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
repository.create(publicationNew);
repository.create(publicationOld);
repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER));
var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER);
assertThat(actual).hasValueSatisfying(it -> {
assertThat(it.getPublicationDate()) //
.isCloseTo(publicationOld.getPublicationDate(), within(1, ChronoUnit.MILLIS));
.isCloseTo(publication.getPublicationDate(), within(1, ChronoUnit.MILLIS));
});
}
@Test
// GH-3
@Test // GH-3
void shouldSilentlyIgnoreNotSerializableEvents() {
var testEvent = new TestEvent("id");
@@ -219,7 +202,7 @@ class JdbcEventPublicationRepositoryIntegrationTests {
when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
// Store publication
repository.create(CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER));
repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER));
operations.update("UPDATE EVENT_PUBLICATION SET EVENT_TYPE='abc'");
@@ -240,19 +223,41 @@ class JdbcEventPublicationRepositoryIntegrationTests {
when(serializer.serialize(testEvent2)).thenReturn(serializedEvent2);
when(serializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2);
var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER);
var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER);
repository.create(publication1);
repository.create(publication2);
repository.update(publication1.markCompleted());
var publication = repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER));
repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER));
repository.markCompleted(publication, Instant.now());
repository.deleteCompletedPublications();
assertThat(operations.query("SELECT * FROM EVENT_PUBLICATION", (rs, __) -> rs.getString("SERIALIZED_EVENT")))
.hasSize(1).element(0).isEqualTo(serializedEvent2);
}
@Test // GH-251
void shouldDeleteCompletedEventsBefore() {
var testEvent1 = new TestEvent("abc");
var serializedEvent1 = "{\"eventId\":\"abc\"}";
var testEvent2 = new TestEvent("def");
var serializedEvent2 = "{\"eventId\":\"def\"}";
when(serializer.serialize(testEvent1)).thenReturn(serializedEvent1);
when(serializer.deserialize(serializedEvent1, TestEvent.class)).thenReturn(testEvent1);
when(serializer.serialize(testEvent2)).thenReturn(serializedEvent2);
when(serializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2);
repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER));
repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER));
var now = Instant.now();
repository.markCompleted(testEvent1, TARGET_IDENTIFIER, now.minusSeconds(30));
repository.markCompleted(testEvent2, TARGET_IDENTIFIER, now);
repository.deleteCompletedPublicationsBefore(now.minusSeconds(15));
assertThat(operations.query("SELECT * FROM EVENT_PUBLICATION", (rs, __) -> rs.getString("SERIALIZED_EVENT")))
.hasSize(1).element(0).isEqualTo(serializedEvent2);
}
}
@Nested

View File

@@ -51,14 +51,15 @@ class JpaEventPublication {
* @param serializedEvent must not be {@literal null} or empty.
* @param eventType must not be {@literal null}.
*/
JpaEventPublication(Instant publicationDate, String listenerId, String serializedEvent, Class<?> eventType) {
JpaEventPublication(UUID id, Instant publicationDate, String listenerId, String serializedEvent, Class<?> eventType) {
Assert.notNull(id, "Identifier must not be null!");
Assert.notNull(publicationDate, "Publication date must not be null!");
Assert.notNull(listenerId, "Listener id must not be null or empty!");
Assert.notNull(serializedEvent, "Serialized event must not be null or empty!");
Assert.notNull(eventType, "Event type must not be null!");
this.id = UUID.randomUUID();
this.id = id;
this.publicationDate = publicationDate;
this.listenerId = listenerId;
this.serializedEvent = serializedEvent;

View File

@@ -21,8 +21,8 @@ import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventPublicationRepository;
import org.springframework.modulith.events.EventSerializer;
@@ -64,6 +64,13 @@ class JpaEventPublicationRepository implements EventPublicationRepository {
p.completionDate is not null
""";
private static final String DELETE_COMPLETED_BEFORE = """
delete
from JpaEventPublication p
where
p.completionDate < ?1
""";
private final EntityManager entityManager;
private final EventSerializer serializer;
@@ -98,19 +105,14 @@ class JpaEventPublicationRepository implements EventPublicationRepository {
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublicationRepository#update(org.springframework.modulith.events.CompletableEventPublication)
* @see org.springframework.modulith.events.EventPublicationRepository#markCompleted(java.lang.Object, org.springframework.modulith.events.PublicationTargetIdentifier, java.time.Instant)
*/
@Override
@Transactional
public EventPublication update(CompletableEventPublication publication) {
public void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate) {
var id = publication.getTargetIdentifier().getValue();
var event = publication.getEvent();
findEntityBySerializedEventAndListenerIdAndCompletionDateNull(event, id) //
.ifPresent(entity -> entity.completionDate = publication.getCompletionDate().orElse(null));
return publication;
findEntityBySerializedEventAndListenerIdAndCompletionDateNull(event, identifier)
.map(it -> it.completionDate = completionDate);
}
/*
@@ -136,7 +138,7 @@ class JpaEventPublicationRepository implements EventPublicationRepository {
public Optional<EventPublication> findIncompletePublicationsByEventAndTargetIdentifier( //
Object event, PublicationTargetIdentifier targetIdentifier) {
return findEntityBySerializedEventAndListenerIdAndCompletionDateNull(event, targetIdentifier.getValue())
return findEntityBySerializedEventAndListenerIdAndCompletionDateNull(event, targetIdentifier)
.map(this::entityToDomain);
}
@@ -150,14 +152,29 @@ class JpaEventPublicationRepository implements EventPublicationRepository {
entityManager.createQuery(DELETE_COMPLETED).executeUpdate();
}
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublicationRepository#deleteCompletedPublicationsBefore(java.time.Instant)
*/
@Override
public void deleteCompletedPublicationsBefore(Instant instant) {
Assert.notNull(instant, "Instant must not be null!");
var query = entityManager.createQuery(DELETE_COMPLETED_BEFORE);
query.setParameter(1, instant);
query.executeUpdate();
}
private Optional<JpaEventPublication> findEntityBySerializedEventAndListenerIdAndCompletionDateNull( //
Object event, String listenerId) {
Object event, PublicationTargetIdentifier listenerId) {
var serializedEvent = serializeEvent(event);
var query = entityManager.createQuery(BY_EVENT_AND_LISTENER_ID, JpaEventPublication.class)
.setParameter(1, serializedEvent)
.setParameter(2, listenerId);
.setParameter(2, listenerId.getValue());
return query.getResultStream().findFirst();
}
@@ -167,7 +184,8 @@ class JpaEventPublicationRepository implements EventPublicationRepository {
}
private JpaEventPublication domainToEntity(EventPublication domain) {
return new JpaEventPublication(domain.getPublicationDate(), domain.getTargetIdentifier().getValue(),
return new JpaEventPublication(domain.getIdentifier(), domain.getPublicationDate(),
domain.getTargetIdentifier().getValue(),
serializeEvent(domain.getEvent()), domain.getEvent().getClass());
}
@@ -175,7 +193,7 @@ class JpaEventPublicationRepository implements EventPublicationRepository {
return new JpaEventPublicationAdapter(entity, serializer);
}
private static class JpaEventPublicationAdapter implements CompletableEventPublication {
private static class JpaEventPublicationAdapter implements EventPublication {
private final JpaEventPublication publication;
private final EventSerializer serializer;
@@ -196,6 +214,15 @@ class JpaEventPublicationRepository implements EventPublicationRepository {
this.serializer = serializer;
}
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublication#getPublicationIdentifier()
*/
@Override
public UUID getIdentifier() {
return publication.id;
}
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublication#getEvent()
@@ -243,12 +270,11 @@ class JpaEventPublicationRepository implements EventPublicationRepository {
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.CompletableEventPublication#markCompleted()
* @see org.springframework.modulith.events.Completable#markCompleted(java.time.Instant)
*/
@Override
public CompletableEventPublication markCompleted() {
publication.markCompleted();
return this;
public void markCompleted(Instant instant) {
this.publication.completionDate = instant;
}
/*

View File

@@ -23,13 +23,15 @@ import jakarta.persistence.EntityManagerFactory;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import javax.sql.DataSource;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.context.annotation.Bean;
@@ -38,7 +40,6 @@ import org.springframework.context.annotation.Import;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.events.PublicationTargetIdentifier;
@@ -64,7 +65,6 @@ import org.springframework.transaction.annotation.Transactional;
class JpaEventPublicationRepositoryIntegrationTests {
private static final PublicationTargetIdentifier TARGET_IDENTIFIER = PublicationTargetIdentifier.of("listener");
private static final EventSerializer eventSerializer = mock(EventSerializer.class);
@Configuration
@@ -113,29 +113,31 @@ class JpaEventPublicationRepositoryIntegrationTests {
private final JpaEventPublicationRepository repository;
private final EntityManager em;
@AfterEach
public void flush() {
em.flush();
}
@Test
void persistsJpaEventPublication() {
TestEvent testEvent = new TestEvent("abc");
String serializedEvent = "{\"eventId\":\"abc\"}";
var testEvent = new TestEvent("abc");
var serializedEvent = "{\"eventId\":\"abc\"}";
when(eventSerializer.serialize(testEvent)).thenReturn(serializedEvent);
when(eventSerializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
CompletableEventPublication publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER));
// Store publication
repository.create(publication);
var eventPublications = repository.findIncompletePublications();
List<EventPublication> eventPublications = repository.findIncompletePublications();
assertThat(eventPublications).hasSize(1);
assertThat(eventPublications.get(0).getEvent()).isEqualTo(publication.getEvent());
assertThat(eventPublications.get(0).getTargetIdentifier()).isEqualTo(publication.getTargetIdentifier());
assertThat(repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER))
.isPresent();
// Complete publication
repository.update(publication.markCompleted());
repository.markCompleted(publication, Instant.now());
assertThat(repository.findIncompletePublications()).isEmpty();
}
@@ -160,11 +162,10 @@ class JpaEventPublicationRepositoryIntegrationTests {
when(eventSerializer.serialize(testEvent)).thenReturn(serializedEvent);
when(eventSerializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
CompletableEventPublication publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
var publication = EventPublication.of(testEvent, TARGET_IDENTIFIER);
// Store publication
repository.create(publication);
repository.update(publication.markCompleted());
repository.markCompleted(publication, Instant.now());
var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER);
@@ -184,14 +185,9 @@ class JpaEventPublicationRepositoryIntegrationTests {
when(eventSerializer.serialize(testEvent2)).thenReturn(serializedEvent2);
when(eventSerializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2);
var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER);
var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER);
repository.create(publication1);
repository.create(publication2);
repository.update(publication1.markCompleted());
repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER));
repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER));
repository.markCompleted(testEvent1, TARGET_IDENTIFIER, Instant.now());
repository.deleteCompletedPublications();
assertThat(em.createQuery("select p from JpaEventPublication p", JpaEventPublication.class).getResultList())
@@ -212,8 +208,35 @@ class JpaEventPublicationRepositoryIntegrationTests {
.isSortedAccordingTo(Comparator.comparing(EventPublication::getPublicationDate));
}
@Test // GH-251
void shouldDeleteCompletedEventsBefore() {
var testEvent1 = new TestEvent("abc");
var serializedEvent1 = "{\"eventId\":\"abc\"}";
var testEvent2 = new TestEvent("def");
var serializedEvent2 = "{\"eventId\":\"def\"}";
when(eventSerializer.serialize(testEvent1)).thenReturn(serializedEvent1);
when(eventSerializer.deserialize(serializedEvent1, TestEvent.class)).thenReturn(testEvent1);
when(eventSerializer.serialize(testEvent2)).thenReturn(serializedEvent2);
when(eventSerializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2);
repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER));
repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER));
var now = Instant.now();
repository.markCompleted(testEvent1, TARGET_IDENTIFIER, now.minusSeconds(30));
repository.markCompleted(testEvent2, TARGET_IDENTIFIER, now);
repository.deleteCompletedPublicationsBefore(now.minusSeconds(15));
assertThat(em.createQuery("select p from JpaEventPublication p", JpaEventPublication.class).getResultList())
.hasSize(1) //
.element(0).extracting(it -> it.serializedEvent).isEqualTo(serializedEvent2);
}
private void savePublicationAt(LocalDateTime date) {
em.persist(new JpaEventPublication(date.toInstant(ZoneOffset.UTC), "", "", Object.class));
em.persist(new JpaEventPublication(UUID.randomUUID(), date.toInstant(ZoneOffset.UTC), "", "", Object.class));
}
@Value

View File

@@ -16,8 +16,8 @@
package org.springframework.modulith.events.mongodb;
import java.time.Instant;
import java.util.UUID;
import org.bson.types.ObjectId;
import org.springframework.data.annotation.PersistenceCreator;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.lang.Nullable;
@@ -32,7 +32,7 @@ import org.springframework.util.Assert;
@Document(collection = "org_springframework_modulith_events")
class MongoDbEventPublication {
final ObjectId id;
final UUID id;
final Instant publicationDate;
final String listenerId;
final Object event;
@@ -50,7 +50,7 @@ class MongoDbEventPublication {
* @param completionDate can be {@literal null}.
*/
@PersistenceCreator
MongoDbEventPublication(ObjectId id, Instant publicationDate, String listenerId, Object event,
MongoDbEventPublication(UUID id, Instant publicationDate, String listenerId, Object event,
@Nullable Instant completionDate) {
Assert.notNull(id, "Id must not be null!");
@@ -72,8 +72,8 @@ class MongoDbEventPublication {
* @param listenerId must not be {@literal null}.
* @param event must not be {@literal null}.
*/
MongoDbEventPublication(Instant publicationDate, String listenerId, Object event) {
this(new ObjectId(), publicationDate, listenerId, event, null);
MongoDbEventPublication(UUID id, Instant publicationDate, String listenerId, Object event) {
this(id, publicationDate, listenerId, event, null);
}
/**

View File

@@ -22,11 +22,13 @@ import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.util.TypeInformation;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventPublicationRepository;
import org.springframework.modulith.events.PublicationTargetIdentifier;
@@ -55,6 +57,10 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository {
this.mongoTemplate = mongoTemplate;
}
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublicationRepository#create(org.springframework.modulith.events.EventPublication)
*/
@Override
public EventPublication create(EventPublication publication) {
@@ -63,17 +69,17 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository {
return publication;
}
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublicationRepository#markCompleted(java.lang.Object, org.springframework.modulith.events.PublicationTargetIdentifier, java.time.Instant)
*/
@Override
public EventPublication update(CompletableEventPublication publication) {
public void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate) {
return findDocumentsByEventAndTargetIdentifierAndCompletionDateNull(publication.getEvent(),
publication.getTargetIdentifier()) //
.stream() //
.findFirst() //
.map(document -> document.markCompleted(publication.getCompletionDate().orElse(null))) //
.map(mongoTemplate::save) //
.map(this::documentToDomain) //
.orElse(publication);
var criteria = byEventAndListenerId(event, identifier);
var update = Update.update("completionDate", completionDate);
mongoTemplate.updateFirst(query(criteria), update, MongoDbEventPublication.class);
}
@Override
@@ -83,7 +89,7 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository {
.with(Sort.by("publicationDate").ascending());
return mongoTemplate.find(query, MongoDbEventPublication.class).stream() //
.<EventPublication> map(this::documentToDomain) //
.map(this::documentToDomain) //
.toList();
}
@@ -101,39 +107,59 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository {
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
}
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublicationRepository#deleteCompletedPublications()
*/
@Override
public void deleteCompletedPublications() {
mongoTemplate.remove(query(where("completionDate").ne(null)), MongoDbEventPublication.class);
}
/*
* (non-Javadoc)
* @see org.springframework.modulith.events.EventPublicationRepository#deleteCompletedPublicationsBefore(java.time.Instant)
*/
@Override
public void deleteCompletedPublicationsBefore(Instant instant) {
Assert.notNull(instant, "Instant must not be null!");
mongoTemplate.remove(query(where("completionDate").lt(instant)), MongoDbEventPublication.class);
}
private List<MongoDbEventPublication> findDocumentsByEventAndTargetIdentifierAndCompletionDateNull( //
Object event, PublicationTargetIdentifier targetIdentifier) {
// we need to enforce writing of the type information
var eventAsMongoType = mongoTemplate.getConverter().convertToMongoType(event, TypeInformation.OBJECT);
var query = query(
where("event").is(eventAsMongoType) //
.and("listenerId").is(targetIdentifier.getValue()) //
.and("completionDate").isNull()) //
.with(Sort.by("publicationDate").ascending());
var criteria = byEventAndListenerId(event, targetIdentifier);
var query = query(criteria).with(Sort.by("publicationDate").ascending());
return mongoTemplate.find(query, MongoDbEventPublication.class);
}
private Criteria byEventAndListenerId(Object event, PublicationTargetIdentifier identifier) {
var eventAsMongoType = mongoTemplate.getConverter().convertToMongoType(event, TypeInformation.OBJECT);
return where("event").is(eventAsMongoType) //
.and("listenerId").is(identifier.getValue())
.and("completionDate").isNull();
}
private MongoDbEventPublication domainToDocument(EventPublication publication) {
return new MongoDbEventPublication( //
publication.getIdentifier(), //
publication.getPublicationDate(), //
publication.getTargetIdentifier().getValue(), //
publication.getEvent());
}
private CompletableEventPublication documentToDomain(MongoDbEventPublication document) {
private EventPublication documentToDomain(MongoDbEventPublication document) {
return new MongoDbEventPublicationAdapter(document);
}
private static class MongoDbEventPublicationAdapter implements CompletableEventPublication {
private static class MongoDbEventPublicationAdapter implements EventPublication {
private final MongoDbEventPublication publication;
@@ -141,6 +167,11 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository {
this.publication = publication;
}
@Override
public UUID getIdentifier() {
return publication.id;
}
@Override
public Object getEvent() {
return publication.event;
@@ -167,9 +198,8 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository {
}
@Override
public CompletableEventPublication markCompleted() {
publication.completionDate = Instant.now();
return this;
public void markCompleted(Instant instant) {
this.publication.completionDate = instant;
}
/*
@@ -183,11 +213,11 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository {
return true;
}
if (!(obj instanceof MongoDbEventPublicationAdapter other)) {
if (!(obj instanceof MongoDbEventPublicationAdapter that)) {
return false;
}
return Objects.equals(publication, other.publication);
return Objects.equals(publication, that.publication);
}
/*

View File

@@ -19,12 +19,13 @@ import static org.assertj.core.api.Assertions.*;
import lombok.Value;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.Comparator;
import java.util.UUID;
import org.bson.types.ObjectId;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
@@ -32,7 +33,6 @@ import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.PublicationTargetIdentifier;
import org.springframework.modulith.testapp.TestApplication;
@@ -66,11 +66,7 @@ class MongoDbEventPublicationRepositoryTest {
void shouldPersistAndUpdateEventPublication() {
var testEvent = new TestEvent("abc");
var publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
// Store publication
repository.create(publication);
var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER));
var eventPublications = repository.findIncompletePublications();
@@ -82,7 +78,7 @@ class MongoDbEventPublicationRepositoryTest {
.isPresent();
// Complete publication
repository.update(publication.markCompleted());
repository.markCompleted(publication, Instant.now());
assertThat(repository.findIncompletePublications()).isEmpty();
}
@@ -93,12 +89,10 @@ class MongoDbEventPublicationRepositoryTest {
var testEvent1 = new TestEvent("id1");
var testEvent2 = new TestEvent("id2");
var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER);
var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER);
repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER));
var publication = repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER));
repository.create(publication1);
repository.create(publication2);
repository.update(publication2.markCompleted());
repository.markCompleted(publication, Instant.now());
assertThat(repository.findIncompletePublications()).hasSize(1)
.element(0).extracting(EventPublication::getEvent).isEqualTo(testEvent1);
@@ -120,7 +114,7 @@ class MongoDbEventPublicationRepositoryTest {
private void savePublicationAt(LocalDateTime date) {
mongoTemplate.save(
new MongoDbEventPublication(new ObjectId(), date.toInstant(ZoneOffset.UTC), "", "", null));
new MongoDbEventPublication(UUID.randomUUID(), date.toInstant(ZoneOffset.UTC), "", "", null));
}
@Nested
@@ -132,10 +126,10 @@ class MongoDbEventPublicationRepositoryTest {
var testEvent1 = new TestEvent("abc");
var testEvent2 = new TestEvent("def");
repository.create(CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER));
repository.create(CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER));
repository.create(CompletableEventPublication.of(
testEvent1, PublicationTargetIdentifier.of(TARGET_IDENTIFIER.getValue() + "!")));
repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER));
repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER));
repository
.create(EventPublication.of(testEvent1, PublicationTargetIdentifier.of(TARGET_IDENTIFIER.getValue() + "!")));
var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent1, TARGET_IDENTIFIER);
@@ -159,11 +153,11 @@ class MongoDbEventPublicationRepositoryTest {
TestEvent testEvent = new TestEvent("abc");
CompletableEventPublication publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
EventPublication publication = EventPublication.of(testEvent, TARGET_IDENTIFIER);
// Store publication
repository.create(publication);
repository.update(publication.markCompleted());
repository.markCompleted(publication, Instant.now());
var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER);
@@ -175,16 +169,15 @@ class MongoDbEventPublicationRepositoryTest {
var testEvent = new TestEvent("id");
var publicationOld = repository
.create(CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER));
var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER));
Thread.sleep(10);
repository.create(CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER));
repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER));
var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER);
assertThat(actual).hasValueSatisfying(it -> //
assertThat(it.getPublicationDate()) //
.isCloseTo(publicationOld.getPublicationDate(), within(1, ChronoUnit.MILLIS)));
.isCloseTo(publication.getPublicationDate(), within(1, ChronoUnit.MILLIS)));
}
}
@@ -197,20 +190,36 @@ class MongoDbEventPublicationRepositoryTest {
var testEvent1 = new TestEvent("abc");
var testEvent2 = new TestEvent("def");
var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER);
var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER);
repository.create(publication1);
repository.create(publication2);
repository.update(publication1.markCompleted());
var publication = repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER));
repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER));
repository.markCompleted(publication, Instant.now());
repository.deleteCompletedPublications();
assertThat(mongoTemplate.findAll(MongoDbEventPublication.class)) //
.hasSize(1) //
.element(0).extracting(it -> it.event).isEqualTo(testEvent2);
}
@Test // GH-251
void shouldDeleteCompletedEventsBefore() {
var testEvent1 = new TestEvent("abc");
var testEvent2 = new TestEvent("def");
var publication1 = repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER));
var publication2 = repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER));
var now = Instant.now();
repository.markCompleted(publication1, now.minusSeconds(30));
repository.markCompleted(publication2, now);
repository.deleteCompletedPublicationsBefore(now.minusSeconds(15));
assertThat(mongoTemplate.findAll(MongoDbEventPublication.class)) //
.hasSize(1) //
.element(0).extracting(it -> it.event).isEqualTo(testEvent2);
}
}
@Value