Skip to content

Commit

Permalink
GH-806 - Add archive support for MongoDB.
Browse files Browse the repository at this point in the history
Co-authored-by: Oliver Drotbohm <[email protected]>
  • Loading branch information
ciberkleid and odrotbohm committed Oct 25, 2024
1 parent 2cb0db7 commit a7e4a79
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository {
private static final String ID = "id";
private static final String LISTENER_ID = "listenerId";
private static final String PUBLICATION_DATE = "publicationDate";

private static final Sort DEFAULT_SORT = Sort.by(PUBLICATION_DATE).ascending();

static final String ARCHIVE_COLLECTION = "event_publication_archive";

private final MongoTemplate mongoTemplate;
private final CompletionMode completionMode;
private final String collection, archiveCollection;

/**
* Creates a new {@link MongoDbEventPublicationRepository} for the given {@link MongoTemplate}.
Expand All @@ -71,6 +73,8 @@ public MongoDbEventPublicationRepository(MongoTemplate mongoTemplate, Completion

this.mongoTemplate = mongoTemplate;
this.completionMode = completionMode;
this.collection = "event_publication";
this.archiveCollection = completionMode == CompletionMode.ARCHIVE ? ARCHIVE_COLLECTION : collection;
}

/*
Expand All @@ -80,7 +84,7 @@ public MongoDbEventPublicationRepository(MongoTemplate mongoTemplate, Completion
@Override
public TargetEventPublication create(TargetEventPublication publication) {

mongoTemplate.save(domainToDocument(publication));
mongoTemplate.save(domainToDocument(publication), collection);

return publication;
}
Expand All @@ -93,16 +97,20 @@ public TargetEventPublication create(TargetEventPublication publication) {
public void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate) {

var query = byEventAndListenerId(event, identifier);
var update = Update.update(COMPLETION_DATE, completionDate);

if (completionMode == CompletionMode.DELETE) {

mongoTemplate.remove(query, MongoDbEventPublication.class);
mongoTemplate.remove(query, MongoDbEventPublication.class, collection);

} else {
} else if (completionMode == CompletionMode.ARCHIVE) {

var update = Update.update(COMPLETION_DATE, completionDate);
mongoTemplate.findAndModify(query, update, MongoDbEventPublication.class, collection);
var completedEvent = mongoTemplate.findAndRemove(query, MongoDbEventPublication.class, collection);
mongoTemplate.save(completedEvent, archiveCollection);
} else {

mongoTemplate.findAndModify(query, update, MongoDbEventPublication.class);
mongoTemplate.findAndModify(query, update, MongoDbEventPublication.class, collection);
}
}

Expand All @@ -113,17 +121,21 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,
@Override
public void markCompleted(UUID identifier, Instant completionDate) {

var criateria = query(where(ID).is(identifier));
var criteria = query(where(ID).is(identifier));
var update = Update.update(COMPLETION_DATE, completionDate);

if (completionMode == CompletionMode.DELETE) {

mongoTemplate.remove(criateria, MongoDbEventPublication.class);
mongoTemplate.remove(criteria, MongoDbEventPublication.class, collection);

} else {
} else if (completionMode == CompletionMode.ARCHIVE) {

var update = Update.update(COMPLETION_DATE, completionDate);
mongoTemplate.findAndModify(criteria, update, MongoDbEventPublication.class, collection);
var completedEvent = mongoTemplate.findAndRemove(criteria, MongoDbEventPublication.class, collection);
mongoTemplate.save(completedEvent, archiveCollection);

mongoTemplate.findAndModify(criateria, update, MongoDbEventPublication.class);
} else {
mongoTemplate.findAndModify(criteria, update, MongoDbEventPublication.class, collection);
}
}

Expand Down Expand Up @@ -168,7 +180,7 @@ public Optional<TargetEventPublication> findIncompletePublicationsByEventAndTarg
*/
@Override
public List<TargetEventPublication> findCompletedPublications() {
return readMapped(defaultQuery(where(COMPLETION_DATE).ne(null)));
return readMapped(defaultQuery(where(COMPLETION_DATE).ne(null)), archiveCollection);
}

/*
Expand All @@ -177,7 +189,9 @@ public List<TargetEventPublication> findCompletedPublications() {
*/
@Override
public void deletePublications(List<UUID> identifiers) {
mongoTemplate.remove(query(where(ID).in(identifiers)), MongoDbEventPublication.class);

mongoTemplate.remove(query(where(ID).in(identifiers)), MongoDbEventPublication.class, collection);
mongoTemplate.remove(query(where(ID).in(identifiers)), MongoDbEventPublication.class, archiveCollection);
}

/*
Expand All @@ -186,7 +200,7 @@ public void deletePublications(List<UUID> identifiers) {
*/
@Override
public void deleteCompletedPublications() {
mongoTemplate.remove(query(where(COMPLETION_DATE).ne(null)), MongoDbEventPublication.class);
mongoTemplate.remove(query(where(COMPLETION_DATE).ne(null)), MongoDbEventPublication.class, archiveCollection);
}

/*
Expand All @@ -198,16 +212,22 @@ public void deleteCompletedPublicationsBefore(Instant instant) {

Assert.notNull(instant, "Instant must not be null!");

mongoTemplate.remove(query(where(COMPLETION_DATE).lt(instant)), MongoDbEventPublication.class);
mongoTemplate.remove(query(where(COMPLETION_DATE).lt(instant)), MongoDbEventPublication.class, archiveCollection);
}

private List<TargetEventPublication> readMapped(Query query) {
return readMapped(query, collection);
}

private List<TargetEventPublication> readMapped(Query query, String collection) {

return mongoTemplate.query(MongoDbEventPublication.class)
.inCollection(collection)
.matching(query)
.stream()
.map(MongoDbEventPublicationRepository::documentToDomain)
.toList();

}

private Query byEventAndListenerId(Object event, PublicationTargetIdentifier identifier) {
Expand Down
Loading

0 comments on commit a7e4a79

Please sign in to comment.