From 79e465c10ee1027eb51494291cddfbd16eef4b3d Mon Sep 17 00:00:00 2001 From: Oliver Drotbohm Date: Mon, 7 Aug 2023 20:40:06 +0200 Subject: [PATCH] GH-251 - Improve efficiency of event publication completion. Changed the EventPublicationRepository interface to allow marking an event as completed without having to materialize it in the first place. This allows us to get rid of CompletableEventPublication. EventPublication not exposes its identifier to make sure the stores can actually store the same id. Introduced EventPublicationRegistry.deleteCompletedPublicationsOlderThan(Duration) to purge completed event publications before a given point in time. --- .../modulith/events/Completable.java | 33 +++++++ .../events/CompletableEventPublication.java | 77 ---------------- .../events/DefaultEventPublication.java | 24 +++-- .../DefaultEventPublicationRegistry.java | 44 +++++----- .../modulith/events/EventPublication.java | 54 +++++++++++- .../events/EventPublicationRegistry.java | 8 ++ .../events/EventPublicationRepository.java | 34 ++++++- ...st.java => EventPublicationUnitTests.java} | 18 +--- .../jdbc/JdbcEventPublicationRepository.java | 88 +++++++++++-------- ...PublicationRepositoryIntegrationTests.java | 81 +++++++++-------- .../events/jpa/JpaEventPublication.java | 5 +- .../jpa/JpaEventPublicationRepository.java | 64 ++++++++++---- ...PublicationRepositoryIntegrationTests.java | 69 ++++++++++----- .../mongodb/MongoDbEventPublication.java | 10 +-- .../MongoDbEventPublicationRepository.java | 82 +++++++++++------ ...MongoDbEventPublicationRepositoryTest.java | 71 ++++++++------- 16 files changed, 455 insertions(+), 307 deletions(-) create mode 100644 spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/Completable.java delete mode 100644 spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/CompletableEventPublication.java rename spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/{CompletableEventPublicationTest.java => EventPublicationUnitTests.java} (68%) diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/Completable.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/Completable.java new file mode 100644 index 00000000..b63375fd --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/Completable.java @@ -0,0 +1,33 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.modulith.events; + +import java.time.Instant; + +/** + * Internal interface to be able to mark {@link EventPublication} instances as completed. + * + * @author Oliver Drotbohm + */ +interface Completable { + + /** + * Marks the instance as completed at the given {@link Instant}. + * + * @param instant must not be {@literal null}. + */ + void markCompleted(Instant instant); +} diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/CompletableEventPublication.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/CompletableEventPublication.java deleted file mode 100644 index bcc2d247..00000000 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/CompletableEventPublication.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2017-2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.modulith.events; - -import java.time.Clock; -import java.time.Instant; -import java.util.Optional; - -/** - * An event publication that can be completed. - * - * @author Oliver Drotbohm - */ -public interface CompletableEventPublication extends EventPublication { - - /** - * Returns the completion date of the publication. - * - * @return will never be {@literal null}. - */ - Optional getCompletionDate(); - - /** - * Returns whether the publication of the event has completed. - * - * @return will never be {@literal null}. - */ - default boolean isPublicationCompleted() { - return getCompletionDate().isPresent(); - } - - /** - * Marks the event publication as completed. - * - * @return will never be {@literal null}. - */ - CompletableEventPublication markCompleted(); - - /** - * Creates a {@link CompletableEventPublication} for the given event an listener identifier using a default - * {@link Instant}. Prefer using {@link #of(Object, PublicationTargetIdentifier, Instant)} with a dedicated - * {@link Instant} obtained from a {@link Clock}. - * - * @param event must not be {@literal null}. - * @param id must not be {@literal null}. - * @return will never be {@literal null}. - * @see #of(Object, PublicationTargetIdentifier, Instant) - */ - static CompletableEventPublication of(Object event, PublicationTargetIdentifier id) { - return new DefaultEventPublication(event, id, Instant.now()); - } - - /** - * Creates a {@link CompletableEventPublication} for the given event an listener identifier and publication date. - * - * @param event must not be {@literal null}. - * @param id must not be {@literal null}. - * @param publicationDate must not be {@literal null}. - * @return will never be {@literal null}. - */ - static CompletableEventPublication of(Object event, PublicationTargetIdentifier id, Instant publicationDate) { - return new DefaultEventPublication(event, id, publicationDate); - } -} diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublication.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublication.java index ccd11350..f907afd5 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublication.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublication.java @@ -18,16 +18,18 @@ package org.springframework.modulith.events; import java.time.Instant; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import org.springframework.util.Assert; /** - * Default {@link CompletableEventPublication} implementation. + * Default {@link Completable} implementation. * * @author Oliver Drotbohm */ -class DefaultEventPublication implements CompletableEventPublication { +class DefaultEventPublication implements EventPublication { + private final UUID identifier; private final Object event; private final PublicationTargetIdentifier targetIdentifier; private final Instant publicationDate; @@ -47,12 +49,22 @@ class DefaultEventPublication implements CompletableEventPublication { Assert.notNull(targetIdentifier, "PublicationTargetIdentifier must not be null!"); Assert.notNull(publicationDate, "Publication date must not be null!"); + this.identifier = UUID.randomUUID(); this.event = event; this.targetIdentifier = targetIdentifier; this.publicationDate = publicationDate; this.completionDate = Optional.empty(); } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublication#getPublicationIdentifier() + */ + @Override + public UUID getIdentifier() { + return identifier; + } + /* * (non-Javadoc) * @see org.springframework.modulith.events.EventPublication#getEvent() @@ -89,13 +101,11 @@ class DefaultEventPublication implements CompletableEventPublication { /* * (non-Javadoc) - * @see org.springframework.modulith.events.CompletableEventPublication#markCompleted() + * @see org.springframework.modulith.events.CompletableEventPublication#markCompleted(java.time.Instant) */ @Override - public CompletableEventPublication markCompleted() { - - this.completionDate = Optional.of(Instant.now()); - return this; + public void markCompleted(Instant instant) { + this.completionDate = Optional.of(instant); } /* diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublicationRegistry.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublicationRegistry.java index dc4a11ee..2a9cd506 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublicationRegistry.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublicationRegistry.java @@ -16,6 +16,7 @@ package org.springframework.modulith.events; import java.time.Clock; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.stream.Stream; @@ -39,6 +40,7 @@ import org.springframework.util.Assert; public class DefaultEventPublicationRegistry implements DisposableBean, EventPublicationRegistry { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventPublicationRegistry.class); + private static final String REGISTER = "Registering publication of {} for {}."; private final EventPublicationRepository events; private final Clock clock; @@ -65,7 +67,8 @@ public class DefaultEventPublicationRegistry implements DisposableBean, EventPub @Override public Collection store(Object event, Stream listeners) { - return listeners.map(it -> map(event, it)) + return listeners.map(it -> EventPublication.of(event, it, clock.instant())) + .peek(it -> LOGGER.debug(REGISTER, it.getEvent().getClass().getName(), it.getTargetIdentifier().getValue())) .map(events::create) .toList(); } @@ -90,12 +93,24 @@ public class DefaultEventPublicationRegistry implements DisposableBean, EventPub Assert.notNull(event, "Domain event must not be null!"); Assert.notNull(targetIdentifier, "Listener identifier must not be null!"); - events.findIncompletePublicationsByEventAndTargetIdentifier(event, targetIdentifier) // - .map(DefaultEventPublicationRegistry::logCompleted) // - .map(e -> CompletableEventPublication.of(e.getEvent(), e.getTargetIdentifier())) - .ifPresent(it -> events.update(it.markCompleted())); + LOGGER.debug("Marking publication of event {} to listener {} completed.", // + event.getClass().getName(), targetIdentifier.getValue()); + + events.markCompleted(event, targetIdentifier, clock.instant()); } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRegistry#deleteCompletedPublicationsOlderThan(java.time.Duration) + */ + @Override + public void deleteCompletedPublicationsOlderThan(Duration duration) { + + Assert.notNull(duration, "Duration must not be null!"); + + events.deleteCompletedPublicationsBefore(clock.instant().minus(duration)); + }; + /* * (non-Javadoc) * @see org.springframework.beans.factory.DisposableBean#destroy() @@ -121,23 +136,4 @@ public class DefaultEventPublicationRegistry implements DisposableBean, EventPub LOGGER.info("{} {} - {}", prefix, it.getEvent().getClass().getName(), it.getTargetIdentifier().getValue()); } } - - private EventPublication map(Object event, PublicationTargetIdentifier targetIdentifier) { - - var result = CompletableEventPublication.of(event, targetIdentifier, clock.instant()); - - LOGGER.debug("Registering publication of {} for {}.", // - result.getEvent().getClass().getName(), result.getTargetIdentifier().getValue()); - - return result; - } - - private static EventPublication logCompleted(EventPublication publication) { - - LOGGER.debug("Marking publication of event {} to listener {} completed.", // - publication.getEvent().getClass().getName(), publication.getTargetIdentifier().getValue()); - - return publication; - } - } diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublication.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublication.java index 56349911..94f6f9dc 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublication.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublication.java @@ -15,7 +15,10 @@ */ package org.springframework.modulith.events; +import java.time.Clock; import java.time.Instant; +import java.util.Optional; +import java.util.UUID; import org.springframework.context.ApplicationEvent; import org.springframework.context.PayloadApplicationEvent; @@ -28,7 +31,40 @@ import org.springframework.util.Assert; * @author Björn Kieling * @author Dmitry Belyaev */ -public interface EventPublication extends Comparable { +public interface EventPublication extends Comparable, Completable { + + /** + * Creates a {@link EventPublication} for the given event an listener identifier using a default {@link Instant}. + * Prefer using {@link #of(Object, PublicationTargetIdentifier, Instant)} with a dedicated {@link Instant} obtained + * from a {@link Clock}. + * + * @param event must not be {@literal null}. + * @param id must not be {@literal null}. + * @return will never be {@literal null}. + * @see #of(Object, PublicationTargetIdentifier, Instant) + */ + static EventPublication of(Object event, PublicationTargetIdentifier id) { + return new DefaultEventPublication(event, id, Instant.now()); + } + + /** + * Creates a {@link EventPublication} for the given event an listener identifier and publication date. + * + * @param event must not be {@literal null}. + * @param id must not be {@literal null}. + * @param publicationDate must not be {@literal null}. + * @return will never be {@literal null}. + */ + static EventPublication of(Object event, PublicationTargetIdentifier id, Instant publicationDate) { + return new DefaultEventPublication(event, id, publicationDate); + } + + /** + * Returns a unique identifier for this publication. + * + * @return will never be {@literal null}. + */ + UUID getIdentifier(); /** * Returns the event that is published. @@ -79,6 +115,22 @@ public interface EventPublication extends Comparable { return this.getTargetIdentifier().equals(identifier); } + /** + * Returns the completion date of the publication. + * + * @return will never be {@literal null}. + */ + Optional getCompletionDate(); + + /** + * Returns whether the publication of the event has completed. + * + * @return will never be {@literal null}. + */ + default boolean isPublicationCompleted() { + return getCompletionDate().isPresent(); + } + /* * (non-Javadoc) * @see java.lang.Comparable#compareTo(java.lang.Object) diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRegistry.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRegistry.java index d36ae63c..86493c26 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRegistry.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRegistry.java @@ -15,6 +15,7 @@ */ package org.springframework.modulith.events; +import java.time.Duration; import java.util.Collection; import java.util.stream.Stream; @@ -52,4 +53,11 @@ public interface EventPublicationRegistry { * @param targetIdentifier must not be {@literal null}. */ void markCompleted(Object event, PublicationTargetIdentifier targetIdentifier); + + /** + * Deletes all completed {@link EventPublication}s that have been completed before the given {@link Duration}. + * + * @param duration must not be {@literal null}. + */ + void deleteCompletedPublicationsOlderThan(Duration duration); } diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRepository.java index 6988e516..3209f2f5 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRepository.java @@ -15,9 +15,12 @@ */ package org.springframework.modulith.events; +import java.time.Instant; import java.util.List; import java.util.Optional; +import org.springframework.util.Assert; + /** * Repository to store {@link EventPublication}s. * @@ -36,12 +39,30 @@ public interface EventPublicationRepository { EventPublication create(EventPublication publication); /** - * Update the data store to mark the backing log entry as completed. + * Marks the given {@link EventPublication} as completed. * * @param publication must not be {@literal null}. - * @return will never be {@literal null}. + * @param completionDate must not be {@literal null}. */ - EventPublication update(CompletableEventPublication publication); + default void markCompleted(EventPublication publication, Instant completionDate) { + + Assert.notNull(publication, "EventPublication must not be null!"); + Assert.notNull(completionDate, "Instant must not be null!"); + + publication.markCompleted(completionDate); + + markCompleted(publication.getEvent(), publication.getTargetIdentifier(), completionDate); + } + + /** + * Marks the publication for the given event and {@link PublicationTargetIdentifier} to be completed at the given + * {@link Instant}. + * + * @param event must not be {@literal null}. + * @param identifier must not be {@literal null}. + * @param completionDate must not be {@literal null}. + */ + void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate); /** * Returns all {@link EventPublication} that have not been completed yet. @@ -64,4 +85,11 @@ public interface EventPublicationRepository { * Deletes all publications that were already marked as completed. */ void deleteCompletedPublications(); + + /** + * Deletes all publication that were already marked as completed with a completion date before the given one. + * + * @param instant must not be {@literal null}. + */ + void deleteCompletedPublicationsBefore(Instant instant); } diff --git a/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/CompletableEventPublicationTest.java b/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/EventPublicationUnitTests.java similarity index 68% rename from spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/CompletableEventPublicationTest.java rename to spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/EventPublicationUnitTests.java index 115ed3f9..00346875 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/CompletableEventPublicationTest.java +++ b/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/EventPublicationUnitTests.java @@ -24,13 +24,13 @@ import org.junit.jupiter.api.Test; * @author Björn Kieling * @author Dmitry Belyaev */ -class CompletableEventPublicationUnitTests { +class EventPublicationUnitTests { @Test void rejectsNullEvent() { assertThatExceptionOfType(IllegalArgumentException.class)// - .isThrownBy(() -> CompletableEventPublication.of(null, PublicationTargetIdentifier.of("foo")))// + .isThrownBy(() -> EventPublication.of(null, PublicationTargetIdentifier.of("foo")))// .withMessageContaining("Event"); } @@ -38,27 +38,17 @@ class CompletableEventPublicationUnitTests { void rejectsNullTargetIdentifier() { assertThatExceptionOfType(IllegalArgumentException.class)// - .isThrownBy(() -> CompletableEventPublication.of(new Object(), null))// + .isThrownBy(() -> EventPublication.of(new Object(), null))// .withMessageContaining("TargetIdentifier"); } @Test void publicationIsIncompleteByDefault() { - CompletableEventPublication publication = CompletableEventPublication.of(new Object(), + EventPublication publication = EventPublication.of(new Object(), PublicationTargetIdentifier.of("foo")); assertThat(publication.isPublicationCompleted()).isFalse(); assertThat(publication.getCompletionDate()).isNotPresent(); } - - @Test - void completionCapturesDate() { - - CompletableEventPublication publication = CompletableEventPublication - .of(new Object(), PublicationTargetIdentifier.of("foo")).markCompleted(); - - assertThat(publication.isPublicationCompleted()).isTrue(); - assertThat(publication.getCompletionDate()).isPresent(); - } } diff --git a/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java index 45c2b73a..c4d39844 100644 --- a/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java @@ -31,7 +31,6 @@ import org.springframework.jdbc.core.JdbcOperations; import org.springframework.jdbc.core.ResultSetExtractor; import org.springframework.jdbc.core.RowMapper; import org.springframework.lang.Nullable; -import org.springframework.modulith.events.CompletableEventPublication; import org.springframework.modulith.events.EventPublication; import org.springframework.modulith.events.EventPublicationRepository; import org.springframework.modulith.events.EventSerializer; @@ -68,6 +67,14 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { WHERE ID = ? """; + private static final String SQL_STATEMENT_UPDATE_BY_EVENT_AND_LISTENER_ID = """ + UPDATE EVENT_PUBLICATION + SET COMPLETION_DATE = ? + WHERE + LISTENER_ID = ? + AND SERIALIZED_EVENT = ? + """; + private static final String SQL_STATEMENT_FIND_BY_EVENT_AND_LISTENER_ID = """ SELECT * FROM EVENT_PUBLICATION @@ -79,10 +86,17 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { """; private static final String SQL_STATEMENT_DELETE_UNCOMPLETED = """ - DELETE - FROM EVENT_PUBLICATION - WHERE - COMPLETION_DATE IS NOT NULL + DELETE + FROM EVENT_PUBLICATION + WHERE + COMPLETION_DATE IS NOT NULL + """; + + private static final String SQL_STATEMENT_DELETE_UNCOMPLETED_BEFORE = """ + DELETE + FROM EVENT_PUBLICATION + WHERE + COMPLETION_DATE < ? """; private final JdbcOperations operations; @@ -121,7 +135,7 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { operations.update( // SQL_STATEMENT_INSERT, // - uuidToDatabase(UUID.randomUUID()), // + uuidToDatabase(publication.getIdentifier()), // publication.getEvent().getClass().getName(), // publication.getTargetIdentifier().getValue(), // Timestamp.from(publication.getPublicationDate()), // @@ -130,23 +144,18 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { return publication; } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRepository#markCompleted(java.lang.Object, org.springframework.modulith.events.PublicationTargetIdentifier, java.time.Instant) + */ @Override @Transactional - public EventPublication update(CompletableEventPublication publication) { + public void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate) { - var serializedEvent = serializeEvent(publication.getEvent()); - var listenerId = publication.getTargetIdentifier().getValue(); - var potentialPublicationIdsToBeUpdated = operations.query( // - SQL_STATEMENT_FIND_BY_EVENT_AND_LISTENER_ID, // - (rs, rowNum) -> getUuidFromResultSet(rs), // - serializedEvent, // - listenerId); - - potentialPublicationIdsToBeUpdated.stream() - .findFirst() - .ifPresent(id -> update(id, publication)); - - return publication; + operations.update(SQL_STATEMENT_UPDATE_BY_EVENT_AND_LISTENER_ID, // + Timestamp.from(completionDate), // + identifier.getValue(), // + serializer.serialize(event)); } @Override @@ -165,10 +174,7 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { @Transactional(readOnly = true) @SuppressWarnings("null") public List findIncompletePublications() { - - return operations.query( // - SQL_STATEMENT_FIND_UNCOMPLETED, // - this::resultSetToPublications); + return operations.query(SQL_STATEMENT_FIND_UNCOMPLETED, this::resultSetToPublications); } @Override @@ -176,14 +182,16 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { operations.execute(SQL_STATEMENT_DELETE_UNCOMPLETED); } - private void update(UUID id, CompletableEventPublication publication) { + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRepository#deleteCompletedPublicationsBefore(java.time.Instant) + */ + @Override + public void deleteCompletedPublicationsBefore(Instant instant) { - var timestamp = publication.getCompletionDate().map(Timestamp::from).orElse(null); + Assert.notNull(instant, "Instant must not be null!"); - operations.update( // - SQL_STATEMENT_UPDATE, // - timestamp, // - uuidToDatabase(id)); + operations.update(SQL_STATEMENT_DELETE_UNCOMPLETED_BEFORE, Timestamp.from(instant)); } @SuppressWarnings("null") @@ -269,7 +277,7 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { } } - private static class JdbcEventPublication implements CompletableEventPublication { + private static class JdbcEventPublication implements EventPublication { private final UUID id; private final Instant publicationDate; @@ -308,6 +316,15 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { this.completionDate = completionDate; } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublication#getPublicationIdentifier() + */ + @Override + public UUID getIdentifier() { + return id; + } + /* * (non-Javadoc) * @see org.springframework.modulith.events.EventPublication#getEvent() @@ -355,14 +372,11 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { /* * (non-Javadoc) - * @see org.springframework.modulith.events.CompletableEventPublication#markCompleted() + * @see org.springframework.modulith.events.Completable#markCompleted(java.time.Instant) */ @Override - public CompletableEventPublication markCompleted() { - - this.completionDate = Instant.now(); - - return this; + public void markCompleted(Instant instant) { + this.completionDate = instant; } /* diff --git a/spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryIntegrationTests.java b/spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryIntegrationTests.java index 17999dc4..015bd2f6 100644 --- a/spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryIntegrationTests.java +++ b/spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryIntegrationTests.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.*; import lombok.Value; +import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; @@ -34,7 +35,6 @@ import org.springframework.boot.test.autoconfigure.jdbc.JdbcTest; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.context.annotation.Import; import org.springframework.jdbc.core.JdbcOperations; -import org.springframework.modulith.events.CompletableEventPublication; import org.springframework.modulith.events.EventPublication; import org.springframework.modulith.events.EventSerializer; import org.springframework.modulith.events.PublicationTargetIdentifier; @@ -79,10 +79,7 @@ class JdbcEventPublicationRepositoryIntegrationTests { when(serializer.serialize(testEvent)).thenReturn(serializedEvent); when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); - var publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); - - // Store publication - repository.create(publication); + var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); var eventPublications = repository.findIncompletePublications(); @@ -96,7 +93,7 @@ class JdbcEventPublicationRepositoryIntegrationTests { .isPresent(); // Complete publication - repository.update(publication.markCompleted()); + repository.markCompleted(publication, Instant.now()); assertThat(repository.findIncompletePublications()).isEmpty(); } @@ -117,13 +114,7 @@ class JdbcEventPublicationRepositoryIntegrationTests { } private void createPublicationAt(LocalDateTime publicationDate) { - - EventPublication publication = mock(EventPublication.class); - when(publication.getEvent()).thenReturn(""); - when(publication.getTargetIdentifier()).thenReturn(TARGET_IDENTIFIER); - when(publication.getPublicationDate()).thenReturn(publicationDate.toInstant(ZoneOffset.UTC)); - - repository.create(publication); + repository.create(EventPublication.of("", TARGET_IDENTIFIER, publicationDate.toInstant(ZoneOffset.UTC))); } @Test // GH-3 @@ -139,15 +130,11 @@ class JdbcEventPublicationRepositoryIntegrationTests { when(serializer.serialize(testEvent2)).thenReturn(serializedEvent2); when(serializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2); - var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER); - var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER); - - // Store publication - repository.create(publication1); - repository.create(publication2); + repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + var publication = repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); // Complete publication - repository.update(publication2.markCompleted()); + repository.markCompleted(publication, Instant.now()); assertThat(repository.findIncompletePublications()).hasSize(1) .element(0).extracting(EventPublication::getEvent).isEqualTo(testEvent1); @@ -174,10 +161,10 @@ class JdbcEventPublicationRepositoryIntegrationTests { when(serializer.serialize(testEvent)).thenReturn(serializedEvent); when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); - var publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); + var publication = EventPublication.of(testEvent, TARGET_IDENTIFIER); repository.create(publication); - repository.update(publication.markCompleted()); + repository.markCompleted(publication, Instant.now()); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER); @@ -193,23 +180,19 @@ class JdbcEventPublicationRepositoryIntegrationTests { when(serializer.serialize(testEvent)).thenReturn(serializedEvent); when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); - var publicationOld = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); + var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); Thread.sleep(10); - var publicationNew = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); - - repository.create(publicationNew); - repository.create(publicationOld); + repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER); assertThat(actual).hasValueSatisfying(it -> { assertThat(it.getPublicationDate()) // - .isCloseTo(publicationOld.getPublicationDate(), within(1, ChronoUnit.MILLIS)); + .isCloseTo(publication.getPublicationDate(), within(1, ChronoUnit.MILLIS)); }); } - @Test - // GH-3 + @Test // GH-3 void shouldSilentlyIgnoreNotSerializableEvents() { var testEvent = new TestEvent("id"); @@ -219,7 +202,7 @@ class JdbcEventPublicationRepositoryIntegrationTests { when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); // Store publication - repository.create(CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); operations.update("UPDATE EVENT_PUBLICATION SET EVENT_TYPE='abc'"); @@ -240,19 +223,41 @@ class JdbcEventPublicationRepositoryIntegrationTests { when(serializer.serialize(testEvent2)).thenReturn(serializedEvent2); when(serializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2); - var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER); - var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER); - - repository.create(publication1); - repository.create(publication2); - - repository.update(publication1.markCompleted()); + var publication = repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.markCompleted(publication, Instant.now()); repository.deleteCompletedPublications(); assertThat(operations.query("SELECT * FROM EVENT_PUBLICATION", (rs, __) -> rs.getString("SERIALIZED_EVENT"))) .hasSize(1).element(0).isEqualTo(serializedEvent2); } + + @Test // GH-251 + void shouldDeleteCompletedEventsBefore() { + + var testEvent1 = new TestEvent("abc"); + var serializedEvent1 = "{\"eventId\":\"abc\"}"; + var testEvent2 = new TestEvent("def"); + var serializedEvent2 = "{\"eventId\":\"def\"}"; + + when(serializer.serialize(testEvent1)).thenReturn(serializedEvent1); + when(serializer.deserialize(serializedEvent1, TestEvent.class)).thenReturn(testEvent1); + when(serializer.serialize(testEvent2)).thenReturn(serializedEvent2); + when(serializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2); + + repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + + var now = Instant.now(); + + repository.markCompleted(testEvent1, TARGET_IDENTIFIER, now.minusSeconds(30)); + repository.markCompleted(testEvent2, TARGET_IDENTIFIER, now); + repository.deleteCompletedPublicationsBefore(now.minusSeconds(15)); + + assertThat(operations.query("SELECT * FROM EVENT_PUBLICATION", (rs, __) -> rs.getString("SERIALIZED_EVENT"))) + .hasSize(1).element(0).isEqualTo(serializedEvent2); + } } @Nested diff --git a/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublication.java b/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublication.java index dc196b1c..02b9670d 100644 --- a/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublication.java +++ b/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublication.java @@ -51,14 +51,15 @@ class JpaEventPublication { * @param serializedEvent must not be {@literal null} or empty. * @param eventType must not be {@literal null}. */ - JpaEventPublication(Instant publicationDate, String listenerId, String serializedEvent, Class eventType) { + JpaEventPublication(UUID id, Instant publicationDate, String listenerId, String serializedEvent, Class eventType) { + Assert.notNull(id, "Identifier must not be null!"); Assert.notNull(publicationDate, "Publication date must not be null!"); Assert.notNull(listenerId, "Listener id must not be null or empty!"); Assert.notNull(serializedEvent, "Serialized event must not be null or empty!"); Assert.notNull(eventType, "Event type must not be null!"); - this.id = UUID.randomUUID(); + this.id = id; this.publicationDate = publicationDate; this.listenerId = listenerId; this.serializedEvent = serializedEvent; diff --git a/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java index ab6afc85..352a0733 100644 --- a/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java @@ -21,8 +21,8 @@ import java.time.Instant; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.UUID; -import org.springframework.modulith.events.CompletableEventPublication; import org.springframework.modulith.events.EventPublication; import org.springframework.modulith.events.EventPublicationRepository; import org.springframework.modulith.events.EventSerializer; @@ -64,6 +64,13 @@ class JpaEventPublicationRepository implements EventPublicationRepository { p.completionDate is not null """; + private static final String DELETE_COMPLETED_BEFORE = """ + delete + from JpaEventPublication p + where + p.completionDate < ?1 + """; + private final EntityManager entityManager; private final EventSerializer serializer; @@ -98,19 +105,14 @@ class JpaEventPublicationRepository implements EventPublicationRepository { /* * (non-Javadoc) - * @see org.springframework.modulith.events.EventPublicationRepository#update(org.springframework.modulith.events.CompletableEventPublication) + * @see org.springframework.modulith.events.EventPublicationRepository#markCompleted(java.lang.Object, org.springframework.modulith.events.PublicationTargetIdentifier, java.time.Instant) */ @Override @Transactional - public EventPublication update(CompletableEventPublication publication) { + public void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate) { - var id = publication.getTargetIdentifier().getValue(); - var event = publication.getEvent(); - - findEntityBySerializedEventAndListenerIdAndCompletionDateNull(event, id) // - .ifPresent(entity -> entity.completionDate = publication.getCompletionDate().orElse(null)); - - return publication; + findEntityBySerializedEventAndListenerIdAndCompletionDateNull(event, identifier) + .map(it -> it.completionDate = completionDate); } /* @@ -136,7 +138,7 @@ class JpaEventPublicationRepository implements EventPublicationRepository { public Optional findIncompletePublicationsByEventAndTargetIdentifier( // Object event, PublicationTargetIdentifier targetIdentifier) { - return findEntityBySerializedEventAndListenerIdAndCompletionDateNull(event, targetIdentifier.getValue()) + return findEntityBySerializedEventAndListenerIdAndCompletionDateNull(event, targetIdentifier) .map(this::entityToDomain); } @@ -150,14 +152,29 @@ class JpaEventPublicationRepository implements EventPublicationRepository { entityManager.createQuery(DELETE_COMPLETED).executeUpdate(); } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRepository#deleteCompletedPublicationsBefore(java.time.Instant) + */ + @Override + public void deleteCompletedPublicationsBefore(Instant instant) { + + Assert.notNull(instant, "Instant must not be null!"); + + var query = entityManager.createQuery(DELETE_COMPLETED_BEFORE); + + query.setParameter(1, instant); + query.executeUpdate(); + } + private Optional findEntityBySerializedEventAndListenerIdAndCompletionDateNull( // - Object event, String listenerId) { + Object event, PublicationTargetIdentifier listenerId) { var serializedEvent = serializeEvent(event); var query = entityManager.createQuery(BY_EVENT_AND_LISTENER_ID, JpaEventPublication.class) .setParameter(1, serializedEvent) - .setParameter(2, listenerId); + .setParameter(2, listenerId.getValue()); return query.getResultStream().findFirst(); } @@ -167,7 +184,8 @@ class JpaEventPublicationRepository implements EventPublicationRepository { } private JpaEventPublication domainToEntity(EventPublication domain) { - return new JpaEventPublication(domain.getPublicationDate(), domain.getTargetIdentifier().getValue(), + return new JpaEventPublication(domain.getIdentifier(), domain.getPublicationDate(), + domain.getTargetIdentifier().getValue(), serializeEvent(domain.getEvent()), domain.getEvent().getClass()); } @@ -175,7 +193,7 @@ class JpaEventPublicationRepository implements EventPublicationRepository { return new JpaEventPublicationAdapter(entity, serializer); } - private static class JpaEventPublicationAdapter implements CompletableEventPublication { + private static class JpaEventPublicationAdapter implements EventPublication { private final JpaEventPublication publication; private final EventSerializer serializer; @@ -196,6 +214,15 @@ class JpaEventPublicationRepository implements EventPublicationRepository { this.serializer = serializer; } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublication#getPublicationIdentifier() + */ + @Override + public UUID getIdentifier() { + return publication.id; + } + /* * (non-Javadoc) * @see org.springframework.modulith.events.EventPublication#getEvent() @@ -243,12 +270,11 @@ class JpaEventPublicationRepository implements EventPublicationRepository { /* * (non-Javadoc) - * @see org.springframework.modulith.events.CompletableEventPublication#markCompleted() + * @see org.springframework.modulith.events.Completable#markCompleted(java.time.Instant) */ @Override - public CompletableEventPublication markCompleted() { - publication.markCompleted(); - return this; + public void markCompleted(Instant instant) { + this.publication.completionDate = instant; } /* diff --git a/spring-modulith-events/spring-modulith-events-jpa/src/test/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepositoryIntegrationTests.java b/spring-modulith-events/spring-modulith-events-jpa/src/test/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepositoryIntegrationTests.java index ee75b08b..6c830b85 100644 --- a/spring-modulith-events/spring-modulith-events-jpa/src/test/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepositoryIntegrationTests.java +++ b/spring-modulith-events/spring-modulith-events-jpa/src/test/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepositoryIntegrationTests.java @@ -23,13 +23,15 @@ import jakarta.persistence.EntityManagerFactory; import lombok.RequiredArgsConstructor; import lombok.Value; +import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Comparator; -import java.util.List; +import java.util.UUID; import javax.sql.DataSource; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.context.annotation.Bean; @@ -38,7 +40,6 @@ import org.springframework.context.annotation.Import; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; -import org.springframework.modulith.events.CompletableEventPublication; import org.springframework.modulith.events.EventPublication; import org.springframework.modulith.events.EventSerializer; import org.springframework.modulith.events.PublicationTargetIdentifier; @@ -64,7 +65,6 @@ import org.springframework.transaction.annotation.Transactional; class JpaEventPublicationRepositoryIntegrationTests { private static final PublicationTargetIdentifier TARGET_IDENTIFIER = PublicationTargetIdentifier.of("listener"); - private static final EventSerializer eventSerializer = mock(EventSerializer.class); @Configuration @@ -113,29 +113,31 @@ class JpaEventPublicationRepositoryIntegrationTests { private final JpaEventPublicationRepository repository; private final EntityManager em; + @AfterEach + public void flush() { + em.flush(); + } + @Test void persistsJpaEventPublication() { - TestEvent testEvent = new TestEvent("abc"); - String serializedEvent = "{\"eventId\":\"abc\"}"; + var testEvent = new TestEvent("abc"); + var serializedEvent = "{\"eventId\":\"abc\"}"; when(eventSerializer.serialize(testEvent)).thenReturn(serializedEvent); when(eventSerializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); - CompletableEventPublication publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); + var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); - // Store publication - repository.create(publication); + var eventPublications = repository.findIncompletePublications(); - List eventPublications = repository.findIncompletePublications(); assertThat(eventPublications).hasSize(1); assertThat(eventPublications.get(0).getEvent()).isEqualTo(publication.getEvent()); assertThat(eventPublications.get(0).getTargetIdentifier()).isEqualTo(publication.getTargetIdentifier()); assertThat(repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER)) .isPresent(); - // Complete publication - repository.update(publication.markCompleted()); + repository.markCompleted(publication, Instant.now()); assertThat(repository.findIncompletePublications()).isEmpty(); } @@ -160,11 +162,10 @@ class JpaEventPublicationRepositoryIntegrationTests { when(eventSerializer.serialize(testEvent)).thenReturn(serializedEvent); when(eventSerializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); - CompletableEventPublication publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); + var publication = EventPublication.of(testEvent, TARGET_IDENTIFIER); - // Store publication repository.create(publication); - repository.update(publication.markCompleted()); + repository.markCompleted(publication, Instant.now()); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER); @@ -184,14 +185,9 @@ class JpaEventPublicationRepositoryIntegrationTests { when(eventSerializer.serialize(testEvent2)).thenReturn(serializedEvent2); when(eventSerializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2); - var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER); - var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER); - - repository.create(publication1); - repository.create(publication2); - - repository.update(publication1.markCompleted()); - + repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.markCompleted(testEvent1, TARGET_IDENTIFIER, Instant.now()); repository.deleteCompletedPublications(); assertThat(em.createQuery("select p from JpaEventPublication p", JpaEventPublication.class).getResultList()) @@ -212,8 +208,35 @@ class JpaEventPublicationRepositoryIntegrationTests { .isSortedAccordingTo(Comparator.comparing(EventPublication::getPublicationDate)); } + @Test // GH-251 + void shouldDeleteCompletedEventsBefore() { + + var testEvent1 = new TestEvent("abc"); + var serializedEvent1 = "{\"eventId\":\"abc\"}"; + var testEvent2 = new TestEvent("def"); + var serializedEvent2 = "{\"eventId\":\"def\"}"; + + when(eventSerializer.serialize(testEvent1)).thenReturn(serializedEvent1); + when(eventSerializer.deserialize(serializedEvent1, TestEvent.class)).thenReturn(testEvent1); + when(eventSerializer.serialize(testEvent2)).thenReturn(serializedEvent2); + when(eventSerializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2); + + repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + + var now = Instant.now(); + + repository.markCompleted(testEvent1, TARGET_IDENTIFIER, now.minusSeconds(30)); + repository.markCompleted(testEvent2, TARGET_IDENTIFIER, now); + repository.deleteCompletedPublicationsBefore(now.minusSeconds(15)); + + assertThat(em.createQuery("select p from JpaEventPublication p", JpaEventPublication.class).getResultList()) + .hasSize(1) // + .element(0).extracting(it -> it.serializedEvent).isEqualTo(serializedEvent2); + } + private void savePublicationAt(LocalDateTime date) { - em.persist(new JpaEventPublication(date.toInstant(ZoneOffset.UTC), "", "", Object.class)); + em.persist(new JpaEventPublication(UUID.randomUUID(), date.toInstant(ZoneOffset.UTC), "", "", Object.class)); } @Value diff --git a/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublication.java b/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublication.java index 56d99383..948659c2 100644 --- a/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublication.java +++ b/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublication.java @@ -16,8 +16,8 @@ package org.springframework.modulith.events.mongodb; import java.time.Instant; +import java.util.UUID; -import org.bson.types.ObjectId; import org.springframework.data.annotation.PersistenceCreator; import org.springframework.data.mongodb.core.mapping.Document; import org.springframework.lang.Nullable; @@ -32,7 +32,7 @@ import org.springframework.util.Assert; @Document(collection = "org_springframework_modulith_events") class MongoDbEventPublication { - final ObjectId id; + final UUID id; final Instant publicationDate; final String listenerId; final Object event; @@ -50,7 +50,7 @@ class MongoDbEventPublication { * @param completionDate can be {@literal null}. */ @PersistenceCreator - MongoDbEventPublication(ObjectId id, Instant publicationDate, String listenerId, Object event, + MongoDbEventPublication(UUID id, Instant publicationDate, String listenerId, Object event, @Nullable Instant completionDate) { Assert.notNull(id, "Id must not be null!"); @@ -72,8 +72,8 @@ class MongoDbEventPublication { * @param listenerId must not be {@literal null}. * @param event must not be {@literal null}. */ - MongoDbEventPublication(Instant publicationDate, String listenerId, Object event) { - this(new ObjectId(), publicationDate, listenerId, event, null); + MongoDbEventPublication(UUID id, Instant publicationDate, String listenerId, Object event) { + this(id, publicationDate, listenerId, event, null); } /** diff --git a/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java index bdf90655..322f97fe 100644 --- a/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java @@ -22,11 +22,13 @@ import java.time.Instant; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Update; import org.springframework.data.util.TypeInformation; -import org.springframework.modulith.events.CompletableEventPublication; import org.springframework.modulith.events.EventPublication; import org.springframework.modulith.events.EventPublicationRepository; import org.springframework.modulith.events.PublicationTargetIdentifier; @@ -55,6 +57,10 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository { this.mongoTemplate = mongoTemplate; } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRepository#create(org.springframework.modulith.events.EventPublication) + */ @Override public EventPublication create(EventPublication publication) { @@ -63,17 +69,17 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository { return publication; } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRepository#markCompleted(java.lang.Object, org.springframework.modulith.events.PublicationTargetIdentifier, java.time.Instant) + */ @Override - public EventPublication update(CompletableEventPublication publication) { + public void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate) { - return findDocumentsByEventAndTargetIdentifierAndCompletionDateNull(publication.getEvent(), - publication.getTargetIdentifier()) // - .stream() // - .findFirst() // - .map(document -> document.markCompleted(publication.getCompletionDate().orElse(null))) // - .map(mongoTemplate::save) // - .map(this::documentToDomain) // - .orElse(publication); + var criteria = byEventAndListenerId(event, identifier); + var update = Update.update("completionDate", completionDate); + + mongoTemplate.updateFirst(query(criteria), update, MongoDbEventPublication.class); } @Override @@ -83,7 +89,7 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository { .with(Sort.by("publicationDate").ascending()); return mongoTemplate.find(query, MongoDbEventPublication.class).stream() // - . map(this::documentToDomain) // + .map(this::documentToDomain) // .toList(); } @@ -101,39 +107,59 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository { return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRepository#deleteCompletedPublications() + */ @Override public void deleteCompletedPublications() { mongoTemplate.remove(query(where("completionDate").ne(null)), MongoDbEventPublication.class); } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRepository#deleteCompletedPublicationsBefore(java.time.Instant) + */ + @Override + public void deleteCompletedPublicationsBefore(Instant instant) { + + Assert.notNull(instant, "Instant must not be null!"); + + mongoTemplate.remove(query(where("completionDate").lt(instant)), MongoDbEventPublication.class); + } + private List findDocumentsByEventAndTargetIdentifierAndCompletionDateNull( // Object event, PublicationTargetIdentifier targetIdentifier) { - // we need to enforce writing of the type information - var eventAsMongoType = mongoTemplate.getConverter().convertToMongoType(event, TypeInformation.OBJECT); - - var query = query( - where("event").is(eventAsMongoType) // - .and("listenerId").is(targetIdentifier.getValue()) // - .and("completionDate").isNull()) // - .with(Sort.by("publicationDate").ascending()); + var criteria = byEventAndListenerId(event, targetIdentifier); + var query = query(criteria).with(Sort.by("publicationDate").ascending()); return mongoTemplate.find(query, MongoDbEventPublication.class); } + private Criteria byEventAndListenerId(Object event, PublicationTargetIdentifier identifier) { + + var eventAsMongoType = mongoTemplate.getConverter().convertToMongoType(event, TypeInformation.OBJECT); + + return where("event").is(eventAsMongoType) // + .and("listenerId").is(identifier.getValue()) + .and("completionDate").isNull(); + } + private MongoDbEventPublication domainToDocument(EventPublication publication) { return new MongoDbEventPublication( // + publication.getIdentifier(), // publication.getPublicationDate(), // publication.getTargetIdentifier().getValue(), // publication.getEvent()); } - private CompletableEventPublication documentToDomain(MongoDbEventPublication document) { + private EventPublication documentToDomain(MongoDbEventPublication document) { return new MongoDbEventPublicationAdapter(document); } - private static class MongoDbEventPublicationAdapter implements CompletableEventPublication { + private static class MongoDbEventPublicationAdapter implements EventPublication { private final MongoDbEventPublication publication; @@ -141,6 +167,11 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository { this.publication = publication; } + @Override + public UUID getIdentifier() { + return publication.id; + } + @Override public Object getEvent() { return publication.event; @@ -167,9 +198,8 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository { } @Override - public CompletableEventPublication markCompleted() { - publication.completionDate = Instant.now(); - return this; + public void markCompleted(Instant instant) { + this.publication.completionDate = instant; } /* @@ -183,11 +213,11 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository { return true; } - if (!(obj instanceof MongoDbEventPublicationAdapter other)) { + if (!(obj instanceof MongoDbEventPublicationAdapter that)) { return false; } - return Objects.equals(publication, other.publication); + return Objects.equals(publication, that.publication); } /* diff --git a/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java b/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java index 05945337..6957ac98 100644 --- a/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java +++ b/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java @@ -19,12 +19,13 @@ import static org.assertj.core.api.Assertions.*; import lombok.Value; +import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.util.Comparator; +import java.util.UUID; -import org.bson.types.ObjectId; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; @@ -32,7 +33,6 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest; import org.springframework.data.mongodb.core.MongoTemplate; -import org.springframework.modulith.events.CompletableEventPublication; import org.springframework.modulith.events.EventPublication; import org.springframework.modulith.events.PublicationTargetIdentifier; import org.springframework.modulith.testapp.TestApplication; @@ -66,11 +66,7 @@ class MongoDbEventPublicationRepositoryTest { void shouldPersistAndUpdateEventPublication() { var testEvent = new TestEvent("abc"); - - var publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); - - // Store publication - repository.create(publication); + var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); var eventPublications = repository.findIncompletePublications(); @@ -82,7 +78,7 @@ class MongoDbEventPublicationRepositoryTest { .isPresent(); // Complete publication - repository.update(publication.markCompleted()); + repository.markCompleted(publication, Instant.now()); assertThat(repository.findIncompletePublications()).isEmpty(); } @@ -93,12 +89,10 @@ class MongoDbEventPublicationRepositoryTest { var testEvent1 = new TestEvent("id1"); var testEvent2 = new TestEvent("id2"); - var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER); - var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER); + repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + var publication = repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); - repository.create(publication1); - repository.create(publication2); - repository.update(publication2.markCompleted()); + repository.markCompleted(publication, Instant.now()); assertThat(repository.findIncompletePublications()).hasSize(1) .element(0).extracting(EventPublication::getEvent).isEqualTo(testEvent1); @@ -120,7 +114,7 @@ class MongoDbEventPublicationRepositoryTest { private void savePublicationAt(LocalDateTime date) { mongoTemplate.save( - new MongoDbEventPublication(new ObjectId(), date.toInstant(ZoneOffset.UTC), "", "", null)); + new MongoDbEventPublication(UUID.randomUUID(), date.toInstant(ZoneOffset.UTC), "", "", null)); } @Nested @@ -132,10 +126,10 @@ class MongoDbEventPublicationRepositoryTest { var testEvent1 = new TestEvent("abc"); var testEvent2 = new TestEvent("def"); - repository.create(CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER)); - repository.create(CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER)); - repository.create(CompletableEventPublication.of( - testEvent1, PublicationTargetIdentifier.of(TARGET_IDENTIFIER.getValue() + "!"))); + repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository + .create(EventPublication.of(testEvent1, PublicationTargetIdentifier.of(TARGET_IDENTIFIER.getValue() + "!"))); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent1, TARGET_IDENTIFIER); @@ -159,11 +153,11 @@ class MongoDbEventPublicationRepositoryTest { TestEvent testEvent = new TestEvent("abc"); - CompletableEventPublication publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); + EventPublication publication = EventPublication.of(testEvent, TARGET_IDENTIFIER); // Store publication repository.create(publication); - repository.update(publication.markCompleted()); + repository.markCompleted(publication, Instant.now()); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER); @@ -175,16 +169,15 @@ class MongoDbEventPublicationRepositoryTest { var testEvent = new TestEvent("id"); - var publicationOld = repository - .create(CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER)); + var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); Thread.sleep(10); - repository.create(CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER); assertThat(actual).hasValueSatisfying(it -> // assertThat(it.getPublicationDate()) // - .isCloseTo(publicationOld.getPublicationDate(), within(1, ChronoUnit.MILLIS))); + .isCloseTo(publication.getPublicationDate(), within(1, ChronoUnit.MILLIS))); } } @@ -197,20 +190,36 @@ class MongoDbEventPublicationRepositoryTest { var testEvent1 = new TestEvent("abc"); var testEvent2 = new TestEvent("def"); - var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER); - var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER); - - repository.create(publication1); - repository.create(publication2); - - repository.update(publication1.markCompleted()); + var publication = repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.markCompleted(publication, Instant.now()); repository.deleteCompletedPublications(); assertThat(mongoTemplate.findAll(MongoDbEventPublication.class)) // .hasSize(1) // .element(0).extracting(it -> it.event).isEqualTo(testEvent2); } + + @Test // GH-251 + void shouldDeleteCompletedEventsBefore() { + + var testEvent1 = new TestEvent("abc"); + var testEvent2 = new TestEvent("def"); + + var publication1 = repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + var publication2 = repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + + var now = Instant.now(); + + repository.markCompleted(publication1, now.minusSeconds(30)); + repository.markCompleted(publication2, now); + repository.deleteCompletedPublicationsBefore(now.minusSeconds(15)); + + assertThat(mongoTemplate.findAll(MongoDbEventPublication.class)) // + .hasSize(1) // + .element(0).extracting(it -> it.event).isEqualTo(testEvent2); + } } @Value