Skip to content

Commit

Permalink
GH-806 - Add archive support for JDBC.
Browse files Browse the repository at this point in the history
Co-authored-by: Oliver Drotbohm <[email protected]>
  • Loading branch information
ciberkleid and odrotbohm committed Oct 24, 2024
1 parent 8562cee commit dab1031
Show file tree
Hide file tree
Showing 17 changed files with 271 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.springframework.core.io.ResourceLoader;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import org.springframework.util.Assert;

/**
* Initializes the DB schema used to store events
Expand All @@ -37,23 +36,15 @@ class DatabaseSchemaInitializer implements InitializingBean {

private final DataSource dataSource;
private final ResourceLoader resourceLoader;
private final DatabaseType databaseType;
private final JdbcOperations jdbcOperations;
private final JdbcConfigurationProperties properties;
private final JdbcRepositorySettings settings;

DatabaseSchemaInitializer(DataSource dataSource, ResourceLoader resourceLoader, DatabaseType databaseType,
JdbcOperations jdbcOperations, JdbcConfigurationProperties properties) {

Assert.isTrue(properties.getSchemaInitialization().isEnabled(),
"Schema initialization disabled! Initializer should not have been registered!");
DatabaseSchemaInitializer(DataSource dataSource, ResourceLoader resourceLoader, JdbcOperations jdbcOperations, JdbcRepositorySettings settings) {

this.dataSource = dataSource;
this.resourceLoader = resourceLoader;
this.databaseType = databaseType;
this.jdbcOperations = jdbcOperations;
this.properties = properties;

properties.verify(databaseType);
this.settings = settings;
}

/*
Expand All @@ -66,7 +57,8 @@ public void afterPropertiesSet() throws Exception {
try (Connection connection = dataSource.getConnection()) {

var initialSchema = connection.getSchema();
var schemaName = properties.getSchema();
var schemaName = settings.getSchema();
var databaseType = settings.getDatabaseType();
var useSchema = schemaName != null && !schemaName.isEmpty();

if (useSchema) { // A schema name has been specified.
Expand All @@ -80,7 +72,10 @@ public void afterPropertiesSet() throws Exception {
}

var locator = new DatabaseSchemaLocator(resourceLoader);
new ResourceDatabasePopulator(locator.getSchemaResource(databaseType)).execute(dataSource);

var populator = new ResourceDatabasePopulator();
locator.getSchemaResource(settings).forEach(populator::addScript);
populator.execute(dataSource);

// Return to the initial schema.
if (initialSchema != null && useSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import org.springframework.core.io.ResourceLoader;
import org.springframework.util.Assert;

import java.util.Collection;
import java.util.List;

/**
* Simple wrapper around a {@link ResourceLoader} to load database specific schema files from the classpath.
*
Expand All @@ -41,16 +44,22 @@ public class DatabaseSchemaLocator {
}

/**
* Loads the {@link Resource} containing the schema for the given {@link DatabaseType} from the classpath.
* Loads the {@link Resource} containing the schema for the given {@link JdbcRepositorySettings} from the classpath.
*
* @param databaseType must not be {@literal null}.
* @param settings must not be {@literal null}.
* @return will never be {@literal null}.
*/
Resource getSchemaResource(DatabaseType databaseType) {
Collection<Resource> getSchemaResource(JdbcRepositorySettings settings) {

Assert.notNull(databaseType, "DatabaseType must not be null!");
Assert.notNull(settings, "JdbcRepositorySettings must not be null!");

var databaseType = settings.getDatabaseType();
var schemaResourceFilename = databaseType.getSchemaResourceFilename();
return resourceLoader.getResource(ResourceLoader.CLASSPATH_URL_PREFIX + schemaResourceFilename);
var schemaResource = resourceLoader.getResource(ResourceLoader.CLASSPATH_URL_PREFIX + schemaResourceFilename);

return !settings.isArchiveCompletion()
? List.of(schemaResource)
: List.of(schemaResource, resourceLoader.getResource(ResourceLoader.CLASSPATH_URL_PREFIX + databaseType.getArchiveSchemaResourceFilename()));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* @author Björn Kieling
* @author Oliver Drotbohm
* @author Raed Ben Hamouda
* @author Cora Iberkleid
*/
enum DatabaseType {

Expand Down Expand Up @@ -106,7 +107,7 @@ boolean isSchemaSupported() {
}
};

static final String SCHEMA_NOT_SUPPORTED = "Setting the schema name not supported on MySQL!";
static final String SCHEMA_NOT_SUPPORTED = "Setting the schema name is not supported!";

static DatabaseType from(String productName) {

Expand Down Expand Up @@ -138,6 +139,8 @@ String getSchemaResourceFilename() {
return "/schema-" + value + ".sql";
}

String getArchiveSchemaResourceFilename() { return "/schema-" + value + "-archive.sql"; }

boolean isSchemaSupported() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import javax.sql.DataSource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
Expand All @@ -44,14 +45,16 @@
@EnableConfigurationProperties(JdbcConfigurationProperties.class)
class JdbcEventPublicationAutoConfiguration implements EventPublicationConfigurationExtension {

@Autowired Environment environment;

@Bean
DatabaseType databaseType(DataSource dataSource) {
return DatabaseType.from(fromDataSource(dataSource));
}

@Bean
JdbcRepositorySettings jdbcEventPublicationRepositorySettings(DatabaseType databaseType,
JdbcConfigurationProperties properties, Environment environment) {
JdbcConfigurationProperties properties) {

return new JdbcRepositorySettings(databaseType, CompletionMode.from(environment), properties.getSchema());
}
Expand All @@ -65,9 +68,9 @@ JdbcEventPublicationRepository jdbcEventPublicationRepository(JdbcTemplate jdbcT
@Bean
@ConditionalOnProperty(name = "spring.modulith.events.jdbc.schema-initialization.enabled", havingValue = "true")
DatabaseSchemaInitializer databaseSchemaInitializer(DataSource dataSource, ResourceLoader resourceLoader,
DatabaseType databaseType, JdbcTemplate jdbcTemplate, JdbcConfigurationProperties properties) {
DatabaseType databaseType, JdbcTemplate jdbcTemplate, JdbcRepositorySettings settings) {

return new DatabaseSchemaInitializer(dataSource, resourceLoader, databaseType, jdbcTemplate, properties);
return new DatabaseSchemaInitializer(dataSource, resourceLoader, jdbcTemplate, settings);
}

private static String fromDataSource(DataSource dataSource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* @author Björn Kieling
* @author Oliver Drotbohm
* @author Raed Ben Hamouda
* @author Cora Iberkleid
*/
class JdbcEventPublicationRepository implements EventPublicationRepository, BeanClassLoaderAware {

Expand Down Expand Up @@ -130,20 +131,39 @@ class JdbcEventPublicationRepository implements EventPublicationRepository, Bean
ID = ?
""";

private static final String SQL_STATEMENT_DELETE_UNCOMPLETED = """
private static final String SQL_STATEMENT_DELETE_COMPLETED = """
DELETE
FROM %s
WHERE
COMPLETION_DATE IS NOT NULL
""";

private static final String SQL_STATEMENT_DELETE_UNCOMPLETED_BEFORE = """
private static final String SQL_STATEMENT_DELETE_COMPLETED_BEFORE = """
DELETE
FROM %s
WHERE
COMPLETION_DATE < ?
""";

private static final String SQL_STATEMENT_COPY_TO_ARCHIVE_BY_ID = """
-- Only copy if no entry in target table
INSERT INTO %s (ID, LISTENER_ID, EVENT_TYPE, SERIALIZED_EVENT, PUBLICATION_DATE, COMPLETION_DATE)
SELECT ID, LISTENER_ID, EVENT_TYPE, SERIALIZED_EVENT, PUBLICATION_DATE, ?
FROM %s
WHERE ID = ?
AND NOT EXISTS (SELECT 1 FROM %s WHERE ID = EVENT_PUBLICATION.ID)
""";

private static final String SQL_STATEMENT_COPY_TO_ARCHIVE_BY_EVENT_AND_LISTENER_ID = """
-- Only copy if no entry in target table
INSERT INTO %s (ID, LISTENER_ID, EVENT_TYPE, SERIALIZED_EVENT, PUBLICATION_DATE, COMPLETION_DATE)
SELECT ID, LISTENER_ID, EVENT_TYPE, SERIALIZED_EVENT, PUBLICATION_DATE, ?
FROM %s
WHERE LISTENER_ID = ?
AND SERIALIZED_EVENT = ?
AND NOT EXISTS (SELECT 1 FROM %s WHERE ID = EVENT_PUBLICATION.ID)
""";

private static final int DELETE_BATCH_SIZE = 100;

private final JdbcOperations operations;
Expand All @@ -162,8 +182,10 @@ class JdbcEventPublicationRepository implements EventPublicationRepository, Bean
sqlStatementDelete,
sqlStatementDeleteByEventAndListenerId,
sqlStatementDeleteById,
sqlStatementDeleteUncompleted,
sqlStatementDeleteUncompletedBefore;
sqlStatementDeleteCompleted,
sqlStatementDeleteCompletedBefore,
sqlStatementCopyToArchive,
sqlStatementCopyToArchiveByEventAndListenerId;

/**
* Creates a new {@link JdbcEventPublicationRepository} for the given {@link JdbcOperations}, {@link EventSerializer},
Expand All @@ -186,9 +208,10 @@ public JdbcEventPublicationRepository(JdbcOperations operations, EventSerializer

var schema = settings.getSchema();
var table = ObjectUtils.isEmpty(schema) ? "EVENT_PUBLICATION" : schema + ".EVENT_PUBLICATION";
var completedTable = settings.isArchiveCompletion() ? table + "_ARCHIVE" : table;

this.sqlStatementInsert = SQL_STATEMENT_INSERT.formatted(table);
this.sqlStatementFindCompleted = SQL_STATEMENT_FIND_COMPLETED.formatted(table);
this.sqlStatementFindCompleted = SQL_STATEMENT_FIND_COMPLETED.formatted(completedTable);
this.sqlStatementFindUncompleted = SQL_STATEMENT_FIND_UNCOMPLETED.formatted(table);
this.sqlStatementFindUncompletedBefore = SQL_STATEMENT_FIND_UNCOMPLETED_BEFORE.formatted(table);
this.sqlStatementUpdateByEventAndListenerId = SQL_STATEMENT_UPDATE_BY_EVENT_AND_LISTENER_ID.formatted(table);
Expand All @@ -197,8 +220,10 @@ public JdbcEventPublicationRepository(JdbcOperations operations, EventSerializer
this.sqlStatementDelete = SQL_STATEMENT_DELETE.formatted(table);
this.sqlStatementDeleteByEventAndListenerId = SQL_STATEMENT_DELETE_BY_EVENT_AND_LISTENER_ID.formatted(table);
this.sqlStatementDeleteById = SQL_STATEMENT_DELETE_BY_ID.formatted(table);
this.sqlStatementDeleteUncompleted = SQL_STATEMENT_DELETE_UNCOMPLETED.formatted(table);
this.sqlStatementDeleteUncompletedBefore = SQL_STATEMENT_DELETE_UNCOMPLETED_BEFORE.formatted(table);
this.sqlStatementDeleteCompleted = SQL_STATEMENT_DELETE_COMPLETED.formatted(completedTable);
this.sqlStatementDeleteCompletedBefore = SQL_STATEMENT_DELETE_COMPLETED_BEFORE.formatted(completedTable);
this.sqlStatementCopyToArchive = SQL_STATEMENT_COPY_TO_ARCHIVE_BY_ID.formatted(completedTable, table, completedTable);
this.sqlStatementCopyToArchiveByEventAndListenerId = SQL_STATEMENT_COPY_TO_ARCHIVE_BY_EVENT_AND_LISTENER_ID.formatted(completedTable, table, completedTable);
}

/*
Expand Down Expand Up @@ -246,6 +271,14 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,

operations.update(sqlStatementDeleteByEventAndListenerId, targetIdentifier, serializedEvent);

} else if (settings.isArchiveCompletion()) {

operations.update(sqlStatementCopyToArchiveByEventAndListenerId, //
Timestamp.from(completionDate), //
targetIdentifier, //
serializedEvent);
operations.update(sqlStatementDeleteByEventAndListenerId, targetIdentifier, serializedEvent);

} else {

operations.update(sqlStatementUpdateByEventAndListenerId, //
Expand All @@ -263,10 +296,18 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,
@Transactional
public void markCompleted(UUID identifier, Instant completionDate) {

var databaseId = uuidToDatabase(identifier);
var timestamp = Timestamp.from(completionDate);

if (settings.isDeleteCompletion()) {
operations.update(sqlStatementDeleteById, uuidToDatabase(identifier));
operations.update(sqlStatementDeleteById, databaseId);

} else if (settings.isArchiveCompletion()) {
operations.update(sqlStatementCopyToArchive, timestamp, databaseId);
operations.update(sqlStatementDeleteById, databaseId);

} else {
operations.update(sqlStatementUpdateById, Timestamp.from(completionDate), uuidToDatabase(identifier));
operations.update(sqlStatementUpdateById, timestamp, databaseId);
}
}

Expand Down Expand Up @@ -338,7 +379,7 @@ public void deletePublications(List<UUID> identifiers) {
*/
@Override
public void deleteCompletedPublications() {
operations.execute(sqlStatementDeleteUncompleted);
operations.execute(sqlStatementDeleteCompleted);
}

/*
Expand All @@ -350,7 +391,7 @@ public void deleteCompletedPublicationsBefore(Instant instant) {

Assert.notNull(instant, "Instant must not be null!");

operations.update(sqlStatementDeleteUncompletedBefore, Timestamp.from(instant));
operations.update(sqlStatementDeleteCompletedBefore, Timestamp.from(instant));
}

private String serializeEvent(Object event) {
Expand Down Expand Up @@ -457,9 +498,7 @@ private static class JdbcEventPublication implements TargetEventPublication {
* @param id must not be {@literal null}.
* @param publicationDate must not be {@literal null}.
* @param listenerId must not be {@literal null} or empty.
* @param serializedEvent must not be {@literal null} or empty.
* @param eventType must not be {@literal null}.
* @param serializer must not be {@literal null}.
* @param event must not be {@literal null}..
* @param completionDate can be {@literal null}.
*/
public JdbcEventPublication(UUID id, Instant publicationDate, String listenerId, Supplier<Object> event,
Expand All @@ -468,6 +507,7 @@ public JdbcEventPublication(UUID id, Instant publicationDate, String listenerId,
Assert.notNull(id, "Id must not be null!");
Assert.notNull(publicationDate, "Publication date must not be null!");
Assert.hasText(listenerId, "Listener id must not be null or empty!");
Assert.notNull(event, "Event must not be null!");

this.id = id;
this.publicationDate = publicationDate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public class JdbcRepositorySettings {
this.databaseType = databaseType;
this.schema = schema;
this.completionMode = completionMode;

if (schema != null && !databaseType.isSchemaSupported()) {
throw new IllegalStateException(DatabaseType.SCHEMA_NOT_SUPPORTED);
}
}

/**
Expand Down Expand Up @@ -74,4 +78,14 @@ public String getSchema() {
public boolean isDeleteCompletion() {
return completionMode == CompletionMode.DELETE;
}

/**
* Returns whether we use the archiving completion mode.
*/
public boolean isArchiveCompletion() { return completionMode == CompletionMode.ARCHIVE; }

/**
* Returns whether we use the updating completion mode.
*/
public boolean isUpdateCompletion() { return completionMode == CompletionMode.UPDATE; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE
(
ID UUID NOT NULL,
COMPLETION_DATE TIMESTAMP(9) WITH TIME ZONE,
EVENT_TYPE VARCHAR(512) NOT NULL,
LISTENER_ID VARCHAR(512) NOT NULL,
PUBLICATION_DATE TIMESTAMP(9) WITH TIME ZONE NOT NULL,
SERIALIZED_EVENT VARCHAR(4000) NOT NULL,
PRIMARY KEY (ID)
);
CREATE INDEX IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE_BY_LISTENER_ID_AND_SERIALIZED_EVENT_IDX ON EVENT_PUBLICATION_ARCHIVE (LISTENER_ID, SERIALIZED_EVENT);
CREATE INDEX IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE_BY_COMPLETION_DATE_IDX ON EVENT_PUBLICATION_ARCHIVE (COMPLETION_DATE);
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE
(
ID UUID NOT NULL,
COMPLETION_DATE TIMESTAMP(9),
EVENT_TYPE VARCHAR(512) NOT NULL,
LISTENER_ID VARCHAR(512) NOT NULL,
PUBLICATION_DATE TIMESTAMP(9) NOT NULL,
SERIALIZED_EVENT VARCHAR(4000) NOT NULL,
PRIMARY KEY (ID)
);
CREATE INDEX IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE_BY_LISTENER_ID_AND_SERIALIZED_EVENT_IDX ON EVENT_PUBLICATION_ARCHIVE (LISTENER_ID, SERIALIZED_EVENT);
CREATE INDEX IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE_BY_COMPLETION_DATE_IDX ON EVENT_PUBLICATION_ARCHIVE (COMPLETION_DATE);
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE
(
ID VARCHAR(36) NOT NULL,
LISTENER_ID VARCHAR(512) NOT NULL,
EVENT_TYPE VARCHAR(512) NOT NULL,
SERIALIZED_EVENT VARCHAR(4000) NOT NULL,
PUBLICATION_DATE TIMESTAMP(6) NOT NULL,
COMPLETION_DATE TIMESTAMP(6) DEFAULT NULL NULL,
PRIMARY KEY (ID),
INDEX EVENT_PUBLICATION_ARCHIVE_BY_COMPLETION_DATE_IDX (COMPLETION_DATE)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS EVENT_PUBLICATION_ARCHIVE
(
ID VARCHAR(36) NOT NULL,
LISTENER_ID VARCHAR(512) NOT NULL,
EVENT_TYPE VARCHAR(512) NOT NULL,
SERIALIZED_EVENT VARCHAR(4000) NOT NULL,
PUBLICATION_DATE TIMESTAMP(6) NOT NULL,
COMPLETION_DATE TIMESTAMP(6) DEFAULT NULL NULL,
PRIMARY KEY (ID),
INDEX EVENT_PUBLICATION_ARCHIVE_BY_COMPLETION_DATE_IDX (COMPLETION_DATE)
);
Loading

0 comments on commit dab1031

Please sign in to comment.