GH-3 - Polishing.

A couple of renames in the exposed APIs. MapBasedEventPublicationRepository now uses a a TreeMap again instead of a List (closer to what the original EventPublicationRegistry did).

Cleanups in the JDBC event registry module. Polish pom.xml to minimize dependencies. Removed custom DatabaseType in favor of Boot's DatabaseDriver. Tweaked the auto-configuration of the JDBC module to not even expose a DatabaseSchemaInitializer in the first place to avoid having to wire the boolean flag into the latter.

Cleanups in test cases to avoid code duplication. Move to @JdbcTest to avoid having to depend on Spring Data JDBC.

Enable nullability checks, formatting, Javadoc, author tags, missing license headers.
This commit is contained in:
Oliver Drotbohm
2022-07-22 22:19:47 +02:00
parent 6a4260471e
commit 3899d9d59c
33 changed files with 744 additions and 934 deletions

View File

@@ -1,3 +1,4 @@
lombok.nonNull.exceptionType = IllegalArgumentException
lombok.log.fieldName = LOG
lombok.addLombokGeneratedAnnotation = true
lombok.accessors.chain=true

View File

@@ -24,6 +24,7 @@ import java.util.Optional;
* @author Oliver Drotbohm
*/
public interface CompletableEventPublication extends EventPublication {
/**
* Returns the completion date of the publication.
*

View File

@@ -15,6 +15,10 @@
*/
package org.springframework.modulith.events;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.stream.Stream;
@@ -24,15 +28,13 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* A registry to capture event publications to {@link ApplicationListener}s. Allows to register those publications, mark
* them as completed and lookup incomplete publications.
*
* @author Oliver Drotbohm, Björn Kieling, Dmitry Belyaev
* @author Oliver Drotbohm
* @author Björn Kieling
* @author Dmitry Belyaev
*/
@Slf4j
@RequiredArgsConstructor
@@ -40,33 +42,19 @@ public class DefaultEventPublicationRegistry implements DisposableBean, EventPub
private final @NonNull EventPublicationRepository events;
/**
* Stores {@link EventPublication}s for the given event and {@link ApplicationListener}s.
*
* @param event must not be {@literal null}.
* @param listeners must not be {@literal null}.
*/
@Override
public void store(Object event, Stream<PublicationTargetIdentifier> listeners) {
listeners.map(it -> map(event, it))
.forEach(events::create);
}
/**
* Returns all {@link EventPublication}s that have not been completed yet.
*
* @return will never be {@literal null}.
*/
@Override
public Iterable<EventPublication> findIncompletePublications() {
return events.findByCompletionDateIsNull();
return events.findIncompletePublications();
}
/**
* Marks the publication for the given event and {@link PublicationTargetIdentifier} as completed.
*
* @param event must not be {@literal null}.
* @param targetIdentifier must not be {@literal null}.
*/
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void markCompleted(Object event, PublicationTargetIdentifier targetIdentifier) {
@@ -74,15 +62,15 @@ public class DefaultEventPublicationRegistry implements DisposableBean, EventPub
Assert.notNull(targetIdentifier, "Listener identifier must not be null!");
events.findByEventAndTargetIdentifier(event, targetIdentifier) //
.map(DefaultEventPublicationRegistry::LOGCompleted) //
.map(DefaultEventPublicationRegistry::logCompleted) //
.map(e -> CompletableEventPublication.of(e.getEvent(), e.getTargetIdentifier()))
.ifPresent(it -> events.updateCompletionDate(it.markCompleted()));
.ifPresent(it -> events.update(it.markCompleted()));
}
@Override
public void destroy() {
List<EventPublication> publications = events.findByCompletionDateIsNull();
List<EventPublication> publications = events.findIncompletePublications();
if (publications.isEmpty()) {
@@ -111,7 +99,7 @@ public class DefaultEventPublicationRegistry implements DisposableBean, EventPub
return result;
}
private static EventPublication LOGCompleted(EventPublication publication) {
private static EventPublication logCompleted(EventPublication publication) {
LOG.debug("Marking publication of event {} to listener {} completed.", //
publication.getEvent().getClass().getName(), publication.getTargetIdentifier().getValue());

View File

@@ -24,9 +24,12 @@ import org.springframework.util.Assert;
/**
* An event publication.
*
* @author Oliver Drotbohm, Björn Kieling, Dmitry Belyaev
* @author Oliver Drotbohm
* @author Björn Kieling
* @author Dmitry Belyaev
*/
public interface EventPublication extends Comparable<EventPublication> {
/**
* Returns the event that is published.
*

View File

@@ -23,7 +23,9 @@ import org.springframework.context.ApplicationListener;
* A registry to capture event publications to {@link ApplicationListener}s. Allows to register those publications, mark
* them as completed and lookup incomplete publications.
*
* @author Oliver Drotbohm, Björn Kieling, Dmitry Belyaev
* @author Oliver Drotbohm
* @author Björn Kieling
* @author Dmitry Belyaev
*/
public interface EventPublicationRegistry {

View File

@@ -1,3 +1,18 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events;
import java.util.List;
@@ -6,25 +21,41 @@ import java.util.Optional;
/**
* Repository to store {@link EventPublication}s.
*
* @author Björn Kieling, Dmitry Belyaev
* @author Björn Kieling
* @author Dmitry Belyaev
* @author Oliver Drotbohm
*/
public interface EventPublicationRepository {
EventPublication create(EventPublication publication);
/**
* Persists the given {@link EventPublication}.
*
* @param publication must not be {@literal null}.
* @return will never be {@literal null}.
*/
EventPublication create(EventPublication publication);
EventPublication updateCompletionDate(CompletableEventPublication publication);
/**
* Update the data store to mark the backing log entry as completed.
*
* @param publication must not be {@literal null}.
* @return will never be {@literal null}.
*/
EventPublication update(CompletableEventPublication publication);
/**
* Returns all {@link EventPublication} that have not been completed yet.
*/
List<EventPublication> findByCompletionDateIsNull();
/**
* Returns all {@link EventPublication} that have not been completed yet.
*
* @return will never be {@literal null}.
*/
List<EventPublication> findIncompletePublications();
/**
* Return the {@link EventPublication} for the given serialized event and listener identifier.
*
* @param event must not be {@literal null}.
* @param targetIdentifier must not be {@literal null}.
* @return
*/
Optional<EventPublication> findByEventAndTargetIdentifier(Object event, PublicationTargetIdentifier targetIdentifier);
/**
* Return the {@link EventPublication} for the given serialized event and listener identifier.
*
* @param event must not be {@literal null}.
* @param targetIdentifier must not be {@literal null}.
* @return will never be {@literal null}.
*/
Optional<EventPublication> findByEventAndTargetIdentifier(Object event, PublicationTargetIdentifier targetIdentifier);
}

View File

@@ -24,7 +24,7 @@ public interface EventSerializer {
* Serializes the given event into a storable format.
*
* @param event must not be {@literal null}.
* @return
* @return will never be {@literal null}.
*/
Object serialize(Object event);
@@ -33,7 +33,7 @@ public interface EventSerializer {
*
* @param serialized must not be {@literal null}.
* @param type must not be {@literal null}.
* @return
* @return will never be {@literal null}.
*/
<T> T deserialize(Object serialized, Class<T> type);
}

View File

@@ -27,7 +27,7 @@ import lombok.Value;
@RequiredArgsConstructor(staticName = "of")
public class PublicationTargetIdentifier {
private String value;
String value;
/*
* (non-Javadoc)

View File

@@ -23,33 +23,34 @@ import org.springframework.modulith.events.DefaultEventPublicationRegistry;
import org.springframework.modulith.events.EventPublicationRegistry;
import org.springframework.modulith.events.EventPublicationRepository;
import org.springframework.modulith.events.support.CompletionRegisteringBeanPostProcessor;
import org.springframework.modulith.events.support.MapBackedEventPublicationRepository;
import org.springframework.modulith.events.support.MapEventPublicationRepository;
import org.springframework.modulith.events.support.PersistentApplicationEventMulticaster;
/**
* @author Oliver Drotbohm, Björn Kieling, Dmitry Belyaev
* @author Oliver Drotbohm
* @author Björn Kieling
* @author Dmitry Belyaev
*/
@Configuration(proxyBeanMethods = false)
class EventPublicationConfiguration {
@Bean
EventPublicationRegistry eventPublicationRegistry(
ObjectProvider<EventPublicationRepository> repositoryProvider) {
@Bean
EventPublicationRegistry eventPublicationRegistry(
ObjectProvider<EventPublicationRepository> repositoryProvider) {
return new DefaultEventPublicationRegistry(
repositoryProvider.getIfAvailable(MapBackedEventPublicationRepository::new)
);
}
return new DefaultEventPublicationRegistry(
repositoryProvider.getIfAvailable(MapEventPublicationRepository::new));
}
@Bean
PersistentApplicationEventMulticaster applicationEventMulticaster(
EventPublicationRegistry eventPublicationRegistry) {
@Bean
PersistentApplicationEventMulticaster applicationEventMulticaster(
EventPublicationRegistry eventPublicationRegistry) {
return new PersistentApplicationEventMulticaster(() -> eventPublicationRegistry);
}
return new PersistentApplicationEventMulticaster(() -> eventPublicationRegistry);
}
@Bean
static CompletionRegisteringBeanPostProcessor bpp(ObjectFactory<EventPublicationRegistry> store) {
return new CompletionRegisteringBeanPostProcessor(store::getObject);
}
@Bean
static CompletionRegisteringBeanPostProcessor bpp(ObjectFactory<EventPublicationRegistry> store) {
return new CompletionRegisteringBeanPostProcessor(store::getObject);
}
}

View File

@@ -1,65 +0,0 @@
/*
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.support;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventPublicationRepository;
import org.springframework.modulith.events.PublicationTargetIdentifier;
/**
* Map based {@link EventPublicationRepository}, for testing purposes only.
*
* @author Björn Kieling, Dmitry Belyaev
*/
public class MapBackedEventPublicationRepository implements EventPublicationRepository {
private final List<CompletableEventPublication> events = new ArrayList<>();
@Override
public EventPublication create(EventPublication publication) {
events.add(CompletableEventPublication.of(publication.getEvent(), publication.getTargetIdentifier()));
return publication;
}
@Override
public EventPublication updateCompletionDate(CompletableEventPublication publication) {
findByEventAndTargetIdentifier(publication.getEvent(), publication.getTargetIdentifier())
.ifPresent(eventPublication -> ((CompletableEventPublication) eventPublication).markCompleted());
return publication;
}
@Override
public List<EventPublication> findByCompletionDateIsNull() {
return events.stream()
.filter(publication -> !publication.isPublicationCompleted())
.collect(Collectors.toList());
}
@Override
public Optional<EventPublication> findByEventAndTargetIdentifier(Object event, PublicationTargetIdentifier targetIdentifier) {
return events.stream()
.filter(publication ->
publication.equals(publication.getEvent()) && publication.getTargetIdentifier().equals(targetIdentifier))
.map(EventPublication.class::cast)
.findAny();
}
}

View File

@@ -0,0 +1,89 @@
/*
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.support;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventPublicationRepository;
import org.springframework.modulith.events.PublicationTargetIdentifier;
/**
* Map based {@link EventPublicationRepository}, for testing purposes only.
*
* @author Oliver Drotbohm
* @author Björn Kieling
* @author Dmitry Belyaev
*/
public class MapEventPublicationRepository implements EventPublicationRepository {
private final Map<Key, CompletableEventPublication> events = new TreeMap<>();
@Override
public EventPublication create(EventPublication publication) {
return events.computeIfAbsent(Key.of(publication),
it -> CompletableEventPublication.of(it.getEvent(), it.getIdentifier()));
}
@Override
public EventPublication update(CompletableEventPublication publication) {
var result = events.computeIfPresent(Key.of(publication), (__, it) -> it.markCompleted());
if (result == null) {
throw new IllegalArgumentException("Couldn't find publication %s!".formatted(publication));
}
return result;
}
@Override
public List<EventPublication> findIncompletePublications() {
return events.values().stream() //
.filter(it -> !it.isPublicationCompleted())
.map(EventPublication.class::cast)
.toList();
}
@Override
public Optional<EventPublication> findByEventAndTargetIdentifier(Object event,
PublicationTargetIdentifier targetIdentifier) {
return Optional.ofNullable(events.get(new Key(event, targetIdentifier)));
}
@Value
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
private static class Key {
Object event;
PublicationTargetIdentifier identifier;
static Key of(EventPublication publication) {
return new Key(publication.getEvent(), publication.getTargetIdentifier());
}
}
}

View File

@@ -123,8 +123,8 @@ public class PersistentApplicationEventMulticaster extends AbstractApplicationEv
});
}
private ApplicationListener<ApplicationEvent> executeListenerWithCompletion(
EventPublication publication, TransactionalApplicationListener<ApplicationEvent> listener) {
private ApplicationListener<ApplicationEvent> executeListenerWithCompletion(EventPublication publication,
TransactionalApplicationListener<ApplicationEvent> listener) {
listener.processEvent(publication.getApplicationEvent());

View File

@@ -15,15 +15,16 @@
*/
package org.springframework.modulith.events;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.*;
import org.junit.jupiter.api.Test;
/**
* @author Oliver Drotbohm, Björn Kieling, Dmitry Belyaev
* @author Oliver Drotbohm
* @author Björn Kieling
* @author Dmitry Belyaev
*/
class CompletableEventPublicationTest {
class CompletableEventPublicationUnitTests {
@Test
void rejectsNullEvent() {

View File

@@ -34,7 +34,7 @@ import org.springframework.transaction.event.TransactionalEventListener;
*
* @author Oliver Drotbohm
*/
class CompletionRegisteringBeanPostProcessorUnitTest {
class CompletionRegisteringBeanPostProcessorUnitTests {
EventPublicationRegistry registry = mock(EventPublicationRegistry.class);
BeanPostProcessor processor = new CompletionRegisteringBeanPostProcessor(() -> registry);

View File

@@ -12,7 +12,6 @@
<artifactId>spring-modulith-events-jdbc</artifactId>
<properties>
<java.version>17</java.version>
<module.name>org.springframework.modulith.events.jdbc</module.name>
</properties>
@@ -23,71 +22,43 @@
<artifactId>spring-modulith-events-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-jdbc</artifactId>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>
<!--
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<optional>true</optional>
</dependency>
-->
<!-- Testing -->
<!-- <dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
<scope>test</scope>
</dependency>-->
<!--
<dependency>
<groupId>jakarta.transaction</groupId>
<artifactId>jakarta.transaction-api</artifactId>
<scope>test</scope>
</dependency>
-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>spring-milestone</id>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
</project>

View File

@@ -15,62 +15,65 @@
*/
package org.springframework.modulith.events.jdbc;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.jdbc.DatabaseDriver;
import org.springframework.context.ResourceLoaderAware;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.support.MetaDataAccessException;
import org.springframework.util.FileCopyUtils;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import org.springframework.util.StreamUtils;
/**
* Initializes the DB schema used to store events
*
* @author Dmitry Belyaev, Björn Kieling
* @author Dmitry Belyaev
* @author Björn Kieling
* @author Oliver Drotbohm
*/
public class DatabaseSchemaInitializer implements ResourceLoaderAware, InitializingBean {
class DatabaseSchemaInitializer implements ResourceLoaderAware, InitializingBean {
private final JdbcTemplate jdbcTemplate;
private final JdbcTemplate jdbcTemplate;
private ResourceLoader resourceLoader;
private ResourceLoader resourceLoader;
@Value("${spring.modulith.events.schema-initialization.enabled:false}")
private boolean initEnabled;
/**
* Creates a new {@link DatabaseSchemaInitializer} for the given {@link JdbcTemplate} and ini
*
* @param jdbcTemplate
* @param initEnabled
*/
public DatabaseSchemaInitializer(JdbcTemplate jdbcTemplate) {
public DatabaseSchemaInitializer(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
this.jdbcTemplate = jdbcTemplate;
}
@Override
public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
}
@Override
public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
}
@Override
public void afterPropertiesSet() throws MetaDataAccessException {
if (!initEnabled) {
return;
}
@Override
public void afterPropertiesSet() throws MetaDataAccessException {
DatabaseType databaseType = DatabaseType.fromMetaData(jdbcTemplate.getDataSource());
String databaseName = databaseType.name().toLowerCase();
var schemaDdlResource = resourceLoader.getResource("/schema-" + databaseName + ".sql");
var schemaDdl = asString(schemaDdlResource);
jdbcTemplate.execute(schemaDdl);
}
var fromDataSource = DatabaseDriver.fromDataSource(jdbcTemplate.getDataSource());
var databaseName = fromDataSource.name().toLowerCase();
var schemaDdlResource = resourceLoader.getResource("/schema-" + databaseName + ".sql");
var schemaDdl = asString(schemaDdlResource);
private String asString(Resource resource) {
try (Reader reader = new InputStreamReader(resource.getInputStream(), StandardCharsets.UTF_8)) {
return FileCopyUtils.copyToString(reader);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
jdbcTemplate.execute(schemaDdl);
}
private String asString(Resource resource) {
try {
return StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

View File

@@ -1,112 +0,0 @@
/*
* Copyright 2006-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.jdbc;
import org.springframework.jdbc.support.JdbcUtils;
import org.springframework.jdbc.support.MetaDataAccessException;
import org.springframework.util.StringUtils;
import javax.sql.DataSource;
import java.sql.DatabaseMetaData;
import java.util.HashMap;
import java.util.Map;
/**
* Enum representing a database type, such as DB2 or oracle. The type also contains a
* product name, which is expected to be the same as the product name provided by the
* database driver's metadata.
*
* @author Lucas Ward
*/
public enum DatabaseType {
DERBY("Apache Derby"), DB2("DB2"), DB2VSE("DB2VSE"), DB2ZOS("DB2ZOS"), DB2AS400("DB2AS400"),
HSQL("HSQL Database Engine"), SQLSERVER("Microsoft SQL Server"), MYSQL("MySQL"), ORACLE("Oracle"),
POSTGRES("PostgreSQL"), SYBASE("Sybase"), H2("H2"), SQLITE("SQLite"), HANA("HDB");
private static final Map<String, DatabaseType> nameMap;
static {
nameMap = new HashMap<>();
for (DatabaseType type : values()) {
nameMap.put(type.getProductName(), type);
}
}
// A description is necessary due to the nature of database descriptions
// in metadata.
private final String productName;
private DatabaseType(String productName) {
this.productName = productName;
}
public String getProductName() {
return productName;
}
/**
* Static method to obtain a DatabaseType from the provided product name.
* @param productName {@link String} containing the product name.
* @return the {@link DatabaseType} for given product name.
* @throws IllegalArgumentException if none is found.
*/
public static DatabaseType fromProductName(String productName) {
if (productName.equals("MariaDB"))
productName = "MySQL";
if (!nameMap.containsKey(productName)) {
throw new IllegalArgumentException("DatabaseType not found for product name: [" + productName + "]");
}
else {
return nameMap.get(productName);
}
}
/**
* Convenience method that pulls a database product name from the DataSource's
* metadata.
* @param dataSource {@link DataSource} to the database to be used.
* @return {@link DatabaseType} for the {@link DataSource} specified.
* @throws MetaDataAccessException thrown if error occured during Metadata lookup.
*/
public static DatabaseType fromMetaData(DataSource dataSource) throws MetaDataAccessException {
String databaseProductName = JdbcUtils.extractDatabaseMetaData(dataSource,
DatabaseMetaData::getDatabaseProductName);
if (StringUtils.hasText(databaseProductName) && databaseProductName.startsWith("DB2")) {
String databaseProductVersion = JdbcUtils.extractDatabaseMetaData(dataSource,
DatabaseMetaData::getDatabaseProductVersion);
if (databaseProductVersion.startsWith("ARI")) {
databaseProductName = "DB2VSE";
}
else if (databaseProductVersion.startsWith("DSN")) {
databaseProductName = "DB2ZOS";
}
else if (databaseProductName.contains("AS")
&& (databaseProductVersion.startsWith("QSQ") || databaseProductVersion
.substring(databaseProductVersion.indexOf('V')).matches("V\\dR\\d[mM]\\d"))) {
databaseProductName = "DB2AS400";
}
else {
databaseProductName = JdbcUtils.commonDatabaseName(databaseProductName);
}
}
else {
databaseProductName = JdbcUtils.commonDatabaseName(databaseProductName);
}
return fromProductName(databaseProductName);
}
}

View File

@@ -15,6 +15,7 @@
*/
package org.springframework.modulith.events.jdbc;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
@@ -22,22 +23,21 @@ import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.events.config.EventPublicationConfigurationExtension;
/**
* @author Dmitry Belyaev, Björn Kieling
* @author Dmitry Belyaev
* @author Björn Kieling
* @author Oliver Drotbohm
*/
@Configuration(proxyBeanMethods = false)
class JdbcEventPublicationAutoConfiguration implements EventPublicationConfigurationExtension {
@Bean
public JdbcEventPublicationRepository jpaEventPublicationRepository(
JdbcTemplate jdbcTemplate, EventSerializer serializer) {
// TODO Why do we want to instantiate the serializer here and what
// happens if no serializer is available or is not compatible to
// JDBC serialization?
return new JdbcEventPublicationRepository(jdbcTemplate, serializer);
}
@Bean
JdbcEventPublicationRepository jpaEventPublicationRepository(JdbcTemplate jdbcTemplate, EventSerializer serializer) {
return new JdbcEventPublicationRepository(jdbcTemplate, serializer);
}
@Bean
public DatabaseSchemaInitializer databaseSchemaInitializer(JdbcTemplate jdbcTemplate) {
return new DatabaseSchemaInitializer(jdbcTemplate);
}
@Bean
@ConditionalOnProperty(name = "spring.modulith.events.schema-initialization.enabled", havingValue = "true")
DatabaseSchemaInitializer databaseSchemaInitializer(JdbcTemplate jdbcTemplate) {
return new DatabaseSchemaInitializer(jdbcTemplate);
}
}

View File

@@ -15,16 +15,23 @@
*/
package org.springframework.modulith.events.jdbc;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.lang.Nullable;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventPublicationRepository;
@@ -32,19 +39,16 @@ import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.events.PublicationTargetIdentifier;
import org.springframework.transaction.annotation.Transactional;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* Repository to store {@link EventPublication}s.
* JDBC-based repository to store {@link EventPublication}s.
*
* @author Dmitry Belyaev, Björn Kieling
* @author Dmitry Belyaev
* @author Björn Kieling
* @author Oliver Drotbohm
*/
@Slf4j
@RequiredArgsConstructor
public class JdbcEventPublicationRepository implements EventPublicationRepository {
class JdbcEventPublicationRepository implements EventPublicationRepository {
private static final String SQL_STATEMENT_INSERT = """
INSERT INTO EVENT_PUBLICATION (ID, EVENT_TYPE, LISTENER_ID, PUBLICATION_DATE, SERIALIZED_EVENT)
@@ -66,89 +70,120 @@ public class JdbcEventPublicationRepository implements EventPublicationRepositor
ORDER BY PUBLICATION_DATE
""";
private final JdbcTemplate jdbcTemplate;
private final JdbcOperations operations;
private final EventSerializer serializer;
@Override
@Transactional
public EventPublication create(EventPublication publication) {
String serializedEvent = serializeEvent(publication.getEvent());
jdbcTemplate.update(SQL_STATEMENT_INSERT, //
operations.update(SQL_STATEMENT_INSERT, //
UUID.randomUUID(), //
publication.getEvent().getClass().getName(), //
publication.getTargetIdentifier().getValue(), //
publication.getPublicationDate(), //
serializedEvent);
serializeEvent(publication.getEvent()));
return publication;
}
@Override
@Transactional
public EventPublication updateCompletionDate(CompletableEventPublication publication) {
String serializedEvent = serializeEvent(publication.getEvent());
List<UUID> results = jdbcTemplate.query(SQL_STATEMENT_FIND_BY_EVENT_AND_LISTENER_ID,
public EventPublication update(CompletableEventPublication publication) {
var serializedEvent = serializeEvent(publication.getEvent());
var results = operations.query(SQL_STATEMENT_FIND_BY_EVENT_AND_LISTENER_ID,
(rs, rowNum) -> rs.getObject("ID", UUID.class), serializedEvent, publication.getTargetIdentifier().getValue());
if (!results.isEmpty()) {
jdbcTemplate.update(SQL_STATEMENT_UPDATE, publication.getCompletionDate().orElse(null), results.get(0));
operations.update(SQL_STATEMENT_UPDATE, publication.getCompletionDate().orElse(null), results.get(0));
}
return publication;
}
@Override
@Transactional(readOnly = true)
public List<EventPublication> findByCompletionDateIsNull() {
return jdbcTemplate.query(SQL_STATEMENT_FIND_UNCOMPLETED, this::mapResultSetToEventPublications);
}
@Override
@Transactional(readOnly = true)
public Optional<EventPublication> findByEventAndTargetIdentifier(Object event,
PublicationTargetIdentifier targetIdentifier) {
String serializedEvent = serializeEvent(event);
List<EventPublication> results = jdbcTemplate.query(SQL_STATEMENT_FIND_BY_EVENT_AND_LISTENER_ID,
this::mapResultSetToEventPublications, serializedEvent, targetIdentifier.getValue());
if (results.isEmpty()) {
return Optional.empty();
} else {
// if there are several events with exactly the same payload we return the oldest one first
return Optional.of(results.get(0));
}
var results = operations.query(SQL_STATEMENT_FIND_BY_EVENT_AND_LISTENER_ID, this::resultSetToPublications,
serializeEvent(event), targetIdentifier.getValue());
return Optional.ofNullable((results == null) || results.isEmpty() ? null : results.get(0));
}
@Override
@Transactional(readOnly = true)
public List<EventPublication> findIncompletePublications() {
return operations.query(SQL_STATEMENT_FIND_UNCOMPLETED, this::resultSetToPublications);
}
private String serializeEvent(Object event) {
return serializer.serialize(event).toString();
}
private List<EventPublication> mapResultSetToEventPublications(ResultSet rs) throws SQLException {
var result = new ArrayList<EventPublication>();
while (rs.next()) {
entityToDomain(rs).ifPresent(result::add);
/**
* Effectively a {@link ResultSetExtractor} to drop {@link EventPublication}s that cannot be deserialized.
*
* @param resultSet must not be {@literal null}.
* @return will never be {@literal null}.
* @throws SQLException
*/
private List<EventPublication> resultSetToPublications(ResultSet resultSet) throws SQLException {
List<EventPublication> result = new ArrayList<>();
while (resultSet.next()) {
EventPublication publication = resultSetToPublication(resultSet);
if (publication != null) {
result.add(publication);
}
}
return result;
}
private Optional<EventPublication> entityToDomain(ResultSet rs) throws SQLException {
/**
* Effectively a {@link RowMapper} to turn a single row into an {@link EventPublication}.
*
* @param rs must not be {@literal null}.
* @return can be {@literal null}.
* @throws SQLException
*/
@Nullable
private EventPublication resultSetToPublication(ResultSet rs) throws SQLException {
var id = rs.getObject("ID", UUID.class);
var eventClassName = rs.getString("EVENT_TYPE");
Class<?> eventClass;
try {
eventClass = Class.forName(eventClassName);
} catch (ClassNotFoundException e) {
LOG.warn("Event '{}' of unknown type '{}' found", id, eventClassName);
return Optional.empty();
var eventClass = loadClass(id, rs.getString("EVENT_TYPE"));
if (eventClass == null) {
return null;
}
return Optional.of(JdbcEventPublication.builder()
.completionDate(Optional.ofNullable(rs.getTimestamp("COMPLETION_DATE")).map(Timestamp::toInstant).orElse(null))
var completionDate = rs.getTimestamp("COMPLETION_DATE");
return JdbcEventPublication.builder()
.completionDate(completionDate == null ? null : completionDate.toInstant())
.eventType(eventClass) //
.listenerId(rs.getString("LISTENER_ID")) //
.publicationDate(rs.getTimestamp("PUBLICATION_DATE").toInstant()) //
.serializedEvent(rs.getString("SERIALIZED_EVENT")) //
.serializer(serializer) //
.build());
.build();
}
@Nullable
private Class<?> loadClass(UUID id, String className) {
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
LOG.warn("Event '{}' of unknown type '{}' found", id, className);
return null;
}
}
@EqualsAndHashCode
@@ -156,7 +191,7 @@ public class JdbcEventPublicationRepository implements EventPublicationRepositor
private static class JdbcEventPublication implements CompletableEventPublication {
private final UUID id;
private final Instant publicationDate;
private final @Nullable Instant publicationDate;
private final String listenerId;
private final String serializedEvent;
private final Class<?> eventType;

View File

@@ -0,0 +1,2 @@
@org.springframework.lang.NonNullApi
package org.springframework.modulith.events.jdbc;

View File

@@ -0,0 +1,121 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.jdbc;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.util.Optional;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.test.autoconfigure.jdbc.JdbcTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.testapp.TestApplication;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
/**
* @author Dmitry Belyaev
* @author Björn Kieling
* @author Oliver Drotbohm
*/
class DatabaseSchemaInitializerIntegrationTests {
private static final String COUNT_PUBLICATIONS = "SELECT COUNT(*) FROM EVENT_PUBLICATION";
@JdbcTest
@ImportAutoConfiguration(JdbcEventPublicationAutoConfiguration.class)
@ContextConfiguration(classes = TestApplication.class)
static class TestBase {
@MockBean EventSerializer serializer;
}
@Nested
@JdbcTest(properties = "spring.modulith.events.schema-initialization.enabled=true")
static class WithInitEnabled extends TestBase {
@Autowired JdbcOperations operations;
@Autowired Optional<DatabaseSchemaInitializer> initializer;
@Test // GH-3
void doesNotRegisterAnInitializerBean() {
assertThat(initializer).isPresent();
}
@Test // GH-3
void shouldCreateDatabaseSchemaOnStartUp() {
assertThat(operations.queryForObject(COUNT_PUBLICATIONS, Long.class)).isEqualTo(0);
}
}
@Nested
@JdbcTest(properties = "spring.modulith.events.schema-initialization.enabled=false")
static class WithInitDisabled extends TestBase {
@SpyBean JdbcOperations operations;
@Autowired Optional<DatabaseSchemaInitializer> initializer;
@Test // GH-3
void doesNotRegisterAnInitializerBean() {
assertThat(initializer).isEmpty();
}
@Test // GH-3
void shouldNotCreateDatabaseSchemaOnStartUp() {
verify(operations, never()).execute(anyString());
}
}
@Nested
class InitializationDisabledByDefault extends TestBase {
@SpyBean JdbcOperations operations;
@Autowired Optional<DatabaseSchemaInitializer> initializer;
@Test // GH-3
void doesNotRegisterAnInitializerBean() {
assertThat(initializer).isEmpty();
}
@Test // GH-3
void shouldNotCreateDatabaseSchemaOnStartUp() {
verify(operations, never()).execute(anyString());
}
}
@Nested
@ActiveProfiles("hsqldb")
class HSQLDB extends WithInitEnabled {
}
@Nested
@ActiveProfiles("h2")
class H2 extends WithInitEnabled {}
@Nested
@Disabled
@ActiveProfiles("postgres")
class Postgres extends WithInitEnabled {}
}

View File

@@ -15,6 +15,8 @@
*/
package org.springframework.modulith.events.jdbc;
import static org.assertj.core.api.Assertions.*;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@@ -24,24 +26,21 @@ import org.springframework.modulith.events.EventPublicationRegistry;
import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.testapp.TestApplication;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Dmitry Belyaev, Björn Kieling
* @author Dmitry Belyaev
* @author Björn Kieling
* @author Oliver Drotbohm
*/
@SpringBootTest(
classes = TestApplication.class,
properties = "spring.modulith.events.schema-initialization.enabled=true"
)
public class JdbcEventPublicationAutoConfigurationIntegrationTests {
properties = "spring.modulith.events.schema-initialization.enabled=true")
class JdbcEventPublicationAutoConfigurationIntegrationTests {
@Autowired
private ApplicationContext context;
@Autowired ApplicationContext context;
@MockBean
private EventSerializer serializer;
@MockBean EventSerializer serializer;
@Test
@Test // GH-3
void bootstrapsApplicationComponents() {
assertThat(context.getBean(EventPublicationRegistry.class)).isNotNull();

View File

@@ -0,0 +1,193 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.events.jdbc;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import lombok.Value;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.jdbc.JdbcTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.annotation.Import;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.events.PublicationTargetIdentifier;
import org.springframework.modulith.testapp.TestApplication;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
/**
* Integration tests for {@link JdbcEventPublicationRepository}.
*
* @author Dmitry Belyaev
* @author Björn Kieling
* @author Oliver Drotbohm
*/
class JdbcEventPublicationRepositoryIntegrationTests {
static final PublicationTargetIdentifier TARGET_IDENTIFIER = PublicationTargetIdentifier.of("listener");
@JdbcTest
@Import(TestApplication.class)
@ContextConfiguration(classes = JdbcEventPublicationAutoConfiguration.class)
abstract class TestBase {
@Autowired JdbcOperations operations;
@Autowired JdbcEventPublicationRepository repository;
@MockBean EventSerializer serializer;
@BeforeEach
void cleanUp() {
operations.execute("TRUNCATE TABLE EVENT_PUBLICATION");
}
@Test // GH-3
void shouldPersistAndUpdateEventPublication() {
var testEvent = new TestEvent("id");
var serializedEvent = "{\"eventId\":\"id\"}";
when(serializer.serialize(testEvent)).thenReturn(serializedEvent);
when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
var publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
// Store publication
repository.create(publication);
var eventPublications = repository.findIncompletePublications();
assertThat(eventPublications).hasSize(1);
assertThat(eventPublications).element(0).satisfies(it -> {
assertThat(it.getEvent()).isEqualTo(publication.getEvent());
assertThat(it.getTargetIdentifier()).isEqualTo(publication.getTargetIdentifier());
});
assertThat(repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER))
.isPresent();
// Complete publication
repository.update(publication.markCompleted());
assertThat(repository.findIncompletePublications()).isEmpty();
}
@Test // GH-3
void shouldUpdateSingleEventPublication() {
var testEvent1 = new TestEvent("id1");
var testEvent2 = new TestEvent("id2");
var serializedEvent1 = "{\"eventId\":\"id1\"}";
var serializedEvent2 = "{\"eventId\":\"id2\"}";
when(serializer.serialize(testEvent1)).thenReturn(serializedEvent1);
when(serializer.deserialize(serializedEvent1, TestEvent.class)).thenReturn(testEvent1);
when(serializer.serialize(testEvent2)).thenReturn(serializedEvent2);
when(serializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2);
var publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER);
var publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER);
// Store publication
repository.create(publication1);
repository.create(publication2);
// Complete publication
repository.update(publication2.markCompleted());
assertThat(repository.findIncompletePublications()).hasSize(1)
.element(0).extracting(EventPublication::getEvent).isEqualTo(testEvent1);
}
@Test // GH-3
void shouldTolerateEmptyResult() {
var testEvent = new TestEvent("id");
var serializedEvent = "{\"eventId\":\"id\"}";
when(serializer.serialize(testEvent)).thenReturn(serializedEvent);
assertThat(repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER)).isEmpty();
}
@Test // GH-3
void shouldReturnTheOldestEvent() throws Exception {
var testEvent = new TestEvent("id");
var serializedEvent = "{\"eventId\":\"id\"}";
when(serializer.serialize(testEvent)).thenReturn(serializedEvent);
when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
var publicationOld = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
Thread.sleep(10);
var publicationNew = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
repository.create(publicationNew);
repository.create(publicationOld);
var actual = repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER);
assertThat(actual).hasValueSatisfying(it -> {
assertThat(it.getPublicationDate()).isEqualTo(publicationOld.getPublicationDate());
});
}
@Test // GH-3
void shouldSilentlyIgnoreNotSerializableEvents() {
var testEvent = new TestEvent("id");
var serializedEvent = "{\"eventId\":\"id\"}";
when(serializer.serialize(testEvent)).thenReturn(serializedEvent);
when(serializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
// Store publication
repository.create(CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER));
operations.update("UPDATE EVENT_PUBLICATION SET EVENT_TYPE='abc'");
assertThat(repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER)).isEmpty();
}
}
@Nested
@ActiveProfiles("hsqldb")
class HSQL extends TestBase {}
@Nested
@ActiveProfiles("h2")
class H2 extends TestBase {}
@Nested
@Disabled
@ActiveProfiles("postgres")
class Postgres extends TestBase {}
@Value
private static final class TestEvent {
String eventId;
}
}

View File

@@ -1,143 +0,0 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.testapp;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.data.jdbc.DataJdbcTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.context.annotation.Import;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.modulith.events.jdbc.DatabaseSchemaInitializer;
import org.springframework.test.context.ActiveProfiles;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
/**
* @author Dmitry Belyaev, Björn Kieling
*/
public class DatabaseSchemaInitializerTest {
@Nested
@DataJdbcTest(properties = {
"spring.modulith.events.schema-initialization.enabled=true"
})
@Import(DatabaseSchemaInitializer.class)
class InitializationEnabled {
@Autowired
private JdbcTemplate jdbcTemplate;
@Test
public void shouldCreateDatabaseSchemaOnStartUp() {
Long count = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM EVENT_PUBLICATION", Long.class);
assertThat(count).isEqualTo(0);
}
}
@Nested
@DataJdbcTest(properties = {
"spring.modulith.events.schema-initialization.enabled=false"
})
@Import(DatabaseSchemaInitializer.class)
class InitializationDisabled {
@SpyBean
private JdbcTemplate jdbcTemplate;
@Test
public void shouldNotCreateDatabaseSchemaOnStartUp() {
Mockito.verify(jdbcTemplate, Mockito.never()).execute(anyString());
}
}
@Nested
@DataJdbcTest
@Import(DatabaseSchemaInitializer.class)
class InitializationDisabledByDefault {
@SpyBean
private JdbcTemplate jdbcTemplate;
@Test
public void shouldNotCreateDatabaseSchemaOnStartUp() {
Mockito.verify(jdbcTemplate, Mockito.never()).execute(anyString());
}
}
@Nested
@DataJdbcTest(properties = {
"spring.modulith.events.schema-initialization.enabled=true"
})
@ActiveProfiles("hsql")
@Import(DatabaseSchemaInitializer.class)
class InitializationUseHsql {
@Autowired
private JdbcTemplate jdbcTemplate;
@Test
public void shouldCreateDatabaseSchemaOnStartUp() {
Long count = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM EVENT_PUBLICATION", Long.class);
assertThat(count).isEqualTo(0);
}
}
@Nested
@DataJdbcTest(properties = {
"spring.modulith.events.schema-initialization.enabled=true"
})
@ActiveProfiles("h2")
@Import(DatabaseSchemaInitializer.class)
class InitializationUseH2 {
@Autowired
private JdbcTemplate jdbcTemplate;
@Test
public void shouldCreateDatabaseSchemaOnStartUp() {
Long count = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM EVENT_PUBLICATION", Long.class);
assertThat(count).isEqualTo(0);
}
}
@Nested
@Disabled
@DataJdbcTest(properties = {
"spring.modulith.events.schema-initialization.enabled=true"
})
@ActiveProfiles("postgres")
@Import(DatabaseSchemaInitializer.class)
class InitializationUsePostgres {
@Autowired
private JdbcTemplate jdbcTemplate;
@Test
public void shouldCreateDatabaseSchemaOnStartUp() {
Long count = jdbcTemplate.queryForObject("SELECT COUNT(*) FROM EVENT_PUBLICATION", Long.class);
assertThat(count).isEqualTo(0);
}
}
}

View File

@@ -1,319 +0,0 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.modulith.testapp;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.data.jdbc.DataJdbcTest;
import org.springframework.context.annotation.Import;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.events.PublicationTargetIdentifier;
import org.springframework.modulith.events.jdbc.DatabaseSchemaInitializer;
import org.springframework.modulith.events.jdbc.JdbcEventPublicationRepository;
import org.springframework.test.context.ActiveProfiles;
/**
* @author Dmitry Belyaev, Björn Kieling
*/
class JdbcEventPublicationRepositoryIntegrationTests {
private static final PublicationTargetIdentifier TARGET_IDENTIFIER = PublicationTargetIdentifier.of("listener");
private JdbcEventPublicationRepository repository;
private final EventSerializer eventSerializer = mock(EventSerializer.class);
private abstract class TestBase {
@Autowired
protected JdbcTemplate jdbcTemplate;
@BeforeEach
public void setUp() {
repository = new JdbcEventPublicationRepository(jdbcTemplate, eventSerializer);
}
}
@Nested
@DataJdbcTest
@ActiveProfiles("hsql")
@Import(DatabaseSchemaInitializer.class)
class HSQL extends TestBase {
@Nested
class CreateAndUpdate {
@Test
void shouldPersistAndUpdateEventPublication() {
shouldPersistAndUpdateEventPublicationTest();
}
@Test
void shouldUpdateSingleEventPublication() {
shouldUpdateSingleEventPublicationTest();
}
}
@Nested
class FindByCompletionDateIsNull {
@Test
void shouldSilentlyIgnoreNotSerializableEvents() {
shouldSilentlyIgnoreNotSerializableEventsTest(jdbcTemplate);
}
}
@Nested
class FindBySerializedEventAndListenerId {
@Test
void shouldTolerateEmptyResult() {
shouldTolerateEmptyResultTest();
}
@Test
void shouldReturnTheOldestEvent() throws InterruptedException {
shouldReturnTheOldestEventTest();
}
@Test
void shouldSilentlyIgnoreNotSerializableEvents() {
shouldSilentlyIgnoreNotSerializableEventsTest(jdbcTemplate);
}
}
}
@Nested
@DataJdbcTest
@ActiveProfiles("h2")
@Import(DatabaseSchemaInitializer.class)
class H2 extends TestBase {
@Nested
class CreateAndUpdate {
@Test
void shouldPersistAndUpdateEventPublication() {
shouldPersistAndUpdateEventPublicationTest();
}
@Test
void shouldUpdateSingleEventPublication() {
shouldUpdateSingleEventPublicationTest();
}
}
@Nested
class FindByCompletionDateIsNull {
@Test
void shouldSilentlyIgnoreNotSerializableEvents() {
shouldSilentlyIgnoreNotSerializableEventsTest(jdbcTemplate);
}
}
@Nested
class FindBySerializedEventAndListenerId {
@Test
void shouldTolerateEmptyResult() {
shouldTolerateEmptyResultTest();
}
@Test
void shouldReturnTheOldestEvent() throws InterruptedException {
shouldReturnTheOldestEventTest();
}
@Test
void shouldSilentlyIgnoreNotSerializableEvents() {
shouldSilentlyIgnoreNotSerializableEventsTest(jdbcTemplate);
}
}
}
@Nested
@Disabled
@DataJdbcTest
@ActiveProfiles("postgres")
@Import(DatabaseSchemaInitializer.class)
class Postgres extends TestBase {
@Nested
class CreateAndUpdate {
@Test
void shouldPersistAndUpdateEventPublication() {
shouldPersistAndUpdateEventPublicationTest();
}
@Test
void shouldUpdateSingleEventPublication() {
shouldUpdateSingleEventPublicationTest();
}
}
@Nested
class FindByCompletionDateIsNull {
@Test
void shouldSilentlyIgnoreNotSerializableEvents() {
shouldSilentlyIgnoreNotSerializableEventsTest(jdbcTemplate);
}
}
@Nested
class FindBySerializedEventAndListenerId {
@Test
void shouldTolerateEmptyResult() {
shouldTolerateEmptyResultTest();
}
@Test
void shouldReturnTheOldestEvent() throws InterruptedException {
shouldReturnTheOldestEventTest();
}
@Test
void shouldSilentlyIgnoreNotSerializableEvents() {
shouldSilentlyIgnoreNotSerializableEventsTest(jdbcTemplate);
}
}
}
private void shouldPersistAndUpdateEventPublicationTest() {
TestEvent testEvent = new TestEvent("id");
String serializedEvent = "{\"eventId\":\"id\"}";
when(eventSerializer.serialize(testEvent)).thenReturn(serializedEvent);
when(eventSerializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
CompletableEventPublication publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
// Store publication
repository.create(publication);
List<EventPublication> eventPublications = repository.findByCompletionDateIsNull();
assertThat(eventPublications).hasSize(1);
assertThat(eventPublications.get(0).getEvent()).isEqualTo(publication.getEvent());
assertThat(eventPublications.get(0).getTargetIdentifier()).isEqualTo(publication.getTargetIdentifier());
assertThat(repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER))
.isPresent();
// Complete publication
repository.updateCompletionDate(publication.markCompleted());
assertThat(repository.findByCompletionDateIsNull()).isEmpty();
}
private void shouldUpdateSingleEventPublicationTest() {
TestEvent testEvent1 = new TestEvent("id1");
TestEvent testEvent2 = new TestEvent("id2");
String serializedEvent1 = "{\"eventId\":\"id1\"}";
String serializedEvent2 = "{\"eventId\":\"id2\"}";
when(eventSerializer.serialize(testEvent1)).thenReturn(serializedEvent1);
when(eventSerializer.deserialize(serializedEvent1, TestEvent.class)).thenReturn(testEvent1);
when(eventSerializer.serialize(testEvent2)).thenReturn(serializedEvent2);
when(eventSerializer.deserialize(serializedEvent2, TestEvent.class)).thenReturn(testEvent2);
CompletableEventPublication publication1 = CompletableEventPublication.of(testEvent1, TARGET_IDENTIFIER);
CompletableEventPublication publication2 = CompletableEventPublication.of(testEvent2, TARGET_IDENTIFIER);
// Store publication
repository.create(publication1);
repository.create(publication2);
// Complete publication
repository.updateCompletionDate(publication2.markCompleted());
List<EventPublication> withCompletionDateNull = repository.findByCompletionDateIsNull();
assertThat(withCompletionDateNull).hasSize(1);
assertThat(withCompletionDateNull.get(0).getEvent()).isEqualTo(testEvent1);
}
private void shouldTolerateEmptyResultTest() {
TestEvent testEvent = new TestEvent("id");
String serializedEvent = "{\"eventId\":\"id\"}";
when(eventSerializer.serialize(testEvent)).thenReturn(serializedEvent);
Optional<EventPublication> actual =
repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER);
assertThat(actual).isEmpty();
}
private void shouldReturnTheOldestEventTest() throws InterruptedException {
TestEvent testEvent = new TestEvent("id");
String serializedEvent = "{\"eventId\":\"id\"}";
when(eventSerializer.serialize(testEvent)).thenReturn(serializedEvent);
when(eventSerializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
CompletableEventPublication publicationOld = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
Thread.sleep(10);
CompletableEventPublication publicationNew = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
repository.create(publicationNew);
repository.create(publicationOld);
Optional<EventPublication> actual =
repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER);
assertThat(actual).isNotEmpty();
assertThat(actual.get().getPublicationDate()).isEqualTo(publicationOld.getPublicationDate());
}
private void shouldSilentlyIgnoreNotSerializableEventsTest(JdbcTemplate jdbcTemplate) {
TestEvent testEvent = new TestEvent("id");
String serializedEvent = "{\"eventId\":\"id\"}";
when(eventSerializer.serialize(testEvent)).thenReturn(serializedEvent);
when(eventSerializer.deserialize(serializedEvent, TestEvent.class)).thenReturn(testEvent);
CompletableEventPublication publication = CompletableEventPublication.of(testEvent, TARGET_IDENTIFIER);
// Store publication
repository.create(publication);
jdbcTemplate.update("UPDATE EVENT_PUBLICATION SET EVENT_TYPE='abc'");
Optional<EventPublication> actual =
repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER);
assertThat(actual).isEmpty();
}
private static final class TestEvent {
private final String eventId;
private TestEvent(String eventId) {
this.eventId = eventId;
}
}
}

View File

@@ -1,5 +1,4 @@
spring.datasource.driverClassName=org.hsqldb.jdbc.JDBCDriver
spring.datasource.url=jdbc:hsqldb:mem:testdb;DB_CLOSE_DELAY=-1
spring.test.database.replace=NONE
spring.modulith.events.schema-initialization.enabled=true

View File

@@ -15,26 +15,29 @@
*/
package org.springframework.modulith.events.jpa;
import java.time.Instant;
import java.util.UUID;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import java.time.Instant;
import java.util.UUID;
/**
* @author Oliver Drotbohm, Dmitry Belyaev, Björn Kieling
* JPA entity to represent event publications.
*
* @author Oliver Drotbohm
* @author Dmitry Belyaev
* @author Björn Kieling
*/
@Data
@Entity
@NoArgsConstructor(force = true)
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
class JpaEventPublication {
private final @Id @Column(length = 16) UUID id;
@@ -46,10 +49,10 @@ class JpaEventPublication {
private Instant completionDate;
@Builder
static JpaEventPublication of(UUID id, Instant publicationDate, String listenerId, Object serializedEvent,
Class<?> eventType, Instant completionDate) {
return new JpaEventPublication(id, publicationDate, listenerId, serializedEvent.toString(), eventType,
completionDate);
static JpaEventPublication of(Instant publicationDate, String listenerId, Object serializedEvent,
Class<?> eventType) {
return new JpaEventPublication(UUID.randomUUID(), publicationDate, listenerId, serializedEvent.toString(),
eventType);
}
JpaEventPublication markCompleted() {

View File

@@ -16,25 +16,24 @@
package org.springframework.modulith.events.jpa;
import jakarta.persistence.EntityManager;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.events.config.EventPublicationConfigurationExtension;
import lombok.RequiredArgsConstructor;
/**
* @author Oliver Drotbohm, Dmitry Belyaev, Björn Kieling
* @author Oliver Drotbohm
* @author Dmitry Belyaev
* @author Björn Kieling
*/
@Configuration(proxyBeanMethods = false)
@RequiredArgsConstructor
class JpaEventPublicationConfiguration implements EventPublicationConfigurationExtension {
@Bean
public JpaEventPublicationRepository jpaEventPublicationRepository(EntityManager em, EventSerializer serializer) {
// TODO Why do we want to instantiate the serializer here and what
// happens if no serializer is available?
JpaEventPublicationRepository jpaEventPublicationRepository(EntityManager em, EventSerializer serializer) {
return new JpaEventPublicationRepository(em, serializer);
}
}

View File

@@ -15,14 +15,13 @@
*/
package org.springframework.modulith.events.jpa;
import jakarta.persistence.EntityManager;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import jakarta.persistence.EntityManager;
import jakarta.persistence.TypedQuery;
import org.springframework.modulith.events.CompletableEventPublication;
import org.springframework.modulith.events.EventPublication;
@@ -31,17 +30,19 @@ import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.events.PublicationTargetIdentifier;
import org.springframework.transaction.annotation.Transactional;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
/**
* Repository to store {@link EventPublication}s.
*
* @author Oliver Drotbohm, Dmitry Belyaev, Björn Kieling
* @author Oliver Drotbohm
* @author Dmitry Belyaev
* @author Björn Kieling
*/
@RequiredArgsConstructor
public class JpaEventPublicationRepository implements EventPublicationRepository {
private static String BY_EVENT_AND_LISTENER_ID = "select p from JpaEventPublication p where p.serializedEvent = ?1 and p.listenerId = ?2";
private static String INCOMPLETE = "select p from JpaEventPublication p where p.completionDate is null";
private final EntityManager entityManager;
private final EventSerializer serializer;
@@ -50,29 +51,31 @@ public class JpaEventPublicationRepository implements EventPublicationRepository
public EventPublication create(EventPublication publication) {
entityManager.persist(domainToEntity(publication));
return publication;
}
@Override
@Transactional
public EventPublication updateCompletionDate(CompletableEventPublication publication) {
public EventPublication update(CompletableEventPublication publication) {
var id = publication.getTargetIdentifier().getValue();
var event = publication.getEvent();
findEntityBySerializedEventAndListenerId(event, id) //
.setCompletionDate(publication.getCompletionDate().orElse(null));
findEntityBySerializedEventAndListenerId(publication.getEvent(),
publication.getTargetIdentifier().getValue()).ifPresent(entity -> {
entity.setCompletionDate(publication.getCompletionDate().orElse(null));
entityManager.flush();
});
return publication;
}
@Override
@Transactional(readOnly = true)
public List<EventPublication> findByCompletionDateIsNull() {
public List<EventPublication> findIncompletePublications() {
String query = "select p from JpaEventPublication p where p.completionDate is null";
return entityManager.createQuery(query, JpaEventPublication.class).getResultList().stream()
.map(this::entityToDomain).collect(Collectors.toList());
return entityManager.createQuery(INCOMPLETE, JpaEventPublication.class)
.getResultStream()
.map(this::entityToDomain)
.toList();
}
@Override
@@ -80,17 +83,19 @@ public class JpaEventPublicationRepository implements EventPublicationRepository
public Optional<EventPublication> findByEventAndTargetIdentifier(Object event,
PublicationTargetIdentifier targetIdentifier) {
Optional<JpaEventPublication> result = findEntityBySerializedEventAndListenerId(event, targetIdentifier.getValue());
return result.map(this::entityToDomain);
return Optional.ofNullable(findEntityBySerializedEventAndListenerId(event, targetIdentifier.getValue()))
.map(this::entityToDomain);
}
private Optional<JpaEventPublication> findEntityBySerializedEventAndListenerId(Object event, String listenerId) {
String query = "select p from JpaEventPublication p where p.serializedEvent = ?1 and p.listenerId = ?2";
String serializedEvent = serializeEvent(event);
TypedQuery<JpaEventPublication> typedQuery = entityManager.createQuery(query, JpaEventPublication.class)
.setParameter(1, serializedEvent).setParameter(2, listenerId);
JpaEventPublication resultEntity = typedQuery.getSingleResult();
return Optional.ofNullable(resultEntity);
private JpaEventPublication findEntityBySerializedEventAndListenerId(Object event, String listenerId) {
var serializedEvent = serializeEvent(event);
var query = entityManager.createQuery(BY_EVENT_AND_LISTENER_ID, JpaEventPublication.class)
.setParameter(1, serializedEvent)
.setParameter(2, listenerId);
return query.getSingleResult();
}
private String serializeEvent(Object event) {
@@ -98,13 +103,16 @@ public class JpaEventPublicationRepository implements EventPublicationRepository
}
private JpaEventPublication domainToEntity(EventPublication domain) {
String serializedEvent = serializeEvent(domain.getEvent());
return JpaEventPublication.builder().id(UUID.randomUUID()).publicationDate(domain.getPublicationDate())
.listenerId(domain.getTargetIdentifier().getValue()).serializedEvent(serializedEvent)
.eventType(domain.getEvent().getClass()).build();
return JpaEventPublication.builder() //
.publicationDate(domain.getPublicationDate()) //
.listenerId(domain.getTargetIdentifier().getValue()) //
.serializedEvent(serializeEvent(domain.getEvent())) //
.eventType(domain.getEvent().getClass()) //
.build();
}
private CompletableEventPublication entityToDomain(JpaEventPublication entity) {
private EventPublication entityToDomain(JpaEventPublication entity) {
return JpaEventPublicationAdapter.of(entity, serializer);
}

View File

@@ -24,13 +24,15 @@ import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.context.ApplicationContext;
import org.springframework.modulith.events.EventSerializer;
import org.springframework.modulith.events.EventPublicationRegistry;
import org.springframework.modulith.events.EventSerializer;
import org.springframework.test.context.TestConstructor;
import org.springframework.test.context.TestConstructor.AutowireMode;
/**
* @author Oliver Drotbohm, Dmitry Belyaev, Björn Kieling
* @author Oliver Drotbohm
* @author Dmitry Belyaev
* @author Björn Kieling
*/
@SpringBootTest(classes = ExampleApplication.class)
@TestConstructor(autowireMode = AutowireMode.ALL)
@@ -39,8 +41,7 @@ class JpaEventPublicationConfigurationIntegrationTests {
private final ApplicationContext context;
@MockBean
private EventSerializer serializer;
@MockBean EventSerializer serializer;
@Test
void bootstrapsApplicationComponents() {

View File

@@ -15,14 +15,15 @@
*/
package org.springframework.modulith.events.jpa;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
import jakarta.persistence.EntityManager;
import jakarta.persistence.EntityManagerFactory;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import java.util.List;
import javax.sql.DataSource;
@@ -48,10 +49,10 @@ import org.springframework.test.context.TestConstructor.AutowireMode;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.transaction.annotation.Transactional;
import lombok.RequiredArgsConstructor;
/**
* @author Oliver Drotbohm, Dmitry Belyaev, Björn Kieling
* @author Oliver Drotbohm
* @author Dmitry Belyaev
* @author Björn Kieling
*/
@ExtendWith(SpringExtension.class)
@TestConstructor(autowireMode = AutowireMode.ALL)
@@ -122,23 +123,20 @@ class JpaEventPublicationRepositoryIntegrationTests {
// Store publication
repository.create(publication);
List<EventPublication> eventPublications = repository.findByCompletionDateIsNull();
List<EventPublication> eventPublications = repository.findIncompletePublications();
assertThat(eventPublications).hasSize(1);
assertThat(eventPublications.get(0).getEvent()).isEqualTo(publication.getEvent());
assertThat(eventPublications.get(0).getTargetIdentifier()).isEqualTo(publication.getTargetIdentifier());
assertThat(repository.findByEventAndTargetIdentifier(testEvent, TARGET_IDENTIFIER)).isPresent();
// Complete publication
repository.updateCompletionDate(publication.markCompleted());
repository.update(publication.markCompleted());
assertThat(repository.findByCompletionDateIsNull()).isEmpty();
assertThat(repository.findIncompletePublications()).isEmpty();
}
@Value
private static final class TestEvent {
private final String eventId;
private TestEvent(String eventId) {
this.eventId = eventId;
}
String eventId;
}
}

View File

@@ -36,7 +36,6 @@
<dependency>
<groupId>org.hibernate.orm</groupId>
<artifactId>hibernate-core</artifactId>
<version>6.1.1.Final</version>
</dependency>
<dependency>
@@ -49,6 +48,7 @@
<artifactId>hsqldb</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>