From b1a4600376b0f7f40626d4f3f2aae3dbfed5d749 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Fri, 22 Feb 2019 10:18:39 +0100 Subject: [PATCH 1/8] Larger snippets with more types --- .../test/java/docs/javadsl/AmqpDocsTest.java | 55 +++++++------ .../scala/docs/scaladsl/AmqpDocsSpec.scala | 82 ++++++++++--------- docs/src/main/paradox/amqp.md | 42 ++-------- 3 files changed, 82 insertions(+), 97 deletions(-) diff --git a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java index e14d112ec6..9329aed119 100644 --- a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java +++ b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java @@ -83,7 +83,12 @@ public void publishAndConsume() throws Exception { AmqpSinkSettings.create(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration)); + + final List input = Arrays.asList("one", "two", "three", "four", "five"); + CompletionStage writing = + Source.from(input).map(ByteString::fromString).runWith(amqpSink, materializer); // #create-sink + writing.toCompletableFuture().get(3, TimeUnit.SECONDS); // #create-source final Integer bufferSize = 10; @@ -92,17 +97,10 @@ public void publishAndConsume() throws Exception { NamedQueueSourceSettings.create(connectionProvider, queueName) .withDeclaration(queueDeclaration), bufferSize); - // #create-source - // #run-sink - final List input = Arrays.asList("one", "two", "three", "four", "five"); - Source.from(input).map(ByteString::fromString).runWith(amqpSink, materializer); - // #run-sink - - // #run-source final CompletionStage> result = amqpSource.take(input.size()).runWith(Sink.seq(), materializer); - // #run-source + // #create-source assertEquals( input, @@ -120,15 +118,6 @@ public void publishAndConsumeRpc() throws Exception { final String queueName = "amqp-conn-it-test-rpc-queue-" + System.currentTimeMillis(); final QueueDeclaration queueDeclaration = QueueDeclaration.create(queueName); - // #create-rpc-flow - final Flow> ampqRpcFlow = - AmqpRpcFlow.createSimple( - AmqpSinkSettings.create(connectionProvider) - .withRoutingKey(queueName) - .withDeclaration(queueDeclaration), - 1); - // #create-rpc-flow - final Integer bufferSize = 10; final Source amqpSource = AmqpSource.atMostOnceSource( @@ -137,14 +126,22 @@ public void publishAndConsumeRpc() throws Exception { bufferSize); final List input = Arrays.asList("one", "two", "three", "four", "five"); - // #run-rpc-flow + + // #create-rpc-flow + final Flow> ampqRpcFlow = + AmqpRpcFlow.createSimple( + AmqpSinkSettings.create(connectionProvider) + .withRoutingKey(queueName) + .withDeclaration(queueDeclaration), + 1); + Pair, TestSubscriber.Probe> result = Source.from(input) .map(ByteString::fromString) .viaMat(ampqRpcFlow, Keep.right()) .toMat(TestSink.probe(system), Keep.both()) .run(materializer); - // #run-rpc-flow + // #create-rpc-flow result.first().toCompletableFuture().get(3, TimeUnit.SECONDS); Sink> amqpSink = @@ -245,6 +242,11 @@ public void publishFanoutAndConsume() throws Exception { repeatingFlow.shutdown(); } + private CompletionStage businessLogic( + CommittableIncomingMessage msg) { + return CompletableFuture.completedFuture(msg); + } + @Test public void publishAndConsumeWithoutAutoAck() throws Exception { final String queueName = "amqp-conn-it-test-no-auto-ack-" + System.currentTimeMillis(); @@ -256,6 +258,9 @@ public void publishAndConsumeWithoutAutoAck() throws Exception { .withRoutingKey(queueName) .withDeclaration(queueDeclaration)); + final List input = Arrays.asList("one", "two", "three", "four", "five"); + Source.from(input).map(ByteString::fromString).runWith(amqpSink, materializer); + // #create-source-withoutautoack final Integer bufferSize = 10; final Source amqpSource = @@ -263,18 +268,14 @@ public void publishAndConsumeWithoutAutoAck() throws Exception { NamedQueueSourceSettings.create(connectionProvider, queueName) .withDeclaration(queueDeclaration), bufferSize); - // #create-source-withoutautoack - final List input = Arrays.asList("one", "two", "three", "four", "five"); - Source.from(input).map(ByteString::fromString).runWith(amqpSink, materializer); - - // #run-source-withoutautoack final CompletionStage> result = amqpSource + .mapAsync(1, this::businessLogic) .mapAsync(1, cm -> cm.ack(false).thenApply(unused -> cm.message())) .take(input.size()) .runWith(Sink.seq(), materializer); - // #run-source-withoutautoack + // #create-source-withoutautoack assertEquals( input, @@ -314,8 +315,10 @@ public void republishMessageWithoutAutoAckIfNacked() throws Exception { // #run-source-withoutautoack-and-nack final CompletionStage> result1 = amqpSource + .mapAsync(1, this::businessLogic) .take(input.size()) - .mapAsync(1, cm -> cm.nack(false, true).thenApply(unused -> cm)) + .mapAsync( + 1, cm -> cm.nack(/* multiple */ false, /* requeue */ true).thenApply(unused -> cm)) .runWith(Sink.seq(), materializer); // #run-source-withoutautoack-and-nack diff --git a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala index 7d907049a7..2af6b9500f 100644 --- a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala +++ b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala @@ -4,17 +4,18 @@ package docs.scaladsl -import akka.Done +import akka.{Done, NotUsed} import akka.stream.KillSwitches import akka.stream.alpakka.amqp._ -import akka.stream.alpakka.amqp.scaladsl.{AmqpRpcFlow, AmqpSink, AmqpSource} +import akka.stream.alpakka.amqp.scaladsl.{AmqpRpcFlow, AmqpSink, AmqpSource, CommittableIncomingMessage} import akka.stream.scaladsl.{Keep, Sink, Source} +import akka.stream.testkit.TestSubscriber import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import akka.stream.testkit.scaladsl.TestSink import akka.util.ByteString import scala.concurrent.duration._ -import scala.concurrent.{Await, Promise} +import scala.concurrent.{Await, Future, Promise} import scala.collection.immutable /** @@ -41,29 +42,34 @@ class AmqpDocsSpec extends AmqpSpec { //#queue-declaration //#create-sink - val amqpSink = AmqpSink.simple( - AmqpSinkSettings(connectionProvider) - .withRoutingKey(queueName) - .withDeclaration(queueDeclaration) - ) + val amqpSink: Sink[ByteString, Future[Done]] = + AmqpSink.simple( + AmqpSinkSettings(connectionProvider) + .withRoutingKey(queueName) + .withDeclaration(queueDeclaration) + ) + + val input = Vector("one", "two", "three", "four", "five") + val writing: Future[Done] = + Source(input) + .map(s => ByteString(s)) + .runWith(amqpSink) //#create-sink + writing.futureValue shouldEqual Done //#create-source - val amqpSource = AmqpSource.atMostOnceSource( - NamedQueueSourceSettings(connectionProvider, queueName).withDeclaration(queueDeclaration), - bufferSize = 10 - ) + val amqpSource: Source[IncomingMessage, NotUsed] = + AmqpSource.atMostOnceSource( + NamedQueueSourceSettings(connectionProvider, queueName).withDeclaration(queueDeclaration), + bufferSize = 10 + ) + + val result: Future[immutable.Seq[IncomingMessage]] = + amqpSource + .take(input.size) + .runWith(Sink.seq) //#create-source - //#run-sink - val input = Vector("one", "two", "three", "four", "five") - Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue shouldEqual Done - //#run-sink - - //#run-source - val result = amqpSource.take(input.size).runWith(Sink.seq) - //#run-source - result.futureValue.map(_.bytes.utf8String) shouldEqual input } @@ -72,25 +78,24 @@ class AmqpDocsSpec extends AmqpSpec { val queueName = "amqp-conn-it-spec-rpc-queue-" + System.currentTimeMillis() val queueDeclaration = QueueDeclaration(queueName) - //#create-rpc-flow - val amqpRpcFlow = AmqpRpcFlow.simple( - AmqpSinkSettings(connectionProvider).withRoutingKey(queueName).withDeclaration(queueDeclaration) - ) - //#create-rpc-flow - val amqpSource = AmqpSource.atMostOnceSource( NamedQueueSourceSettings(connectionProvider, queueName), bufferSize = 1 ) val input = Vector("one", "two", "three", "four", "five") - //#run-rpc-flow - val (rpcQueueF, probe) = Source(input) + + //#create-rpc-flow + val amqpRpcFlow = AmqpRpcFlow.simple( + AmqpSinkSettings(connectionProvider).withRoutingKey(queueName).withDeclaration(queueDeclaration) + ) + + val (rpcQueueF: Future[String], probe: TestSubscriber.Probe[ByteString]) = Source(input) .map(s => ByteString(s)) .viaMat(amqpRpcFlow)(Keep.right) .toMat(TestSink.probe)(Keep.both) .run - //#run-rpc-flow + //#create-rpc-flow rpcQueueF.futureValue val amqpSink = AmqpSink.replyTo( @@ -180,22 +185,23 @@ class AmqpDocsSpec extends AmqpSpec { .withDeclaration(queueDeclaration) ) + val input = Vector("one", "two", "three", "four", "five") + Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue shouldEqual Done + + val businessLogic: CommittableIncomingMessage => Future[CommittableIncomingMessage] = Future.successful(_) + //#create-source-withoutautoack val amqpSource = AmqpSource.committableSource( NamedQueueSourceSettings(connectionProvider, queueName).withDeclaration(queueDeclaration), bufferSize = 10 ) - //#create-source-withoutautoack - val input = Vector("one", "two", "three", "four", "five") - Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue shouldEqual Done - - //#run-source-withoutautoack - val result = amqpSource + val result: Future[immutable.Seq[CommittableIncomingMessage]] = amqpSource + .mapAsync(1)(businessLogic) .mapAsync(1)(cm => cm.ack().map(_ => cm)) .take(input.size) .runWith(Sink.seq) - //#run-source-withoutautoack + //#create-source-withoutautoack result.futureValue.map(_.message.bytes.utf8String) shouldEqual input } @@ -222,7 +228,7 @@ class AmqpDocsSpec extends AmqpSpec { //#run-source-withoutautoack-and-nack val result1 = amqpSource .take(input.size) - .mapAsync(1)(cm => cm.nack().map(_ => cm)) + .mapAsync(1)(cm => cm.nack(multiple = false, requeue = true).map(_ => cm)) .runWith(Sink.seq) //#run-source-withoutautoack-and-nack diff --git a/docs/src/main/paradox/amqp.md b/docs/src/main/paradox/amqp.md index 86c88cc945..a67d7e415e 100644 --- a/docs/src/main/paradox/amqp.md +++ b/docs/src/main/paradox/amqp.md @@ -50,21 +50,16 @@ Here we used @scaladoc[QueueDeclaration](akka.stream.alpakka.amqp.QueueDeclarati Create a sink, that accepts and forwards @scaladoc[ByteString](akka.util.ByteString)s to the AMQP server. -Scala -: @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #create-sink } - -Java -: @@snip [snip](/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java) { #create-sink } - @scaladoc[AmqpSink](akka.stream.alpakka.amqp.AmqpSink$) is a collection of factory methods that facilitates creation of sinks. Here we created a *simple* sink, which means that we are able to pass `ByteString`s to the sink instead of wrapping data into @scaladoc[OutgoingMessage](akka.stream.alpakka.amqp.OutgoingMessage)s. Last step is to @extref[materialize](akka-docs:scala/stream/stream-flows-and-basics) and run the sink we have created. Scala -: @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #run-sink } +: @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #create-sink } Java -: @@snip [snip](/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java) { #run-sink } +: @@snip [snip](/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java) { #create-sink } + ## Receiving messages @@ -76,21 +71,16 @@ The Alpakka AMQP API is likely to change a bit in future releases, as discussed Create a source using the same queue declaration as before. -Scala -: @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #create-source } - -Java -: @@snip [snip](/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java) { #create-source } - The `bufferSize` parameter controls the maximum number of messages to prefetch from the AMQP server. Run the source and take the same amount of messages as we previously sent to it. + Scala -: @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #run-source } +: @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #create-source } Java -: @@snip [snip](/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java) { #run-source } +: @@snip [snip](/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java) { #create-source } This is how you send and receive message from AMQP server using this connector. @@ -135,32 +125,18 @@ Java : @@snip [snip](/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java) { #create-rpc-flow } -Scala -: @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #run-rpc-flow } - -Java -: @@snip [snip](/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java) { #run-rpc-flow } - - ## Acknowledging messages downstream -Create a committable sink which returns - -Scala -: @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #create-source-withoutautoack } - -Java -: @@snip [snip](/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java) { #create-source-withoutautoack } - Committable sources return @scaladoc[CommittableIncomingMessage](akka.stream.alpakka.amqp.CommittableIncomingMessage) which wraps the @scaladoc[IncomingMessage](akka.stream.alpakka.amqp.IncomingMessage) and exposes the methods ack and nack. Use ack to acknowledge the message back to RabbitMQ. Ack takes an optional boolean parameter `multiple` indicating whether you are acknowledging the individual message or all the messages up to it. + Scala -: @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #run-source-withoutautoack } +: @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #create-source-withoutautoack } Java -: @@snip [snip](/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java) { #run-source-withoutautoack } +: @@snip [snip](/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java) { #create-source-withoutautoack } Use nack to reject a message. Apart from the `multiple` argument, nack takes another optional boolean parameter indicating whether the item should be requeued or not. From c7b8cad6498620c4c664cf837deff951f48126c0 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Fri, 22 Feb 2019 11:06:52 +0100 Subject: [PATCH 2/8] Rename IncomingMessage to ReadResult, OutgoingMessage to WriteMessage, ... --- .../amqp/impl/AmqpReplyToSinkStage.scala | 10 +-- .../alpakka/amqp/impl/AmqpRpcFlowStage.scala | 24 +++--- .../alpakka/amqp/impl/AmqpSinkStage.scala | 10 +-- .../alpakka/amqp/impl/AmqpSourceStage.scala | 25 +++---- .../alpakka/amqp/javadsl/AmqpRpcFlow.scala | 21 ++---- .../alpakka/amqp/javadsl/AmqpSink.scala | 19 ++--- .../alpakka/amqp/javadsl/AmqpSource.scala | 11 +-- .../javadsl/CommittableIncomingMessage.scala | 18 ----- .../amqp/javadsl/CommittableReadResult.scala | 24 ++++++ .../stream/alpakka/amqp/javadsl/package.scala | 29 -------- .../akka/stream/alpakka/amqp/model.scala | 74 ++++++++++++------- .../alpakka/amqp/scaladsl/AmqpRpcFlow.scala | 8 +- .../alpakka/amqp/scaladsl/AmqpSink.scala | 6 +- .../alpakka/amqp/scaladsl/AmqpSource.scala | 6 +- ...sage.scala => CommittableReadResult.scala} | 6 +- .../amqp/javadsl/AmqpConnectorsTest.java | 34 ++++----- .../stream/alpakka/amqp/javadsl/package.scala | 25 ------- .../test/java/docs/javadsl/AmqpDocsTest.java | 30 ++++---- .../amqp/scaladsl/AmqpConnectorsSpec.scala | 16 ++-- .../scala/docs/scaladsl/AmqpDocsSpec.scala | 12 +-- docs/src/main/paradox/amqp.md | 23 ++---- 21 files changed, 186 insertions(+), 245 deletions(-) delete mode 100644 amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/CommittableIncomingMessage.scala create mode 100644 amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/CommittableReadResult.scala delete mode 100644 amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/package.scala rename amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/{CommittableIncomingMessage.scala => CommittableReadResult.scala} (77%) delete mode 100644 amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/package.scala diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpReplyToSinkStage.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpReplyToSinkStage.scala index 8fe96ce1e8..2777863bc5 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpReplyToSinkStage.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpReplyToSinkStage.scala @@ -6,24 +6,24 @@ package akka.stream.alpakka.amqp.impl import akka.Done import akka.annotation.InternalApi -import akka.stream.alpakka.amqp.{AmqpReplyToSinkSettings, OutgoingMessage} +import akka.stream.alpakka.amqp.{AmqpReplyToSinkSettings, WriteMessage} import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler} import akka.stream.{ActorAttributes, Attributes, Inlet, SinkShape} import scala.concurrent.{Future, Promise} /** - * Connects to an AMQP server upon materialization and sends incoming messages to the server. + * Connects to an AMQP server upon materialization and sends write messages to the server. * Each materialized sink will create one connection to the broker. This stage sends messages to * the queue named in the replyTo options of the message instead of from settings declared at construction. */ @InternalApi private[amqp] final class AmqpReplyToSinkStage(settings: AmqpReplyToSinkSettings) - extends GraphStageWithMaterializedValue[SinkShape[OutgoingMessage], Future[Done]] { stage => + extends GraphStageWithMaterializedValue[SinkShape[WriteMessage], Future[Done]] { stage => - val in = Inlet[OutgoingMessage]("AmqpReplyToSink.in") + val in = Inlet[WriteMessage]("AmqpReplyToSink.in") - override def shape: SinkShape[OutgoingMessage] = SinkShape.of(in) + override def shape: SinkShape[WriteMessage] = SinkShape.of(in) override protected def initialAttributes: Attributes = super.initialAttributes and Attributes.name("AmqpReplyToSink") and ActorAttributes.IODispatcher diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpRpcFlowStage.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpRpcFlowStage.scala index 1d1e8ce119..90e895ad53 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpRpcFlowStage.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpRpcFlowStage.scala @@ -10,7 +10,7 @@ import akka.Done import akka.annotation.InternalApi import akka.stream._ import akka.stream.alpakka.amqp._ -import akka.stream.alpakka.amqp.scaladsl.CommittableIncomingMessage +import akka.stream.alpakka.amqp.scaladsl.CommittableReadResult import akka.stream.stage._ import akka.util.ByteString import com.rabbitmq.client.AMQP.BasicProperties @@ -21,20 +21,20 @@ import scala.concurrent.{Future, Promise} import scala.util.Success /** - * This stage materializes to a Future[String], which is the name of the private exclusive queue used for RPC communication + * This stage materializes to a `Future[String]`, which is the name of the private exclusive queue used for RPC communication * * @param responsesPerMessage The number of responses that should be expected for each message placed on the queue. This - * can be overridden per message by including `expectedReplies` in the the header of the [[OutgoingMessage]] + * can be overridden per message by including `expectedReplies` in the the header of the [[akka.stream.alpakka.amqp.WriteMessage]] */ @InternalApi private[amqp] final class AmqpRpcFlowStage(settings: AmqpSinkSettings, bufferSize: Int, responsesPerMessage: Int = 1) - extends GraphStageWithMaterializedValue[FlowShape[OutgoingMessage, CommittableIncomingMessage], Future[String]] { + extends GraphStageWithMaterializedValue[FlowShape[WriteMessage, CommittableReadResult], Future[String]] { stage => - val in = Inlet[OutgoingMessage]("AmqpRpcFlow.in") - val out = Outlet[CommittableIncomingMessage]("AmqpRpcFlow.out") + val in = Inlet[WriteMessage]("AmqpRpcFlow.in") + val out = Outlet[CommittableReadResult]("AmqpRpcFlow.out") - override def shape: FlowShape[OutgoingMessage, CommittableIncomingMessage] = FlowShape.of(in, out) + override def shape: FlowShape[WriteMessage, CommittableReadResult] = FlowShape.of(in, out) override protected def initialAttributes: Attributes = super.initialAttributes and Attributes.name("AmqpRpcFlow") and ActorAttributes.IODispatcher @@ -46,7 +46,7 @@ private[amqp] final class AmqpRpcFlowStage(settings: AmqpSinkSettings, bufferSiz override val settings = stage.settings private val exchange = settings.exchange.getOrElse("") private val routingKey = settings.routingKey.getOrElse("") - private val queue = mutable.Queue[CommittableIncomingMessage]() + private val queue = mutable.Queue[CommittableReadResult]() private var queueName: String = _ private var unackedMessages = 0 private var outstandingMessages = 0 @@ -91,8 +91,8 @@ private[amqp] final class AmqpRpcFlowStage(settings: AmqpSinkSettings, bufferSiz properties: BasicProperties, body: Array[Byte]): Unit = consumerCallback.invoke( - new CommittableIncomingMessage { - override val message = IncomingMessage(ByteString(body), envelope, properties) + new CommittableReadResult { + override val message = ReadResult(ByteString(body), envelope, properties) override def ack(multiple: Boolean): Future[Done] = { val promise = Promise[Done]() @@ -139,7 +139,7 @@ private[amqp] final class AmqpRpcFlowStage(settings: AmqpSinkSettings, bufferSiz promise.success(queueName) } - def handleDelivery(message: CommittableIncomingMessage): Unit = + def handleDelivery(message: CommittableReadResult): Unit = if (isAvailable(out)) { pushMessage(message) } else if (queue.size + 1 > bufferSize) { @@ -163,7 +163,7 @@ private[amqp] final class AmqpRpcFlowStage(settings: AmqpSinkSettings, bufferSiz } ) - def pushMessage(message: CommittableIncomingMessage): Unit = { + def pushMessage(message: CommittableReadResult): Unit = { push(out, message) unackedMessages += 1 outstandingMessages -= 1 diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpSinkStage.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpSinkStage.scala index f58116061b..f1e6e14101 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpSinkStage.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpSinkStage.scala @@ -6,23 +6,23 @@ package akka.stream.alpakka.amqp.impl import akka.Done import akka.annotation.InternalApi -import akka.stream.alpakka.amqp.{AmqpSinkSettings, OutgoingMessage} +import akka.stream.alpakka.amqp.{AmqpSinkSettings, WriteMessage} import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler} import akka.stream.{ActorAttributes, Attributes, Inlet, SinkShape} import scala.concurrent.{Future, Promise} /** - * Connects to an AMQP server upon materialization and sends incoming messages to the server. + * Connects to an AMQP server upon materialization and sends write messages to the server. * Each materialized sink will create one connection to the broker. */ @InternalApi private[amqp] final class AmqpSinkStage(settings: AmqpSinkSettings) - extends GraphStageWithMaterializedValue[SinkShape[OutgoingMessage], Future[Done]] { stage => + extends GraphStageWithMaterializedValue[SinkShape[WriteMessage], Future[Done]] { stage => - val in = Inlet[OutgoingMessage]("AmqpSink.in") + val in = Inlet[WriteMessage]("AmqpSink.in") - override def shape: SinkShape[OutgoingMessage] = SinkShape.of(in) + override def shape: SinkShape[WriteMessage] = SinkShape.of(in) override protected def initialAttributes: Attributes = super.initialAttributes and Attributes.name("AmqpSink") and ActorAttributes.IODispatcher diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpSourceStage.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpSourceStage.scala index 309a5c7700..1761e5ad49 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpSourceStage.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpSourceStage.scala @@ -7,8 +7,8 @@ package akka.stream.alpakka.amqp.impl import akka.Done import akka.annotation.InternalApi import akka.stream.alpakka.amqp._ -import akka.stream.alpakka.amqp.impl.AmqpSourceStage.AutoAckedMessage -import akka.stream.alpakka.amqp.scaladsl.CommittableIncomingMessage +import akka.stream.alpakka.amqp.impl.AmqpSourceStage.AutoAckedReadResult +import akka.stream.alpakka.amqp.scaladsl.CommittableReadResult import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler, StageLogging} import akka.stream.{Attributes, Outlet, SourceShape} import akka.util.ByteString @@ -27,17 +27,16 @@ private final case class NackArguments(deliveryTag: Long, multiple: Boolean, req * * Connects to an AMQP server upon materialization and consumes messages from it emitting them * into the stream. Each materialized source will create one connection to the broker. - * As soon as an `IncomingMessage` is sent downstream, an ack for it is sent to the broker. * * @param bufferSize The max number of elements to prefetch and buffer at any given time. */ @InternalApi private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSize: Int) - extends GraphStage[SourceShape[CommittableIncomingMessage]] { stage => + extends GraphStage[SourceShape[CommittableReadResult]] { stage => - private val out = Outlet[CommittableIncomingMessage]("AmqpSource.out") + private val out = Outlet[CommittableReadResult]("AmqpSource.out") - override val shape: SourceShape[CommittableIncomingMessage] = SourceShape.of(out) + override val shape: SourceShape[CommittableReadResult] = SourceShape.of(out) override protected def initialAttributes: Attributes = Attributes.name("AmqpSource") @@ -46,7 +45,7 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi override val settings: AmqpSourceSettings = stage.settings - private val queue = mutable.Queue[CommittableIncomingMessage]() + private val queue = mutable.Queue[CommittableReadResult]() private var ackRequired = true private var unackedMessages = 0 @@ -86,8 +85,8 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi body: Array[Byte]): Unit = { val message = if (ackRequired) { - new CommittableIncomingMessage { - override val message = IncomingMessage(ByteString(body), envelope, properties) + new CommittableReadResult { + override val message = ReadResult(ByteString(body), envelope, properties) override def ack(multiple: Boolean): Future[Done] = { val promise = Promise[Done]() @@ -101,7 +100,7 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi promise.future } } - } else new AutoAckedMessage(IncomingMessage(ByteString(body), envelope, properties)) + } else new AutoAckedReadResult(ReadResult(ByteString(body), envelope, properties)) consumerCallback.invoke(message) } @@ -149,7 +148,7 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi } } - def handleDelivery(message: CommittableIncomingMessage): Unit = + def handleDelivery(message: CommittableReadResult): Unit = if (isAvailable(out)) { pushMessage(message) } else if (queue.size + 1 > bufferSize) { @@ -175,7 +174,7 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi } ) - def pushMessage(message: CommittableIncomingMessage): Unit = { + def pushMessage(message: CommittableReadResult): Unit = { push(out, message) if (ackRequired) unackedMessages += 1 } @@ -190,7 +189,7 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi private[amqp] object AmqpSourceStage { private val SuccessfullyDone = Future.successful(Done) - final class AutoAckedMessage(override val message: IncomingMessage) extends CommittableIncomingMessage { + final class AutoAckedReadResult(override val message: ReadResult) extends CommittableReadResult { override def ack(multiple: Boolean): Future[Done] = SuccessfullyDone override def nack(multiple: Boolean, requeue: Boolean): Future[Done] = SuccessfullyDone } diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpRpcFlow.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpRpcFlow.scala index 529c4f0393..15060e9dec 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpRpcFlow.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpRpcFlow.scala @@ -6,7 +6,6 @@ package akka.stream.alpakka.amqp.javadsl import java.util.concurrent.CompletionStage -import akka.annotation.ApiMayChange import akka.stream.alpakka.amqp._ import akka.stream.javadsl.Flow import akka.util.ByteString @@ -20,10 +19,9 @@ object AmqpRpcFlow { * Create an [[https://www.rabbitmq.com/tutorials/tutorial-six-java.html RPC style flow]] for processing and communicating * over a rabbitmq message bus. This will create a private queue, and add the reply-to header to messages sent out. * - * This stage materializes to a CompletionStage, which is the name of the private exclusive queue used for RPC communication. + * This stage materializes to a `CompletionStage`, which is the name of the private exclusive queue used for RPC communication. * - * @param repliesPerMessage The number of responses that should be expected for each message placed on the queue. This - * can be overridden per message by including `expectedReplies` in the the header of the [[OutgoingMessage]] + * @param repliesPerMessage The number of responses that should be expected for each message placed on the queue. */ def createSimple(settings: AmqpSinkSettings, repliesPerMessage: Int): Flow[ByteString, ByteString, CompletionStage[String]] = @@ -35,11 +33,10 @@ object AmqpRpcFlow { /** * Java API: * Convenience for "at-most once delivery" semantics. Each message is acked to RabbitMQ - * before it is emitted downstream. + * before its read result is emitted downstream. */ - @ApiMayChange // https://github.com/akka/alpakka/issues/1513 def atMostOnceFlow(settings: AmqpSinkSettings, - bufferSize: Int): Flow[OutgoingMessage, IncomingMessage, CompletionStage[String]] = + bufferSize: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] = akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow .atMostOnceFlow(settings, bufferSize) .mapMaterializedValue(f => f.toJava) @@ -48,12 +45,11 @@ object AmqpRpcFlow { /** * Java API: * Convenience for "at-most once delivery" semantics. Each message is acked to RabbitMQ - * before it is emitted downstream. + * before its read result is emitted downstream. */ - @ApiMayChange // https://github.com/akka/alpakka/issues/1513 def atMostOnceFlow(settings: AmqpSinkSettings, bufferSize: Int, - repliesPerMessage: Int): Flow[OutgoingMessage, IncomingMessage, CompletionStage[String]] = + repliesPerMessage: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] = akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow .atMostOnceFlow(settings, bufferSize, repliesPerMessage) .mapMaterializedValue(f => f.toJava) @@ -70,16 +66,15 @@ object AmqpRpcFlow { * * Compared to auto-commit, this gives exact control over when a message is considered consumed. */ - @ApiMayChange // https://github.com/akka/alpakka/issues/1513 def committableFlow( settings: AmqpSinkSettings, bufferSize: Int, repliesPerMessage: Int = 1 - ): Flow[OutgoingMessage, CommittableIncomingMessage, CompletionStage[String]] = + ): Flow[WriteMessage, CommittableReadResult, CompletionStage[String]] = akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow .committableFlow(settings, bufferSize, repliesPerMessage) .mapMaterializedValue(f => f.toJava) - .map(cm => cm.asJava) + .map(cm => new CommittableReadResult(cm)) .asJava } diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala index e42858b638..0a7aac9012 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala @@ -8,20 +8,18 @@ import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters._ import akka.Done -import akka.annotation.ApiMayChange import akka.stream.alpakka.amqp._ import akka.util.ByteString object AmqpSink { /** - * Java API: Creates an [[AmqpSink]] that accepts [[OutgoingMessage]] elements. + * Java API: Creates an [[AmqpSink]] that accepts [[akka.stream.alpakka.amqp.WriteMessage WriteMessage]] elements. * * This stage materializes to a CompletionStage, which can be used to know when the Sink completes, either normally * or because of an amqp failure */ - @ApiMayChange // https://github.com/akka/alpakka/issues/1513 - def create(settings: AmqpSinkSettings): akka.stream.javadsl.Sink[OutgoingMessage, CompletionStage[Done]] = + def create(settings: AmqpSinkSettings): akka.stream.javadsl.Sink[WriteMessage, CompletionStage[Done]] = akka.stream.alpakka.amqp.scaladsl.AmqpSink(settings).mapMaterializedValue(f => f.toJava).asJava /** @@ -31,20 +29,19 @@ object AmqpSink { * Each materialized sink will create one connection to the broker. This stage sends messages to * the queue named in the replyTo options of the message instead of from settings declared at construction. * - * This stage materializes to a CompletionStage, which can be used to know when the Sink completes, either normally - * or because of an amqp failure + * This stage materializes to a `CompletionStage`, which can be used to know when the Sink completes, either normally + * or because of an amqp failure. */ - @ApiMayChange // https://github.com/akka/alpakka/issues/1513 def createReplyTo( settings: AmqpReplyToSinkSettings - ): akka.stream.javadsl.Sink[OutgoingMessage, CompletionStage[Done]] = + ): akka.stream.javadsl.Sink[WriteMessage, CompletionStage[Done]] = akka.stream.alpakka.amqp.scaladsl.AmqpSink.replyTo(settings).mapMaterializedValue(f => f.toJava).asJava /** - * Java API: Creates an [[AmqpSink]] that accepts ByteString elements. + * Java API: Creates an [[AmqpSink]] that accepts `ByteString` elements. * - * This stage materializes to a CompletionStage, which can be used to know when the Sink completes, either normally - * or because of an amqp failure + * This stage materializes to a `CompletionStage`, which can be used to know when the Sink completes, either normally + * or because of an amqp failure. */ def createSimple(settings: AmqpSinkSettings): akka.stream.javadsl.Sink[ByteString, CompletionStage[Done]] = akka.stream.alpakka.amqp.scaladsl.AmqpSink.simple(settings).mapMaterializedValue(f => f.toJava).asJava diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSource.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSource.scala index 294b1eda61..87cbd373e5 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSource.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSource.scala @@ -5,8 +5,7 @@ package akka.stream.alpakka.amqp.javadsl import akka.NotUsed -import akka.annotation.ApiMayChange -import akka.stream.alpakka.amqp.{AmqpSourceSettings, IncomingMessage} +import akka.stream.alpakka.amqp.{AmqpSourceSettings, ReadResult} import akka.stream.javadsl.Source object AmqpSource { @@ -15,8 +14,7 @@ object AmqpSource { * Java API: Convenience for "at-most once delivery" semantics. Each message is acked to RabbitMQ * before it is emitted downstream. */ - @ApiMayChange // https://github.com/akka/alpakka/issues/1513 - def atMostOnceSource(settings: AmqpSourceSettings, bufferSize: Int): Source[IncomingMessage, NotUsed] = + def atMostOnceSource(settings: AmqpSourceSettings, bufferSize: Int): Source[ReadResult, NotUsed] = akka.stream.alpakka.amqp.scaladsl.AmqpSource .atMostOnceSource(settings, bufferSize) .asJava @@ -32,11 +30,10 @@ object AmqpSource { * * Compared to auto-commit, this gives exact control over when a message is considered consumed. */ - @ApiMayChange // https://github.com/akka/alpakka/issues/1513 - def committableSource(settings: AmqpSourceSettings, bufferSize: Int): Source[CommittableIncomingMessage, NotUsed] = + def committableSource(settings: AmqpSourceSettings, bufferSize: Int): Source[CommittableReadResult, NotUsed] = akka.stream.alpakka.amqp.scaladsl.AmqpSource .committableSource(settings, bufferSize) - .map(cm => cm.asJava) + .map(cm => new CommittableReadResult(cm)) .asJava } diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/CommittableIncomingMessage.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/CommittableIncomingMessage.scala deleted file mode 100644 index 89a1bdea2a..0000000000 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/CommittableIncomingMessage.scala +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.amqp.javadsl - -import java.util.concurrent.CompletionStage - -import akka.Done -import akka.annotation.ApiMayChange -import akka.stream.alpakka.amqp.IncomingMessage - -@ApiMayChange // https://github.com/akka/alpakka/issues/1513 -trait CommittableIncomingMessage { - val message: IncomingMessage - def ack(multiple: Boolean = false): CompletionStage[Done] - def nack(multiple: Boolean = false, requeue: Boolean = true): CompletionStage[Done] -} diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/CommittableReadResult.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/CommittableReadResult.scala new file mode 100644 index 0000000000..2c82a86c01 --- /dev/null +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/CommittableReadResult.scala @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2016-2019 Lightbend Inc. + */ + +package akka.stream.alpakka.amqp.javadsl + +import java.util.concurrent.CompletionStage + +import akka.Done +import akka.stream.alpakka.amqp.ReadResult +import akka.stream.alpakka.amqp.scaladsl + +import scala.compat.java8.FutureConverters._ + +final class CommittableReadResult(cm: scaladsl.CommittableReadResult) { + val message: ReadResult = cm.message + + def ack(): CompletionStage[Done] = ack(false) + def ack(multiple: Boolean): CompletionStage[Done] = cm.ack(multiple).toJava + + def nack(): CompletionStage[Done] = nack(false, true) + def nack(multiple: Boolean, requeue: Boolean): CompletionStage[Done] = + cm.nack(multiple, requeue).toJava +} diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/package.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/package.scala deleted file mode 100644 index fa3af26388..0000000000 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/package.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.amqp - -import java.util.concurrent.CompletionStage - -import akka.Done -import akka.stream.alpakka.amqp.scaladsl.{CommittableIncomingMessage => ScalaCommittableIncomingMessage} - -import scala.compat.java8.FutureConverters - -/** - * This implicit classes allow to convert the Committable and CommittableMessage between scaladsl and javadsl. - */ -package object javadsl { - - import FutureConverters._ - - private[javadsl] implicit class RichCommittableIncomingMessage(cm: ScalaCommittableIncomingMessage) { - def asJava: CommittableIncomingMessage = new CommittableIncomingMessage { - override val message: IncomingMessage = cm.message - override def ack(multiple: Boolean = false): CompletionStage[Done] = cm.ack(multiple).toJava - override def nack(multiple: Boolean = false, requeue: Boolean = true): CompletionStage[Done] = - cm.nack(multiple, requeue).toJava - } - } -} diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/model.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/model.scala index 89a7ebdad1..7db5d0266c 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/model.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/model.scala @@ -4,61 +4,81 @@ package akka.stream.alpakka.amqp -import akka.annotation.ApiMayChange import akka.util.ByteString import com.rabbitmq.client.AMQP.BasicProperties import com.rabbitmq.client.Envelope -@ApiMayChange // https://github.com/akka/alpakka/issues/1513 -final class IncomingMessage private ( +final class ReadResult private ( val bytes: ByteString, val envelope: Envelope, val properties: BasicProperties ) { override def toString: String = - s"IncomingMessage(bytes=$bytes, envelope=$envelope, properties=$properties)" + s"ReadResult(bytes=$bytes, envelope=$envelope, properties=$properties)" } -@ApiMayChange // https://github.com/akka/alpakka/issues/1513 -object IncomingMessage { - def apply(bytes: ByteString, envelope: Envelope, properties: BasicProperties): IncomingMessage = - new IncomingMessage(bytes, envelope, properties) +object ReadResult { + def apply(bytes: ByteString, envelope: Envelope, properties: BasicProperties): ReadResult = + new ReadResult(bytes, envelope, properties) /** * Java API */ - def create(bytes: ByteString, envelope: Envelope, properties: BasicProperties): IncomingMessage = - IncomingMessage(bytes, envelope, properties) + def create(bytes: ByteString, envelope: Envelope, properties: BasicProperties): ReadResult = + ReadResult(bytes, envelope, properties) } -@ApiMayChange // https://github.com/akka/alpakka/issues/1513 -final class OutgoingMessage private (val bytes: ByteString, - val immediate: Boolean, - val mandatory: Boolean, - val properties: Option[BasicProperties] = None, - val routingKey: Option[String] = None) { +final class WriteMessage private (val bytes: ByteString, + val immediate: Boolean, + val mandatory: Boolean, + val properties: Option[BasicProperties] = None, + val routingKey: Option[String] = None) { - def withProperties(properties: BasicProperties): OutgoingMessage = + def withImmediate(value: Boolean): WriteMessage = + if (value == immediate) this + else copy(immediate = value) + + def withMandatory(value: Boolean): WriteMessage = + if (value == mandatory) this + else copy(mandatory = value) + + def withProperties(properties: BasicProperties): WriteMessage = copy(properties = Some(properties)) - def withRoutingKey(routingKey: String): OutgoingMessage = + def withRoutingKey(routingKey: String): WriteMessage = copy(routingKey = Some(routingKey)) - private def copy(properties: Option[BasicProperties] = properties, routingKey: Option[String] = routingKey) = - new OutgoingMessage(bytes, immediate, mandatory, properties, routingKey) + private def copy(immediate: Boolean = immediate, + mandatory: Boolean = mandatory, + properties: Option[BasicProperties] = properties, + routingKey: Option[String] = routingKey) = + new WriteMessage(bytes, immediate, mandatory, properties, routingKey) override def toString: String = - s"OutgoingMessage(bytes=$bytes, immediate=$immediate, mandatory=$mandatory, properties=$properties, routingKey=$routingKey)" + "WriteMessage(" + + s"bytes=$bytes, " + + s"immediate=$immediate, " + + s"mandatory=$mandatory, " + + s"properties=$properties, " + + s"routingKey=$routingKey" + + ")" } -@ApiMayChange // https://github.com/akka/alpakka/issues/1513 -object OutgoingMessage { - def apply(bytes: ByteString, immediate: Boolean, mandatory: Boolean): OutgoingMessage = - new OutgoingMessage(bytes, immediate, mandatory) +object WriteMessage { + def apply(bytes: ByteString): WriteMessage = + new WriteMessage(bytes, immediate = false, mandatory = false) + + def apply(bytes: ByteString, immediate: Boolean, mandatory: Boolean): WriteMessage = + new WriteMessage(bytes, immediate, mandatory) + + /** + * Java API + */ + def create(bytes: ByteString): WriteMessage = WriteMessage(bytes) /** * Java API */ - def create(bytes: ByteString, immediate: Boolean, mandatory: Boolean): OutgoingMessage = - OutgoingMessage(bytes, immediate, mandatory) + def create(bytes: ByteString, immediate: Boolean, mandatory: Boolean): WriteMessage = + WriteMessage(bytes, immediate, mandatory) } diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala index 43071335dc..25096e076c 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala @@ -23,11 +23,11 @@ object AmqpRpcFlow { * This stage materializes to a Future[String], which is the name of the private exclusive queue used for RPC communication. * * @param repliesPerMessage The number of responses that should be expected for each message placed on the queue. This - * can be overridden per message by including `expectedReplies` in the the header of the [[OutgoingMessage]] + * can be overridden per message by including `expectedReplies` in the the header of the [[WriteMessage]] */ def simple(settings: AmqpSinkSettings, repliesPerMessage: Int = 1): Flow[ByteString, ByteString, Future[String]] = Flow[ByteString] - .map(bytes => OutgoingMessage(bytes, false, false)) + .map(bytes => WriteMessage(bytes)) .viaMat(atMostOnceFlow(settings, 1, repliesPerMessage))(Keep.right) .map(_.bytes) @@ -39,7 +39,7 @@ object AmqpRpcFlow { @ApiMayChange // https://github.com/akka/alpakka/issues/1513 def atMostOnceFlow(settings: AmqpSinkSettings, bufferSize: Int, - repliesPerMessage: Int = 1): Flow[OutgoingMessage, IncomingMessage, Future[String]] = + repliesPerMessage: Int = 1): Flow[WriteMessage, ReadResult, Future[String]] = committableFlow(settings, bufferSize, repliesPerMessage) .mapAsync(1)(cm => cm.ack().map(_ => cm.message)) @@ -57,7 +57,7 @@ object AmqpRpcFlow { @ApiMayChange // https://github.com/akka/alpakka/issues/1513 def committableFlow(settings: AmqpSinkSettings, bufferSize: Int, - repliesPerMessage: Int = 1): Flow[OutgoingMessage, CommittableIncomingMessage, Future[String]] = + repliesPerMessage: Int = 1): Flow[WriteMessage, CommittableReadResult, Future[String]] = Flow.fromGraph(new impl.AmqpRpcFlowStage(settings, bufferSize, repliesPerMessage)) } diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala index 4a6bf91775..2a7e75d1d5 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala @@ -21,7 +21,7 @@ object AmqpSink { * or because of an amqp failure */ def simple(settings: AmqpSinkSettings): Sink[ByteString, Future[Done]] = - apply(settings).contramap[ByteString](bytes => OutgoingMessage(bytes, false, false)) + apply(settings).contramap[ByteString](bytes => WriteMessage(bytes)) /** * Scala API: @@ -34,7 +34,7 @@ object AmqpSink { * or because of an amqp failure */ @ApiMayChange // https://github.com/akka/alpakka/issues/1513 - def replyTo(settings: AmqpReplyToSinkSettings): Sink[OutgoingMessage, Future[Done]] = + def replyTo(settings: AmqpReplyToSinkSettings): Sink[WriteMessage, Future[Done]] = Sink.fromGraph(new impl.AmqpReplyToSinkStage(settings)) /** @@ -48,7 +48,7 @@ object AmqpSink { * or because of an amqp failure */ @ApiMayChange // https://github.com/akka/alpakka/issues/1513 - def apply(settings: AmqpSinkSettings): Sink[OutgoingMessage, Future[Done]] = + def apply(settings: AmqpSinkSettings): Sink[WriteMessage, Future[Done]] = Sink.fromGraph(new impl.AmqpSinkStage(settings)) } diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSource.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSource.scala index 1b9e5c2c44..07012c1e37 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSource.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSource.scala @@ -8,7 +8,7 @@ import akka.NotUsed import akka.annotation.ApiMayChange import akka.dispatch.ExecutionContexts import akka.stream.alpakka.amqp.impl -import akka.stream.alpakka.amqp.{AmqpSourceSettings, IncomingMessage} +import akka.stream.alpakka.amqp.{AmqpSourceSettings, ReadResult} import akka.stream.scaladsl.Source object AmqpSource { @@ -19,7 +19,7 @@ object AmqpSource { * before it is emitted downstream. */ @ApiMayChange // https://github.com/akka/alpakka/issues/1513 - def atMostOnceSource(settings: AmqpSourceSettings, bufferSize: Int): Source[IncomingMessage, NotUsed] = + def atMostOnceSource(settings: AmqpSourceSettings, bufferSize: Int): Source[ReadResult, NotUsed] = committableSource(settings, bufferSize) .mapAsync(1)(cm => cm.ack().map(_ => cm.message)) @@ -35,7 +35,7 @@ object AmqpSource { * Compared to auto-commit, this gives exact control over when a message is considered consumed. */ @ApiMayChange // https://github.com/akka/alpakka/issues/1513 - def committableSource(settings: AmqpSourceSettings, bufferSize: Int): Source[CommittableIncomingMessage, NotUsed] = + def committableSource(settings: AmqpSourceSettings, bufferSize: Int): Source[CommittableReadResult, NotUsed] = Source.fromGraph(new impl.AmqpSourceStage(settings, bufferSize)) } diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/CommittableIncomingMessage.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/CommittableReadResult.scala similarity index 77% rename from amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/CommittableIncomingMessage.scala rename to amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/CommittableReadResult.scala index c771368ae2..6191e35127 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/CommittableIncomingMessage.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/CommittableReadResult.scala @@ -6,13 +6,13 @@ package akka.stream.alpakka.amqp.scaladsl import akka.Done import akka.annotation.ApiMayChange -import akka.stream.alpakka.amqp.IncomingMessage +import akka.stream.alpakka.amqp.ReadResult import scala.concurrent.Future @ApiMayChange // https://github.com/akka/alpakka/issues/1513 -trait CommittableIncomingMessage { - val message: IncomingMessage +trait CommittableReadResult { + val message: ReadResult def ack(multiple: Boolean = false): Future[Done] def nack(multiple: Boolean = false, requeue: Boolean = true): Future[Done] } diff --git a/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/AmqpConnectorsTest.java b/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/AmqpConnectorsTest.java index cef3426711..274108fc52 100644 --- a/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/AmqpConnectorsTest.java +++ b/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/AmqpConnectorsTest.java @@ -130,28 +130,28 @@ public void publishAndConsumeRpcWithoutAutoAck() throws Exception { final List input = Arrays.asList("one", "two", "three", "four", "five"); - final Flow> ampqRpcFlow = + final Flow> ampqRpcFlow = AmqpRpcFlow.committableFlow( AmqpSinkSettings.create(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration), 10, 1); - Pair, TestSubscriber.Probe> result = + Pair, TestSubscriber.Probe> result = Source.from(input) .map(ByteString::fromString) - .map(bytes -> OutgoingMessage.create(bytes, false, false)) + .map(bytes -> WriteMessage.create(bytes)) .viaMat(ampqRpcFlow, Keep.right()) - .mapAsync(1, cm -> cm.ack(false).thenApply(unused -> cm.message())) + .mapAsync(1, cm -> cm.ack().thenApply(unused -> cm.message())) .toMat(TestSink.probe(system), Keep.both()) .run(materializer); result.first().toCompletableFuture().get(5, TimeUnit.SECONDS); - Sink> amqpSink = + Sink> amqpSink = AmqpSink.createReplyTo(AmqpReplyToSinkSettings.create(connectionProvider)); - final Source amqpSource = + final Source amqpSource = AmqpSource.atMostOnceSource( NamedQueueSourceSettings.create(connectionProvider, queueName) .withDeclaration(queueDeclaration), @@ -160,12 +160,11 @@ public void publishAndConsumeRpcWithoutAutoAck() throws Exception { UniqueKillSwitch sourceToSink = amqpSource .viaMat(KillSwitches.single(), Keep.right()) - .map( - b -> OutgoingMessage.create(b.bytes(), false, false).withProperties(b.properties())) + .map(b -> WriteMessage.create(b.bytes()).withProperties(b.properties())) .to(amqpSink) .run(materializer); - List probeResult = + List probeResult = JavaConverters.seqAsJavaListConverter( result.second().toStrict(Duration.create(3, TimeUnit.SECONDS))) .asJava(); @@ -186,7 +185,7 @@ public void keepConnectionOpenIfDownstreamClosesAndThereArePendingAcks() throws .withDeclaration(queueDeclaration)); final Integer bufferSize = 10; - final Source amqpSource = + final Source amqpSource = AmqpSource.committableSource( NamedQueueSourceSettings.create(connectionProvider, queueName) .withDeclaration(queueDeclaration), @@ -199,10 +198,10 @@ public void keepConnectionOpenIfDownstreamClosesAndThereArePendingAcks() throws .toCompletableFuture() .get(3, TimeUnit.SECONDS); - final CompletionStage> result = + final CompletionStage> result = amqpSource.take(input.size()).runWith(Sink.seq(), materializer); - List committableMessages = + List committableMessages = result.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(input.size(), committableMessages.size()); @@ -226,7 +225,7 @@ public void setRoutingKeyPerMessageAndConsumeThemInTheSameJVM() throws Exception final BindingDeclaration bindingDeclaration = BindingDeclaration.create(queueName, exchangeName).withRoutingKey("key.*"); - final Sink> amqpSink = + final Sink> amqpSink = AmqpSink.create( AmqpSinkSettings.create(connectionProvider) .withExchange(exchangeName) @@ -234,7 +233,7 @@ public void setRoutingKeyPerMessageAndConsumeThemInTheSameJVM() throws Exception Arrays.asList(exchangeDeclaration, queueDeclaration, bindingDeclaration))); final Integer bufferSize = 10; - final Source amqpSource = + final Source amqpSource = AmqpSource.atMostOnceSource( NamedQueueSourceSettings.create(connectionProvider, queueName) .withDeclarations( @@ -245,13 +244,10 @@ public void setRoutingKeyPerMessageAndConsumeThemInTheSameJVM() throws Exception final List routingKeys = input.stream().map(s -> "key." + s).collect(Collectors.toList()); Source.from(input) - .map( - s -> - OutgoingMessage.create(ByteString.fromString(s), false, false) - .withRoutingKey("key." + s)) + .map(s -> WriteMessage.create(ByteString.fromString(s)).withRoutingKey("key." + s)) .runWith(amqpSink, materializer); - final List result = + final List result = amqpSource .take(input.size()) .runWith(Sink.seq(), materializer) diff --git a/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/package.scala b/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/package.scala deleted file mode 100644 index a273a3c02b..0000000000 --- a/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/package.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.stream.alpakka.amqp - -import java.util.concurrent.CompletionStage - -import akka.Done -import akka.stream.alpakka.amqp.scaladsl.{CommittableIncomingMessage => ScalaCommittableIncomingMessage} - -import scala.compat.java8.FutureConverters - -package object javadsl { - import FutureConverters._ - - private[javadsl] implicit class RichCommittableIncomingMessage(cm: ScalaCommittableIncomingMessage) { - def asJava: CommittableIncomingMessage = new CommittableIncomingMessage { - override val message: IncomingMessage = cm.message - override def ack(multiple: Boolean = false): CompletionStage[Done] = cm.ack(multiple).toJava - override def nack(multiple: Boolean = false, requeue: Boolean = true): CompletionStage[Done] = - cm.nack(multiple, requeue).toJava - } - } -} diff --git a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java index 9329aed119..cbbb2ef737 100644 --- a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java +++ b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java @@ -16,7 +16,7 @@ import akka.stream.alpakka.amqp.javadsl.AmqpRpcFlow; import akka.stream.alpakka.amqp.javadsl.AmqpSink; import akka.stream.alpakka.amqp.javadsl.AmqpSource; -import akka.stream.alpakka.amqp.javadsl.CommittableIncomingMessage; +import akka.stream.alpakka.amqp.javadsl.CommittableReadResult; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; @@ -92,13 +92,13 @@ public void publishAndConsume() throws Exception { // #create-source final Integer bufferSize = 10; - final Source amqpSource = + final Source amqpSource = AmqpSource.atMostOnceSource( NamedQueueSourceSettings.create(connectionProvider, queueName) .withDeclaration(queueDeclaration), bufferSize); - final CompletionStage> result = + final CompletionStage> result = amqpSource.take(input.size()).runWith(Sink.seq(), materializer); // #create-source @@ -119,7 +119,7 @@ public void publishAndConsumeRpc() throws Exception { final QueueDeclaration queueDeclaration = QueueDeclaration.create(queueName); final Integer bufferSize = 10; - final Source amqpSource = + final Source amqpSource = AmqpSource.atMostOnceSource( NamedQueueSourceSettings.create(connectionProvider, queueName) .withDeclaration(queueDeclaration), @@ -144,7 +144,7 @@ public void publishAndConsumeRpc() throws Exception { // #create-rpc-flow result.first().toCompletableFuture().get(3, TimeUnit.SECONDS); - Sink> amqpSink = + Sink> amqpSink = AmqpSink.createReplyTo(AmqpReplyToSinkSettings.create(connectionProvider)); UniqueKillSwitch killSwitch = @@ -152,8 +152,7 @@ public void publishAndConsumeRpc() throws Exception { .viaMat(KillSwitches.single(), Keep.right()) .map( b -> - OutgoingMessage.create( - b.bytes().concat(ByteString.fromString("a")), false, false) + WriteMessage.create(b.bytes().concat(ByteString.fromString("a"))) .withProperties(b.properties())) .to(amqpSink) .run(materializer); @@ -242,8 +241,7 @@ public void publishFanoutAndConsume() throws Exception { repeatingFlow.shutdown(); } - private CompletionStage businessLogic( - CommittableIncomingMessage msg) { + private CompletionStage businessLogic(CommittableReadResult msg) { return CompletableFuture.completedFuture(msg); } @@ -263,16 +261,16 @@ public void publishAndConsumeWithoutAutoAck() throws Exception { // #create-source-withoutautoack final Integer bufferSize = 10; - final Source amqpSource = + final Source amqpSource = AmqpSource.committableSource( NamedQueueSourceSettings.create(connectionProvider, queueName) .withDeclaration(queueDeclaration), bufferSize); - final CompletionStage> result = + final CompletionStage> result = amqpSource .mapAsync(1, this::businessLogic) - .mapAsync(1, cm -> cm.ack(false).thenApply(unused -> cm.message())) + .mapAsync(1, cm -> cm.ack().thenApply(unused -> cm.message())) .take(input.size()) .runWith(Sink.seq(), materializer); // #create-source-withoutautoack @@ -306,14 +304,14 @@ public void republishMessageWithoutAutoAckIfNacked() throws Exception { .get(5, TimeUnit.SECONDS); final Integer bufferSize = 10; - final Source amqpSource = + final Source amqpSource = AmqpSource.committableSource( NamedQueueSourceSettings.create(connectionProvider, queueName) .withDeclaration(queueDeclaration), bufferSize); // #run-source-withoutautoack-and-nack - final CompletionStage> result1 = + final CompletionStage> result1 = amqpSource .mapAsync(1, this::businessLogic) .take(input.size()) @@ -324,9 +322,9 @@ public void republishMessageWithoutAutoAckIfNacked() throws Exception { result1.toCompletableFuture().get(3, TimeUnit.SECONDS); - final CompletionStage> result2 = + final CompletionStage> result2 = amqpSource - .mapAsync(1, cm -> cm.ack(false).thenApply(unused -> cm)) + .mapAsync(1, cm -> cm.ack().thenApply(unused -> cm)) .take(input.size()) .runWith(Sink.seq(), materializer); diff --git a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala index e175e8242c..08169167b1 100644 --- a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala +++ b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala @@ -97,8 +97,8 @@ class AmqpConnectorsSpec extends AmqpSpec { .viaMat(KillSwitches.single)(Keep.right) .mapConcat { b => List( - OutgoingMessage(b.bytes.concat(ByteString("a")), false, false).withProperties(b.properties), - OutgoingMessage(b.bytes.concat(ByteString("aa")), false, false).withProperties(b.properties) + WriteMessage(b.bytes.concat(ByteString("a"))).withProperties(b.properties), + WriteMessage(b.bytes.concat(ByteString("aa"))).withProperties(b.properties) ) } .to(amqpSink) @@ -125,7 +125,7 @@ class AmqpConnectorsSpec extends AmqpSpec { "handle missing reply-to header correctly" in assertAllStagesStopped { - val outgoingMessage = OutgoingMessage(ByteString.empty, false, false) + val outgoingMessage = WriteMessage(ByteString.empty) Source .single(outgoingMessage) @@ -163,7 +163,7 @@ class AmqpConnectorsSpec extends AmqpSpec { val mergedSources = Source.fromGraph(GraphDSL.create() { implicit b => import GraphDSL.Implicits._ val count = 3 - val merge = b.add(Merge[IncomingMessage](count)) + val merge = b.add(Merge[ReadResult](count)) for (n <- 0 until count) { val source = b.add( AmqpSource.atMostOnceSource( @@ -198,7 +198,7 @@ class AmqpConnectorsSpec extends AmqpSpec { ) val publisher = TestPublisher.probe[ByteString]() - val subscriber = TestSubscriber.probe[IncomingMessage]() + val subscriber = TestSubscriber.probe[ReadResult]() amqpSink.addAttributes(Attributes.inputBuffer(1, 1)).runWith(Source.fromPublisher(publisher)) amqpSource.addAttributes(Attributes.inputBuffer(1, 1)).runWith(Sink.fromSubscriber(subscriber)) @@ -315,7 +315,7 @@ class AmqpConnectorsSpec extends AmqpSpec { val (rpcQueueF, probe) = Source(input) .map(s => ByteString(s)) - .map(bytes => OutgoingMessage(bytes, false, false)) + .map(bytes => WriteMessage(bytes)) .viaMat(amqpRpcFlow)(Keep.right) .mapAsync(1)(cm => cm.ack().map(_ => cm.message)) .toMat(TestSink.probe)(Keep.both) @@ -332,7 +332,7 @@ class AmqpConnectorsSpec extends AmqpSpec { ) val sourceToSink = amqpSource .viaMat(KillSwitches.single)(Keep.right) - .map(b => OutgoingMessage(b.bytes, false, false).withProperties(b.properties)) + .map(b => WriteMessage(b.bytes).withProperties(b.properties)) .to(amqpSink) .run() @@ -364,7 +364,7 @@ class AmqpConnectorsSpec extends AmqpSpec { val input = Vector("one", "two", "three", "four", "five") val routingKeys = input.map(s => getRoutingKey(s)) Source(input) - .map(s => OutgoingMessage(ByteString(s), false, false).withRoutingKey(getRoutingKey(s))) + .map(s => WriteMessage(ByteString(s)).withRoutingKey(getRoutingKey(s))) .runWith(amqpSink) .futureValue shouldEqual Done diff --git a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala index 2af6b9500f..60e1c46e25 100644 --- a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala +++ b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala @@ -7,7 +7,7 @@ package docs.scaladsl import akka.{Done, NotUsed} import akka.stream.KillSwitches import akka.stream.alpakka.amqp._ -import akka.stream.alpakka.amqp.scaladsl.{AmqpRpcFlow, AmqpSink, AmqpSource, CommittableIncomingMessage} +import akka.stream.alpakka.amqp.scaladsl.{AmqpRpcFlow, AmqpSink, AmqpSource, CommittableReadResult} import akka.stream.scaladsl.{Keep, Sink, Source} import akka.stream.testkit.TestSubscriber import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped @@ -58,13 +58,13 @@ class AmqpDocsSpec extends AmqpSpec { writing.futureValue shouldEqual Done //#create-source - val amqpSource: Source[IncomingMessage, NotUsed] = + val amqpSource: Source[ReadResult, NotUsed] = AmqpSource.atMostOnceSource( NamedQueueSourceSettings(connectionProvider, queueName).withDeclaration(queueDeclaration), bufferSize = 10 ) - val result: Future[immutable.Seq[IncomingMessage]] = + val result: Future[immutable.Seq[ReadResult]] = amqpSource .take(input.size) .runWith(Sink.seq) @@ -104,7 +104,7 @@ class AmqpDocsSpec extends AmqpSpec { val sourceToSink = amqpSource .viaMat(KillSwitches.single)(Keep.right) - .map(b => OutgoingMessage(b.bytes.concat(ByteString("a")), false, false).withProperties(b.properties)) + .map(b => WriteMessage(b.bytes.concat(ByteString("a"))).withProperties(b.properties)) .to(amqpSink) .run() @@ -188,7 +188,7 @@ class AmqpDocsSpec extends AmqpSpec { val input = Vector("one", "two", "three", "four", "five") Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue shouldEqual Done - val businessLogic: CommittableIncomingMessage => Future[CommittableIncomingMessage] = Future.successful(_) + val businessLogic: CommittableReadResult => Future[CommittableReadResult] = Future.successful(_) //#create-source-withoutautoack val amqpSource = AmqpSource.committableSource( @@ -196,7 +196,7 @@ class AmqpDocsSpec extends AmqpSpec { bufferSize = 10 ) - val result: Future[immutable.Seq[CommittableIncomingMessage]] = amqpSource + val result: Future[immutable.Seq[CommittableReadResult]] = amqpSource .mapAsync(1)(businessLogic) .mapAsync(1)(cm => cm.ack().map(_ => cm)) .take(input.size) diff --git a/docs/src/main/paradox/amqp.md b/docs/src/main/paradox/amqp.md index a67d7e415e..899acd3fa7 100644 --- a/docs/src/main/paradox/amqp.md +++ b/docs/src/main/paradox/amqp.md @@ -32,12 +32,6 @@ There are several types of @scaladoc[AmqpConnectionProvider](akka.stream.alpakka ## Sending messages -@@@warning - -The Alpakka AMQP API is likely to change a bit in future releases, as discussed in @github[#1513](#1513). - -@@@ - First define a queue name and the declaration of the queue that the messages will be sent to. Scala @@ -50,7 +44,7 @@ Here we used @scaladoc[QueueDeclaration](akka.stream.alpakka.amqp.QueueDeclarati Create a sink, that accepts and forwards @scaladoc[ByteString](akka.util.ByteString)s to the AMQP server. -@scaladoc[AmqpSink](akka.stream.alpakka.amqp.AmqpSink$) is a collection of factory methods that facilitates creation of sinks. Here we created a *simple* sink, which means that we are able to pass `ByteString`s to the sink instead of wrapping data into @scaladoc[OutgoingMessage](akka.stream.alpakka.amqp.OutgoingMessage)s. +@scaladoc[AmqpSink](akka.stream.alpakka.amqp.AmqpSink$) is a collection of factory methods that facilitates creation of sinks. Here we created a *simple* sink, which means that we are able to pass `ByteString`s to the sink instead of wrapping data into @scaladoc[WriteMessage](akka.stream.alpakka.amqp.WriteMessage)s. Last step is to @extref[materialize](akka-docs:scala/stream/stream-flows-and-basics) and run the sink we have created. @@ -63,12 +57,6 @@ Java ## Receiving messages -@@@warning - -The Alpakka AMQP API is likely to change a bit in future releases, as discussed in @github[#1513](#1513). - -@@@ - Create a source using the same queue declaration as before. The `bufferSize` parameter controls the maximum number of messages to prefetch from the AMQP server. @@ -86,7 +74,7 @@ This is how you send and receive message from AMQP server using this connector. ## Using Pub/Sub -Instead of sending messages directly to queues, it is possible to send messages to an exchange and then provide instructions to AMQP server what to do with incoming messages to the exchange. We are going to use the *fanout* type of the exchange, which enables message broadcasting to multiple consumers. We are going to do that by using an exchange declaration for the sink and all of the sources. +Instead of sending messages directly to queues, it is possible to send messages to an exchange and then provide instructions to the AMQP server what to do with incoming messages. We are going to use the *fanout* type of the exchange, which enables message broadcasting to multiple consumers. We are going to do that by using an exchange declaration for the sink and all of the sources. Scala : @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #exchange-declaration } @@ -127,10 +115,9 @@ Java ## Acknowledging messages downstream -Committable sources return @scaladoc[CommittableIncomingMessage](akka.stream.alpakka.amqp.CommittableIncomingMessage) which wraps the @scaladoc[IncomingMessage](akka.stream.alpakka.amqp.IncomingMessage) and exposes the methods ack and nack. - -Use ack to acknowledge the message back to RabbitMQ. Ack takes an optional boolean parameter `multiple` indicating whether you are acknowledging the individual message or all the messages up to it. +Committable sources return @scala[@scaladoc[CommittableReadResult](akka.stream.alpakka.amqp.scaladsl.CommittableReadResult)]@java[@scaladoc[CommittableReadResult](akka.stream.alpakka.amqp.javadsl.CommittableReadResult)] which wraps the @scaladoc[ReadResult](akka.stream.alpakka.amqp.ReadResult) and exposes the methods `ack` and `nack`. +Use ack to acknowledge the message back to RabbitMQ. `ack` takes an optional boolean parameter `multiple` indicating whether you are acknowledging the individual message or all the messages up to it. Scala : @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #create-source-withoutautoack } @@ -138,7 +125,7 @@ Scala Java : @@snip [snip](/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java) { #create-source-withoutautoack } -Use nack to reject a message. Apart from the `multiple` argument, nack takes another optional boolean parameter indicating whether the item should be requeued or not. +Use `nack` to reject a message. Apart from the `multiple` argument, `nack` takes another optional boolean parameter indicating whether the item should be requeued or not. Scala : @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #run-source-withoutautoack-and-nack } From 9aea8d0b484d42f50940c83066f1b8df9ab8eca9 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Fri, 22 Feb 2019 11:12:03 +0100 Subject: [PATCH 3/8] Rename AmqpSinkSettings to AmqpWriteSettings --- .../alpakka/amqp/AmqpConnectorSettings.scala | 24 +++++++++---------- .../alpakka/amqp/impl/AmqpRpcFlowStage.scala | 2 +- .../alpakka/amqp/impl/AmqpSinkStage.scala | 4 ++-- .../alpakka/amqp/javadsl/AmqpRpcFlow.scala | 8 +++---- .../alpakka/amqp/javadsl/AmqpSink.scala | 4 ++-- .../alpakka/amqp/scaladsl/AmqpRpcFlow.scala | 6 ++--- .../alpakka/amqp/scaladsl/AmqpSink.scala | 4 ++-- .../amqp/javadsl/AmqpConnectorsTest.java | 10 ++++---- .../test/java/docs/javadsl/AmqpDocsTest.java | 10 ++++---- .../amqp/scaladsl/AmqpConnectorsSpec.scala | 22 ++++++++--------- ...raphStageLogicConnectionShutdownSpec.scala | 4 ++-- .../scala/docs/scaladsl/AmqpDocsSpec.scala | 10 ++++---- 12 files changed, 54 insertions(+), 54 deletions(-) diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpConnectorSettings.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpConnectorSettings.scala index bc4b06a595..b27639976d 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpConnectorSettings.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpConnectorSettings.scala @@ -184,36 +184,36 @@ object AmqpReplyToSinkSettings { AmqpReplyToSinkSettings(connectionProvider) } -final class AmqpSinkSettings private ( +final class AmqpWriteSettings private ( val connectionProvider: AmqpConnectionProvider, val exchange: Option[String] = None, val routingKey: Option[String] = None, val declarations: immutable.Seq[Declaration] = Nil ) extends AmqpConnectorSettings { - def withExchange(exchange: String): AmqpSinkSettings = + def withExchange(exchange: String): AmqpWriteSettings = copy(exchange = Some(exchange)) - def withRoutingKey(routingKey: String): AmqpSinkSettings = + def withRoutingKey(routingKey: String): AmqpWriteSettings = copy(routingKey = Some(routingKey)) - def withDeclaration(declaration: Declaration): AmqpSinkSettings = + def withDeclaration(declaration: Declaration): AmqpWriteSettings = copy(declarations = immutable.Seq(declaration)) - def withDeclarations(declarations: immutable.Seq[Declaration]): AmqpSinkSettings = + def withDeclarations(declarations: immutable.Seq[Declaration]): AmqpWriteSettings = copy(declarations = declarations) /** * Java API */ - def withDeclarations(declarations: java.util.List[Declaration]): AmqpSinkSettings = + def withDeclarations(declarations: java.util.List[Declaration]): AmqpWriteSettings = copy(declarations = declarations.asScala.toIndexedSeq) private def copy(connectionProvider: AmqpConnectionProvider = connectionProvider, exchange: Option[String] = exchange, routingKey: Option[String] = routingKey, declarations: immutable.Seq[Declaration] = declarations) = - new AmqpSinkSettings(connectionProvider, exchange, routingKey, declarations) + new AmqpWriteSettings(connectionProvider, exchange, routingKey, declarations) override def toString: String = "AmqpSinkSettings(" + @@ -224,15 +224,15 @@ final class AmqpSinkSettings private ( ")" } -object AmqpSinkSettings { - def apply(connectionProvider: AmqpConnectionProvider): AmqpSinkSettings = - new AmqpSinkSettings(connectionProvider) +object AmqpWriteSettings { + def apply(connectionProvider: AmqpConnectionProvider): AmqpWriteSettings = + new AmqpWriteSettings(connectionProvider) /** * Java API */ - def create(connectionProvider: AmqpConnectionProvider): AmqpSinkSettings = - AmqpSinkSettings(connectionProvider) + def create(connectionProvider: AmqpConnectionProvider): AmqpWriteSettings = + AmqpWriteSettings(connectionProvider) } sealed trait Declaration diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpRpcFlowStage.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpRpcFlowStage.scala index 90e895ad53..070bc51f5f 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpRpcFlowStage.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpRpcFlowStage.scala @@ -27,7 +27,7 @@ import scala.util.Success * can be overridden per message by including `expectedReplies` in the the header of the [[akka.stream.alpakka.amqp.WriteMessage]] */ @InternalApi -private[amqp] final class AmqpRpcFlowStage(settings: AmqpSinkSettings, bufferSize: Int, responsesPerMessage: Int = 1) +private[amqp] final class AmqpRpcFlowStage(settings: AmqpWriteSettings, bufferSize: Int, responsesPerMessage: Int = 1) extends GraphStageWithMaterializedValue[FlowShape[WriteMessage, CommittableReadResult], Future[String]] { stage => diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpSinkStage.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpSinkStage.scala index f1e6e14101..27ef976c2f 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpSinkStage.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/impl/AmqpSinkStage.scala @@ -6,7 +6,7 @@ package akka.stream.alpakka.amqp.impl import akka.Done import akka.annotation.InternalApi -import akka.stream.alpakka.amqp.{AmqpSinkSettings, WriteMessage} +import akka.stream.alpakka.amqp.{AmqpWriteSettings, WriteMessage} import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler} import akka.stream.{ActorAttributes, Attributes, Inlet, SinkShape} @@ -17,7 +17,7 @@ import scala.concurrent.{Future, Promise} * Each materialized sink will create one connection to the broker. */ @InternalApi -private[amqp] final class AmqpSinkStage(settings: AmqpSinkSettings) +private[amqp] final class AmqpSinkStage(settings: AmqpWriteSettings) extends GraphStageWithMaterializedValue[SinkShape[WriteMessage], Future[Done]] { stage => val in = Inlet[WriteMessage]("AmqpSink.in") diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpRpcFlow.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpRpcFlow.scala index 15060e9dec..62662c11c2 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpRpcFlow.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpRpcFlow.scala @@ -23,7 +23,7 @@ object AmqpRpcFlow { * * @param repliesPerMessage The number of responses that should be expected for each message placed on the queue. */ - def createSimple(settings: AmqpSinkSettings, + def createSimple(settings: AmqpWriteSettings, repliesPerMessage: Int): Flow[ByteString, ByteString, CompletionStage[String]] = akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow .simple(settings, repliesPerMessage) @@ -35,7 +35,7 @@ object AmqpRpcFlow { * Convenience for "at-most once delivery" semantics. Each message is acked to RabbitMQ * before its read result is emitted downstream. */ - def atMostOnceFlow(settings: AmqpSinkSettings, + def atMostOnceFlow(settings: AmqpWriteSettings, bufferSize: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] = akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow .atMostOnceFlow(settings, bufferSize) @@ -47,7 +47,7 @@ object AmqpRpcFlow { * Convenience for "at-most once delivery" semantics. Each message is acked to RabbitMQ * before its read result is emitted downstream. */ - def atMostOnceFlow(settings: AmqpSinkSettings, + def atMostOnceFlow(settings: AmqpWriteSettings, bufferSize: Int, repliesPerMessage: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] = akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow @@ -67,7 +67,7 @@ object AmqpRpcFlow { * Compared to auto-commit, this gives exact control over when a message is considered consumed. */ def committableFlow( - settings: AmqpSinkSettings, + settings: AmqpWriteSettings, bufferSize: Int, repliesPerMessage: Int = 1 ): Flow[WriteMessage, CommittableReadResult, CompletionStage[String]] = diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala index 0a7aac9012..7c052392a9 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/javadsl/AmqpSink.scala @@ -19,7 +19,7 @@ object AmqpSink { * This stage materializes to a CompletionStage, which can be used to know when the Sink completes, either normally * or because of an amqp failure */ - def create(settings: AmqpSinkSettings): akka.stream.javadsl.Sink[WriteMessage, CompletionStage[Done]] = + def create(settings: AmqpWriteSettings): akka.stream.javadsl.Sink[WriteMessage, CompletionStage[Done]] = akka.stream.alpakka.amqp.scaladsl.AmqpSink(settings).mapMaterializedValue(f => f.toJava).asJava /** @@ -43,7 +43,7 @@ object AmqpSink { * This stage materializes to a `CompletionStage`, which can be used to know when the Sink completes, either normally * or because of an amqp failure. */ - def createSimple(settings: AmqpSinkSettings): akka.stream.javadsl.Sink[ByteString, CompletionStage[Done]] = + def createSimple(settings: AmqpWriteSettings): akka.stream.javadsl.Sink[ByteString, CompletionStage[Done]] = akka.stream.alpakka.amqp.scaladsl.AmqpSink.simple(settings).mapMaterializedValue(f => f.toJava).asJava } diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala index 25096e076c..5666b2b59c 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala @@ -25,7 +25,7 @@ object AmqpRpcFlow { * @param repliesPerMessage The number of responses that should be expected for each message placed on the queue. This * can be overridden per message by including `expectedReplies` in the the header of the [[WriteMessage]] */ - def simple(settings: AmqpSinkSettings, repliesPerMessage: Int = 1): Flow[ByteString, ByteString, Future[String]] = + def simple(settings: AmqpWriteSettings, repliesPerMessage: Int = 1): Flow[ByteString, ByteString, Future[String]] = Flow[ByteString] .map(bytes => WriteMessage(bytes)) .viaMat(atMostOnceFlow(settings, 1, repliesPerMessage))(Keep.right) @@ -37,7 +37,7 @@ object AmqpRpcFlow { * before it is emitted downstream. */ @ApiMayChange // https://github.com/akka/alpakka/issues/1513 - def atMostOnceFlow(settings: AmqpSinkSettings, + def atMostOnceFlow(settings: AmqpWriteSettings, bufferSize: Int, repliesPerMessage: Int = 1): Flow[WriteMessage, ReadResult, Future[String]] = committableFlow(settings, bufferSize, repliesPerMessage) @@ -55,7 +55,7 @@ object AmqpRpcFlow { * Compared to auto-commit, this gives exact control over when a message is considered consumed. */ @ApiMayChange // https://github.com/akka/alpakka/issues/1513 - def committableFlow(settings: AmqpSinkSettings, + def committableFlow(settings: AmqpWriteSettings, bufferSize: Int, repliesPerMessage: Int = 1): Flow[WriteMessage, CommittableReadResult, Future[String]] = Flow.fromGraph(new impl.AmqpRpcFlowStage(settings, bufferSize, repliesPerMessage)) diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala index 2a7e75d1d5..56e4a3475b 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala @@ -20,7 +20,7 @@ object AmqpSink { * This stage materializes to a `Future[Done]`, which can be used to know when the Sink completes, either normally * or because of an amqp failure */ - def simple(settings: AmqpSinkSettings): Sink[ByteString, Future[Done]] = + def simple(settings: AmqpWriteSettings): Sink[ByteString, Future[Done]] = apply(settings).contramap[ByteString](bytes => WriteMessage(bytes)) /** @@ -48,7 +48,7 @@ object AmqpSink { * or because of an amqp failure */ @ApiMayChange // https://github.com/akka/alpakka/issues/1513 - def apply(settings: AmqpSinkSettings): Sink[WriteMessage, Future[Done]] = + def apply(settings: AmqpWriteSettings): Sink[WriteMessage, Future[Done]] = Sink.fromGraph(new impl.AmqpSinkStage(settings)) } diff --git a/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/AmqpConnectorsTest.java b/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/AmqpConnectorsTest.java index 274108fc52..d1e2b2fb52 100644 --- a/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/AmqpConnectorsTest.java +++ b/amqp/src/test/java/akka/stream/alpakka/amqp/javadsl/AmqpConnectorsTest.java @@ -77,7 +77,7 @@ public void throwIfCanNotConnect() throws Throwable { final Sink> amqpSink = AmqpSink.createSimple( - AmqpSinkSettings.create(connectionProvider) + AmqpWriteSettings.create(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration)); @@ -105,7 +105,7 @@ public void throwWithWrongCredentials() throws Throwable { final Sink> amqpSink = AmqpSink.createSimple( - AmqpSinkSettings.create(connectionProvider) + AmqpWriteSettings.create(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration)); @@ -132,7 +132,7 @@ public void publishAndConsumeRpcWithoutAutoAck() throws Exception { final Flow> ampqRpcFlow = AmqpRpcFlow.committableFlow( - AmqpSinkSettings.create(connectionProvider) + AmqpWriteSettings.create(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration), 10, @@ -180,7 +180,7 @@ public void keepConnectionOpenIfDownstreamClosesAndThereArePendingAcks() throws final Sink> amqpSink = AmqpSink.createSimple( - AmqpSinkSettings.create(connectionProvider) + AmqpWriteSettings.create(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration)); @@ -227,7 +227,7 @@ public void setRoutingKeyPerMessageAndConsumeThemInTheSameJVM() throws Exception final Sink> amqpSink = AmqpSink.create( - AmqpSinkSettings.create(connectionProvider) + AmqpWriteSettings.create(connectionProvider) .withExchange(exchangeName) .withDeclarations( Arrays.asList(exchangeDeclaration, queueDeclaration, bindingDeclaration))); diff --git a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java index cbbb2ef737..312ac7ca1b 100644 --- a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java +++ b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java @@ -80,7 +80,7 @@ public void publishAndConsume() throws Exception { // #create-sink final Sink> amqpSink = AmqpSink.createSimple( - AmqpSinkSettings.create(connectionProvider) + AmqpWriteSettings.create(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration)); @@ -130,7 +130,7 @@ public void publishAndConsumeRpc() throws Exception { // #create-rpc-flow final Flow> ampqRpcFlow = AmqpRpcFlow.createSimple( - AmqpSinkSettings.create(connectionProvider) + AmqpWriteSettings.create(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration), 1); @@ -182,7 +182,7 @@ public void publishFanoutAndConsume() throws Exception { // #create-exchange-sink final Sink> amqpSink = AmqpSink.createSimple( - AmqpSinkSettings.create(connectionProvider) + AmqpWriteSettings.create(connectionProvider) .withExchange(exchangeName) .withDeclaration(exchangeDeclaration)); // #create-exchange-sink @@ -252,7 +252,7 @@ public void publishAndConsumeWithoutAutoAck() throws Exception { final Sink> amqpSink = AmqpSink.createSimple( - AmqpSinkSettings.create(connectionProvider) + AmqpWriteSettings.create(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration)); @@ -292,7 +292,7 @@ public void republishMessageWithoutAutoAckIfNacked() throws Exception { final Sink> amqpSink = AmqpSink.createSimple( - AmqpSinkSettings.create(connectionProvider) + AmqpWriteSettings.create(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration)); diff --git a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala index 08169167b1..7e88b794da 100644 --- a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala +++ b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpConnectorsSpec.scala @@ -38,7 +38,7 @@ class AmqpConnectorsSpec extends AmqpSpec { val queueDeclaration = QueueDeclaration(queueName) val amqpSink = AmqpSink.simple( - AmqpSinkSettings(connectionProvider) + AmqpWriteSettings(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration) ) @@ -58,7 +58,7 @@ class AmqpConnectorsSpec extends AmqpSpec { val queueDeclaration = QueueDeclaration(queueName) val amqpSink = AmqpSink.simple( - AmqpSinkSettings(connectionProvider) + AmqpWriteSettings(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration) ) @@ -73,7 +73,7 @@ class AmqpConnectorsSpec extends AmqpSpec { val queueDeclaration = QueueDeclaration(queueName) val amqpRpcFlow = AmqpRpcFlow.simple( - AmqpSinkSettings(connectionProvider) + AmqpWriteSettings(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration), 2 @@ -116,7 +116,7 @@ class AmqpConnectorsSpec extends AmqpSpec { Source .empty[ByteString] - .via(AmqpRpcFlow.simple(AmqpSinkSettings(connectionProvider))) + .via(AmqpRpcFlow.simple(AmqpWriteSettings(connectionProvider))) .runWith(TestSink.probe) .ensureSubscription() .expectComplete() @@ -152,7 +152,7 @@ class AmqpConnectorsSpec extends AmqpSpec { val queueName = "amqp-conn-it-spec-work-queues-" + System.currentTimeMillis() val queueDeclaration = QueueDeclaration(queueName) val amqpSink = AmqpSink.simple( - AmqpSinkSettings(connectionProvider) + AmqpWriteSettings(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration) ) @@ -192,7 +192,7 @@ class AmqpConnectorsSpec extends AmqpSpec { ) val amqpSink = AmqpSink.simple( - AmqpSinkSettings(connectionProvider) + AmqpWriteSettings(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration) ) @@ -242,7 +242,7 @@ class AmqpConnectorsSpec extends AmqpSpec { val queueDeclaration = QueueDeclaration(queueName) val amqpSink = AmqpSink.simple( - AmqpSinkSettings(connectionSettings) + AmqpWriteSettings(connectionSettings) .withRoutingKey(queueName) .withDeclaration(queueDeclaration) ) @@ -269,7 +269,7 @@ class AmqpConnectorsSpec extends AmqpSpec { val queueDeclaration = QueueDeclaration(queueName) val amqpSink = AmqpSink.simple( - AmqpSinkSettings(connectionProvider) + AmqpWriteSettings(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration) ) @@ -307,7 +307,7 @@ class AmqpConnectorsSpec extends AmqpSpec { val input = Vector("one", "two", "three", "four", "five") val amqpRpcFlow = AmqpRpcFlow.committableFlow( - AmqpSinkSettings(connectionProvider) + AmqpWriteSettings(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration), bufferSize = 10 @@ -350,7 +350,7 @@ class AmqpConnectorsSpec extends AmqpSpec { val bindingDeclaration = BindingDeclaration(queueName, exchangeName).withRoutingKey(getRoutingKey("*")) val amqpSink = AmqpSink( - AmqpSinkSettings(connectionProvider) + AmqpWriteSettings(connectionProvider) .withExchange(exchangeName) .withDeclarations(immutable.Seq(exchangeDeclaration, queueDeclaration, bindingDeclaration)) ) @@ -385,7 +385,7 @@ class AmqpConnectorsSpec extends AmqpSpec { val queueDeclaration = QueueDeclaration(queueName) val amqpSink = AmqpSink.simple( - AmqpSinkSettings(connectionProvider) + AmqpWriteSettings(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration) ) diff --git a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala index 5784828dc6..6aebccdedf 100644 --- a/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala +++ b/amqp/src/test/scala/akka/stream/alpakka/amqp/scaladsl/AmqpGraphStageLogicConnectionShutdownSpec.scala @@ -14,7 +14,7 @@ import akka.stream.alpakka.amqp.{ AmqpCachedConnectionProvider, AmqpConnectionFactoryConnectionProvider, AmqpProxyConnection, - AmqpSinkSettings, + AmqpWriteSettings, QueueDeclaration } import akka.stream.scaladsl.Source @@ -78,7 +78,7 @@ class AmqpGraphStageLogicConnectionShutdownSpec val queueDeclaration = QueueDeclaration(queueName) val amqpSink = AmqpSink.simple( - AmqpSinkSettings(reusableConnectionProvider) + AmqpWriteSettings(reusableConnectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration) ) diff --git a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala index 60e1c46e25..dc4bebed8c 100644 --- a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala +++ b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala @@ -44,7 +44,7 @@ class AmqpDocsSpec extends AmqpSpec { //#create-sink val amqpSink: Sink[ByteString, Future[Done]] = AmqpSink.simple( - AmqpSinkSettings(connectionProvider) + AmqpWriteSettings(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration) ) @@ -87,7 +87,7 @@ class AmqpDocsSpec extends AmqpSpec { //#create-rpc-flow val amqpRpcFlow = AmqpRpcFlow.simple( - AmqpSinkSettings(connectionProvider).withRoutingKey(queueName).withDeclaration(queueDeclaration) + AmqpWriteSettings(connectionProvider).withRoutingKey(queueName).withDeclaration(queueDeclaration) ) val (rpcQueueF: Future[String], probe: TestSubscriber.Probe[ByteString]) = Source(input) @@ -124,7 +124,7 @@ class AmqpDocsSpec extends AmqpSpec { //#create-exchange-sink val amqpSink = AmqpSink.simple( - AmqpSinkSettings(connectionProvider) + AmqpWriteSettings(connectionProvider) .withExchange(exchangeName) .withDeclaration(exchangeDeclaration) ) @@ -180,7 +180,7 @@ class AmqpDocsSpec extends AmqpSpec { val queueDeclaration = QueueDeclaration(queueName) val amqpSink = AmqpSink.simple( - AmqpSinkSettings(connectionProvider) + AmqpWriteSettings(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration) ) @@ -212,7 +212,7 @@ class AmqpDocsSpec extends AmqpSpec { val queueDeclaration = QueueDeclaration(queueName) val amqpSink = AmqpSink.simple( - AmqpSinkSettings(connectionProvider) + AmqpWriteSettings(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration) ) From 5d8ac3621077a8954cc322aaf7656d1a4cc3e2a0 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Fri, 22 Feb 2019 11:38:49 +0100 Subject: [PATCH 4/8] Enable autoack in at-most-once examples; review ack/nack example --- .../test/java/docs/javadsl/AmqpDocsTest.java | 19 +++++++---- .../scala/docs/scaladsl/AmqpDocsSpec.scala | 29 ++++++++++------- docs/src/main/paradox/amqp.md | 32 ++----------------- 3 files changed, 32 insertions(+), 48 deletions(-) diff --git a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java index 312ac7ca1b..b89f891b4c 100644 --- a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java +++ b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java @@ -95,7 +95,8 @@ public void publishAndConsume() throws Exception { final Source amqpSource = AmqpSource.atMostOnceSource( NamedQueueSourceSettings.create(connectionProvider, queueName) - .withDeclaration(queueDeclaration), + .withDeclaration(queueDeclaration) + .withAckRequired(false), bufferSize); final CompletionStage> result = @@ -270,7 +271,7 @@ public void publishAndConsumeWithoutAutoAck() throws Exception { final CompletionStage> result = amqpSource .mapAsync(1, this::businessLogic) - .mapAsync(1, cm -> cm.ack().thenApply(unused -> cm.message())) + .mapAsync(1, cm -> cm.ack(/* multiple */ false).thenApply(unused -> cm.message())) .take(input.size()) .runWith(Sink.seq(), materializer); // #create-source-withoutautoack @@ -310,17 +311,21 @@ public void republishMessageWithoutAutoAckIfNacked() throws Exception { .withDeclaration(queueDeclaration), bufferSize); - // #run-source-withoutautoack-and-nack - final CompletionStage> result1 = + // #create-source-withoutautoack + + final CompletionStage> nackedResults = amqpSource .mapAsync(1, this::businessLogic) .take(input.size()) .mapAsync( - 1, cm -> cm.nack(/* multiple */ false, /* requeue */ true).thenApply(unused -> cm)) + 1, + cm -> + cm.nack(/* multiple */ false, /* requeue */ true) + .thenApply(unused -> cm.message())) .runWith(Sink.seq(), materializer); - // #run-source-withoutautoack-and-nack + // #create-source-withoutautoack - result1.toCompletableFuture().get(3, TimeUnit.SECONDS); + nackedResults.toCompletableFuture().get(3, TimeUnit.SECONDS); final CompletionStage> result2 = amqpSource diff --git a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala index dc4bebed8c..752ec05da8 100644 --- a/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala +++ b/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala @@ -25,6 +25,8 @@ class AmqpDocsSpec extends AmqpSpec { override implicit val patienceConfig = PatienceConfig(10.seconds) + val businessLogic: CommittableReadResult => Future[CommittableReadResult] = Future.successful(_) + "The AMQP Connectors" should { val connectionProvider = AmqpLocalConnectionProvider @@ -60,7 +62,9 @@ class AmqpDocsSpec extends AmqpSpec { //#create-source val amqpSource: Source[ReadResult, NotUsed] = AmqpSource.atMostOnceSource( - NamedQueueSourceSettings(connectionProvider, queueName).withDeclaration(queueDeclaration), + NamedQueueSourceSettings(connectionProvider, queueName) + .withDeclaration(queueDeclaration) + .withAckRequired(false), bufferSize = 10 ) @@ -188,22 +192,21 @@ class AmqpDocsSpec extends AmqpSpec { val input = Vector("one", "two", "three", "four", "five") Source(input).map(s => ByteString(s)).runWith(amqpSink).futureValue shouldEqual Done - val businessLogic: CommittableReadResult => Future[CommittableReadResult] = Future.successful(_) - //#create-source-withoutautoack val amqpSource = AmqpSource.committableSource( - NamedQueueSourceSettings(connectionProvider, queueName).withDeclaration(queueDeclaration), + NamedQueueSourceSettings(connectionProvider, queueName) + .withDeclaration(queueDeclaration), bufferSize = 10 ) - val result: Future[immutable.Seq[CommittableReadResult]] = amqpSource + val result: Future[immutable.Seq[ReadResult]] = amqpSource .mapAsync(1)(businessLogic) - .mapAsync(1)(cm => cm.ack().map(_ => cm)) + .mapAsync(1)(cm => cm.ack().map(_ => cm.message)) .take(input.size) .runWith(Sink.seq) //#create-source-withoutautoack - result.futureValue.map(_.message.bytes.utf8String) shouldEqual input + result.futureValue.map(_.bytes.utf8String) shouldEqual input } "republish message without autoAck if nack is sent" in assertAllStagesStopped { @@ -225,14 +228,16 @@ class AmqpDocsSpec extends AmqpSpec { bufferSize = 10 ) - //#run-source-withoutautoack-and-nack - val result1 = amqpSource + //#create-source-withoutautoack + + val nackedResults: Future[immutable.Seq[ReadResult]] = amqpSource + .mapAsync(1)(businessLogic) .take(input.size) - .mapAsync(1)(cm => cm.nack(multiple = false, requeue = true).map(_ => cm)) + .mapAsync(1)(cm => cm.nack(multiple = false, requeue = true).map(_ => cm.message)) .runWith(Sink.seq) - //#run-source-withoutautoack-and-nack + //#create-source-withoutautoack - Await.ready(result1, 3.seconds) + Await.ready(nackedResults, 3.seconds) val result2 = amqpSource .mapAsync(1)(cm => cm.ack().map(_ => cm)) diff --git a/docs/src/main/paradox/amqp.md b/docs/src/main/paradox/amqp.md index 899acd3fa7..5dcc0545a2 100644 --- a/docs/src/main/paradox/amqp.md +++ b/docs/src/main/paradox/amqp.md @@ -117,38 +117,12 @@ Java Committable sources return @scala[@scaladoc[CommittableReadResult](akka.stream.alpakka.amqp.scaladsl.CommittableReadResult)]@java[@scaladoc[CommittableReadResult](akka.stream.alpakka.amqp.javadsl.CommittableReadResult)] which wraps the @scaladoc[ReadResult](akka.stream.alpakka.amqp.ReadResult) and exposes the methods `ack` and `nack`. -Use ack to acknowledge the message back to RabbitMQ. `ack` takes an optional boolean parameter `multiple` indicating whether you are acknowledging the individual message or all the messages up to it. - -Scala -: @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #create-source-withoutautoack } - -Java -: @@snip [snip](/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java) { #create-source-withoutautoack } +Use `ack` to acknowledge the message back to RabbitMQ. `ack` takes an optional boolean parameter `multiple` indicating whether you are acknowledging the individual message or all the messages up to it. Use `nack` to reject a message. Apart from the `multiple` argument, `nack` takes another optional boolean parameter indicating whether the item should be requeued or not. Scala -: @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #run-source-withoutautoack-and-nack } - -Java -: @@snip [snip](/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java) { #run-source-withoutautoack-and-nack } - -## Running the example code - -The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt. - -> Test code requires AMQP server running in the background. You can start one quickly using docker: -> -> `docker-compose up amqp` - -Scala -: ``` - sbt - > amqp/testOnly *.AmqpConnectorsSpec - ``` +: @@snip [snip](/amqp/src/test/scala/docs/scaladsl/AmqpDocsSpec.scala) { #create-source-withoutautoack } Java -: ``` - sbt - > amqp/testOnly *.AmqpConnectorsTest - ``` +: @@snip [snip](/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java) { #create-source-withoutautoack } From d6243fca1fea336822b8dc2b5d4f13a742fbd9e4 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Fri, 22 Feb 2019 11:49:36 +0100 Subject: [PATCH 5/8] Remove API may change annotations --- .../akka/stream/alpakka/amqp/AmqpConnectorSettings.scala | 5 ++--- .../akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala | 2 -- .../scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala | 3 --- .../scala/akka/stream/alpakka/amqp/scaladsl/AmqpSource.scala | 3 --- .../stream/alpakka/amqp/scaladsl/CommittableReadResult.scala | 2 -- 5 files changed, 2 insertions(+), 13 deletions(-) diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpConnectorSettings.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpConnectorSettings.scala index b27639976d..c57b3bf619 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpConnectorSettings.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpConnectorSettings.scala @@ -4,10 +4,10 @@ package akka.stream.alpakka.amqp -import akka.annotation.{ApiMayChange, InternalApi} +import akka.annotation.InternalApi -import scala.collection.immutable import scala.collection.JavaConverters._ +import scala.collection.immutable /** * Internal API @@ -53,7 +53,6 @@ final class NamedQueueSourceSettings private ( * Ack/Nack is required as default. Setting this to false will configure AMQP's `autoAck` so that the * server considers messages acknowledged once delivered. */ - @ApiMayChange // def withAckRequired(ackRequired: Boolean): NamedQueueSourceSettings = copy(ackRequired = ackRequired) diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala index 5666b2b59c..7a9d1bac8d 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala @@ -36,7 +36,6 @@ object AmqpRpcFlow { * Convenience for "at-most once delivery" semantics. Each message is acked to RabbitMQ * before it is emitted downstream. */ - @ApiMayChange // https://github.com/akka/alpakka/issues/1513 def atMostOnceFlow(settings: AmqpWriteSettings, bufferSize: Int, repliesPerMessage: Int = 1): Flow[WriteMessage, ReadResult, Future[String]] = @@ -54,7 +53,6 @@ object AmqpRpcFlow { * * Compared to auto-commit, this gives exact control over when a message is considered consumed. */ - @ApiMayChange // https://github.com/akka/alpakka/issues/1513 def committableFlow(settings: AmqpWriteSettings, bufferSize: Int, repliesPerMessage: Int = 1): Flow[WriteMessage, CommittableReadResult, Future[String]] = diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala index 56e4a3475b..97df09cf41 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSink.scala @@ -5,7 +5,6 @@ package akka.stream.alpakka.amqp.scaladsl import akka.Done -import akka.annotation.ApiMayChange import akka.stream.alpakka.amqp._ import akka.stream.scaladsl.Sink import akka.util.ByteString @@ -33,7 +32,6 @@ object AmqpSink { * This stage materializes to a `Future[Done]`, which can be used to know when the Sink completes, either normally * or because of an amqp failure */ - @ApiMayChange // https://github.com/akka/alpakka/issues/1513 def replyTo(settings: AmqpReplyToSinkSettings): Sink[WriteMessage, Future[Done]] = Sink.fromGraph(new impl.AmqpReplyToSinkStage(settings)) @@ -47,7 +45,6 @@ object AmqpSink { * This stage materializes to a Future[Done], which can be used to know when the Sink completes, either normally * or because of an amqp failure */ - @ApiMayChange // https://github.com/akka/alpakka/issues/1513 def apply(settings: AmqpWriteSettings): Sink[WriteMessage, Future[Done]] = Sink.fromGraph(new impl.AmqpSinkStage(settings)) diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSource.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSource.scala index 07012c1e37..078d9c42bb 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSource.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpSource.scala @@ -5,7 +5,6 @@ package akka.stream.alpakka.amqp.scaladsl import akka.NotUsed -import akka.annotation.ApiMayChange import akka.dispatch.ExecutionContexts import akka.stream.alpakka.amqp.impl import akka.stream.alpakka.amqp.{AmqpSourceSettings, ReadResult} @@ -18,7 +17,6 @@ object AmqpSource { * Scala API: Convenience for "at-most once delivery" semantics. Each message is acked to RabbitMQ * before it is emitted downstream. */ - @ApiMayChange // https://github.com/akka/alpakka/issues/1513 def atMostOnceSource(settings: AmqpSourceSettings, bufferSize: Int): Source[ReadResult, NotUsed] = committableSource(settings, bufferSize) .mapAsync(1)(cm => cm.ack().map(_ => cm.message)) @@ -34,7 +32,6 @@ object AmqpSource { * * Compared to auto-commit, this gives exact control over when a message is considered consumed. */ - @ApiMayChange // https://github.com/akka/alpakka/issues/1513 def committableSource(settings: AmqpSourceSettings, bufferSize: Int): Source[CommittableReadResult, NotUsed] = Source.fromGraph(new impl.AmqpSourceStage(settings, bufferSize)) diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/CommittableReadResult.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/CommittableReadResult.scala index 6191e35127..6634ca4b53 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/CommittableReadResult.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/CommittableReadResult.scala @@ -5,12 +5,10 @@ package akka.stream.alpakka.amqp.scaladsl import akka.Done -import akka.annotation.ApiMayChange import akka.stream.alpakka.amqp.ReadResult import scala.concurrent.Future -@ApiMayChange // https://github.com/akka/alpakka/issues/1513 trait CommittableReadResult { val message: ReadResult def ack(multiple: Boolean = false): Future[Done] From aab2af9c38e870d9ad2776345039b2e0c1506a67 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Fri, 22 Feb 2019 13:34:22 +0100 Subject: [PATCH 6/8] Remove unused import, make execution context use explicit --- .../akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala index 7a9d1bac8d..63740c46bf 100644 --- a/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala +++ b/amqp/src/main/scala/akka/stream/alpakka/amqp/scaladsl/AmqpRpcFlow.scala @@ -4,7 +4,6 @@ package akka.stream.alpakka.amqp.scaladsl -import akka.annotation.ApiMayChange import akka.dispatch.ExecutionContexts import akka.stream.alpakka.amqp._ import akka.stream.scaladsl.{Flow, Keep} @@ -13,7 +12,6 @@ import akka.util.ByteString import scala.concurrent.Future object AmqpRpcFlow { - private implicit val executionContext = ExecutionContexts.sameThreadExecutionContext /** * Scala API: @@ -40,7 +38,9 @@ object AmqpRpcFlow { bufferSize: Int, repliesPerMessage: Int = 1): Flow[WriteMessage, ReadResult, Future[String]] = committableFlow(settings, bufferSize, repliesPerMessage) - .mapAsync(1)(cm => cm.ack().map(_ => cm.message)) + .mapAsync(1) { cm => + cm.ack().map(_ => cm.message)(ExecutionContexts.sameThreadExecutionContext) + } /** * Scala API: From a3412dd6e8fca887c0c953be8a394c41dd107402 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Fri, 22 Feb 2019 15:33:00 +0100 Subject: [PATCH 7/8] Wait a bit so that stages can stop correctly --- amqp/src/test/java/docs/javadsl/AmqpDocsTest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java index b89f891b4c..92a18431b8 100644 --- a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java +++ b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java @@ -341,5 +341,9 @@ public void republishMessageWithoutAutoAckIfNacked() throws Exception { .stream() .map(m -> m.message().bytes().utf8String()) .collect(Collectors.toList())); + + // See https://github.com/akka/akka/issues/26410 + // extra wait before assertAllStagesStopped kicks in + Thread.sleep(3 * 1000); } } From a9faedad51b4d89f9d0095274c0cd659a9362c3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martynas=20Mickevi=C4=8Dius?= Date: Mon, 25 Feb 2019 15:53:29 +0200 Subject: [PATCH 8/8] Move take combinator to the very upstream So it cancells the source immediately when the needed number of messages is reached. --- amqp/src/test/java/docs/javadsl/AmqpDocsTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java index 92a18431b8..0a5403dbbf 100644 --- a/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java +++ b/amqp/src/test/java/docs/javadsl/AmqpDocsTest.java @@ -315,8 +315,8 @@ public void republishMessageWithoutAutoAckIfNacked() throws Exception { final CompletionStage> nackedResults = amqpSource - .mapAsync(1, this::businessLogic) .take(input.size()) + .mapAsync(1, this::businessLogic) .mapAsync( 1, cm -> @@ -329,21 +329,21 @@ public void republishMessageWithoutAutoAckIfNacked() throws Exception { final CompletionStage> result2 = amqpSource - .mapAsync(1, cm -> cm.ack().thenApply(unused -> cm)) .take(input.size()) + .mapAsync(1, cm -> cm.ack().thenApply(unused -> cm)) .runWith(Sink.seq(), materializer); assertEquals( input, result2 .toCompletableFuture() - .get(3, TimeUnit.SECONDS) + .get(10, TimeUnit.SECONDS) .stream() .map(m -> m.message().bytes().utf8String()) .collect(Collectors.toList())); // See https://github.com/akka/akka/issues/26410 // extra wait before assertAllStagesStopped kicks in - Thread.sleep(3 * 1000); + Thread.sleep(6 * 1000); } }