diff --git a/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java b/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java index dad127ed..10b096b0 100644 --- a/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java +++ b/spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java @@ -52,11 +52,13 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository { private static final String ID = "id"; private static final String LISTENER_ID = "listenerId"; private static final String PUBLICATION_DATE = "publicationDate"; - private static final Sort DEFAULT_SORT = Sort.by(PUBLICATION_DATE).ascending(); + static final String ARCHIVE_COLLECTION = "event_publication_archive"; + private final MongoTemplate mongoTemplate; private final CompletionMode completionMode; + private final String collection, archiveCollection; /** * Creates a new {@link MongoDbEventPublicationRepository} for the given {@link MongoTemplate}. @@ -71,6 +73,8 @@ public MongoDbEventPublicationRepository(MongoTemplate mongoTemplate, Completion this.mongoTemplate = mongoTemplate; this.completionMode = completionMode; + this.collection = "event_publication"; + this.archiveCollection = completionMode == CompletionMode.ARCHIVE ? ARCHIVE_COLLECTION : collection; } /* @@ -80,7 +84,7 @@ public MongoDbEventPublicationRepository(MongoTemplate mongoTemplate, Completion @Override public TargetEventPublication create(TargetEventPublication publication) { - mongoTemplate.save(domainToDocument(publication)); + mongoTemplate.save(domainToDocument(publication), collection); return publication; } @@ -93,16 +97,20 @@ public TargetEventPublication create(TargetEventPublication publication) { public void markCompleted(Object event, PublicationTargetIdentifier identifier, Instant completionDate) { var query = byEventAndListenerId(event, identifier); + var update = Update.update(COMPLETION_DATE, completionDate); if (completionMode == CompletionMode.DELETE) { - mongoTemplate.remove(query, MongoDbEventPublication.class); + mongoTemplate.remove(query, MongoDbEventPublication.class, collection); - } else { + } else if (completionMode == CompletionMode.ARCHIVE) { - var update = Update.update(COMPLETION_DATE, completionDate); + mongoTemplate.findAndModify(query, update, MongoDbEventPublication.class, collection); + var completedEvent = mongoTemplate.findAndRemove(query, MongoDbEventPublication.class, collection); + mongoTemplate.save(completedEvent, archiveCollection); + } else { - mongoTemplate.findAndModify(query, update, MongoDbEventPublication.class); + mongoTemplate.findAndModify(query, update, MongoDbEventPublication.class, collection); } } @@ -113,17 +121,21 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier, @Override public void markCompleted(UUID identifier, Instant completionDate) { - var criateria = query(where(ID).is(identifier)); + var criteria = query(where(ID).is(identifier)); + var update = Update.update(COMPLETION_DATE, completionDate); if (completionMode == CompletionMode.DELETE) { - mongoTemplate.remove(criateria, MongoDbEventPublication.class); + mongoTemplate.remove(criteria, MongoDbEventPublication.class, collection); - } else { + } else if (completionMode == CompletionMode.ARCHIVE) { - var update = Update.update(COMPLETION_DATE, completionDate); + mongoTemplate.findAndModify(criteria, update, MongoDbEventPublication.class, collection); + var completedEvent = mongoTemplate.findAndRemove(criteria, MongoDbEventPublication.class, collection); + mongoTemplate.save(completedEvent, archiveCollection); - mongoTemplate.findAndModify(criateria, update, MongoDbEventPublication.class); + } else { + mongoTemplate.findAndModify(criteria, update, MongoDbEventPublication.class, collection); } } @@ -168,7 +180,7 @@ public Optional findIncompletePublicationsByEventAndTarg */ @Override public List findCompletedPublications() { - return readMapped(defaultQuery(where(COMPLETION_DATE).ne(null))); + return readMapped(defaultQuery(where(COMPLETION_DATE).ne(null)), archiveCollection); } /* @@ -177,7 +189,9 @@ public List findCompletedPublications() { */ @Override public void deletePublications(List identifiers) { - mongoTemplate.remove(query(where(ID).in(identifiers)), MongoDbEventPublication.class); + + mongoTemplate.remove(query(where(ID).in(identifiers)), MongoDbEventPublication.class, collection); + mongoTemplate.remove(query(where(ID).in(identifiers)), MongoDbEventPublication.class, archiveCollection); } /* @@ -186,7 +200,7 @@ public void deletePublications(List identifiers) { */ @Override public void deleteCompletedPublications() { - mongoTemplate.remove(query(where(COMPLETION_DATE).ne(null)), MongoDbEventPublication.class); + mongoTemplate.remove(query(where(COMPLETION_DATE).ne(null)), MongoDbEventPublication.class, archiveCollection); } /* @@ -198,16 +212,22 @@ public void deleteCompletedPublicationsBefore(Instant instant) { Assert.notNull(instant, "Instant must not be null!"); - mongoTemplate.remove(query(where(COMPLETION_DATE).lt(instant)), MongoDbEventPublication.class); + mongoTemplate.remove(query(where(COMPLETION_DATE).lt(instant)), MongoDbEventPublication.class, archiveCollection); } private List readMapped(Query query) { + return readMapped(query, collection); + } + + private List readMapped(Query query, String collection) { return mongoTemplate.query(MongoDbEventPublication.class) + .inCollection(collection) .matching(query) .stream() .map(MongoDbEventPublicationRepository::documentToDomain) .toList(); + } private Query byEventAndListenerId(Object event, PublicationTargetIdentifier identifier) { diff --git a/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java b/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java index 28c87d22..4f61a0d2 100644 --- a/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java +++ b/spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java @@ -46,154 +46,146 @@ * @author Dmitry Belyaev * @author Oliver Drotbohm */ -@DataMongoTest -@ContextConfiguration(classes = TestApplication.class) class MongoDbEventPublicationRepositoryTest { private static final PublicationTargetIdentifier TARGET_IDENTIFIER = PublicationTargetIdentifier.of("listener"); - @Autowired MongoTemplate mongoTemplate; - @Autowired Environment environment; + @DataMongoTest + @ContextConfiguration(classes = TestApplication.class) + static abstract class TestBase { - MongoDbEventPublicationRepository repository; - CompletionMode completionMode; + @Autowired MongoTemplate mongoTemplate; + @Autowired Environment environment; - @BeforeEach - void setUp() { - this.completionMode = CompletionMode.from(environment); - this.repository = new MongoDbEventPublicationRepository(mongoTemplate, completionMode); - } + MongoDbEventPublicationRepository repository; + CompletionMode completionMode; + String archiveCollection = MongoDbEventPublicationRepository.ARCHIVE_COLLECTION; - @AfterEach - void tearDown() { - mongoTemplate.remove(MongoDbEventPublication.class).all(); - } + @BeforeEach + void setUp() { + this.completionMode = CompletionMode.from(environment); + this.repository = new MongoDbEventPublicationRepository(mongoTemplate, completionMode); + } - @Test // GH-4 - void shouldPersistAndUpdateEventPublication() { + @AfterEach + void tearDown() { + mongoTemplate.remove(MongoDbEventPublication.class).all(); + mongoTemplate.remove(MongoDbEventPublication.class).inCollection(archiveCollection).all(); + } - var publication = createPublication(new TestEvent("abc")); + @Test // GH-4 + void shouldPersistAndUpdateEventPublication() { - var eventPublications = repository.findIncompletePublications(); + var publication = createPublication(new TestEvent("abc")); - assertThat(eventPublications).hasSize(1); - assertThat(eventPublications.get(0).getEvent()).isEqualTo(publication.getEvent()); - assertThat(eventPublications.get(0).getTargetIdentifier()).isEqualTo(publication.getTargetIdentifier()); + var eventPublications = repository.findIncompletePublications(); - assertThat(repository.findIncompletePublicationsByEventAndTargetIdentifier(new TestEvent("abc"), TARGET_IDENTIFIER)) - .isPresent(); + assertThat(eventPublications).hasSize(1); + assertThat(eventPublications.get(0).getEvent()).isEqualTo(publication.getEvent()); + assertThat(eventPublications.get(0).getTargetIdentifier()).isEqualTo(publication.getTargetIdentifier()); - // Complete publication - repository.markCompleted(publication, Instant.now()); + assertThat(repository.findIncompletePublicationsByEventAndTargetIdentifier(new TestEvent("abc"), TARGET_IDENTIFIER)) + .isPresent(); - assertThat(repository.findIncompletePublications()).isEmpty(); - } + // Complete publication + repository.markCompleted(publication, Instant.now()); - @Test // GH-4 - void shouldUpdateSingleEventPublication() { + assertThat(repository.findIncompletePublications()).isEmpty(); + } - var first = createPublication(new TestEvent("id1")); - var second = createPublication(new TestEvent("id2")); + @Test // GH-4 + void shouldUpdateSingleEventPublication() { - repository.markCompleted(second, Instant.now()); + var first = createPublication(new TestEvent("id1")); + var second = createPublication(new TestEvent("id2")); - assertThat(repository.findIncompletePublications()).hasSize(1) - .element(0) - .extracting(TargetEventPublication::getEvent).isEqualTo(first.getEvent()); - } + repository.markCompleted(second, Instant.now()); - @Test // GH-133 - void returnsOldestIncompletePublicationsFirst() { + assertThat(repository.findIncompletePublications()).hasSize(1) + .element(0) + .extracting(TargetEventPublication::getEvent).isEqualTo(first.getEvent()); + } - var now = LocalDateTime.now(); + @Test // GH-133 + void returnsOldestIncompletePublicationsFirst() { - savePublicationAt(now.withHour(3)); - savePublicationAt(now.withHour(0)); - savePublicationAt(now.withHour(1)); + var now = LocalDateTime.now(); - assertThat(repository.findIncompletePublications()) - .isSortedAccordingTo(Comparator.comparing(TargetEventPublication::getPublicationDate)); - } + savePublicationAt(now.withHour(3)); + savePublicationAt(now.withHour(0)); + savePublicationAt(now.withHour(1)); - @Test // GH-294 - void findsPublicationsOlderThanReference() throws Exception { + assertThat(repository.findIncompletePublications()) + .isSortedAccordingTo(Comparator.comparing(TargetEventPublication::getPublicationDate)); + } - var first = createPublication(new TestEvent("first")); + @Test // GH-294 + void findsPublicationsOlderThanReference() throws Exception { - Thread.sleep(100); + var first = createPublication(new TestEvent("first")); - var now = Instant.now(); - var second = createPublication(new TestEvent("second")); + Thread.sleep(100); - assertThat(repository.findIncompletePublications()) - .extracting(TargetEventPublication::getIdentifier) - .containsExactly(first.getIdentifier(), second.getIdentifier()); + var now = Instant.now(); + var second = createPublication(new TestEvent("second")); - assertThat(repository.findIncompletePublicationsPublishedBefore(now)) - .hasSize(1) - .element(0).extracting(TargetEventPublication::getIdentifier).isEqualTo(first.getIdentifier()); - } + assertThat(repository.findIncompletePublications()) + .extracting(TargetEventPublication::getIdentifier) + .containsExactly(first.getIdentifier(), second.getIdentifier()); - @Test // GH-451 - void findsCompletedPublications() { + assertThat(repository.findIncompletePublicationsPublishedBefore(now)) + .hasSize(1) + .element(0).extracting(TargetEventPublication::getIdentifier).isEqualTo(first.getIdentifier()); + } - var event = new TestEvent("first"); - var publication = createPublication(event); + @Test // GH-451 + void findsCompletedPublications() { - repository.markCompleted(publication, Instant.now()); + var event = new TestEvent("first"); + var publication = createPublication(event); - if (completionMode == CompletionMode.DELETE) { + repository.markCompleted(publication, Instant.now()); - assertThat(repository.findCompletedPublications()).isEmpty(); + if (completionMode == CompletionMode.DELETE) { - } else { + assertThat(repository.findCompletedPublications()).isEmpty(); - assertThat(repository.findCompletedPublications()) - .hasSize(1) - .element(0) - .extracting(TargetEventPublication::getEvent) - .isEqualTo(event); - } + } else { - } + assertThat(repository.findCompletedPublications()) + .hasSize(1) + .element(0) + .extracting(TargetEventPublication::getEvent) + .isEqualTo(event); + } - @Test // GH-258 - void marksPublicationAsCompletedById() { + } - var event = new TestEvent("first"); - var publication = createPublication(event); + @Test // GH-258 + void marksPublicationAsCompletedById() { - repository.markCompleted(publication.getIdentifier(), Instant.now()); + var event = new TestEvent("first"); + var publication = createPublication(event); - if (completionMode == CompletionMode.DELETE) { + repository.markCompleted(publication.getIdentifier(), Instant.now()); - assertThat(repository.findCompletedPublications()).isEmpty(); assertThat(repository.findIncompletePublications()).isEmpty(); - } else { - - assertThat(repository.findCompletedPublications()) - .extracting(TargetEventPublication::getIdentifier) - .containsExactly(publication.getIdentifier()); - } - } - - private TargetEventPublication createPublication(Object event) { - return createPublication(event, TARGET_IDENTIFIER); - } + if (completionMode == CompletionMode.DELETE) { - private TargetEventPublication createPublication(Object event, PublicationTargetIdentifier id) { - return repository.create(TargetEventPublication.of(event, id)); - } + assertThat(repository.findCompletedPublications()).isEmpty(); - private void savePublicationAt(LocalDateTime date) { + } else { - mongoTemplate.save( - new MongoDbEventPublication(UUID.randomUUID(), date.toInstant(ZoneOffset.UTC), "", "", null)); - } + assertThat(repository.findCompletedPublications()) + .extracting(TargetEventPublication::getIdentifier) + .containsExactly(publication.getIdentifier()); + } - @Nested - class FindByEventAndTargetIdentifier { + if (completionMode == CompletionMode.ARCHIVE) { + assertThat(mongoTemplate.findAll(MongoDbEventPublication.class, archiveCollection)).isNotEmpty(); + } + } @Test // GH-4 void shouldFindEventPublicationByEventAndTargetIdentifier() { @@ -203,7 +195,7 @@ void shouldFindEventPublicationByEventAndTargetIdentifier() { var firstEvent = first.getEvent(); - createPublication(firstEvent, PublicationTargetIdentifier.of("somethingDifferen")); + createPublication(firstEvent, PublicationTargetIdentifier.of("somethingDifferent")); var actual = repository.findIncompletePublicationsByEventAndTargetIdentifier(firstEvent, TARGET_IDENTIFIER); @@ -250,10 +242,6 @@ void shouldReturnTheOldestEventTest() throws InterruptedException { assertThat(it.getPublicationDate()) // .isCloseTo(publication.getPublicationDate(), within(1, ChronoUnit.MILLIS))); } - } - - @Nested - class DeleteCompletedPublications { @Test // GH-20 void shouldDeleteCompletedEvents() { @@ -304,11 +292,32 @@ void deletesPublicationsByIdentifier() { .matches(it -> it.getIdentifier().equals(second.getIdentifier())) .matches(it -> it.getEvent().equals(second.getEvent())); } + + private TargetEventPublication createPublication(Object event) { + return createPublication(event, TARGET_IDENTIFIER); + } + + private TargetEventPublication createPublication(Object event, PublicationTargetIdentifier id) { + return repository.create(TargetEventPublication.of(event, id)); + } + + private void savePublicationAt(LocalDateTime date) { + + mongoTemplate.save( + new MongoDbEventPublication(UUID.randomUUID(), date.toInstant(ZoneOffset.UTC), "", "", null)); + } } + @Nested + class WithUpdateCompletionTest extends TestBase {} + @Nested @TestPropertySource(properties = CompletionMode.PROPERTY + "=DELETE") - static class WithDeleteCompletionTest extends MongoDbEventPublicationRepositoryTest {} + class WithDeleteCompletionTest extends TestBase {} + + @Nested + @TestPropertySource(properties = CompletionMode.PROPERTY + "=ARCHIVE") + class WithArchiveCompletionTest extends TestBase {} private record TestEvent(String eventId) {} }