Skip to content

Commit

Permalink
[JAMES-3696] improves flaky tests from ManageableMailQueueContract
Browse files Browse the repository at this point in the history
The pulsar implementation uses a distributed and therefore asynchronous distribution of the removal filters.

This change ensures that the tests allow for such an implementation by waiting for the system under test to converge to the desired state.

It also ensures that the executor waits as little as possible to get into a consistent state, ensuring fast local execution while allowing a longer wait on the CI. If the default timeout (10s if I understand correctly) is not enough for the CI, it can be safely increased without slowing down local test execution like Thread.sleep() did.
  • Loading branch information
jeantil committed Sep 25, 2024
1 parent f2a56fc commit 1705d89
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ default void deletedDelayedMessagesShouldNotBeBrowseable() throws Exception {

getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, RECIPIENT2.asString());

awaitRemove();

assertThat(getManageableMailQueue().browse()).toIterable()
.extracting(mail -> mail.getMail().getName())
.containsExactly("name2");
Awaitility.await().untilAsserted(() ->
assertThat(getManageableMailQueue().browse()).toIterable()
.extracting(mail -> mail.getMail().getName())
.containsExactly("name2")
);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import jakarta.mail.MessagingException;
Expand All @@ -42,12 +43,10 @@

import org.apache.james.core.MailAddress;
import org.apache.james.core.builder.MimeMessageBuilder;
import org.apache.james.junit.categories.Unstable;
import org.apache.mailet.Attribute;
import org.apache.mailet.Mail;
import org.apache.mailet.base.MailAddressFixture;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -62,10 +61,6 @@

public interface ManageableMailQueueContract extends MailQueueContract {

default void awaitRemove() {

}

ManageableMailQueue getManageableMailQueue();

@Test
Expand Down Expand Up @@ -459,24 +454,27 @@ default void removeShouldNotFailWhenBrowsingIterating() throws Exception {

@Test
default void browseShouldNotFailWhenConcurrentRemoveWhenIterating() throws Exception {
enQueue(defaultMail()
.name("name1")
.build());
enQueue(defaultMail()
.name("name2")
.build());
enQueue(defaultMail()
.name("name3")
.build());
// We use a large number of emails so that the probability of the remove being propagated
// through the queue is high enough that the test is not flaky.
IntStream.range(0, 100).forEach(
Throwing.intConsumer(i ->
enQueue(defaultMail()
.name("name" + i)
.build())
).sneakyThrow()
);

ManageableMailQueue.MailQueueIterator items = getManageableMailQueue().browse();
items.next();
items.next();// we consume 1 here

getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name2");
getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name98"); // we remove 1 here

awaitRemove();

assertThatCode(() -> consumeIterator(items)).doesNotThrowAnyException();
assertThatCode(
// the remove may or may not be applied to an already started iterator
// as long as it doesn't make the iterator crash
() -> assertThat(consumeIterator(items)).isGreaterThanOrEqualTo(98).isLessThan(100)
).doesNotThrowAnyException();
}

@Test
Expand Down Expand Up @@ -551,13 +549,13 @@ default void removeByNameShouldRemoveSpecificEmail() throws Exception {

getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name2");

awaitRemove();

assertThat(getManageableMailQueue().browse())
.toIterable()
.extracting(ManageableMailQueue.MailQueueItemView::getMail)
.extracting(Mail::getName)
.containsExactly("name1");
Awaitility.await().untilAsserted(() ->
assertThat(getManageableMailQueue().browse())
.toIterable()
.extracting(ManageableMailQueue.MailQueueItemView::getMail)
.extracting(Mail::getName)
.containsExactly("name1")
);
}

@Test
Expand All @@ -573,13 +571,13 @@ default void removeBySenderShouldRemoveSpecificEmail() throws Exception {

getManageableMailQueue().remove(ManageableMailQueue.Type.Sender, OTHER_AT_LOCAL.asString());

awaitRemove();

assertThat(getManageableMailQueue().browse())
.toIterable()
.extracting(ManageableMailQueue.MailQueueItemView::getMail)
.extracting(Mail::getName)
.containsExactly("name2");
Awaitility.await().untilAsserted(() ->
assertThat(getManageableMailQueue().browse())
.toIterable()
.extracting(ManageableMailQueue.MailQueueItemView::getMail)
.extracting(Mail::getName)
.containsExactly("name2")
);
}

@Test
Expand All @@ -595,13 +593,13 @@ default void removeByRecipientShouldRemoveSpecificEmail() throws Exception {

getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, RECIPIENT2.asString());

awaitRemove();

assertThat(getManageableMailQueue().browse())
.toIterable()
.extracting(ManageableMailQueue.MailQueueItemView::getMail)
.extracting(Mail::getName)
.containsExactly("name1");
Awaitility.await().untilAsserted(() ->
assertThat(getManageableMailQueue().browse())
.toIterable()
.extracting(ManageableMailQueue.MailQueueItemView::getMail)
.extracting(Mail::getName)
.containsExactly("name1")
);
}

static Stream<Arguments> removeByRecipientShouldRemoveSpecificEmailWhenMultipleRecipients() throws AddressException {
Expand Down Expand Up @@ -630,13 +628,13 @@ default void removeByRecipientShouldRemoveSpecificEmailWhenMultipleRecipients(Li

getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, toRemove.asString());

awaitRemove();

assertThat(getManageableMailQueue().browse())
.toIterable()
.extracting(ManageableMailQueue.MailQueueItemView::getMail)
.extracting(Mail::getName)
.containsExactly("name2");
Awaitility.await().untilAsserted(() ->
assertThat(getManageableMailQueue().browse())
.toIterable()
.extracting(ManageableMailQueue.MailQueueItemView::getMail)
.extracting(Mail::getName)
.containsExactly("name2")
);
}

@Test
Expand Down Expand Up @@ -693,7 +691,13 @@ default void deletedElementsShouldNotBeDequeued() throws Exception {

getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name1");

awaitRemove();
Awaitility.await().untilAsserted(() ->
assertThat(getManageableMailQueue().browse())
.toIterable()
.extracting(ManageableMailQueue.MailQueueItemView::getMail)
.extracting(Mail::getName)
.doesNotContain("name1")
);

assertThat(Flux.from(getManageableMailQueue().deQueue()).blockFirst().getMail().getName())
.isEqualTo("name2");
Expand All @@ -709,7 +713,11 @@ default void removeShouldNotDeleteFutureEmails() throws MessagingException {

getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, MailAddressFixture.RECIPIENT1.asString());

awaitRemove();
Awaitility.await().untilAsserted(() ->
assertThat(getManageableMailQueue().browse())
.toIterable()
.isEmpty()
);

enQueue(defaultMail()
.name("name2")
Expand All @@ -729,7 +737,12 @@ default void removeShouldNotDeleteFutureEmailsFromBrowse() throws MessagingExcep

getManageableMailQueue().remove(ManageableMailQueue.Type.Recipient, MailAddressFixture.RECIPIENT1.asString());

awaitRemove();
Awaitility.await().untilAsserted(() ->
assertThat(getManageableMailQueue().browse())
.toIterable()
.isEmpty()
);


enQueue(defaultMail()
.name("name2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.james.blob.mail.MimeMessagePartsId;
import org.apache.james.blob.mail.MimeMessageStore;
import org.apache.james.blob.memory.MemoryBlobStoreDAO;
import org.apache.james.junit.categories.Unstable;
import org.apache.james.queue.api.DelayedMailQueueContract;
import org.apache.james.queue.api.DelayedManageableMailQueueContract;
import org.apache.james.queue.api.MailQueue;
Expand All @@ -61,7 +60,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

Expand All @@ -71,7 +69,6 @@
import reactor.core.publisher.Flux;
import scala.jdk.javaapi.OptionConverters;

@Tag(Unstable.TAG)
@ExtendWith(DockerPulsarExtension.class)
public class PulsarMailQueueTest implements MailQueueContract, MailQueueMetricContract, ManageableMailQueueContract, DelayedMailQueueContract, DelayedManageableMailQueueContract {

Expand Down Expand Up @@ -110,15 +107,6 @@ void tearDown() throws Exception {
pulsarClients.stop();
}

@Override
public void awaitRemove() {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public MailQueue getMailQueue() {
return mailQueue;
Expand Down Expand Up @@ -272,13 +260,13 @@ void removeShouldRemoveMailFromStoreWhenFilteredOut() throws Exception {
//this won't delete the mail from the store until we try a dequeue
getManageableMailQueue().remove(ManageableMailQueue.Type.Name, "name2");

awaitRemove();

assertThat(getManageableMailQueue().browse())
.toIterable()
.extracting(ManageableMailQueue.MailQueueItemView::getMail)
.extracting(Mail::getName)
.containsExactly("name1", "name3");
Awaitility.await().untilAsserted(() ->
assertThat(getManageableMailQueue().browse())
.toIterable()
.extracting(ManageableMailQueue.MailQueueItemView::getMail)
.extracting(Mail::getName)
.containsExactly("name1", "name3")
);

Flux.from(getMailQueue().deQueue()).take(2).doOnNext(Throwing.consumer(x -> x.done(MailQueue.MailQueueItem.CompletionStatus.SUCCESS))).blockLast();
Awaitility.await().untilAsserted(this::assertThatStoreIsEmpty);
Expand Down

0 comments on commit 1705d89

Please sign in to comment.