diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/Completable.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/Completable.java new file mode 100644 index 00000000..b63375fd --- /dev/null +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/Completable.java @@ -0,0 +1,33 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.modulith.events; + +import java.time.Instant; + +/** + * Internal interface to be able to mark {@link EventPublication} instances as completed. + * + * @author Oliver Drotbohm + */ +interface Completable { + + /** + * Marks the instance as completed at the given {@link Instant}. + * + * @param instant must not be {@literal null}. + */ + void markCompleted(Instant instant); +} diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/CompletableEventPublication.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/CompletableEventPublication.java deleted file mode 100644 index bcc2d247..00000000 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/CompletableEventPublication.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2017-2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.modulith.events; - -import java.time.Clock; -import java.time.Instant; -import java.util.Optional; - -/** - * An event publication that can be completed. - * - * @author Oliver Drotbohm - */ -public interface CompletableEventPublication extends EventPublication { - - /** - * Returns the completion date of the publication. - * - * @return will never be {@literal null}. - */ - Optional getCompletionDate(); - - /** - * Returns whether the publication of the event has completed. - * - * @return will never be {@literal null}. - */ - default boolean isPublicationCompleted() { - return getCompletionDate().isPresent(); - } - - /** - * Marks the event publication as completed. - * - * @return will never be {@literal null}. - */ - CompletableEventPublication markCompleted(); - - /** - * Creates a {@link CompletableEventPublication} for the given event an listener identifier using a default - * {@link Instant}. Prefer using {@link #of(Object, PublicationTargetIdentifier, Instant)} with a dedicated - * {@link Instant} obtained from a {@link Clock}. - * - * @param event must not be {@literal null}. - * @param id must not be {@literal null}. - * @return will never be {@literal null}. - * @see #of(Object, PublicationTargetIdentifier, Instant) - */ - static CompletableEventPublication of(Object event, PublicationTargetIdentifier id) { - return new DefaultEventPublication(event, id, Instant.now()); - } - - /** - * Creates a {@link CompletableEventPublication} for the given event an listener identifier and publication date. - * - * @param event must not be {@literal null}. - * @param id must not be {@literal null}. - * @param publicationDate must not be {@literal null}. - * @return will never be {@literal null}. - */ - static CompletableEventPublication of(Object event, PublicationTargetIdentifier id, Instant publicationDate) { - return new DefaultEventPublication(event, id, publicationDate); - } -} diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublication.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublication.java index ccd11350..f907afd5 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublication.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublication.java @@ -18,16 +18,18 @@ package org.springframework.modulith.events; import java.time.Instant; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import org.springframework.util.Assert; /** - * Default {@link CompletableEventPublication} implementation. + * Default {@link Completable} implementation. * * @author Oliver Drotbohm */ -class DefaultEventPublication implements CompletableEventPublication { +class DefaultEventPublication implements EventPublication { + private final UUID identifier; private final Object event; private final PublicationTargetIdentifier targetIdentifier; private final Instant publicationDate; @@ -47,12 +49,22 @@ class DefaultEventPublication implements CompletableEventPublication { Assert.notNull(targetIdentifier, "PublicationTargetIdentifier must not be null!"); Assert.notNull(publicationDate, "Publication date must not be null!"); + this.identifier = UUID.randomUUID(); this.event = event; this.targetIdentifier = targetIdentifier; this.publicationDate = publicationDate; this.completionDate = Optional.empty(); } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublication#getPublicationIdentifier() + */ + @Override + public UUID getIdentifier() { + return identifier; + } + /* * (non-Javadoc) * @see org.springframework.modulith.events.EventPublication#getEvent() @@ -89,13 +101,11 @@ class DefaultEventPublication implements CompletableEventPublication { /* * (non-Javadoc) - * @see org.springframework.modulith.events.CompletableEventPublication#markCompleted() + * @see org.springframework.modulith.events.CompletableEventPublication#markCompleted(java.time.Instant) */ @Override - public CompletableEventPublication markCompleted() { - - this.completionDate = Optional.of(Instant.now()); - return this; + public void markCompleted(Instant instant) { + this.completionDate = Optional.of(instant); } /* diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublicationRegistry.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublicationRegistry.java index dc4a11ee..2a9cd506 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublicationRegistry.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/DefaultEventPublicationRegistry.java @@ -16,6 +16,7 @@ package org.springframework.modulith.events; import java.time.Clock; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.stream.Stream; @@ -39,6 +40,7 @@ import org.springframework.util.Assert; public class DefaultEventPublicationRegistry implements DisposableBean, EventPublicationRegistry { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventPublicationRegistry.class); + private static final String REGISTER = "Registering publication of {} for {}."; private final EventPublicationRepository events; private final Clock clock; @@ -65,7 +67,8 @@ public class DefaultEventPublicationRegistry implements DisposableBean, EventPub @Override public Collection store(Object event, Stream listeners) { - return listeners.map(it -> map(event, it)) + return listeners.map(it -> EventPublication.of(event, it, clock.instant())) + .peek(it -> LOGGER.debug(REGISTER, it.getEvent().getClass().getName(), it.getTargetIdentifier().getValue())) .map(events::create) .toList(); } @@ -90,12 +93,24 @@ public class DefaultEventPublicationRegistry implements DisposableBean, EventPub Assert.notNull(event, "Domain event must not be null!"); Assert.notNull(targetIdentifier, "Listener identifier must not be null!"); - events.findIncompletePublicationsByEventAndTargetIdentifier(event, targetIdentifier) // - .map(DefaultEventPublicationRegistry::logCompleted) // - .map(e -> CompletableEventPublication.of(e.getEvent(), e.getTargetIdentifier())) - .ifPresent(it -> events.update(it.markCompleted())); + LOGGER.debug("Marking publication of event {} to listener {} completed.", // + event.getClass().getName(), targetIdentifier.getValue()); + + events.markCompleted(event, targetIdentifier, clock.instant()); } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRegistry#deleteCompletedPublicationsOlderThan(java.time.Duration) + */ + @Override + public void deleteCompletedPublicationsOlderThan(Duration duration) { + + Assert.notNull(duration, "Duration must not be null!"); + + events.deleteCompletedPublicationsBefore(clock.instant().minus(duration)); + }; + /* * (non-Javadoc) * @see org.springframework.beans.factory.DisposableBean#destroy() @@ -121,23 +136,4 @@ public class DefaultEventPublicationRegistry implements DisposableBean, EventPub LOGGER.info("{} {} - {}", prefix, it.getEvent().getClass().getName(), it.getTargetIdentifier().getValue()); } } - - private EventPublication map(Object event, PublicationTargetIdentifier targetIdentifier) { - - var result = CompletableEventPublication.of(event, targetIdentifier, clock.instant()); - - LOGGER.debug("Registering publication of {} for {}.", // - result.getEvent().getClass().getName(), result.getTargetIdentifier().getValue()); - - return result; - } - - private static EventPublication logCompleted(EventPublication publication) { - - LOGGER.debug("Marking publication of event {} to listener {} completed.", // - publication.getEvent().getClass().getName(), publication.getTargetIdentifier().getValue()); - - return publication; - } - } diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublication.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublication.java index 56349911..94f6f9dc 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublication.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublication.java @@ -15,7 +15,10 @@ */ package org.springframework.modulith.events; +import java.time.Clock; import java.time.Instant; +import java.util.Optional; +import java.util.UUID; import org.springframework.context.ApplicationEvent; import org.springframework.context.PayloadApplicationEvent; @@ -28,7 +31,40 @@ import org.springframework.util.Assert; * @author Björn Kieling * @author Dmitry Belyaev */ -public interface EventPublication extends Comparable { +public interface EventPublication extends Comparable, Completable { + + /** + * Creates a {@link EventPublication} for the given event an listener identifier using a default {@link Instant}. + * Prefer using {@link #of(Object, PublicationTargetIdentifier, Instant)} with a dedicated {@link Instant} obtained + * from a {@link Clock}. + * + * @param event must not be {@literal null}. + * @param id must not be {@literal null}. + * @return will never be {@literal null}. + * @see #of(Object, PublicationTargetIdentifier, Instant) + */ + static EventPublication of(Object event, PublicationTargetIdentifier id) { + return new DefaultEventPublication(event, id, Instant.now()); + } + + /** + * Creates a {@link EventPublication} for the given event an listener identifier and publication date. + * + * @param event must not be {@literal null}. + * @param id must not be {@literal null}. + * @param publicationDate must not be {@literal null}. + * @return will never be {@literal null}. + */ + static EventPublication of(Object event, PublicationTargetIdentifier id, Instant publicationDate) { + return new DefaultEventPublication(event, id, publicationDate); + } + + /** + * Returns a unique identifier for this publication. + * + * @return will never be {@literal null}. + */ + UUID getIdentifier(); /** * Returns the event that is published. @@ -79,6 +115,22 @@ public interface EventPublication extends Comparable { return this.getTargetIdentifier().equals(identifier); } + /** + * Returns the completion date of the publication. + * + * @return will never be {@literal null}. + */ + Optional getCompletionDate(); + + /** + * Returns whether the publication of the event has completed. + * + * @return will never be {@literal null}. + */ + default boolean isPublicationCompleted() { + return getCompletionDate().isPresent(); + } + /* * (non-Javadoc) * @see java.lang.Comparable#compareTo(java.lang.Object) diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRegistry.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRegistry.java index d36ae63c..86493c26 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRegistry.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRegistry.java @@ -15,6 +15,7 @@ */ package org.springframework.modulith.events; +import java.time.Duration; import java.util.Collection; import java.util.stream.Stream; @@ -52,4 +53,11 @@ public interface EventPublicationRegistry { * @param targetIdentifier must not be {@literal null}. */ void markCompleted(Object event, PublicationTargetIdentifier targetIdentifier); + + /** + * Deletes all completed {@link EventPublication}s that have been completed before the given {@link Duration}. + * + * @param duration must not be {@literal null}. + */ + void deleteCompletedPublicationsOlderThan(Duration duration); } diff --git a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRepository.java index 6988e516..3209f2f5 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/EventPublicationRepository.java @@ -15,9 +15,12 @@ */ package org.springframework.modulith.events; +import java.time.Instant; import java.util.List; import java.util.Optional; +import org.springframework.util.Assert; + /** * Repository to store {@link EventPublication}s. * @@ -36,12 +39,30 @@ public interface EventPublicationRepository { EventPublication create(EventPublication publication); /** - * Update the data store to mark the backing log entry as completed. + * Marks the given {@link EventPublication} as completed. * * @param publication must not be {@literal null}. - * @return will never be {@literal null}. + * @param completionDate must not be {@literal null}. */ - EventPublication update(CompletableEventPublication publication); + default void markCompleted(EventPublication publication, Instant completionDate) { + + Assert.notNull(publication, "EventPublication must not be null!"); + Assert.notNull(completionDate, "Instant must not be null!"); + + publication.markCompleted(completionDate); + + markCompleted(publication.getEvent(), publication.getTargetIdentifier(), completionDate); + } + + /** + * Marks the publication for the given event and {@link PublicationTargetIdentifier} to be completed at the given + * {@link Instant}. + * + * @param event must not be {@literal null}. + * @param identifier must not be {@literal null}. + * @param completionDate must not be {@literal null}. + */ + void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate); /** * Returns all {@link EventPublication} that have not been completed yet. @@ -64,4 +85,11 @@ public interface EventPublicationRepository { * Deletes all publications that were already marked as completed. */ void deleteCompletedPublications(); + + /** + * Deletes all publication that were already marked as completed with a completion date before the given one. + * + * @param instant must not be {@literal null}. + */ + void deleteCompletedPublicationsBefore(Instant instant); } diff --git a/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/CompletableEventPublicationTest.java b/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/EventPublicationUnitTests.java similarity index 68% rename from spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/CompletableEventPublicationTest.java rename to spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/EventPublicationUnitTests.java index 115ed3f9..00346875 100644 --- a/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/CompletableEventPublicationTest.java +++ b/spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/EventPublicationUnitTests.java @@ -24,13 +24,13 @@ import org.junit.jupiter.api.Test; * @author Björn Kieling * @author Dmitry Belyaev */ -class CompletableEventPublicationUnitTests { +class EventPublicationUnitTests { @Test void rejectsNullEvent() { assertThatExceptionOfType(IllegalArgumentException.class)// - .isThrownBy(() -> CompletableEventPublication.of(null, PublicationTargetIdentifier.of("foo")))// + .isThrownBy(() -> EventPublication.of(null, PublicationTargetIdentifier.of("foo")))// .withMessageContaining("Event"); } @@ -38,27 +38,17 @@ class CompletableEventPublicationUnitTests { void rejectsNullTargetIdentifier() { assertThatExceptionOfType(IllegalArgumentException.class)// - .isThrownBy(() -> CompletableEventPublication.of(new Object(), null))// + .isThrownBy(() -> EventPublication.of(new Object(), null))// .withMessageContaining("TargetIdentifier"); } @Test void publicationIsIncompleteByDefault() { - CompletableEventPublication publication = CompletableEventPublication.of(new Object(), + EventPublication publication = EventPublication.of(new Object(), PublicationTargetIdentifier.of("foo")); assertThat(publication.isPublicationCompleted()).isFalse(); assertThat(publication.getCompletionDate()).isNotPresent(); } - - @Test - void completionCapturesDate() { - - CompletableEventPublication publication = CompletableEventPublication - .of(new Object(), PublicationTargetIdentifier.of("foo")).markCompleted(); - - assertThat(publication.isPublicationCompleted()).isTrue(); - assertThat(publication.getCompletionDate()).isPresent(); - } } diff --git a/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java index 45c2b73a..c4d39844 100644 --- a/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepository.java @@ -31,7 +31,6 @@ import org.springframework.jdbc.core.JdbcOperations; import org.springframework.jdbc.core.ResultSetExtractor; import org.springframework.jdbc.core.RowMapper; import org.springframework.lang.Nullable; -import org.springframework.modulith.events.CompletableEventPublication; import org.springframework.modulith.events.EventPublication; import org.springframework.modulith.events.EventPublicationRepository; import org.springframework.modulith.events.EventSerializer; @@ -68,6 +67,14 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { WHERE ID = ? """; + private static final String SQL_STATEMENT_UPDATE_BY_EVENT_AND_LISTENER_ID = """ + UPDATE EVENT_PUBLICATION + SET COMPLETION_DATE = ? + WHERE + LISTENER_ID = ? + AND SERIALIZED_EVENT = ? + """; + private static final String SQL_STATEMENT_FIND_BY_EVENT_AND_LISTENER_ID = """ SELECT * FROM EVENT_PUBLICATION @@ -79,10 +86,17 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { """; private static final String SQL_STATEMENT_DELETE_UNCOMPLETED = """ - DELETE - FROM EVENT_PUBLICATION - WHERE - COMPLETION_DATE IS NOT NULL + DELETE + FROM EVENT_PUBLICATION + WHERE + COMPLETION_DATE IS NOT NULL + """; + + private static final String SQL_STATEMENT_DELETE_UNCOMPLETED_BEFORE = """ + DELETE + FROM EVENT_PUBLICATION + WHERE + COMPLETION_DATE < ? """; private final JdbcOperations operations; @@ -121,7 +135,7 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { operations.update( // SQL_STATEMENT_INSERT, // - uuidToDatabase(UUID.randomUUID()), // + uuidToDatabase(publication.getIdentifier()), // publication.getEvent().getClass().getName(), // publication.getTargetIdentifier().getValue(), // Timestamp.from(publication.getPublicationDate()), // @@ -130,23 +144,18 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { return publication; } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRepository#markCompleted(java.lang.Object, org.springframework.modulith.events.PublicationTargetIdentifier, java.time.Instant) + */ @Override @Transactional - public EventPublication update(CompletableEventPublication publication) { + public void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate) { - var serializedEvent = serializeEvent(publication.getEvent()); - var listenerId = publication.getTargetIdentifier().getValue(); - var potentialPublicationIdsToBeUpdated = operations.query( // - SQL_STATEMENT_FIND_BY_EVENT_AND_LISTENER_ID, // - (rs, rowNum) -> getUuidFromResultSet(rs), // - serializedEvent, // - listenerId); - - potentialPublicationIdsToBeUpdated.stream() - .findFirst() - .ifPresent(id -> update(id, publication)); - - return publication; + operations.update(SQL_STATEMENT_UPDATE_BY_EVENT_AND_LISTENER_ID, // + Timestamp.from(completionDate), // + identifier.getValue(), // + serializer.serialize(event)); } @Override @@ -165,10 +174,7 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { @Transactional(readOnly = true) @SuppressWarnings("null") public List findIncompletePublications() { - - return operations.query( // - SQL_STATEMENT_FIND_UNCOMPLETED, // - this::resultSetToPublications); + return operations.query(SQL_STATEMENT_FIND_UNCOMPLETED, this::resultSetToPublications); } @Override @@ -176,14 +182,16 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { operations.execute(SQL_STATEMENT_DELETE_UNCOMPLETED); } - private void update(UUID id, CompletableEventPublication publication) { + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRepository#deleteCompletedPublicationsBefore(java.time.Instant) + */ + @Override + public void deleteCompletedPublicationsBefore(Instant instant) { - var timestamp = publication.getCompletionDate().map(Timestamp::from).orElse(null); + Assert.notNull(instant, "Instant must not be null!"); - operations.update( // - SQL_STATEMENT_UPDATE, // - timestamp, // - uuidToDatabase(id)); + operations.update(SQL_STATEMENT_DELETE_UNCOMPLETED_BEFORE, Timestamp.from(instant)); } @SuppressWarnings("null") @@ -269,7 +277,7 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { } } - private static class JdbcEventPublication implements CompletableEventPublication { + private static class JdbcEventPublication implements EventPublication { private final UUID id; private final Instant publicationDate; @@ -308,6 +316,15 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { this.completionDate = completionDate; } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublication#getPublicationIdentifier() + */ + @Override + public UUID getIdentifier() { + return id; + } + /* * (non-Javadoc) * @see org.springframework.modulith.events.EventPublication#getEvent() @@ -355,14 +372,11 @@ class JdbcEventPublicationRepository implements EventPublicationRepository { /* * (non-Javadoc) - * @see org.springframework.modulith.events.CompletableEventPublication#markCompleted() + * @see org.springframework.modulith.events.Completable#markCompleted(java.time.Instant) */ @Override - public CompletableEventPublication markCompleted() { - - this.completionDate = Instant.now(); - - return this; + public void markCompleted(Instant instant) { + this.completionDate = instant; } /* diff --git a/spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryIntegrationTests.java b/spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryIntegrationTests.java index 17999dc4..015bd2f6 100644 --- a/spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryIntegrationTests.java +++ b/spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryIntegrationTests.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.*; import lombok.Value; +import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; @@ -34,7 +35,6 @@ import org.springframework.boot.test.autoconfigure.jdbc.JdbcTest; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.context.annotation.Import; import org.springframework.jdbc.core.JdbcOperations; -import org.springframework.modulith.events.CompletableEventPublication; import org.springframework.modulith.events.EventPublication; import org.springframework.modulith.events.EventSerializer; import org.springframework.modulith.events.PublicationTargetIdentifier; @@ -79,10 +79,7 @@ class JdbcEventPublicationRepositoryIntegrationTests { when(serializer.serialize(testEvent)).thenReturn(serializedEvent); when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); - var publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); - - // Store publication - repository.create(publication); + var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); var eventPublications = repository.findIncompletePublications(); @@ -96,7 +93,7 @@ class JdbcEventPublicationRepositoryIntegrationTests { .isPresent(); // Complete publication - repository.update(publication.markCompleted()); + repository.markCompleted(publication, Instant.now()); assertThat(repository.findIncompletePublications()).isEmpty(); } @@ -117,13 +114,7 @@ class JdbcEventPublicationRepositoryIntegrationTests { } private void createPublicationAt(LocalDateTime publicationDate) { - - EventPublication publication = mock(EventPublication.class); - when(publication.getEvent()).thenReturn(""); - when(publication.getTargetIdentifier()).thenReturn(TARGET_IDENTIFIER); - when(publication.getPublicationDate()).thenReturn(publicationDate.toInstant(ZoneOffset.UTC)); - - repository.create(publication); + repository.create(EventPublication.of("", TARGET_IDENTIFIER, publicationDate.toInstant(ZoneOffset.UTC))); } @Test // GH-3 @@ -139,15 +130,11 @@ class JdbcEventPublicationRepositoryIntegrationTests { when(serializer.serialize(testEvent2)).thenReturn(serializedEvent2); when(serializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2); - var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER); - var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER); - - // Store publication - repository.create(publication1); - repository.create(publication2); + repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + var publication = repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); // Complete publication - repository.update(publication2.markCompleted()); + repository.markCompleted(publication, Instant.now()); assertThat(repository.findIncompletePublications()).hasSize(1) .element(0).extracting(EventPublication::getEvent).isEqualTo(testEvent1); @@ -174,10 +161,10 @@ class JdbcEventPublicationRepositoryIntegrationTests { when(serializer.serialize(testEvent)).thenReturn(serializedEvent); when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); - var publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); + var publication = EventPublication.of(testEvent, TARGET_IDENTIFIER); repository.create(publication); - repository.update(publication.markCompleted()); + repository.markCompleted(publication, Instant.now()); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER); @@ -193,23 +180,19 @@ class JdbcEventPublicationRepositoryIntegrationTests { when(serializer.serialize(testEvent)).thenReturn(serializedEvent); when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); - var publicationOld = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); + var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); Thread.sleep(10); - var publicationNew = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); - - repository.create(publicationNew); - repository.create(publicationOld); + repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER); assertThat(actual).hasValueSatisfying(it -> { assertThat(it.getPublicationDate()) // - .isCloseTo(publicationOld.getPublicationDate(), within(1, ChronoUnit.MILLIS)); + .isCloseTo(publication.getPublicationDate(), within(1, ChronoUnit.MILLIS)); }); } - @Test - // GH-3 + @Test // GH-3 void shouldSilentlyIgnoreNotSerializableEvents() { var testEvent = new TestEvent("id"); @@ -219,7 +202,7 @@ class JdbcEventPublicationRepositoryIntegrationTests { when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); // Store publication - repository.create(CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); operations.update("UPDATE EVENT_PUBLICATION SET EVENT_TYPE='abc'"); @@ -240,19 +223,41 @@ class JdbcEventPublicationRepositoryIntegrationTests { when(serializer.serialize(testEvent2)).thenReturn(serializedEvent2); when(serializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2); - var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER); - var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER); - - repository.create(publication1); - repository.create(publication2); - - repository.update(publication1.markCompleted()); + var publication = repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.markCompleted(publication, Instant.now()); repository.deleteCompletedPublications(); assertThat(operations.query("SELECT * FROM EVENT_PUBLICATION", (rs, __) -> rs.getString("SERIALIZED_EVENT"))) .hasSize(1).element(0).isEqualTo(serializedEvent2); } + + @Test // GH-251 + void shouldDeleteCompletedEventsBefore() { + + var testEvent1 = new TestEvent("abc"); + var serializedEvent1 = "{\"eventId\":\"abc\"}"; + var testEvent2 = new TestEvent("def"); + var serializedEvent2 = "{\"eventId\":\"def\"}"; + + when(serializer.serialize(testEvent1)).thenReturn(serializedEvent1); + when(serializer.deserialize(serializedEvent1, TestEvent.class)).thenReturn(testEvent1); + when(serializer.serialize(testEvent2)).thenReturn(serializedEvent2); + when(serializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2); + + repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + + var now = Instant.now(); + + repository.markCompleted(testEvent1, TARGET_IDENTIFIER, now.minusSeconds(30)); + repository.markCompleted(testEvent2, TARGET_IDENTIFIER, now); + repository.deleteCompletedPublicationsBefore(now.minusSeconds(15)); + + assertThat(operations.query("SELECT * FROM EVENT_PUBLICATION", (rs, __) -> rs.getString("SERIALIZED_EVENT"))) + .hasSize(1).element(0).isEqualTo(serializedEvent2); + } } @Nested diff --git a/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublication.java b/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublication.java index dc196b1c..02b9670d 100644 --- a/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublication.java +++ b/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublication.java @@ -51,14 +51,15 @@ class JpaEventPublication { * @param serializedEvent must not be {@literal null} or empty. * @param eventType must not be {@literal null}. */ - JpaEventPublication(Instant publicationDate, String listenerId, String serializedEvent, Class eventType) { + JpaEventPublication(UUID id, Instant publicationDate, String listenerId, String serializedEvent, Class eventType) { + Assert.notNull(id, "Identifier must not be null!"); Assert.notNull(publicationDate, "Publication date must not be null!"); Assert.notNull(listenerId, "Listener id must not be null or empty!"); Assert.notNull(serializedEvent, "Serialized event must not be null or empty!"); Assert.notNull(eventType, "Event type must not be null!"); - this.id = UUID.randomUUID(); + this.id = id; this.publicationDate = publicationDate; this.listenerId = listenerId; this.serializedEvent = serializedEvent; diff --git a/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java index ab6afc85..352a0733 100644 --- a/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-jpa/src/main/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepository.java @@ -21,8 +21,8 @@ import java.time.Instant; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.UUID; -import org.springframework.modulith.events.CompletableEventPublication; import org.springframework.modulith.events.EventPublication; import org.springframework.modulith.events.EventPublicationRepository; import org.springframework.modulith.events.EventSerializer; @@ -64,6 +64,13 @@ class JpaEventPublicationRepository implements EventPublicationRepository { p.completionDate is not null """; + private static final String DELETE_COMPLETED_BEFORE = """ + delete + from JpaEventPublication p + where + p.completionDate < ?1 + """; + private final EntityManager entityManager; private final EventSerializer serializer; @@ -98,19 +105,14 @@ class JpaEventPublicationRepository implements EventPublicationRepository { /* * (non-Javadoc) - * @see org.springframework.modulith.events.EventPublicationRepository#update(org.springframework.modulith.events.CompletableEventPublication) + * @see org.springframework.modulith.events.EventPublicationRepository#markCompleted(java.lang.Object, org.springframework.modulith.events.PublicationTargetIdentifier, java.time.Instant) */ @Override @Transactional - public EventPublication update(CompletableEventPublication publication) { + public void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate) { - var id = publication.getTargetIdentifier().getValue(); - var event = publication.getEvent(); - - findEntityBySerializedEventAndListenerIdAndCompletionDateNull(event, id) // - .ifPresent(entity -> entity.completionDate = publication.getCompletionDate().orElse(null)); - - return publication; + findEntityBySerializedEventAndListenerIdAndCompletionDateNull(event, identifier) + .map(it -> it.completionDate = completionDate); } /* @@ -136,7 +138,7 @@ class JpaEventPublicationRepository implements EventPublicationRepository { public Optional findIncompletePublicationsByEventAndTargetIdentifier( // Object event, PublicationTargetIdentifier targetIdentifier) { - return findEntityBySerializedEventAndListenerIdAndCompletionDateNull(event, targetIdentifier.getValue()) + return findEntityBySerializedEventAndListenerIdAndCompletionDateNull(event, targetIdentifier) .map(this::entityToDomain); } @@ -150,14 +152,29 @@ class JpaEventPublicationRepository implements EventPublicationRepository { entityManager.createQuery(DELETE_COMPLETED).executeUpdate(); } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRepository#deleteCompletedPublicationsBefore(java.time.Instant) + */ + @Override + public void deleteCompletedPublicationsBefore(Instant instant) { + + Assert.notNull(instant, "Instant must not be null!"); + + var query = entityManager.createQuery(DELETE_COMPLETED_BEFORE); + + query.setParameter(1, instant); + query.executeUpdate(); + } + private Optional findEntityBySerializedEventAndListenerIdAndCompletionDateNull( // - Object event, String listenerId) { + Object event, PublicationTargetIdentifier listenerId) { var serializedEvent = serializeEvent(event); var query = entityManager.createQuery(BY_EVENT_AND_LISTENER_ID, JpaEventPublication.class) .setParameter(1, serializedEvent) - .setParameter(2, listenerId); + .setParameter(2, listenerId.getValue()); return query.getResultStream().findFirst(); } @@ -167,7 +184,8 @@ class JpaEventPublicationRepository implements EventPublicationRepository { } private JpaEventPublication domainToEntity(EventPublication domain) { - return new JpaEventPublication(domain.getPublicationDate(), domain.getTargetIdentifier().getValue(), + return new JpaEventPublication(domain.getIdentifier(), domain.getPublicationDate(), + domain.getTargetIdentifier().getValue(), serializeEvent(domain.getEvent()), domain.getEvent().getClass()); } @@ -175,7 +193,7 @@ class JpaEventPublicationRepository implements EventPublicationRepository { return new JpaEventPublicationAdapter(entity, serializer); } - private static class JpaEventPublicationAdapter implements CompletableEventPublication { + private static class JpaEventPublicationAdapter implements EventPublication { private final JpaEventPublication publication; private final EventSerializer serializer; @@ -196,6 +214,15 @@ class JpaEventPublicationRepository implements EventPublicationRepository { this.serializer = serializer; } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublication#getPublicationIdentifier() + */ + @Override + public UUID getIdentifier() { + return publication.id; + } + /* * (non-Javadoc) * @see org.springframework.modulith.events.EventPublication#getEvent() @@ -243,12 +270,11 @@ class JpaEventPublicationRepository implements EventPublicationRepository { /* * (non-Javadoc) - * @see org.springframework.modulith.events.CompletableEventPublication#markCompleted() + * @see org.springframework.modulith.events.Completable#markCompleted(java.time.Instant) */ @Override - public CompletableEventPublication markCompleted() { - publication.markCompleted(); - return this; + public void markCompleted(Instant instant) { + this.publication.completionDate = instant; } /* diff --git a/spring-modulith-events/spring-modulith-events-jpa/src/test/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepositoryIntegrationTests.java b/spring-modulith-events/spring-modulith-events-jpa/src/test/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepositoryIntegrationTests.java index ee75b08b..6c830b85 100644 --- a/spring-modulith-events/spring-modulith-events-jpa/src/test/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepositoryIntegrationTests.java +++ b/spring-modulith-events/spring-modulith-events-jpa/src/test/java/org/springframework/modulith/events/jpa/JpaEventPublicationRepositoryIntegrationTests.java @@ -23,13 +23,15 @@ import jakarta.persistence.EntityManagerFactory; import lombok.RequiredArgsConstructor; import lombok.Value; +import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Comparator; -import java.util.List; +import java.util.UUID; import javax.sql.DataSource; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.context.annotation.Bean; @@ -38,7 +40,6 @@ import org.springframework.context.annotation.Import; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; -import org.springframework.modulith.events.CompletableEventPublication; import org.springframework.modulith.events.EventPublication; import org.springframework.modulith.events.EventSerializer; import org.springframework.modulith.events.PublicationTargetIdentifier; @@ -64,7 +65,6 @@ import org.springframework.transaction.annotation.Transactional; class JpaEventPublicationRepositoryIntegrationTests { private static final PublicationTargetIdentifier TARGET_IDENTIFIER = PublicationTargetIdentifier.of("listener"); - private static final EventSerializer eventSerializer = mock(EventSerializer.class); @Configuration @@ -113,29 +113,31 @@ class JpaEventPublicationRepositoryIntegrationTests { private final JpaEventPublicationRepository repository; private final EntityManager em; + @AfterEach + public void flush() { + em.flush(); + } + @Test void persistsJpaEventPublication() { - TestEvent testEvent = new TestEvent("abc"); - String serializedEvent = "{\"eventId\":\"abc\"}"; + var testEvent = new TestEvent("abc"); + var serializedEvent = "{\"eventId\":\"abc\"}"; when(eventSerializer.serialize(testEvent)).thenReturn(serializedEvent); when(eventSerializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); - CompletableEventPublication publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); + var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); - // Store publication - repository.create(publication); + var eventPublications = repository.findIncompletePublications(); - List eventPublications = repository.findIncompletePublications(); assertThat(eventPublications).hasSize(1); assertThat(eventPublications.get(0).getEvent()).isEqualTo(publication.getEvent()); assertThat(eventPublications.get(0).getTargetIdentifier()).isEqualTo(publication.getTargetIdentifier()); assertThat(repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER)) .isPresent(); - // Complete publication - repository.update(publication.markCompleted()); + repository.markCompleted(publication, Instant.now()); assertThat(repository.findIncompletePublications()).isEmpty(); } @@ -160,11 +162,10 @@ class JpaEventPublicationRepositoryIntegrationTests { when(eventSerializer.serialize(testEvent)).thenReturn(serializedEvent); when(eventSerializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent); - CompletableEventPublication publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); + var publication = EventPublication.of(testEvent, TARGET_IDENTIFIER); - // Store publication repository.create(publication); - repository.update(publication.markCompleted()); + repository.markCompleted(publication, Instant.now()); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER); @@ -184,14 +185,9 @@ class JpaEventPublicationRepositoryIntegrationTests { when(eventSerializer.serialize(testEvent2)).thenReturn(serializedEvent2); when(eventSerializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2); - var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER); - var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER); - - repository.create(publication1); - repository.create(publication2); - - repository.update(publication1.markCompleted()); - + repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.markCompleted(testEvent1, TARGET_IDENTIFIER, Instant.now()); repository.deleteCompletedPublications(); assertThat(em.createQuery("select p from JpaEventPublication p", JpaEventPublication.class).getResultList()) @@ -212,8 +208,35 @@ class JpaEventPublicationRepositoryIntegrationTests { .isSortedAccordingTo(Comparator.comparing(EventPublication::getPublicationDate)); } + @Test // GH-251 + void shouldDeleteCompletedEventsBefore() { + + var testEvent1 = new TestEvent("abc"); + var serializedEvent1 = "{\"eventId\":\"abc\"}"; + var testEvent2 = new TestEvent("def"); + var serializedEvent2 = "{\"eventId\":\"def\"}"; + + when(eventSerializer.serialize(testEvent1)).thenReturn(serializedEvent1); + when(eventSerializer.deserialize(serializedEvent1, TestEvent.class)).thenReturn(testEvent1); + when(eventSerializer.serialize(testEvent2)).thenReturn(serializedEvent2); + when(eventSerializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2); + + repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + + var now = Instant.now(); + + repository.markCompleted(testEvent1, TARGET_IDENTIFIER, now.minusSeconds(30)); + repository.markCompleted(testEvent2, TARGET_IDENTIFIER, now); + repository.deleteCompletedPublicationsBefore(now.minusSeconds(15)); + + assertThat(em.createQuery("select p from JpaEventPublication p", JpaEventPublication.class).getResultList()) + .hasSize(1) // + .element(0).extracting(it -> it.serializedEvent).isEqualTo(serializedEvent2); + } + private void savePublicationAt(LocalDateTime date) { - em.persist(new JpaEventPublication(date.toInstant(ZoneOffset.UTC), "", "", Object.class)); + em.persist(new JpaEventPublication(UUID.randomUUID(), date.toInstant(ZoneOffset.UTC), "", "", Object.class)); } @Value diff --git a/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublication.java b/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublication.java index 56d99383..948659c2 100644 --- a/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublication.java +++ b/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublication.java @@ -16,8 +16,8 @@ package org.springframework.modulith.events.mongodb; import java.time.Instant; +import java.util.UUID; -import org.bson.types.ObjectId; import org.springframework.data.annotation.PersistenceCreator; import org.springframework.data.mongodb.core.mapping.Document; import org.springframework.lang.Nullable; @@ -32,7 +32,7 @@ import org.springframework.util.Assert; @Document(collection = "org_springframework_modulith_events") class MongoDbEventPublication { - final ObjectId id; + final UUID id; final Instant publicationDate; final String listenerId; final Object event; @@ -50,7 +50,7 @@ class MongoDbEventPublication { * @param completionDate can be {@literal null}. */ @PersistenceCreator - MongoDbEventPublication(ObjectId id, Instant publicationDate, String listenerId, Object event, + MongoDbEventPublication(UUID id, Instant publicationDate, String listenerId, Object event, @Nullable Instant completionDate) { Assert.notNull(id, "Id must not be null!"); @@ -72,8 +72,8 @@ class MongoDbEventPublication { * @param listenerId must not be {@literal null}. * @param event must not be {@literal null}. */ - MongoDbEventPublication(Instant publicationDate, String listenerId, Object event) { - this(new ObjectId(), publicationDate, listenerId, event, null); + MongoDbEventPublication(UUID id, Instant publicationDate, String listenerId, Object event) { + this(id, publicationDate, listenerId, event, null); } /** diff --git a/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java index bdf90655..322f97fe 100644 --- a/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java @@ -22,11 +22,13 @@ import java.time.Instant; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Update; import org.springframework.data.util.TypeInformation; -import org.springframework.modulith.events.CompletableEventPublication; import org.springframework.modulith.events.EventPublication; import org.springframework.modulith.events.EventPublicationRepository; import org.springframework.modulith.events.PublicationTargetIdentifier; @@ -55,6 +57,10 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository { this.mongoTemplate = mongoTemplate; } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRepository#create(org.springframework.modulith.events.EventPublication) + */ @Override public EventPublication create(EventPublication publication) { @@ -63,17 +69,17 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository { return publication; } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRepository#markCompleted(java.lang.Object, org.springframework.modulith.events.PublicationTargetIdentifier, java.time.Instant) + */ @Override - public EventPublication update(CompletableEventPublication publication) { + public void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate) { - return findDocumentsByEventAndTargetIdentifierAndCompletionDateNull(publication.getEvent(), - publication.getTargetIdentifier()) // - .stream() // - .findFirst() // - .map(document -> document.markCompleted(publication.getCompletionDate().orElse(null))) // - .map(mongoTemplate::save) // - .map(this::documentToDomain) // - .orElse(publication); + var criteria = byEventAndListenerId(event, identifier); + var update = Update.update("completionDate", completionDate); + + mongoTemplate.updateFirst(query(criteria), update, MongoDbEventPublication.class); } @Override @@ -83,7 +89,7 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository { .with(Sort.by("publicationDate").ascending()); return mongoTemplate.find(query, MongoDbEventPublication.class).stream() // - . map(this::documentToDomain) // + .map(this::documentToDomain) // .toList(); } @@ -101,39 +107,59 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository { return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRepository#deleteCompletedPublications() + */ @Override public void deleteCompletedPublications() { mongoTemplate.remove(query(where("completionDate").ne(null)), MongoDbEventPublication.class); } + /* + * (non-Javadoc) + * @see org.springframework.modulith.events.EventPublicationRepository#deleteCompletedPublicationsBefore(java.time.Instant) + */ + @Override + public void deleteCompletedPublicationsBefore(Instant instant) { + + Assert.notNull(instant, "Instant must not be null!"); + + mongoTemplate.remove(query(where("completionDate").lt(instant)), MongoDbEventPublication.class); + } + private List findDocumentsByEventAndTargetIdentifierAndCompletionDateNull( // Object event, PublicationTargetIdentifier targetIdentifier) { - // we need to enforce writing of the type information - var eventAsMongoType = mongoTemplate.getConverter().convertToMongoType(event, TypeInformation.OBJECT); - - var query = query( - where("event").is(eventAsMongoType) // - .and("listenerId").is(targetIdentifier.getValue()) // - .and("completionDate").isNull()) // - .with(Sort.by("publicationDate").ascending()); + var criteria = byEventAndListenerId(event, targetIdentifier); + var query = query(criteria).with(Sort.by("publicationDate").ascending()); return mongoTemplate.find(query, MongoDbEventPublication.class); } + private Criteria byEventAndListenerId(Object event, PublicationTargetIdentifier identifier) { + + var eventAsMongoType = mongoTemplate.getConverter().convertToMongoType(event, TypeInformation.OBJECT); + + return where("event").is(eventAsMongoType) // + .and("listenerId").is(identifier.getValue()) + .and("completionDate").isNull(); + } + private MongoDbEventPublication domainToDocument(EventPublication publication) { return new MongoDbEventPublication( // + publication.getIdentifier(), // publication.getPublicationDate(), // publication.getTargetIdentifier().getValue(), // publication.getEvent()); } - private CompletableEventPublication documentToDomain(MongoDbEventPublication document) { + private EventPublication documentToDomain(MongoDbEventPublication document) { return new MongoDbEventPublicationAdapter(document); } - private static class MongoDbEventPublicationAdapter implements CompletableEventPublication { + private static class MongoDbEventPublicationAdapter implements EventPublication { private final MongoDbEventPublication publication; @@ -141,6 +167,11 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository { this.publication = publication; } + @Override + public UUID getIdentifier() { + return publication.id; + } + @Override public Object getEvent() { return publication.event; @@ -167,9 +198,8 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository { } @Override - public CompletableEventPublication markCompleted() { - publication.completionDate = Instant.now(); - return this; + public void markCompleted(Instant instant) { + this.publication.completionDate = instant; } /* @@ -183,11 +213,11 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository { return true; } - if (!(obj instanceof MongoDbEventPublicationAdapter other)) { + if (!(obj instanceof MongoDbEventPublicationAdapter that)) { return false; } - return Objects.equals(publication, other.publication); + return Objects.equals(publication, that.publication); } /* diff --git a/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java b/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java index 05945337..6957ac98 100644 --- a/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java +++ b/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java @@ -19,12 +19,13 @@ import static org.assertj.core.api.Assertions.*; import lombok.Value; +import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.util.Comparator; +import java.util.UUID; -import org.bson.types.ObjectId; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; @@ -32,7 +33,6 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest; import org.springframework.data.mongodb.core.MongoTemplate; -import org.springframework.modulith.events.CompletableEventPublication; import org.springframework.modulith.events.EventPublication; import org.springframework.modulith.events.PublicationTargetIdentifier; import org.springframework.modulith.testapp.TestApplication; @@ -66,11 +66,7 @@ class MongoDbEventPublicationRepositoryTest { void shouldPersistAndUpdateEventPublication() { var testEvent = new TestEvent("abc"); - - var publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); - - // Store publication - repository.create(publication); + var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); var eventPublications = repository.findIncompletePublications(); @@ -82,7 +78,7 @@ class MongoDbEventPublicationRepositoryTest { .isPresent(); // Complete publication - repository.update(publication.markCompleted()); + repository.markCompleted(publication, Instant.now()); assertThat(repository.findIncompletePublications()).isEmpty(); } @@ -93,12 +89,10 @@ class MongoDbEventPublicationRepositoryTest { var testEvent1 = new TestEvent("id1"); var testEvent2 = new TestEvent("id2"); - var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER); - var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER); + repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + var publication = repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); - repository.create(publication1); - repository.create(publication2); - repository.update(publication2.markCompleted()); + repository.markCompleted(publication, Instant.now()); assertThat(repository.findIncompletePublications()).hasSize(1) .element(0).extracting(EventPublication::getEvent).isEqualTo(testEvent1); @@ -120,7 +114,7 @@ class MongoDbEventPublicationRepositoryTest { private void savePublicationAt(LocalDateTime date) { mongoTemplate.save( - new MongoDbEventPublication(new ObjectId(), date.toInstant(ZoneOffset.UTC), "", "", null)); + new MongoDbEventPublication(UUID.randomUUID(), date.toInstant(ZoneOffset.UTC), "", "", null)); } @Nested @@ -132,10 +126,10 @@ class MongoDbEventPublicationRepositoryTest { var testEvent1 = new TestEvent("abc"); var testEvent2 = new TestEvent("def"); - repository.create(CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER)); - repository.create(CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER)); - repository.create(CompletableEventPublication.of( - testEvent1, PublicationTargetIdentifier.of(TARGET_IDENTIFIER.getValue() + "!"))); + repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository + .create(EventPublication.of(testEvent1, PublicationTargetIdentifier.of(TARGET_IDENTIFIER.getValue() + "!"))); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent1, TARGET_IDENTIFIER); @@ -159,11 +153,11 @@ class MongoDbEventPublicationRepositoryTest { TestEvent testEvent = new TestEvent("abc"); - CompletableEventPublication publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER); + EventPublication publication = EventPublication.of(testEvent, TARGET_IDENTIFIER); // Store publication repository.create(publication); - repository.update(publication.markCompleted()); + repository.markCompleted(publication, Instant.now()); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER); @@ -175,16 +169,15 @@ class MongoDbEventPublicationRepositoryTest { var testEvent = new TestEvent("id"); - var publicationOld = repository - .create(CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER)); + var publication = repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); Thread.sleep(10); - repository.create(CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent, TARGET_IDENTIFIER)); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER); assertThat(actual).hasValueSatisfying(it -> // assertThat(it.getPublicationDate()) // - .isCloseTo(publicationOld.getPublicationDate(), within(1, ChronoUnit.MILLIS))); + .isCloseTo(publication.getPublicationDate(), within(1, ChronoUnit.MILLIS))); } } @@ -197,20 +190,36 @@ class MongoDbEventPublicationRepositoryTest { var testEvent1 = new TestEvent("abc"); var testEvent2 = new TestEvent("def"); - var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER); - var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER); - - repository.create(publication1); - repository.create(publication2); - - repository.update(publication1.markCompleted()); + var publication = repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + repository.markCompleted(publication, Instant.now()); repository.deleteCompletedPublications(); assertThat(mongoTemplate.findAll(MongoDbEventPublication.class)) // .hasSize(1) // .element(0).extracting(it -> it.event).isEqualTo(testEvent2); } + + @Test // GH-251 + void shouldDeleteCompletedEventsBefore() { + + var testEvent1 = new TestEvent("abc"); + var testEvent2 = new TestEvent("def"); + + var publication1 = repository.create(EventPublication.of(testEvent1, TARGET_IDENTIFIER)); + var publication2 = repository.create(EventPublication.of(testEvent2, TARGET_IDENTIFIER)); + + var now = Instant.now(); + + repository.markCompleted(publication1, now.minusSeconds(30)); + repository.markCompleted(publication2, now); + repository.deleteCompletedPublicationsBefore(now.minusSeconds(15)); + + assertThat(mongoTemplate.findAll(MongoDbEventPublication.class)) // + .hasSize(1) // + .element(0).extracting(it -> it.event).isEqualTo(testEvent2); + } } @Value