Skip to content

Commit

Permalink
Revert "JAMES-2733 RabbitMQ should not dequeue deleted elements"
Browse files Browse the repository at this point in the history
This reverts commit fa3b6ec.
  • Loading branch information
Arsnael committed May 8, 2019
1 parent c49be34 commit 355a7b9
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import com.github.fge.lambdas.consumers.ThrowingConsumer;
import com.rabbitmq.client.Delivery;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
Expand Down Expand Up @@ -81,20 +80,8 @@ public void done(boolean success) {
.filter(getResponse -> getResponse.getBody() != null);
}

Flux<? extends MailQueue.MailQueueItem> deQueue() {
return flux.flatMap(this::loadItem)
.flatMap(this::filterIfDeleted);
}

private Mono<RabbitMQMailQueueItem> filterIfDeleted(RabbitMQMailQueueItem item) {
return mailQueueView.isPresent(item.getMail())
.flatMap(isPresent -> {
if (isPresent) {
return Mono.just(item);
}
item.done(true);
return Mono.empty();
});
Flux<MailQueue.MailQueueItem> deQueue() {
return flux.flatMap(this::loadItem);
}

private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ public void concurrentEnqueueDequeueShouldNotFail() {

}

@Disabled("JAMES-2733 Deleted elements are still dequeued")
@Test
@Override
public void deletedElementsShouldNotBeDequeued() {

}


private void enqueueSomeMails(Function<Integer, String> namePattern, int emailCount) {
IntStream.rangeClosed(1, emailCount)
.forEach(Throwing.intConsumer(i -> enQueue(defaultMail()
Expand Down

0 comments on commit 355a7b9

Please sign in to comment.