Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amqp: renaming of stream element classes #1524

Merged
merged 8 commits into from
Feb 26, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

package akka.stream.alpakka.amqp

import akka.annotation.{ApiMayChange, InternalApi}
import akka.annotation.InternalApi

import scala.collection.immutable
import scala.collection.JavaConverters._
import scala.collection.immutable

/**
* Internal API
Expand Down Expand Up @@ -53,7 +53,6 @@ final class NamedQueueSourceSettings private (
* Ack/Nack is required as default. Setting this to false will configure AMQP's `autoAck` so that the
* server considers messages acknowledged once delivered.
*/
@ApiMayChange //
def withAckRequired(ackRequired: Boolean): NamedQueueSourceSettings =
copy(ackRequired = ackRequired)

Expand Down Expand Up @@ -184,36 +183,36 @@ object AmqpReplyToSinkSettings {
AmqpReplyToSinkSettings(connectionProvider)
}

final class AmqpSinkSettings private (
final class AmqpWriteSettings private (
val connectionProvider: AmqpConnectionProvider,
val exchange: Option[String] = None,
val routingKey: Option[String] = None,
val declarations: immutable.Seq[Declaration] = Nil
) extends AmqpConnectorSettings {

def withExchange(exchange: String): AmqpSinkSettings =
def withExchange(exchange: String): AmqpWriteSettings =
copy(exchange = Some(exchange))

def withRoutingKey(routingKey: String): AmqpSinkSettings =
def withRoutingKey(routingKey: String): AmqpWriteSettings =
copy(routingKey = Some(routingKey))

def withDeclaration(declaration: Declaration): AmqpSinkSettings =
def withDeclaration(declaration: Declaration): AmqpWriteSettings =
copy(declarations = immutable.Seq(declaration))

def withDeclarations(declarations: immutable.Seq[Declaration]): AmqpSinkSettings =
def withDeclarations(declarations: immutable.Seq[Declaration]): AmqpWriteSettings =
copy(declarations = declarations)

/**
* Java API
*/
def withDeclarations(declarations: java.util.List[Declaration]): AmqpSinkSettings =
def withDeclarations(declarations: java.util.List[Declaration]): AmqpWriteSettings =
copy(declarations = declarations.asScala.toIndexedSeq)

private def copy(connectionProvider: AmqpConnectionProvider = connectionProvider,
exchange: Option[String] = exchange,
routingKey: Option[String] = routingKey,
declarations: immutable.Seq[Declaration] = declarations) =
new AmqpSinkSettings(connectionProvider, exchange, routingKey, declarations)
new AmqpWriteSettings(connectionProvider, exchange, routingKey, declarations)

override def toString: String =
"AmqpSinkSettings(" +
Expand All @@ -224,15 +223,15 @@ final class AmqpSinkSettings private (
")"
}

object AmqpSinkSettings {
def apply(connectionProvider: AmqpConnectionProvider): AmqpSinkSettings =
new AmqpSinkSettings(connectionProvider)
object AmqpWriteSettings {
def apply(connectionProvider: AmqpConnectionProvider): AmqpWriteSettings =
new AmqpWriteSettings(connectionProvider)

/**
* Java API
*/
def create(connectionProvider: AmqpConnectionProvider): AmqpSinkSettings =
AmqpSinkSettings(connectionProvider)
def create(connectionProvider: AmqpConnectionProvider): AmqpWriteSettings =
AmqpWriteSettings(connectionProvider)
}

sealed trait Declaration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,24 @@ package akka.stream.alpakka.amqp.impl

import akka.Done
import akka.annotation.InternalApi
import akka.stream.alpakka.amqp.{AmqpReplyToSinkSettings, OutgoingMessage}
import akka.stream.alpakka.amqp.{AmqpReplyToSinkSettings, WriteMessage}
import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler}
import akka.stream.{ActorAttributes, Attributes, Inlet, SinkShape}

import scala.concurrent.{Future, Promise}

/**
* Connects to an AMQP server upon materialization and sends incoming messages to the server.
* Connects to an AMQP server upon materialization and sends write messages to the server.
* Each materialized sink will create one connection to the broker. This stage sends messages to
* the queue named in the replyTo options of the message instead of from settings declared at construction.
*/
@InternalApi
private[amqp] final class AmqpReplyToSinkStage(settings: AmqpReplyToSinkSettings)
extends GraphStageWithMaterializedValue[SinkShape[OutgoingMessage], Future[Done]] { stage =>
extends GraphStageWithMaterializedValue[SinkShape[WriteMessage], Future[Done]] { stage =>

val in = Inlet[OutgoingMessage]("AmqpReplyToSink.in")
val in = Inlet[WriteMessage]("AmqpReplyToSink.in")

override def shape: SinkShape[OutgoingMessage] = SinkShape.of(in)
override def shape: SinkShape[WriteMessage] = SinkShape.of(in)

override protected def initialAttributes: Attributes =
super.initialAttributes and Attributes.name("AmqpReplyToSink") and ActorAttributes.IODispatcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.Done
import akka.annotation.InternalApi
import akka.stream._
import akka.stream.alpakka.amqp._
import akka.stream.alpakka.amqp.scaladsl.CommittableIncomingMessage
import akka.stream.alpakka.amqp.scaladsl.CommittableReadResult
import akka.stream.stage._
import akka.util.ByteString
import com.rabbitmq.client.AMQP.BasicProperties
Expand All @@ -21,20 +21,20 @@ import scala.concurrent.{Future, Promise}
import scala.util.Success

/**
* This stage materializes to a Future[String], which is the name of the private exclusive queue used for RPC communication
* This stage materializes to a `Future[String]`, which is the name of the private exclusive queue used for RPC communication
*
* @param responsesPerMessage The number of responses that should be expected for each message placed on the queue. This
* can be overridden per message by including `expectedReplies` in the the header of the [[OutgoingMessage]]
* can be overridden per message by including `expectedReplies` in the the header of the [[akka.stream.alpakka.amqp.WriteMessage]]
*/
@InternalApi
private[amqp] final class AmqpRpcFlowStage(settings: AmqpSinkSettings, bufferSize: Int, responsesPerMessage: Int = 1)
extends GraphStageWithMaterializedValue[FlowShape[OutgoingMessage, CommittableIncomingMessage], Future[String]] {
private[amqp] final class AmqpRpcFlowStage(settings: AmqpWriteSettings, bufferSize: Int, responsesPerMessage: Int = 1)
extends GraphStageWithMaterializedValue[FlowShape[WriteMessage, CommittableReadResult], Future[String]] {
stage =>

val in = Inlet[OutgoingMessage]("AmqpRpcFlow.in")
val out = Outlet[CommittableIncomingMessage]("AmqpRpcFlow.out")
val in = Inlet[WriteMessage]("AmqpRpcFlow.in")
val out = Outlet[CommittableReadResult]("AmqpRpcFlow.out")

override def shape: FlowShape[OutgoingMessage, CommittableIncomingMessage] = FlowShape.of(in, out)
override def shape: FlowShape[WriteMessage, CommittableReadResult] = FlowShape.of(in, out)

override protected def initialAttributes: Attributes =
super.initialAttributes and Attributes.name("AmqpRpcFlow") and ActorAttributes.IODispatcher
Expand All @@ -46,7 +46,7 @@ private[amqp] final class AmqpRpcFlowStage(settings: AmqpSinkSettings, bufferSiz
override val settings = stage.settings
private val exchange = settings.exchange.getOrElse("")
private val routingKey = settings.routingKey.getOrElse("")
private val queue = mutable.Queue[CommittableIncomingMessage]()
private val queue = mutable.Queue[CommittableReadResult]()
private var queueName: String = _
private var unackedMessages = 0
private var outstandingMessages = 0
Expand Down Expand Up @@ -91,8 +91,8 @@ private[amqp] final class AmqpRpcFlowStage(settings: AmqpSinkSettings, bufferSiz
properties: BasicProperties,
body: Array[Byte]): Unit =
consumerCallback.invoke(
new CommittableIncomingMessage {
override val message = IncomingMessage(ByteString(body), envelope, properties)
new CommittableReadResult {
override val message = ReadResult(ByteString(body), envelope, properties)

override def ack(multiple: Boolean): Future[Done] = {
val promise = Promise[Done]()
Expand Down Expand Up @@ -139,7 +139,7 @@ private[amqp] final class AmqpRpcFlowStage(settings: AmqpSinkSettings, bufferSiz
promise.success(queueName)
}

def handleDelivery(message: CommittableIncomingMessage): Unit =
def handleDelivery(message: CommittableReadResult): Unit =
if (isAvailable(out)) {
pushMessage(message)
} else if (queue.size + 1 > bufferSize) {
Expand All @@ -163,7 +163,7 @@ private[amqp] final class AmqpRpcFlowStage(settings: AmqpSinkSettings, bufferSiz
}
)

def pushMessage(message: CommittableIncomingMessage): Unit = {
def pushMessage(message: CommittableReadResult): Unit = {
push(out, message)
unackedMessages += 1
outstandingMessages -= 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,23 @@ package akka.stream.alpakka.amqp.impl

import akka.Done
import akka.annotation.InternalApi
import akka.stream.alpakka.amqp.{AmqpSinkSettings, OutgoingMessage}
import akka.stream.alpakka.amqp.{AmqpWriteSettings, WriteMessage}
import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue, InHandler}
import akka.stream.{ActorAttributes, Attributes, Inlet, SinkShape}

import scala.concurrent.{Future, Promise}

/**
* Connects to an AMQP server upon materialization and sends incoming messages to the server.
* Connects to an AMQP server upon materialization and sends write messages to the server.
* Each materialized sink will create one connection to the broker.
*/
@InternalApi
private[amqp] final class AmqpSinkStage(settings: AmqpSinkSettings)
extends GraphStageWithMaterializedValue[SinkShape[OutgoingMessage], Future[Done]] { stage =>
private[amqp] final class AmqpSinkStage(settings: AmqpWriteSettings)
extends GraphStageWithMaterializedValue[SinkShape[WriteMessage], Future[Done]] { stage =>

val in = Inlet[OutgoingMessage]("AmqpSink.in")
val in = Inlet[WriteMessage]("AmqpSink.in")

override def shape: SinkShape[OutgoingMessage] = SinkShape.of(in)
override def shape: SinkShape[WriteMessage] = SinkShape.of(in)

override protected def initialAttributes: Attributes =
super.initialAttributes and Attributes.name("AmqpSink") and ActorAttributes.IODispatcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ package akka.stream.alpakka.amqp.impl
import akka.Done
import akka.annotation.InternalApi
import akka.stream.alpakka.amqp._
import akka.stream.alpakka.amqp.impl.AmqpSourceStage.AutoAckedMessage
import akka.stream.alpakka.amqp.scaladsl.CommittableIncomingMessage
import akka.stream.alpakka.amqp.impl.AmqpSourceStage.AutoAckedReadResult
import akka.stream.alpakka.amqp.scaladsl.CommittableReadResult
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler, StageLogging}
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.util.ByteString
Expand All @@ -27,17 +27,16 @@ private final case class NackArguments(deliveryTag: Long, multiple: Boolean, req
*
* Connects to an AMQP server upon materialization and consumes messages from it emitting them
* into the stream. Each materialized source will create one connection to the broker.
* As soon as an `IncomingMessage` is sent downstream, an ack for it is sent to the broker.
*
* @param bufferSize The max number of elements to prefetch and buffer at any given time.
*/
@InternalApi
private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSize: Int)
extends GraphStage[SourceShape[CommittableIncomingMessage]] { stage =>
extends GraphStage[SourceShape[CommittableReadResult]] { stage =>

private val out = Outlet[CommittableIncomingMessage]("AmqpSource.out")
private val out = Outlet[CommittableReadResult]("AmqpSource.out")

override val shape: SourceShape[CommittableIncomingMessage] = SourceShape.of(out)
override val shape: SourceShape[CommittableReadResult] = SourceShape.of(out)

override protected def initialAttributes: Attributes = Attributes.name("AmqpSource")

Expand All @@ -46,7 +45,7 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi

override val settings: AmqpSourceSettings = stage.settings

private val queue = mutable.Queue[CommittableIncomingMessage]()
private val queue = mutable.Queue[CommittableReadResult]()
private var ackRequired = true
private var unackedMessages = 0

Expand Down Expand Up @@ -86,8 +85,8 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
body: Array[Byte]): Unit = {
val message = if (ackRequired) {

new CommittableIncomingMessage {
override val message = IncomingMessage(ByteString(body), envelope, properties)
new CommittableReadResult {
override val message = ReadResult(ByteString(body), envelope, properties)

override def ack(multiple: Boolean): Future[Done] = {
val promise = Promise[Done]()
Expand All @@ -101,7 +100,7 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
promise.future
}
}
} else new AutoAckedMessage(IncomingMessage(ByteString(body), envelope, properties))
} else new AutoAckedReadResult(ReadResult(ByteString(body), envelope, properties))
consumerCallback.invoke(message)
}

Expand Down Expand Up @@ -149,7 +148,7 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
}
}

def handleDelivery(message: CommittableIncomingMessage): Unit =
def handleDelivery(message: CommittableReadResult): Unit =
if (isAvailable(out)) {
pushMessage(message)
} else if (queue.size + 1 > bufferSize) {
Expand All @@ -175,7 +174,7 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
}
)

def pushMessage(message: CommittableIncomingMessage): Unit = {
def pushMessage(message: CommittableReadResult): Unit = {
push(out, message)
if (ackRequired) unackedMessages += 1
}
Expand All @@ -190,7 +189,7 @@ private[amqp] final class AmqpSourceStage(settings: AmqpSourceSettings, bufferSi
private[amqp] object AmqpSourceStage {
private val SuccessfullyDone = Future.successful(Done)

final class AutoAckedMessage(override val message: IncomingMessage) extends CommittableIncomingMessage {
final class AutoAckedReadResult(override val message: ReadResult) extends CommittableReadResult {
override def ack(multiple: Boolean): Future[Done] = SuccessfullyDone
override def nack(multiple: Boolean, requeue: Boolean): Future[Done] = SuccessfullyDone
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package akka.stream.alpakka.amqp.javadsl

import java.util.concurrent.CompletionStage

import akka.annotation.ApiMayChange
import akka.stream.alpakka.amqp._
import akka.stream.javadsl.Flow
import akka.util.ByteString
Expand All @@ -20,12 +19,11 @@ object AmqpRpcFlow {
* Create an [[https://www.rabbitmq.com/tutorials/tutorial-six-java.html RPC style flow]] for processing and communicating
* over a rabbitmq message bus. This will create a private queue, and add the reply-to header to messages sent out.
*
* This stage materializes to a CompletionStage<String>, which is the name of the private exclusive queue used for RPC communication.
* This stage materializes to a `CompletionStage<String>`, which is the name of the private exclusive queue used for RPC communication.
*
* @param repliesPerMessage The number of responses that should be expected for each message placed on the queue. This
* can be overridden per message by including `expectedReplies` in the the header of the [[OutgoingMessage]]
* @param repliesPerMessage The number of responses that should be expected for each message placed on the queue.
*/
def createSimple(settings: AmqpSinkSettings,
def createSimple(settings: AmqpWriteSettings,
repliesPerMessage: Int): Flow[ByteString, ByteString, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.simple(settings, repliesPerMessage)
Expand All @@ -35,11 +33,10 @@ object AmqpRpcFlow {
/**
* Java API:
* Convenience for "at-most once delivery" semantics. Each message is acked to RabbitMQ
* before it is emitted downstream.
* before its read result is emitted downstream.
*/
@ApiMayChange // https://github.com/akka/alpakka/issues/1513
def atMostOnceFlow(settings: AmqpSinkSettings,
bufferSize: Int): Flow[OutgoingMessage, IncomingMessage, CompletionStage[String]] =
def atMostOnceFlow(settings: AmqpWriteSettings,
bufferSize: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.atMostOnceFlow(settings, bufferSize)
.mapMaterializedValue(f => f.toJava)
Expand All @@ -48,12 +45,11 @@ object AmqpRpcFlow {
/**
* Java API:
* Convenience for "at-most once delivery" semantics. Each message is acked to RabbitMQ
* before it is emitted downstream.
* before its read result is emitted downstream.
*/
@ApiMayChange // https://github.com/akka/alpakka/issues/1513
def atMostOnceFlow(settings: AmqpSinkSettings,
def atMostOnceFlow(settings: AmqpWriteSettings,
bufferSize: Int,
repliesPerMessage: Int): Flow[OutgoingMessage, IncomingMessage, CompletionStage[String]] =
repliesPerMessage: Int): Flow[WriteMessage, ReadResult, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.atMostOnceFlow(settings, bufferSize, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
Expand All @@ -70,16 +66,15 @@ object AmqpRpcFlow {
*
* Compared to auto-commit, this gives exact control over when a message is considered consumed.
*/
@ApiMayChange // https://github.com/akka/alpakka/issues/1513
def committableFlow(
settings: AmqpSinkSettings,
settings: AmqpWriteSettings,
bufferSize: Int,
repliesPerMessage: Int = 1
): Flow[OutgoingMessage, CommittableIncomingMessage, CompletionStage[String]] =
): Flow[WriteMessage, CommittableReadResult, CompletionStage[String]] =
akka.stream.alpakka.amqp.scaladsl.AmqpRpcFlow
.committableFlow(settings, bufferSize, repliesPerMessage)
.mapMaterializedValue(f => f.toJava)
.map(cm => cm.asJava)
.map(cm => new CommittableReadResult(cm))
.asJava

}
Loading