Skip to content

Commit

Permalink
GH-806 - Polish Neo4j archiving mode.
Browse files Browse the repository at this point in the history
We now explicitly set the completion date property of newly created event publications to null. The copying data manipulation statements now properly copy all properties of the publication notes into the archive.
  • Loading branch information
odrotbohm committed Oct 25, 2024
1 parent 4132ad3 commit b189dbc
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.time.Instant;
import java.util.UUID;

import org.springframework.lang.Nullable;

/**
* The event publication entity definition.
*
Expand All @@ -32,15 +34,16 @@ class Neo4jEventPublication {
public final Object event;
public final String eventHash;

public Instant completionDate;
public @Nullable Instant completionDate;

public Neo4jEventPublication(UUID identifier, Instant publicationDate, String listenerId, Object event,
String eventHash) {
String eventHash, @Nullable Instant completionDate) {

this.identifier = identifier;
this.publicationDate = publicationDate;
this.listenerId = listenerId;
this.event = event;
this.eventHash = eventHash;
this.completionDate = completionDate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.neo4j.cypherdsl.core.Cypher;
import org.neo4j.cypherdsl.core.Node;
import org.neo4j.cypherdsl.core.ResultStatement;
import org.neo4j.cypherdsl.core.Statement;
import org.neo4j.cypherdsl.core.StatementBuilder.OrderableOngoingReadingAndWithWithoutWhere;
import org.neo4j.cypherdsl.core.renderer.Configuration;
import org.neo4j.cypherdsl.core.renderer.Renderer;
import org.neo4j.driver.Values;
Expand Down Expand Up @@ -65,11 +68,14 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
private static final String PUBLICATION_DATE = "publicationDate";
private static final String COMPLETION_DATE = "completionDate";

private static final Collection<String> ALL_PROPERTIES = List.of(ID, EVENT_SERIALIZED, EVENT_HASH, EVENT_TYPE,
LISTENER_ID, PUBLICATION_DATE, COMPLETION_DATE);

private static final Node EVENT_PUBLICATION_NODE = node("Neo4jEventPublication")
.named("neo4jEventPublication");

private static final Node EVENT_PUBLICATION_COMPLETED_NODE = node("Neo4jEventPublicationCompleted")
.named("neo4jEventPublicationCompleted");
private static final Node EVENT_PUBLICATION_ARCHIVE_NODE = node("Neo4jEventPublicationArchive")
.named("neo4jEventPublicationArchive");

private static final Statement INCOMPLETE_BY_EVENT_AND_TARGET_IDENTIFIER_STATEMENT = match(EVENT_PUBLICATION_NODE)
.where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH)))
Expand Down Expand Up @@ -114,6 +120,7 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
.set(EVENT_PUBLICATION_NODE.property(EVENT_TYPE).to(parameter(EVENT_TYPE)))
.set(EVENT_PUBLICATION_NODE.property(LISTENER_ID).to(parameter(LISTENER_ID)))
.set(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE).to(parameter(PUBLICATION_DATE)))
.set(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE)))
.build();

private static final Statement COMPLETE_STATEMENT = match(EVENT_PUBLICATION_NODE)
Expand All @@ -123,29 +130,34 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
.set(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE)))
.build();

private static final Statement COMPLETE_IN_ARCHIVE_BY_ID_STATEMENT = match(EVENT_PUBLICATION_NODE)
private static final Statement COMPLETE_IN_ARCHIVE_BY_ID_STATEMENT = applyProperties(match(EVENT_PUBLICATION_NODE)
.where(EVENT_PUBLICATION_NODE.property(ID).eq(parameter(ID)))
.and(not(exists(match(EVENT_PUBLICATION_COMPLETED_NODE)
.where(EVENT_PUBLICATION_COMPLETED_NODE.property(ID).eq(parameter(ID)))
.returning(literalTrue()).build())))
.with(EVENT_PUBLICATION_NODE)
.create(EVENT_PUBLICATION_COMPLETED_NODE)
.set(EVENT_PUBLICATION_COMPLETED_NODE.property(ID).to(EVENT_PUBLICATION_NODE.property(ID)))
.set(EVENT_PUBLICATION_COMPLETED_NODE.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE)))
.build();

private static final Statement COMPLETE_IN_ARCHIVE_BY_EVENT_AND_LISTENER_ID_STATEMENT = match(EVENT_PUBLICATION_NODE)
.where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH)))
.and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID)))
.and(not(exists(match(EVENT_PUBLICATION_COMPLETED_NODE)
.where(EVENT_PUBLICATION_COMPLETED_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH)))
.and(EVENT_PUBLICATION_COMPLETED_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID)))
.and(not(exists(match(EVENT_PUBLICATION_ARCHIVE_NODE)
.where(EVENT_PUBLICATION_ARCHIVE_NODE.property(ID).eq(parameter(ID)))
.returning(literalTrue()).build())))
.with(EVENT_PUBLICATION_NODE)
.create(EVENT_PUBLICATION_COMPLETED_NODE)
.set(EVENT_PUBLICATION_COMPLETED_NODE.property(ID).to(EVENT_PUBLICATION_NODE.property(ID)))
.set(EVENT_PUBLICATION_COMPLETED_NODE.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE)))
.build();
.with(EVENT_PUBLICATION_NODE));

private static final Statement COMPLETE_IN_ARCHIVE_BY_EVENT_AND_LISTENER_ID_STATEMENT = applyProperties(
match(EVENT_PUBLICATION_NODE)
.where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH)))
.and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID)))
.and(not(exists(match(EVENT_PUBLICATION_ARCHIVE_NODE)
.where(EVENT_PUBLICATION_ARCHIVE_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH)))
.and(EVENT_PUBLICATION_ARCHIVE_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID)))
.returning(literalTrue()).build())))
.with(EVENT_PUBLICATION_NODE));

private static Statement applyProperties(OrderableOngoingReadingAndWithWithoutWhere source) {

var operations = ALL_PROPERTIES.stream()
.map(it -> EVENT_PUBLICATION_ARCHIVE_NODE.property(it).to(EVENT_PUBLICATION_NODE.property(it)))
.toList();

return source.create(EVENT_PUBLICATION_ARCHIVE_NODE)
.set(operations)
.set(EVENT_PUBLICATION_ARCHIVE_NODE.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE)))
.build();
}

private static final Function<Node, Statement> COMPLETE_BY_ID_STATEMENT = node -> match(node)
.where(node.property(ID).eq(parameter(ID)))
Expand All @@ -168,6 +180,7 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
private final Renderer renderer;
private final EventSerializer eventSerializer;
private final CompletionMode completionMode;
private final Node completedNode;

private final Statement deleteCompletedStatement;
private final Statement deleteCompletedBeforeStatement;
Expand All @@ -187,12 +200,14 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
this.eventSerializer = eventSerializer;
this.completionMode = completionMode;

var archiveNode = completionMode == CompletionMode.ARCHIVE ? EVENT_PUBLICATION_COMPLETED_NODE : EVENT_PUBLICATION_NODE;
this.completedNode = completionMode == CompletionMode.ARCHIVE
? EVENT_PUBLICATION_ARCHIVE_NODE
: EVENT_PUBLICATION_NODE;

this.deleteCompletedStatement = DELETE_COMPLETED_STATEMENT.apply(archiveNode);
this.deleteCompletedBeforeStatement = DELETE_COMPLETED_BEFORE_STATEMENT.apply(archiveNode);
this.completedByIdStatement = COMPLETE_BY_ID_STATEMENT.apply(archiveNode);
this.allCompletedStatement = ALL_COMPLETED_STATEMENT.apply(archiveNode);
this.deleteCompletedStatement = DELETE_COMPLETED_STATEMENT.apply(completedNode);
this.deleteCompletedBeforeStatement = DELETE_COMPLETED_BEFORE_STATEMENT.apply(completedNode);
this.completedByIdStatement = COMPLETE_BY_ID_STATEMENT.apply(completedNode);
this.allCompletedStatement = ALL_COMPLETED_STATEMENT.apply(completedNode);
}

/*
Expand All @@ -219,7 +234,8 @@ public TargetEventPublication create(TargetEventPublication publication) {
EVENT_HASH, eventHash,
EVENT_TYPE, eventType,
LISTENER_ID, listenerId,
PUBLICATION_DATE, Values.value(publicationDate.atOffset(ZoneOffset.UTC))))
PUBLICATION_DATE, Values.value(publicationDate.atOffset(ZoneOffset.UTC)),
COMPLETION_DATE, Values.NULL))
.run();

return publication;
Expand Down Expand Up @@ -249,6 +265,7 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,
.bind(identifier.getValue()).to(LISTENER_ID)
.bind(Values.value(completionDate.atOffset(ZoneOffset.UTC))).to(COMPLETION_DATE)
.run();

neo4jClient.query(renderer.render(DELETE_BY_EVENT_AND_LISTENER_ID))
.bind(eventHash).to(EVENT_HASH)
.bind(identifier.getValue()).to(LISTENER_ID)
Expand Down Expand Up @@ -279,9 +296,10 @@ public void markCompleted(UUID identifier, Instant completionDate) {
} else if (completionMode == CompletionMode.ARCHIVE) {

neo4jClient.query(renderer.render(COMPLETE_IN_ARCHIVE_BY_ID_STATEMENT))
.bind("").to(ID)
.bind(Values.value(identifier.toString())).to(ID)
.bind(Values.value(completionDate.atOffset(ZoneOffset.UTC))).to(COMPLETION_DATE)
.run();

deletePublications(List.of(identifier));

} else {
Expand All @@ -304,7 +322,7 @@ public List<TargetEventPublication> findIncompletePublications() {

return List.copyOf(neo4jClient.query(renderer.render(INCOMPLETE_STATEMENT))
.fetchAs(TargetEventPublication.class)
.mappedBy(this::mapRecordToPublication)
.mappedBy(incompleteMapping())
.all());
}

Expand All @@ -319,7 +337,7 @@ public List<TargetEventPublication> findIncompletePublicationsPublishedBefore(In
return List.copyOf(neo4jClient.query(renderer.render(INCOMPLETE_PUBLISHED_BEFORE_STATEMENT))
.bind(Values.value(instant.atOffset(ZoneOffset.UTC))).to(PUBLICATION_DATE)
.fetchAs(TargetEventPublication.class)
.mappedBy(this::mapRecordToPublication)
.mappedBy(incompleteMapping())
.all());
}

Expand All @@ -338,7 +356,7 @@ public Optional<TargetEventPublication> findIncompletePublicationsByEventAndTarg
return neo4jClient.query(renderer.render(INCOMPLETE_BY_EVENT_AND_TARGET_IDENTIFIER_STATEMENT))
.bindAll(Map.of(EVENT_HASH, eventHash, LISTENER_ID, listenerId))
.fetchAs(TargetEventPublication.class)
.mappedBy(this::mapRecordToPublication)
.mappedBy(incompleteMapping())
.one();
}

Expand All @@ -351,7 +369,7 @@ public List<TargetEventPublication> findCompletedPublications() {

return new ArrayList<>(neo4jClient.query(renderer.render(allCompletedStatement))
.fetchAs(TargetEventPublication.class)
.mappedBy(this::mapRecordToPublication)
.mappedBy(completeMapping())
.all());
}

Expand Down Expand Up @@ -391,21 +409,31 @@ public void deleteCompletedPublicationsBefore(Instant instant) {
.run();
}

private Neo4jEventPublicationAdapter mapRecordToPublication(TypeSystem typeSystem, org.neo4j.driver.Record record) {
private BiFunction<TypeSystem, org.neo4j.driver.Record, TargetEventPublication> incompleteMapping() {
return (typeSystem, driverRecord) -> mapRecordToPublication(typeSystem, driverRecord, EVENT_PUBLICATION_NODE);
}

private BiFunction<TypeSystem, org.neo4j.driver.Record, TargetEventPublication> completeMapping() {
return (typeSystem, driverRecord) -> mapRecordToPublication(typeSystem, driverRecord, completedNode);
}

private Neo4jEventPublicationAdapter mapRecordToPublication(TypeSystem typeSystem, org.neo4j.driver.Record record,
Node node) {

var publicationNode = record.get(EVENT_PUBLICATION_NODE.getRequiredSymbolicName().getValue()).asNode();
var publicationNode = record.get(node.getRequiredSymbolicName().getValue()).asNode();
var identifier = UUID.fromString(publicationNode.get(ID).asString());
var publicationDate = publicationNode.get(PUBLICATION_DATE).asZonedDateTime().toInstant();
var listenerId = publicationNode.get(LISTENER_ID).asString();
var eventSerialized = publicationNode.get(EVENT_SERIALIZED).asString();
var eventHash = publicationNode.get(EVENT_HASH).asString();
var eventType = publicationNode.get(EVENT_TYPE).asString();
var completionDate = publicationNode.get(COMPLETION_DATE);

try {

var event = eventSerializer.deserialize(eventSerialized, Class.forName(eventType));
var publication = new Neo4jEventPublication(identifier, publicationDate, listenerId, event,
eventHash);
eventHash, completionDate.isNull() ? null : completionDate.asZonedDateTime().toInstant());

return new Neo4jEventPublicationAdapter(publication);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@
import org.springframework.modulith.events.core.TargetEventPublication;
import org.springframework.modulith.events.support.CompletionMode;
import org.springframework.modulith.testapp.TestApplication;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.bean.override.mockito.MockitoBean;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.util.DigestUtils;
import org.testcontainers.containers.Neo4jContainer;
import org.testcontainers.junit.jupiter.Container;
Expand All @@ -53,7 +52,12 @@

/**
* @author Gerrit Meier
* @author Cora Iberkleid
* @author Oliver Drotbohm
*/
@Testcontainers(disabledWithoutDocker = true)
@SpringJUnitConfig(Neo4jEventPublicationRepositoryTest.Config.class)
@ImportAutoConfiguration(classes = Neo4jEventPublicationAutoConfiguration.class)
class Neo4jEventPublicationRepositoryTest {

static final PublicationTargetIdentifier TARGET_IDENTIFIER = PublicationTargetIdentifier.of("listener");
Expand All @@ -62,17 +66,12 @@ class Neo4jEventPublicationRepositoryTest {
static final Neo4jContainer<?> neo4jContainer = new Neo4jContainer<>(DockerImageName.parse("neo4j:5"))
.withRandomPassword();

@Import(TestApplication.class)
@ImportAutoConfiguration({ Neo4jEventPublicationAutoConfiguration.class })
@Testcontainers(disabledWithoutDocker = true)
@ContextConfiguration(classes = Config.class)
static abstract class TestBase {

@Autowired Neo4jEventPublicationRepository repository;
@Autowired Driver driver;
@Autowired Environment environment;

@MockitoBean EventSerializer eventSerializer;
@Autowired EventSerializer eventSerializer;

CompletionMode completionMode;

Expand Down Expand Up @@ -335,5 +334,10 @@ Driver driver() {
org.neo4j.cypherdsl.core.renderer.Configuration cypherDslConfiguration() {
return org.neo4j.cypherdsl.core.renderer.Configuration.newConfig().withDialect(Dialect.NEO4J_5).build();
}

@Bean
EventSerializer eventSerializer() {
return mock(EventSerializer.class);
}
}
}

0 comments on commit b189dbc

Please sign in to comment.