Skip to content

Commit

Permalink
GH-806 - MongoDB archive mode now uses aggregation to mark event publ…
Browse files Browse the repository at this point in the history
…ications completed.
  • Loading branch information
odrotbohm committed Oct 25, 2024
1 parent a7e4a79 commit dc11486
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.springframework.modulith.events.mongodb;

import static org.springframework.data.mongodb.core.aggregation.Aggregation.*;
import static org.springframework.data.mongodb.core.query.Criteria.*;
import static org.springframework.data.mongodb.core.query.Query.*;

Expand All @@ -24,8 +25,12 @@
import java.util.Optional;
import java.util.UUID;

import org.bson.Document;
import org.springframework.data.annotation.Id;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Fields;
import org.springframework.data.mongodb.core.aggregation.MergeOperation.WhenDocumentsMatch;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
Expand Down Expand Up @@ -96,7 +101,8 @@ public TargetEventPublication create(TargetEventPublication publication) {
@Override
public void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate) {

var query = byEventAndListenerId(event, identifier);
var criteria = byEventAndListenerId(event, identifier);
var query = defaultQuery(criteria);
var update = Update.update(COMPLETION_DATE, completionDate);

if (completionMode == CompletionMode.DELETE) {
Expand All @@ -105,9 +111,8 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,

} else if (completionMode == CompletionMode.ARCHIVE) {

mongoTemplate.findAndModify(query, update, MongoDbEventPublication.class, collection);
var completedEvent = mongoTemplate.findAndRemove(query, MongoDbEventPublication.class, collection);
mongoTemplate.save(completedEvent, archiveCollection);
markCompleted(criteria, completionDate);

} else {

mongoTemplate.findAndModify(query, update, MongoDbEventPublication.class, collection);
Expand All @@ -121,21 +126,20 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,
@Override
public void markCompleted(UUID identifier, Instant completionDate) {

var criteria = query(where(ID).is(identifier));
var criteria = where(ID).is(identifier).and(COMPLETION_DATE).isNull();
var query = query(criteria);
var update = Update.update(COMPLETION_DATE, completionDate);

if (completionMode == CompletionMode.DELETE) {

mongoTemplate.remove(criteria, MongoDbEventPublication.class, collection);
mongoTemplate.remove(query, MongoDbEventPublication.class, collection);

} else if (completionMode == CompletionMode.ARCHIVE) {

mongoTemplate.findAndModify(criteria, update, MongoDbEventPublication.class, collection);
var completedEvent = mongoTemplate.findAndRemove(criteria, MongoDbEventPublication.class, collection);
mongoTemplate.save(completedEvent, archiveCollection);
markCompleted(criteria, completionDate);

} else {
mongoTemplate.findAndModify(criteria, update, MongoDbEventPublication.class, collection);
mongoTemplate.findAndModify(query, update, MongoDbEventPublication.class, collection);
}
}

Expand Down Expand Up @@ -168,7 +172,7 @@ public List<TargetEventPublication> findIncompletePublicationsPublishedBefore(In
public Optional<TargetEventPublication> findIncompletePublicationsByEventAndTargetIdentifier(
Object event, PublicationTargetIdentifier targetIdentifier) {

var results = readMapped(byEventAndListenerId(event, targetIdentifier));
var results = readMapped(defaultQuery(byEventAndListenerId(event, targetIdentifier)));

// if there are several events with exactly the same payload we return the oldest one first
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
Expand Down Expand Up @@ -230,13 +234,13 @@ private List<TargetEventPublication> readMapped(Query query, String collection)

}

private Query byEventAndListenerId(Object event, PublicationTargetIdentifier identifier) {
private Criteria byEventAndListenerId(Object event, PublicationTargetIdentifier identifier) {

var eventAsMongoType = mongoTemplate.getConverter().convertToMongoType(event, TypeInformation.OBJECT);

return defaultQuery(where(EVENT).is(eventAsMongoType) //
return where(EVENT).is(eventAsMongoType) //
.and(LISTENER_ID).is(identifier.getValue())
.and(COMPLETION_DATE).isNull());
.and(COMPLETION_DATE).isNull();
}

private static MongoDbEventPublication domainToDocument(TargetEventPublication publication) {
Expand All @@ -256,6 +260,28 @@ private static Query defaultQuery(Criteria criteria) {
return query(criteria).with(DEFAULT_SORT);
}

private void markCompleted(Criteria lookup, Instant now) {

var aggregation = newAggregation(MongoDbEventPublication.class,

match(lookup),

addFields()
.addFieldWithValue(COMPLETION_DATE, now)
.build(),

merge()
.intoCollection(archiveCollection)
.on(ID)
.whenMatched(WhenDocumentsMatch.keepExistingDocument())
.build());

mongoTemplate
.aggregate(aggregation, collection, Document.class)
.forEach(it -> mongoTemplate.remove(query(where(Fields.UNDERSCORE_ID).is(it.get(Fields.UNDERSCORE_ID))),
collection));
}

private static class MongoDbEventPublicationAdapter implements TargetEventPublication {

private final MongoDbEventPublication publication;
Expand Down Expand Up @@ -326,4 +352,6 @@ public int hashCode() {
return Objects.hash(publication);
}
}

record IdOnly(@Id UUID id) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package example;

import example.order.OrderManagement;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
Expand All @@ -25,5 +28,10 @@
@SpringBootApplication
public class Application {

public static void main(String... args) {}
public static void main(String... args) {

var context = SpringApplication.run(Application.class, args);

context.getBean(OrderManagement.class).complete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.bson.UuidRepresentation;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
Expand All @@ -48,6 +49,14 @@
@Testcontainers(disabledWithoutDocker = true)
class ApplicationIntegrationTests {

public static void main(String[] args) {

SpringApplication.from(Application::main)
.with(MongoDbInfrastructureConfiguration.class)
.run(args)
.getApplicationContext();
}

@TestConfiguration
static class MongoDbInfrastructureConfiguration {

Expand Down

0 comments on commit dc11486

Please sign in to comment.