From 1705d89822ac5f7f8abbca187bcf87cbae9b008a Mon Sep 17 00:00:00 2001 From: Jean Helou Date: Fri, 20 Sep 2024 09:04:19 +0200 Subject: [PATCH] [JAMES-3696] improves flaky tests from ManageableMailQueueContract The pulsar implementation uses a distributed and therefore asynchronous distribution of the removal filters. This change ensures that the tests allow for such an implementation by waiting for the system under test to converge to the desired state. It also ensures that the executor waits as little as possible to get into a consistent state, ensuring fast local execution while allowing a longer wait on the CI. If the default timeout (10s if I understand correctly) is not enough for the CI, it can be safely increased without slowing down local test execution like Thread.sleep() did. --- .../DelayedManageableMailQueueContract.java | 10 +- .../api/ManageableMailQueueContract.java | 113 ++++++++++-------- .../queue/pulsar/PulsarMailQueueTest.java | 26 ++-- 3 files changed, 75 insertions(+), 74 deletions(-) diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java index 442c42af220..782b2078997 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/DelayedManageableMailQueueContract.java @@ -123,11 +123,11 @@ default void deletedDelayedMessagesShouldNotBeBrowseable() throws Exception { getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, RECIPIENT2.asString()); - awaitRemove(); - - assertThat(getManageableMailQueue().browse()).toIterable() - .extracting(mail -> mail.getMail().getName()) - .containsExactly("name2"); + Awaitility.await().untilAsserted(() -> + assertThat(getManageableMailQueue().browse()).toIterable() + .extracting(mail -> mail.getMail().getName()) + .containsExactly("name2") + ); } @Test diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java index cae74f4d775..c174fe6bc56 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java @@ -34,6 +34,7 @@ import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import jakarta.mail.MessagingException; @@ -42,12 +43,10 @@ import org.apache.james.core.MailAddress; import org.apache.james.core.builder.MimeMessageBuilder; -import org.apache.james.junit.categories.Unstable; import org.apache.mailet.Attribute; import org.apache.mailet.Mail; import org.apache.mailet.base.MailAddressFixture; import org.awaitility.Awaitility; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; @@ -62,10 +61,6 @@ public interface ManageableMailQueueContract extends MailQueueContract { - default void awaitRemove() { - - } - ManageableMailQueue getManageableMailQueue(); @Test @@ -459,24 +454,27 @@ default void removeShouldNotFailWhenBrowsingIterating() throws Exception { @Test default void browseShouldNotFailWhenConcurrentRemoveWhenIterating() throws Exception { - enQueue(defaultMail() - .name("name1") - .build()); - enQueue(defaultMail() - .name("name2") - .build()); - enQueue(defaultMail() - .name("name3") - .build()); + // We use a large number of emails so that the probability of the remove being propagated + // through the queue is high enough that the test is not flaky. + IntStream.range(0, 100).forEach( + Throwing.intConsumer(i -> + enQueue(defaultMail() + .name("name" + i) + .build()) + ).sneakyThrow() + ); ManageableMailQueue.MailQueueIterator items = getManageableMailQueue().browse(); - items.next(); + items.next();// we consume 1 here - getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name2"); + getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name98"); // we remove 1 here - awaitRemove(); - assertThatCode(() -> consumeIterator(items)).doesNotThrowAnyException(); + assertThatCode( + // the remove may or may not be applied to an already started iterator + // as long as it doesn't make the iterator crash + () -> assertThat(consumeIterator(items)).isGreaterThanOrEqualTo(98).isLessThan(100) + ).doesNotThrowAnyException(); } @Test @@ -551,13 +549,13 @@ default void removeByNameShouldRemoveSpecificEmail() throws Exception { getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name2"); - awaitRemove(); - - assertThat(getManageableMailQueue().browse()) - .toIterable() - .extracting(ManageableMailQueue.MailQueueItemView::getMail) - .extracting(Mail::getName) - .containsExactly("name1"); + Awaitility.await().untilAsserted(() -> + assertThat(getManageableMailQueue().browse()) + .toIterable() + .extracting(ManageableMailQueue.MailQueueItemView::getMail) + .extracting(Mail::getName) + .containsExactly("name1") + ); } @Test @@ -573,13 +571,13 @@ default void removeBySenderShouldRemoveSpecificEmail() throws Exception { getManageableMailQueue().remove(ManageableMailQueue.Type.Sender, OTHER_AT_LOCAL.asString()); - awaitRemove(); - - assertThat(getManageableMailQueue().browse()) - .toIterable() - .extracting(ManageableMailQueue.MailQueueItemView::getMail) - .extracting(Mail::getName) - .containsExactly("name2"); + Awaitility.await().untilAsserted(() -> + assertThat(getManageableMailQueue().browse()) + .toIterable() + .extracting(ManageableMailQueue.MailQueueItemView::getMail) + .extracting(Mail::getName) + .containsExactly("name2") + ); } @Test @@ -595,13 +593,13 @@ default void removeByRecipientShouldRemoveSpecificEmail() throws Exception { getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, RECIPIENT2.asString()); - awaitRemove(); - - assertThat(getManageableMailQueue().browse()) - .toIterable() - .extracting(ManageableMailQueue.MailQueueItemView::getMail) - .extracting(Mail::getName) - .containsExactly("name1"); + Awaitility.await().untilAsserted(() -> + assertThat(getManageableMailQueue().browse()) + .toIterable() + .extracting(ManageableMailQueue.MailQueueItemView::getMail) + .extracting(Mail::getName) + .containsExactly("name1") + ); } static Stream removeByRecipientShouldRemoveSpecificEmailWhenMultipleRecipients() throws AddressException { @@ -630,13 +628,13 @@ default void removeByRecipientShouldRemoveSpecificEmailWhenMultipleRecipients(Li getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, toRemove.asString()); - awaitRemove(); - - assertThat(getManageableMailQueue().browse()) - .toIterable() - .extracting(ManageableMailQueue.MailQueueItemView::getMail) - .extracting(Mail::getName) - .containsExactly("name2"); + Awaitility.await().untilAsserted(() -> + assertThat(getManageableMailQueue().browse()) + .toIterable() + .extracting(ManageableMailQueue.MailQueueItemView::getMail) + .extracting(Mail::getName) + .containsExactly("name2") + ); } @Test @@ -693,7 +691,13 @@ default void deletedElementsShouldNotBeDequeued() throws Exception { getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name1"); - awaitRemove(); + Awaitility.await().untilAsserted(() -> + assertThat(getManageableMailQueue().browse()) + .toIterable() + .extracting(ManageableMailQueue.MailQueueItemView::getMail) + .extracting(Mail::getName) + .doesNotContain("name1") + ); assertThat(Flux.from(getManageableMailQueue().deQueue()).blockFirst().getMail().getName()) .isEqualTo("name2"); @@ -709,7 +713,11 @@ default void removeShouldNotDeleteFutureEmails() throws MessagingException { getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, MailAddressFixture.RECIPIENT1.asString()); - awaitRemove(); + Awaitility.await().untilAsserted(() -> + assertThat(getManageableMailQueue().browse()) + .toIterable() + .isEmpty() + ); enQueue(defaultMail() .name("name2") @@ -729,7 +737,12 @@ default void removeShouldNotDeleteFutureEmailsFromBrowse() throws MessagingExcep getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, MailAddressFixture.RECIPIENT1.asString()); - awaitRemove(); + Awaitility.await().untilAsserted(() -> + assertThat(getManageableMailQueue().browse()) + .toIterable() + .isEmpty() + ); + enQueue(defaultMail() .name("name2") diff --git a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java index 135210a630a..c11d0bad5f4 100644 --- a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java +++ b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java @@ -41,7 +41,6 @@ import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.blob.memory.MemoryBlobStoreDAO; -import org.apache.james.junit.categories.Unstable; import org.apache.james.queue.api.DelayedMailQueueContract; import org.apache.james.queue.api.DelayedManageableMailQueueContract; import org.apache.james.queue.api.MailQueue; @@ -61,7 +60,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -71,7 +69,6 @@ import reactor.core.publisher.Flux; import scala.jdk.javaapi.OptionConverters; -@Tag(Unstable.TAG) @ExtendWith(DockerPulsarExtension.class) public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricContract, ManageableMailQueueContract, DelayedMailQueueContract, DelayedManageableMailQueueContract { @@ -110,15 +107,6 @@ void tearDown() throws Exception { pulsarClients.stop(); } - @Override - public void awaitRemove() { - try { - Thread.sleep(50); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - @Override public MailQueue getMailQueue() { return mailQueue; @@ -272,13 +260,13 @@ void removeShouldRemoveMailFromStoreWhenFilteredOut() throws Exception { //this won't delete the mail from the store until we try a dequeue getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name2"); - awaitRemove(); - - assertThat(getManageableMailQueue().browse()) - .toIterable() - .extracting(ManageableMailQueue.MailQueueItemView::getMail) - .extracting(Mail::getName) - .containsExactly("name1", "name3"); + Awaitility.await().untilAsserted(() -> + assertThat(getManageableMailQueue().browse()) + .toIterable() + .extracting(ManageableMailQueue.MailQueueItemView::getMail) + .extracting(Mail::getName) + .containsExactly("name1", "name3") + ); Flux.from(getMailQueue().deQueue()).take(2).doOnNext(Throwing.consumer(x -> x.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS))).blockLast(); Awaitility.await().untilAsserted(this::assertThatStoreIsEmpty);