Skip to content

Commit

Permalink
Move take combinator to the very upstream
Browse files Browse the repository at this point in the history
So it cancells the source immediately when the
needed number of messages is reached.
  • Loading branch information
2m committed Feb 25, 2019
1 parent a3412dd commit a9faeda
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions amqp/src/test/java/docs/javadsl/AmqpDocsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ public void republishMessageWithoutAutoAckIfNacked() throws Exception {

final CompletionStage<List<ReadResult>> nackedResults =
amqpSource
.mapAsync(1, this::businessLogic)
.take(input.size())
.mapAsync(1, this::businessLogic)
.mapAsync(
1,
cm ->
Expand All @@ -329,21 +329,21 @@ public void republishMessageWithoutAutoAckIfNacked() throws Exception {

final CompletionStage<List<CommittableReadResult>> result2 =
amqpSource
.mapAsync(1, cm -> cm.ack().thenApply(unused -> cm))
.take(input.size())
.mapAsync(1, cm -> cm.ack().thenApply(unused -> cm))
.runWith(Sink.seq(), materializer);

assertEquals(
input,
result2
.toCompletableFuture()
.get(3, TimeUnit.SECONDS)
.get(10, TimeUnit.SECONDS)
.stream()
.map(m -> m.message().bytes().utf8String())
.collect(Collectors.toList()));

// See https://github.com/akka/akka/issues/26410
// extra wait before assertAllStagesStopped kicks in
Thread.sleep(3 * 1000);
Thread.sleep(6 * 1000);
}
}

0 comments on commit a9faeda

Please sign in to comment.