GH-3 - Refactor events registration to prepare easy addition of new repository types.

- Introduce Repository interface as a customization point
- Get rid of Registry interface by having one generic implementation
- Combine EventPublication and all its subclasses in one domain object
- Make Repository instead of Registry be dependent on EventSerializer

Signed-off-by: Dmitry Belyaev <dbelyaev@vmware.com>
This commit is contained in:
Björn Kieling
2022-07-19 16:27:15 +02:00
committed by Bjoern Kieling
parent 5f83f7ac13
commit 913ec7bb4e
25 changed files with 467 additions and 453 deletions

5
.git-authors Normal file
View File

@@ -0,0 +1,5 @@
authors:
db: Dmitry Belyaev; dbelyaev
bk: Björn Kieling; bkieling
email:
domain: vmware.com

View File

@@ -23,26 +23,6 @@
<dependencies> <dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.junit.jupiter</groupId> <groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId> <artifactId>junit-jupiter-engine</artifactId>
@@ -61,24 +41,6 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<scope>test</scope>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency> <dependency>
<groupId>ch.qos.logback</groupId> <groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId> <artifactId>logback-classic</artifactId>

View File

@@ -16,5 +16,34 @@
<module.name>org.springframework.modulith.events.core</module.name> <module.name>org.springframework.modulith.events.core</module.name>
</properties> </properties>
<dependencies>
</project> <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
</dependency>
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
</project>

View File

@@ -1,5 +1,5 @@
/* /*
* Copyright 2017-2019 the original author or authors. * Copyright 2017-2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@@ -24,7 +24,6 @@ import java.util.Optional;
* @author Oliver Drotbohm * @author Oliver Drotbohm
*/ */
public interface CompletableEventPublication extends EventPublication { public interface CompletableEventPublication extends EventPublication {
/** /**
* Returns the completion date of the publication. * Returns the completion date of the publication.
* *

View File

@@ -1,11 +1,11 @@
/* /*
* Copyright 2017 the original author or authors. * Copyright 2017-2022 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * https://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,7 +27,7 @@ import java.util.Optional;
/** /**
* Default {@link CompletableEventPublication} implementation. * Default {@link CompletableEventPublication} implementation.
* *
* @author Oliver Gierke * @author Oliver Drotbohm
*/ */
@Getter @Getter
@RequiredArgsConstructor(staticName = "of") @RequiredArgsConstructor(staticName = "of")
@@ -41,10 +41,6 @@ class DefaultEventPublication implements CompletableEventPublication {
private Optional<Instant> completionDate = Optional.empty(); private Optional<Instant> completionDate = Optional.empty();
/*
* (non-Javadoc)
* @see de.olivergierke.events.CompletableEventPublication#markCompleted()
*/
@Override @Override
public CompletableEventPublication markCompleted() { public CompletableEventPublication markCompleted() {

View File

@@ -0,0 +1,121 @@
/*
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events;
import java.util.List;
import java.util.stream.Stream;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationListener;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* A registry to capture event publications to {@link ApplicationListener}s. Allows to register those publications, mark
* them as completed and lookup incomplete publications.
*
* @author Oliver Drotbohm, Björn Kieling, Dmitry Belyaev
*/
@Slf4j
@RequiredArgsConstructor
public class DefaultEventPublicationRegistry implements DisposableBean, EventPublicationRegistry {
private final @NonNull EventPublicationRepository events;
/**
* Stores {@link EventPublication}s for the given event and {@link ApplicationListener}s.
*
* @param event must not be {@literal null}.
* @param listeners must not be {@literal null}.
*/
public void store(Object event, Stream<PublicationTargetIdentifier> listeners) {
listeners.map(it -> map(event, it))
.forEach(events::create);
}
/**
* Returns all {@link EventPublication}s that have not been completed yet.
*
* @return will never be {@literal null}.
*/
public Iterable<EventPublication> findIncompletePublications() {
return events.findByCompletionDateIsNull();
}
/**
* Marks the publication for the given event and {@link PublicationTargetIdentifier} as completed.
*
* @param event must not be {@literal null}.
* @param targetIdentifier must not be {@literal null}.
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void markCompleted(Object event, PublicationTargetIdentifier targetIdentifier) {
Assert.notNull(event, "Domain event must not be null!");
Assert.notNull(targetIdentifier, "Listener identifier must not be null!");
events.findByEventAndTargetIdentifier(event, targetIdentifier) //
.map(DefaultEventPublicationRegistry::LOGCompleted) //
.map(e -> CompletableEventPublication.of(e.getEvent(), e.getTargetIdentifier()))
.ifPresent(it -> events.updateCompletionDate(it.markCompleted()));
}
@Override
public void destroy() {
List<EventPublication> publications = events.findByCompletionDateIsNull();
if (publications.isEmpty()) {
LOG.info("No publications outstanding!");
return;
}
LOG.info("Shutting down with the following publications left unfinished:");
for (int i = 0; i < publications.size(); i++) {
String prefix = (i + 1) == publications.size() ? "└─" : "├─";
EventPublication it = publications.get(i);
LOG.info("{} - {} - {}", prefix, it.getEvent().getClass().getName(), it.getTargetIdentifier().getValue());
}
}
private EventPublication map(Object event, PublicationTargetIdentifier targetIdentifier) {
EventPublication result = CompletableEventPublication.of(event, targetIdentifier);
LOG.debug("Registering publication of {} for {}.", //
result.getEvent().getClass().getName(), result.getTargetIdentifier().getValue());
return result;
}
private static EventPublication LOGCompleted(EventPublication publication) {
LOG.debug("Marking publication of event {} to listener {} completed.", //
publication.getEvent().getClass().getName(), publication.getTargetIdentifier().getValue());
return publication;
}
}

View File

@@ -24,11 +24,9 @@ import org.springframework.util.Assert;
/** /**
* An event publication. * An event publication.
* *
* @author Oliver Drotbohm * @author Oliver Drotbohm, Björn Kieling, Dmitry Belyaev
* @see CompletableEventPublication#of(Object, PublicationTargetIdentifier)
*/ */
public interface EventPublication extends Comparable<EventPublication> { public interface EventPublication extends Comparable<EventPublication> {
/** /**
* Returns the event that is published. * Returns the event that is published.
* *
@@ -78,10 +76,6 @@ public interface EventPublication extends Comparable<EventPublication> {
return this.getTargetIdentifier().equals(identifier); return this.getTargetIdentifier().equals(identifier);
} }
/*
* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
@Override @Override
public default int compareTo(EventPublication that) { public default int compareTo(EventPublication that) {
return this.getPublicationDate().compareTo(that.getPublicationDate()); return this.getPublicationDate().compareTo(that.getPublicationDate());

View File

@@ -18,13 +18,12 @@ package org.springframework.modulith.events;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.util.Assert;
/** /**
* A registry to capture event publications to {@link ApplicationListener}s. Allows to register those publications, mark * A registry to capture event publications to {@link ApplicationListener}s. Allows to register those publications, mark
* them as completed and lookup incomplete publications. * them as completed and lookup incomplete publications.
* *
* @author Oliver Drotbohm * @author Oliver Drotbohm, Björn Kieling, Dmitry Belyaev
*/ */
public interface EventPublicationRegistry { public interface EventPublicationRegistry {
@@ -36,30 +35,18 @@ public interface EventPublicationRegistry {
*/ */
void store(Object event, Stream<PublicationTargetIdentifier> listeners); void store(Object event, Stream<PublicationTargetIdentifier> listeners);
/**
* Marks the publication for the given event and {@link PublicationTargetIdentifier} as completed.
*
* @param event must not be {@literal null}.
* @param listener must not be {@literal null}.
*/
void markCompleted(Object event, PublicationTargetIdentifier listener);
/**
* Marks the given {@link EventPublication} as completed.
*
* @param publication must not be {@literal null}.
*/
default void markCompleted(EventPublication publication) {
Assert.notNull(publication, "Publication must not be null!");
markCompleted(publication.getEvent(), publication.getTargetIdentifier());
}
/** /**
* Returns all {@link EventPublication}s that have not been completed yet. * Returns all {@link EventPublication}s that have not been completed yet.
* *
* @return will never be {@literal null}. * @return will never be {@literal null}.
*/ */
Iterable<EventPublication> findIncompletePublications(); Iterable<EventPublication> findIncompletePublications();
/**
* Marks the publication for the given event and {@link PublicationTargetIdentifier} as completed.
*
* @param event must not be {@literal null}.
* @param targetIdentifier must not be {@literal null}.
*/
void markCompleted(Object event, PublicationTargetIdentifier targetIdentifier);
} }

View File

@@ -0,0 +1,30 @@
package org.springframework.modulith.events;
import java.util.List;
import java.util.Optional;
/**
* Repository to store {@link EventPublication}s.
*
* @author Björn Kieling, Dmitry Belyaev
*/
public interface EventPublicationRepository {
EventPublication create(EventPublication publication);
EventPublication updateCompletionDate(CompletableEventPublication publication);
/**
* Returns all {@link EventPublication} that have not been completed yet.
*/
List<EventPublication> findByCompletionDateIsNull();
/**
* Return the {@link EventPublication} for the given serialized event and listener identifier.
*
* @param event must not be {@literal null}.
* @param targetIdentifier must not be {@literal null}.
* @return
*/
Optional<EventPublication> findByEventAndTargetIdentifier(Object event, PublicationTargetIdentifier targetIdentifier);
}

View File

@@ -35,5 +35,5 @@ public interface EventSerializer {
* @param type must not be {@literal null}. * @param type must not be {@literal null}.
* @return * @return
*/ */
Object deserialize(Object serialized, Class<?> type); <T> T deserialize(Object serialized, Class<T> type);
} }

View File

@@ -27,7 +27,7 @@ import lombok.Value;
@RequiredArgsConstructor(staticName = "of") @RequiredArgsConstructor(staticName = "of")
public class PublicationTargetIdentifier { public class PublicationTargetIdentifier {
String value; private String value;
/* /*
* (non-Javadoc) * (non-Javadoc)

View File

@@ -19,26 +19,37 @@ import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.modulith.events.DefaultEventPublicationRegistry;
import org.springframework.modulith.events.EventPublicationRegistry; import org.springframework.modulith.events.EventPublicationRegistry;
import org.springframework.modulith.events.EventPublicationRepository;
import org.springframework.modulith.events.support.CompletionRegisteringBeanPostProcessor; import org.springframework.modulith.events.support.CompletionRegisteringBeanPostProcessor;
import org.springframework.modulith.events.support.MapEventPublicationRegistry; import org.springframework.modulith.events.support.MapBackedEventPublicationRepository;
import org.springframework.modulith.events.support.PersistentApplicationEventMulticaster; import org.springframework.modulith.events.support.PersistentApplicationEventMulticaster;
/** /**
* @author Oliver Drotbohm * @author Oliver Drotbohm, Björn Kieling, Dmitry Belyaev
*/ */
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
class EventPublicationConfiguration { class EventPublicationConfiguration {
@Bean @Bean
PersistentApplicationEventMulticaster applicationEventMulticaster(ObjectProvider<EventPublicationRegistry> registry) { EventPublicationRegistry eventPublicationRegistry(
ObjectProvider<EventPublicationRepository> repositoryProvider) {
return new PersistentApplicationEventMulticaster( return new DefaultEventPublicationRegistry(
() -> registry.getIfAvailable(() -> new MapEventPublicationRegistry())); repositoryProvider.getIfAvailable(MapBackedEventPublicationRepository::new)
} );
}
@Bean @Bean
static CompletionRegisteringBeanPostProcessor bpp(ObjectFactory<EventPublicationRegistry> store) { PersistentApplicationEventMulticaster applicationEventMulticaster(
return new CompletionRegisteringBeanPostProcessor(() -> store.getObject()); EventPublicationRegistry eventPublicationRegistry) {
}
return new PersistentApplicationEventMulticaster(() -> eventPublicationRegistry);
}
@Bean
static CompletionRegisteringBeanPostProcessor bpp(ObjectFactory<EventPublicationRegistry> store) {
return new CompletionRegisteringBeanPostProcessor(store::getObject);
}
} }

View File

@@ -0,0 +1,65 @@
/*
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.support;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventPublicationRepository;
import org.springframework.modulith.events.PublicationTargetIdentifier;
/**
* Map based {@link EventPublicationRepository}, for testing purposes only.
*
* @author Björn Kieling, Dmitry Belyaev
*/
public class MapBackedEventPublicationRepository implements EventPublicationRepository {
private final List<CompletableEventPublication> events = new ArrayList<>();
@Override
public EventPublication create(EventPublication publication) {
events.add(CompletableEventPublication.of(publication.getEvent(), publication.getTargetIdentifier()));
return publication;
}
@Override
public EventPublication updateCompletionDate(CompletableEventPublication publication) {
findByEventAndTargetIdentifier(publication.getEvent(), publication.getTargetIdentifier())
.ifPresent(eventPublication -> ((CompletableEventPublication) eventPublication).markCompleted());
return publication;
}
@Override
public List<EventPublication> findByCompletionDateIsNull() {
return events.stream()
.filter(publication -> !publication.isPublicationCompleted())
.collect(Collectors.toList());
}
@Override
public Optional<EventPublication> findByEventAndTargetIdentifier(Object event, PublicationTargetIdentifier targetIdentifier) {
return events.stream()
.filter(publication ->
publication.equals(publication.getEvent()) && publication.getTargetIdentifier().equals(targetIdentifier))
.map(EventPublication.class::cast)
.findAny();
}
}

View File

@@ -1,79 +0,0 @@
/*
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.support;
import lombok.Value;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventPublicationRegistry;
import org.springframework.modulith.events.PublicationTargetIdentifier;
/**
* Map based {@link EventPublicationRegistry}, for testing purposes only.
*
* @author Oliver Drotbohm
*/
public class MapEventPublicationRegistry implements EventPublicationRegistry {
private final Map<Key, CompletableEventPublication> events = new HashMap<>();
/*
* (non-Javadoc)
* @see org.springframework.events.EventPublicationRegistry#findIncompletePublications()
*/
@Override
public Iterable<EventPublication> findIncompletePublications() {
return events.entrySet().stream()//
.filter(it -> !it.getValue().isPublicationCompleted())//
.map(it -> it.getValue())//
.collect(Collectors.toList());
}
/*
* (non-Javadoc)
* @see org.springframework.events.EventPublicationRegistry#store(java.lang.Object, java.util.Collection)
*/
@Override
public void store(Object event, Stream<PublicationTargetIdentifier> identifiers) {
identifiers.forEach(id -> {
events.computeIfAbsent(Key.of(event, id), it -> CompletableEventPublication.of(event, id));
});
}
/*
* (non-Javadoc)
* @see org.springframework.events.EventPublicationRegistry#markCompleted(java.lang.Object, org.springframework.events.PublicationTargetIdentifier)
*/
@Override
public void markCompleted(Object event, PublicationTargetIdentifier id) {
events.computeIfPresent(Key.of(event, id), (__, value) -> value.markCompleted());
}
@Value(staticConstructor = "of")
private static class Key {
Object event;
PublicationTargetIdentifier identifier;
}
}

View File

@@ -123,8 +123,8 @@ public class PersistentApplicationEventMulticaster extends AbstractApplicationEv
}); });
} }
private ApplicationListener<ApplicationEvent> executeListenerWithCompletion(EventPublication publication, private ApplicationListener<ApplicationEvent> executeListenerWithCompletion(
TransactionalApplicationListener<ApplicationEvent> listener) { EventPublication publication, TransactionalApplicationListener<ApplicationEvent> listener) {
listener.processEvent(publication.getApplicationEvent()); listener.processEvent(publication.getApplicationEvent());

View File

@@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* https://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,14 +15,15 @@
*/ */
package org.springframework.modulith.events; package org.springframework.modulith.events;
import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
/** /**
* @author Oliver Drotbohm * @author Oliver Drotbohm, Björn Kieling, Dmitry Belyaev
*/ */
class CompletableEventPublicationUnitTest { class CompletableEventPublicationTest {
@Test @Test
void rejectsNullEvent() { void rejectsNullEvent() {

View File

@@ -52,7 +52,7 @@ class JacksonEventSerializer implements EventSerializer {
* @see de.oliverDrotbohm.events.EventSerializer#deserialize(java.lang.Object, java.lang.Class) * @see de.oliverDrotbohm.events.EventSerializer#deserialize(java.lang.Object, java.lang.Class)
*/ */
@Override @Override
public Object deserialize(Object serialized, Class<?> type) { public <T> T deserialize(Object serialized, Class<T> type) {
try { try {
return mapper.get().readerFor(type).readValue(serialized.toString()); return mapper.get().readerFor(type).readValue(serialized.toString());

View File

@@ -15,25 +15,26 @@
*/ */
package org.springframework.modulith.events.jpa; package org.springframework.modulith.events.jpa;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.AccessLevel;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import java.time.Instant; import java.time.Instant;
import java.util.UUID; import java.util.UUID;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/** /**
* @author Oliver Drotbohm * @author Oliver Drotbohm, Dmitry Belyaev, Björn Kieling
*/ */
@Data @Data
@Entity @Entity
@NoArgsConstructor(force = true) @NoArgsConstructor(force = true)
@RequiredArgsConstructor(access = AccessLevel.PRIVATE) @AllArgsConstructor(access = AccessLevel.PRIVATE)
class JpaEventPublication { class JpaEventPublication {
private final @Id @Column(length = 16) UUID id; private final @Id @Column(length = 16) UUID id;
@@ -45,10 +46,10 @@ class JpaEventPublication {
private Instant completionDate; private Instant completionDate;
@Builder @Builder
static JpaEventPublication of(Instant publicationDate, String listenerId, Object serializedEvent, static JpaEventPublication of(UUID id, Instant publicationDate, String listenerId, Object serializedEvent,
Class<?> eventType) { Class<?> eventType, Instant completionDate) {
return new JpaEventPublication(UUID.randomUUID(), publicationDate, listenerId, serializedEvent.toString(), return new JpaEventPublication(id, publicationDate, listenerId, serializedEvent.toString(), eventType,
eventType); completionDate);
} }
JpaEventPublication markCompleted() { JpaEventPublication markCompleted() {

View File

@@ -16,28 +16,25 @@
package org.springframework.modulith.events.jpa; package org.springframework.modulith.events.jpa;
import jakarta.persistence.EntityManager; import jakarta.persistence.EntityManager;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.modulith.events.EventSerializer; import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.events.config.EventPublicationConfigurationExtension; import org.springframework.modulith.events.config.EventPublicationConfigurationExtension;
import lombok.RequiredArgsConstructor;
/** /**
* @author Oliver Drotbohm * @author Oliver Drotbohm, Dmitry Belyaev, Björn Kieling
*/ */
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
@RequiredArgsConstructor @RequiredArgsConstructor
class JpaEventPublicationConfiguration implements EventPublicationConfigurationExtension { class JpaEventPublicationConfiguration implements EventPublicationConfigurationExtension {
@Bean @Bean
public JpaEventPublicationRegistry jpaEventPublicationRegistry(JpaEventPublicationRepository repository, public JpaEventPublicationRepository jpaEventPublicationRepository(EntityManager em, EventSerializer serializer) {
EventSerializer serializer) { // TODO Why do we want to instantiate the serializer here and what
return new JpaEventPublicationRegistry(repository, serializer); // happens if no serializer is available?
} return new JpaEventPublicationRepository(em, serializer);
@Bean
public JpaEventPublicationRepository jpaEventPublicationRepository(EntityManager em) {
return new JpaEventPublicationRepository(em);
} }
} }

View File

@@ -1,175 +0,0 @@
/*
* Copyright 2017-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.jpa;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventPublicationRegistry;
import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.events.PublicationTargetIdentifier;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
/**
* JPA based {@link EventPublicationRegistry}.
*
* @author Oliver Gierke
*/
@Slf4j
@RequiredArgsConstructor
class JpaEventPublicationRegistry implements EventPublicationRegistry, DisposableBean {
private final @NonNull JpaEventPublicationRepository events;
private final @NonNull EventSerializer serializer;
/*
* (non-Javadoc)
* @see org.springframework.events.EventPublicationRegistry#store(java.lang.Object, java.util.Collection)
*/
@Override
public void store(Object event, Stream<PublicationTargetIdentifier> listeners) {
listeners.map(it -> CompletableEventPublication.of(event, it)) //
.map(this::map) //
.forEach(it -> events.create(it));
}
/*
* (non-Javadoc)
* @see org.springframework.events.EventPublicationRegistry#findIncompletePublications()
*/
@Override
public Iterable<EventPublication> findIncompletePublications() {
List<EventPublication> result = events.findByCompletionDateIsNull().stream() //
.map(it -> JpaEventPublicationAdapter.of(it, serializer)) //
.collect(Collectors.toList());
return result;
}
/*
* (non-Javadoc)
* @see org.springframework.events.EventPublicationRegistry#markCompleted(java.lang.Object, org.springframework.events.ListenerId)
*/
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void markCompleted(Object event, PublicationTargetIdentifier listener) {
Assert.notNull(event, "Domain event must not be null!");
Assert.notNull(listener, "Listener identifier must not be null!");
events.findBySerializedEventAndListenerId(serializer.serialize(event), listener.toString()) //
.map(JpaEventPublicationRegistry::LOGCompleted) //
.ifPresent(it -> events.update(it.markCompleted()));
}
/*
* (non-Javadoc)
* @see org.springframework.beans.factory.DisposableBean#destroy()
*/
@Override
public void destroy() throws Exception {
List<JpaEventPublication> publications = events.findByCompletionDateIsNull();
if (publications.isEmpty()) {
LOG.info("No publications outstanding!");
return;
}
LOG.info("Shutting down with the following publications left unfinished:");
for (int i = 0; i < publications.size(); i++) {
String prefix = (i + 1) == publications.size() ? "└─" : "├─";
JpaEventPublication it = publications.get(i);
LOG.info("{} {} - {} - {}", prefix, it.getId(), it.getEventType().getName(), it.getListenerId());
}
}
private JpaEventPublication map(EventPublication publication) {
JpaEventPublication result = JpaEventPublication.builder() //
.eventType(publication.getEvent().getClass()) //
.publicationDate(publication.getPublicationDate()) //
.listenerId(publication.getTargetIdentifier().toString()) //
.serializedEvent(serializer.serialize(publication.getEvent()).toString()) //
.build();
LOG.debug("Registering publication of {} with id {} for {}.", //
result.getEventType(), result.getId(), result.getListenerId());
return result;
}
private static JpaEventPublication LOGCompleted(JpaEventPublication publication) {
LOG.debug("Marking publication of event {} with id {} to listener {} completed.", //
publication.getEventType(), publication.getId(), publication.getListenerId());
return publication;
}
@EqualsAndHashCode
@RequiredArgsConstructor(staticName = "of")
static class JpaEventPublicationAdapter implements EventPublication {
private final JpaEventPublication publication;
private final EventSerializer serializer;
/*
* (non-Javadoc)
* @see org.springframework.events.EventPublication#getEvent()
*/
@Override
public Object getEvent() {
return serializer.deserialize(publication.getSerializedEvent(), publication.getEventType());
}
/*
* (non-Javadoc)
* @see org.springframework.events.EventPublication#getListenerId()
*/
@Override
public PublicationTargetIdentifier getTargetIdentifier() {
return PublicationTargetIdentifier.of(publication.getListenerId());
}
/*
* (non-Javadoc)
* @see org.springframework.events.EventPublication#getPublicationDate()
*/
@Override
public Instant getPublicationDate() {
return publication.getPublicationDate();
}
}
}

View File

@@ -15,72 +15,135 @@
*/ */
package org.springframework.modulith.events.jpa; package org.springframework.modulith.events.jpa;
import jakarta.persistence.EntityManager; import java.time.Instant;
import jakarta.persistence.TypedQuery;
import lombok.RequiredArgsConstructor;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import jakarta.persistence.EntityManager;
import jakarta.persistence.TypedQuery;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventPublicationRepository;
import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.events.PublicationTargetIdentifier;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
/** /**
* Repository to store {@link JpaEventPublication}s. * Repository to store {@link EventPublication}s.
* *
* @author Oliver Drotbohm * @author Oliver Drotbohm, Dmitry Belyaev, Björn Kieling
*/ */
@RequiredArgsConstructor @RequiredArgsConstructor
public class JpaEventPublicationRepository { public class JpaEventPublicationRepository implements EventPublicationRepository {
private final EntityManager entityManager; private final EntityManager entityManager;
private final EventSerializer serializer;
@Override
@Transactional @Transactional
JpaEventPublication create(JpaEventPublication publication) { public EventPublication create(EventPublication publication) {
entityManager.persist(publication); entityManager.persist(domainToEntity(publication));
return publication; return publication;
} }
@Override
@Transactional @Transactional
JpaEventPublication update(JpaEventPublication publication) { public EventPublication updateCompletionDate(CompletableEventPublication publication) {
entityManager.merge(publication);
entityManager.flush();
findEntityBySerializedEventAndListenerId(publication.getEvent(),
publication.getTargetIdentifier().getValue()).ifPresent(entity -> {
entity.setCompletionDate(publication.getCompletionDate().orElse(null));
entityManager.flush();
});
return publication; return publication;
} }
/** @Override
* Returns all {@link JpaEventPublication} that have not been completed yet.
*/
@Transactional(readOnly = true) @Transactional(readOnly = true)
List<JpaEventPublication> findByCompletionDateIsNull() { public List<EventPublication> findByCompletionDateIsNull() {
String query = "select p from JpaEventPublication p where p.completionDate is null"; String query = "select p from JpaEventPublication p where p.completionDate is null";
return entityManager.createQuery(query, JpaEventPublication.class).getResultList(); return entityManager.createQuery(query, JpaEventPublication.class).getResultList().stream()
.map(this::entityToDomain).collect(Collectors.toList());
} }
/** @Override
* Return the {@link JpaEventPublication} for the given serialized event and listener identifier.
*
* @param event must not be {@literal null}.
* @param listenerId must not be {@literal null}.
* @return
*/
@Transactional(readOnly = true) @Transactional(readOnly = true)
Optional<JpaEventPublication> findBySerializedEventAndListenerId(Object event, String listenerId) { public Optional<EventPublication> findByEventAndTargetIdentifier(Object event,
PublicationTargetIdentifier targetIdentifier) {
Optional<JpaEventPublication> result = findEntityBySerializedEventAndListenerId(event, targetIdentifier.getValue());
return result.map(this::entityToDomain);
}
private Optional<JpaEventPublication> findEntityBySerializedEventAndListenerId(Object event, String listenerId) {
String query = "select p from JpaEventPublication p where p.serializedEvent = ?1 and p.listenerId = ?2"; String query = "select p from JpaEventPublication p where p.serializedEvent = ?1 and p.listenerId = ?2";
String serializedEvent = serializeEvent(event);
TypedQuery<JpaEventPublication> typedQuery = entityManager.createQuery(query, JpaEventPublication.class) TypedQuery<JpaEventPublication> typedQuery = entityManager.createQuery(query, JpaEventPublication.class)
.setParameter(1, event) .setParameter(1, serializedEvent).setParameter(2, listenerId);
.setParameter(2, listenerId); JpaEventPublication resultEntity = typedQuery.getSingleResult();
return Optional.ofNullable(resultEntity);
}
try { private String serializeEvent(Object event) {
return Optional.of(typedQuery.getSingleResult()); return serializer.serialize(event).toString();
} catch (Exception o_O) { }
return Optional.empty();
private JpaEventPublication domainToEntity(EventPublication domain) {
String serializedEvent = serializeEvent(domain.getEvent());
return JpaEventPublication.builder().id(UUID.randomUUID()).publicationDate(domain.getPublicationDate())
.listenerId(domain.getTargetIdentifier().getValue()).serializedEvent(serializedEvent)
.eventType(domain.getEvent().getClass()).build();
}
private CompletableEventPublication entityToDomain(JpaEventPublication entity) {
return JpaEventPublicationAdapter.of(entity, serializer);
}
@EqualsAndHashCode
@RequiredArgsConstructor(staticName = "of")
private static class JpaEventPublicationAdapter implements CompletableEventPublication {
private final JpaEventPublication publication;
private final EventSerializer serializer;
@Override
public Object getEvent() {
return serializer.deserialize(publication.getSerializedEvent(), publication.getEventType());
}
@Override
public PublicationTargetIdentifier getTargetIdentifier() {
return PublicationTargetIdentifier.of(publication.getListenerId());
}
@Override
public Instant getPublicationDate() {
return publication.getPublicationDate();
}
@Override
public Optional<Instant> getCompletionDate() {
return Optional.ofNullable(publication.getCompletionDate());
}
@Override
public boolean isPublicationCompleted() {
return publication.getCompletionDate() != null;
}
@Override
public CompletableEventPublication markCompleted() {
publication.markCompleted();
return this;
} }
} }
} }

View File

@@ -16,60 +16,36 @@
package org.springframework.modulith.events.jpa; package org.springframework.modulith.events.jpa;
import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import jakarta.persistence.EntityManager; import example.ExampleApplication;
import jakarta.persistence.TypedQuery;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.modulith.events.EventSerializer; import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.events.EventPublicationRegistry;
import org.springframework.test.context.TestConstructor; import org.springframework.test.context.TestConstructor;
import org.springframework.test.context.TestConstructor.AutowireMode; import org.springframework.test.context.TestConstructor.AutowireMode;
import org.springframework.test.context.junit.jupiter.SpringExtension;
/** /**
* @author Oliver Drotbohm * @author Oliver Drotbohm, Dmitry Belyaev, Björn Kieling
*/ */
@ExtendWith(SpringExtension.class) @SpringBootTest(classes = ExampleApplication.class)
@TestConstructor(autowireMode = AutowireMode.ALL) @TestConstructor(autowireMode = AutowireMode.ALL)
@RequiredArgsConstructor @RequiredArgsConstructor
class JpaEventPublicationConfigurationIntegrationTests { class JpaEventPublicationConfigurationIntegrationTests {
private final ApplicationContext context; private final ApplicationContext context;
@Configuration @MockBean
@Import(JpaEventPublicationConfiguration.class) private EventSerializer serializer;
static class TestConfig {
@Bean
EventSerializer eventSerializer() {
return mock(EventSerializer.class);
}
@Bean
EntityManager entityManager() {
EntityManager em = mock(EntityManager.class);
// Mock API for query executed at bootstrap time
TypedQuery<?> query = mock(TypedQuery.class);
doReturn(query).when(em).createQuery(any(String.class), any());
return em;
}
}
@Test @Test
void bootstrapsApplicationComponents() { void bootstrapsApplicationComponents() {
assertThat(context.getBean(JpaEventPublicationRegistry.class)).isNotNull(); assertThat(context.getBean(EventPublicationRegistry.class)).isNotNull();
assertThat(context.getBean(JpaEventPublicationRepository.class)).isNotNull(); assertThat(context.getBean(JpaEventPublicationRepository.class)).isNotNull();
} }
} }

View File

@@ -15,14 +15,14 @@
*/ */
package org.springframework.modulith.events.jpa; package org.springframework.modulith.events.jpa;
import static org.assertj.core.api.Assertions.*; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
import jakarta.persistence.EntityManager; import jakarta.persistence.EntityManager;
import jakarta.persistence.EntityManagerFactory; import jakarta.persistence.EntityManagerFactory;
import lombok.RequiredArgsConstructor;
import java.time.Instant;
import javax.sql.DataSource; import javax.sql.DataSource;
@@ -34,7 +34,10 @@ import org.springframework.context.annotation.Import;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventSerializer; import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.events.PublicationTargetIdentifier;
import org.springframework.orm.jpa.JpaTransactionManager; import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.SharedEntityManagerCreator; import org.springframework.orm.jpa.SharedEntityManagerCreator;
@@ -45,8 +48,10 @@ import org.springframework.test.context.TestConstructor.AutowireMode;
import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import lombok.RequiredArgsConstructor;
/** /**
* @author Oliver Drotbohm * @author Oliver Drotbohm, Dmitry Belyaev, Björn Kieling
*/ */
@ExtendWith(SpringExtension.class) @ExtendWith(SpringExtension.class)
@TestConstructor(autowireMode = AutowireMode.ALL) @TestConstructor(autowireMode = AutowireMode.ALL)
@@ -54,13 +59,17 @@ import org.springframework.transaction.annotation.Transactional;
@RequiredArgsConstructor @RequiredArgsConstructor
class JpaEventPublicationRepositoryIntegrationTests { class JpaEventPublicationRepositoryIntegrationTests {
private static final PublicationTargetIdentifier TARGET_IDENTIFIER = PublicationTargetIdentifier.of("listener");
private static final EventSerializer eventSerializer = mock(EventSerializer.class);
@Configuration @Configuration
@Import(JpaEventPublicationConfiguration.class) @Import(JpaEventPublicationConfiguration.class)
static class TestConfig { static class TestConfig {
@Bean @Bean
EventSerializer eventSerializer() { EventSerializer eventSerializer() {
return mock(EventSerializer.class); return eventSerializer;
} }
// Database // Database
@@ -102,17 +111,34 @@ class JpaEventPublicationRepositoryIntegrationTests {
@Test @Test
void persistsJpaEventPublication() { void persistsJpaEventPublication() {
JpaEventPublication publication = JpaEventPublication.of(Instant.now(), "listener", "", Object.class); TestEvent testEvent = new TestEvent("abc");
String serializedEvent = "{\"eventId\":\"abc\"}";
when(eventSerializer.serialize(testEvent)).thenReturn(serializedEvent);
when(eventSerializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
CompletableEventPublication publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
// Store publication // Store publication
repository.create(publication); repository.create(publication);
assertThat(repository.findByCompletionDateIsNull()).containsExactly(publication); List<EventPublication> eventPublications = repository.findByCompletionDateIsNull();
assertThat(repository.findBySerializedEventAndListenerId("", "listener")).isPresent(); assertThat(eventPublications).hasSize(1);
assertThat(eventPublications.get(0).getEvent()).isEqualTo(publication.getEvent());
assertThat(eventPublications.get(0).getTargetIdentifier()).isEqualTo(publication.getTargetIdentifier());
assertThat(repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER)).isPresent();
// Complete publication // Complete publication
repository.update(publication.markCompleted()); repository.updateCompletionDate(publication.markCompleted());
assertThat(repository.findByCompletionDateIsNull()).isEmpty(); assertThat(repository.findByCompletionDateIsNull()).isEmpty();
} }
private static final class TestEvent {
private final String eventId;
private TestEvent(String eventId) {
this.eventId = eventId;
}
}
} }

View File

@@ -44,6 +44,11 @@
<artifactId>spring-orm</artifactId> <artifactId>spring-orm</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>