#1.9 集成(Integeration)
##1.9.1 Actors的集成
当需要将流中元素以消息形式输出(Piping)到一个普通的Actor时,你使用Sink.actorRef
作为一个Sink
,同时也可以将消息通过ActorRef
输入到一个流,由Source.actorRef
来实现.
面对更复杂的用例时,可以使用ActorPublisher
和ActorSubscriber
这两个特质,它们实现了基于Actor
的Publiser
和Subscriber
的响应式流.
上面提及的流组件既可以被其他响应式流库使用,也可以当成Akka Streams
中的Source
或者Sink
.
注意:
ActorPublisher
和ActorSubscriber
不能被远程的actors所使用, 因为一旦响应式流协议中的信号在传输过程中丢失了(例如:请求命令 request), 那么该流就会进入死锁状态.
###Source.actorRef
发送消息至Source.actorRef
具象化的actor
时, 在下游有请求的情况下,消息会被输出到流,否则它们将被缓存直到接收到请求.
依据在Source
创建时使用的不同OverflowStratgy
, 当缓存(注:由创建时指定缓存大小)中不存在可用空间时, 可能会丢弃元素. 这种Source
类型不支持OverflowStratgy.backpressure
策略, 如果你需要一个有backpressure
能力的actor
, 可以考虑使用ActorPublisher
.
能够通过发送akka.actor.PoisonPill
或者 akka.actor.Status.Success
这两个特殊消息给actor
的引用来成功的结束这个的流, 而发送akka.actor.Status.Failure
则是失败的结束.
随着流创建出的actor
, 当流完成或者失败亦或是由下游取消时, 将会关闭(stop),你可以监视它从而得知何时发生了这些.
###Sink.actorRef
该类sink
输出流元素到指定的ActorRef
. 如果指定的actor
被终止了那么该流则被取消. 当该流成功的完成时, 初始化时指定的onCompleteMessage
将被输出到目标actor
内, 而当该流失败时则输出akka.actor.Status.Failure
.
注意: 目标
actor
不会发送back-pressure
信号, 举例: 如果该actor
处理消息的速度不如接受消息快, 那么mailbox
将越来越大. 所以建议对预期处理消息低速的actor
使用参数设置mailbox-push-timeout-time
为0
的bounded mailbox
或者 在该Stage
前再使用一个限制消息传输速率(Rate Limiting)的Stage
,
###ActorPublisher
在你的Actor
中继承或者混入akka.stream.actor.ActorPublisher
使其成为一个流的发送者, 能够跟踪订阅的全生命周期以及请求元素.
下面是一个例子. 它将到来的工作分发给订阅者:
object JobManager {
def props: Props = Props[JobManager]
final case class Job(payload: String)
case object JobAccepted
case object JobDenied
}
class JobManager extends ActorPublisher[JobManager.Job] {
import akka.stream.actor.ActorPublisherMessage._
import JobManager._
val MaxBufferSize = 100
var buf = Vector.empty[Job]
def receive = {
case job: Job if buf.size == MaxBufferSize => sender() ! JobDenied
case job: Job =>
sender() ! JobAccepted
if (buf.isEmpty && totalDemand > 0)
onNext(job)
else {
buf :+= job
deliverBuf()
}
case Request(_) =>
deliverBuf()
case Cancel =>
context.stop(self)
}
@tailrec final def deliverBuf(): Unit =
if (totalDemand > 0){
/*
* totalDemand 是一个 Long型
* 可能比buf.splitAt接受的值要大
*/
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
deliverBuf()
}
}
}
通过调用onNext
来输出元素到流. 允许传输元素的个数由流的订阅者的请求来决定. 这个值通过totleDemand
可以查询到. 只有当该流是isActive
状态且totalDemand > 0
时才能主动调用onNext
, 否则onNext
将抛出IllegalStateException
异常.
当流的订阅者请求更多元素时ActorPublisherMessage.Request
消息传递给该actor
, 你可以处理该消息事件. totalDemand
的值是自动更新的.
当流的订阅者取消了订阅, ActorPublisherMessage.Cancel
消息传递给该actor
. 此后的onNext
调用就会被不再有效.
通过调用onComplete
方法可以主动完成该流, 此后不再允许调用onNext
, onError
和onComplete
.
通过调用onError
方法以失败状态终止该流. 此后不再允许调用onNext
, onError
和onComplete
.
如果你考虑这个ActorPublisher
可能从不被订阅, 重载subscriptionTimeout
方法提供当超时后取消这个发布者的功能. 当超时时此actor
会收到一个ActorPublisherMessage.SubsriptionTimeoutExceeded
的消息然后必须清除状态且停止自己.
如果该actor
被终止则流视为完成, 除非流已经被失败终止,完成或者取消.
更多细节可以参看API文档
下面例子展示了它如何用作为Source
到Flow
的输入
val jobManagerSource = Source.actorPublisher[JobManager.Job](JobManager.props)
val ref = Flow[JobManager.Job]
.map(_.payload.toUpperCase)
.map { elem => println(elem); elem }
.to(Sink.ignore)
.runWith(jobManagerSource)
ref ! JobManager.Job("a")
ref ! JobManager.Job("b")
ref ! JobManager.Job("c")
使用Sink.asPublisher
创建的发布者可以支持多数量的订阅者(采用了fanout), 超出数量的订阅请求会以IllegalStateException
消息拒绝.
###ActorSubscriber
在你的Actor
中继承或者混入akka.stream.actor.ActorSubscriber
使其成为一个流的订阅者, 可以完全控制流的back-pressure
. 它可以从流里收到ActorSubscriberMessage.onNext
,ActorSubscriberMessage.onNext
和ActorSubscriberMessage.OnError
消息, 当然也可以和普通actor
一样接受非流内的消息.
以下是一个这样actor
的例子. 它把到来的任务分发给子actor
:
object WorkerPool {
case class Msg(id: Int, replyTo: ActorRef)
case class Work(id: Int)
case class Reply(id: Int)
case class Done(id: Int)
def props: Props = Props(new WorkerPool)
}
class WorkerPool extends ActorSubscriber {
import WorkerPool._
import ActorSubscriberMessage._
val MaxQueueSize = 10
var queue = Map.empty[Int, ActorRef]
var router = {
val routees = Vector.fill(3) {
ActorRefRoutee(context.actorOf(Props[Worker]))
}
Router(RoundRobinRoutingLogic(), routees)
}
override val requestStrategy = new MaxInFlightRequestStrategy(max = MaxQueueSize){
override def inFlightInternally: Int = queue.size
}
def receive = {
case OnNext(Msg(id, replyTo)) =>
queue += (id -> replyTo)
assert(queue.size <= MaxQueueSize, s"queued too many: ${queue.size}")
router.route(Work(id), self)
case Reply(id) =>
queue(id) ! Done(id)
queue -= id
}
}
class Worker extends Actor {
import WorkerPool._
def receive = {
case Work(id) =>
//...
sender() ! Reply(id)
}
}
继承的子类必须要定义RequestStratrgy
方法来控制流的back-pressure
. 对于每个到来的消息ActorSubscriber
都将自动调用RequestStratrgy.requestDemand
且把返回的需求输出到流.
- 内置的
WatermarkRequestStrategy
在actor
自己处理工作负载的情况下是一个好的选择. - 内置的
MaxInFlightRequestStrategy
在当消息会在内部缓存到队列或者代理给其他actor
处理的情况下非常有用. - 你也可实现自定义
RequestStrategy
或者使用ZeroRequestStrategy
手动一起调用request
亦或是其他的策略. 这种情况下你还必须在actor
启动或者就绪状态下调用request
, 否则它将无法从流接受到任何元素.
更多细节可以参看API文档
下面例子展示了它如何用作为Sink
到Flow
的输出
val N = 117
Source(1 to N).map(WorkerPool.Msg(_, replyTo))
.runWith(Sink.actorSubscriber(WorkerPool.props))
##1.9.2 外部服务集成
流内需要通过外部非流服务进行的转换和副作用操作, 可以通过mapAsync
或者mapAsyncUnordered
.
举个例子, 使用一个外部Email服务来发送email给选定的推特作者:
def send(email:Email): Future[Unit] = {
// ...
}
我们从包含了推特文的作者的流开始:
val authors: Source[Author, Unit] =
tweets
.filter(_.hashtags.contains(akka))
.map(_.author)
假设我们可以用以下的方法查询他们的Email地址:
def lookupEmail(handle: String): Future[Option[String]] =
然后可以通过使用mapAsync
来完成 调用lookupEmail
把包含作者的流转化成包含email的流 的过程:
val emailAddresses: Source[String, Unit] =
authors
.mapAsync(4)(author => addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) => emailAddress}
最后发送这些email:
val sendEmails: RunnableGraph[Unit] =
emailAddresses
.mapAsync(4)(address => {
emailServer.send(
Email(to = address, title = "Akka", body = "I like your tweet")
)
})
.to(Sink.ignore)
sendEmails.run()
mapAsync
是对每一个达到该步骤的元素应用一个调用外部服务的函数. 该函数返回一个Future
并且这个future的值会被输送到下游. 并发Future
的数量由传递给mapAsync
的一个参数决定. 这些Future
会以任何顺序执行完毕, 但是输送到下游的次序由上游输入的次序决定.
这意味着back-pressure
将如期作用. 举个例子, 如果emailServer.send
是一个瓶颈, 那么它将对请求推特文和随后的查询email的速率限流.
整个过程的最后一步是创建 在email管道中拉取推特文信息的需求: 我们附上一个Sink.ignore
来实现这个. 如果我们的邮件处理过程会返回可供未来转换的有趣数据, 那么我们当然不会只是无视它, 可以把结果流继续向后输送来处理或者存储.
注意到mapAsync
保留了流中元素原有的次序. 在下面的例子中次序并不那么重要所以我们使用更具性能的mapAsyncUnordered
:
val authors: Source[Author, Unit] =
tweets.filter(_.hashtags.contains(akka)).map(_.author)
val emailAddresses: Source[String, Unit] =
authors
.mapAsyncUnordered(4)(author => addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) => emailAddress }
val sendEmails: RunnableGraph[Unit] =
emailAddresses
.mapAsyncUnordered(4)(address => {
emailServer.send(
Email(to = address, title = "Akka", body = "I like your tweet"))
})
.to(Sink.ignore)
sendEmails.run()
上面例子中服务正好返回的是一个包含结果的Future
. 如果你的服务并不是这样,需要把调用包裹成一个Future
. 如果这个服务会阻塞, 你必须确认它在一个指定的执行上下文中(execution context)被运行, 这样可以避免系统中其他任务运行陷入资源饥饿(starvation)和困扰(disturbance)的状态.
val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
val sendTextMessages: RunnableGraph[Unit] =
phoneNumbers
.mapAsync(4)(phoneNo => {
Future {
smsServer.send(
TextMessage(to = phoneNo, body = "I like your tweet"))
}(blockingExecutionContext)
})
.to(Sink.ignore)
sendTextMessage.run()
blocking-dispatcher
的配置可能如下:
blocking-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 10
core-pool-size-max = 10
}
}
还可以使用map
操作来替代阻塞调用, 同样地得为该操作分配指定的分发器(dispatcher)
val send = Flow[String]
.map { phoneNo =>
smsServer.send(TextMessage(to = phoneNo, body = "I like your tweet"))
}
.withAttributes(ActorAttributes.dispatcher("blocking-dispatcher"))
val sendTextMessages: RunnableGraph[Unit] =
phoneNumbers.via(send).to(Sink.ignore)
sendTextMessages.run()
然而, 这并不和mapAsync
相同, 因为mapAsync
可以并发的执行多个调用,而map
只能同时处理一个.
如果服务以actor
的方式暴露, 或者把actor
作为一个外部服务前端的入口, 这种情况下可以使用ask
:
val akkaTweets: Source[Tweet, Unit] = tweets.filter(_.hashtags.contains(akka))
implicit val timeout = Timeout(3.seconds)
val saveTweets: RunnableGraph[Unit] =
akkaTweets
.mapAsync(4)(tweet => database ? Save(tweet))
.to(Sink.ignore)
注意到如果ask
在给定的超时内没有完成, 那么流会以失败结束. 如果这不是你所期望的结果, 你可以在ask
产生的Future
上调用recover
方法.
###次序和并行的说明
让我们通过另一个例子来更好的理解mapAsync
和mapAsyncUnordered
中的次序和并行的这两个特性.
多个mapAsync
和mapAsyncUnordered
的future
可能并发的运行. 并发的数量由下游的需求限制. 举个例子, 如果下游请求了5个元素那么最多只会有5个future
同时处理.
mapAsync
以接收到输入元素同样的顺序输出future
结果. 这意味着完成的结果只有当更早的结果完成并且输送到下游时才会被输出. 因此一个慢速的调用将会延迟所有后续的调用输出, 即便它们已经在这个调用前完成.
mapAsyncUnordered
当future
结果完成时立即输出, 很有可能以不同与来自上游输入的次序输出到下游. 因此一个缓慢的调用不会延误后续更快的调用结果的输出, 只要在下游有需求的情况下.
下面是一个虚构的服务, 通过它我们能展现上述的情况.
class SometimesSlowService(implicit ec: ExecutionContext) {
private val runningCount = new AtomicInteger
def convert(s: String): Future[String] = {
println(s"running: $s (${runningCount.incrementAndGet()})")
Future {
if(s.nonEmpty && s.head.isLower)
Thread.sleep(500)
else
Thread.sleep(20)
println(s"completed: $s (${runningCount.decrementAndGet()})")
s.toUpperCase
}
}
}
以小写字母开头的元素用来模拟需要花费更多时间的处理过程.
以下是我们如何将它结合mapAsync
使用:
implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
val service = new SometimesSlowService
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem })
.mapAsync(4)(service.convert)
.runForeach(elem => println(s"after: $elem"))
输出可能如下:
before: a
before: B
before: C
before: D
running: a (1)
running: B (2)
before: e
running: C (3)
before: F
running: D (4)
before: g
before: H
completed: C (3)
completed: B (2)
completed: D (1)
completed: a (0)
after: A
after: B
running: e (1)
after: C
after: D
running: F (2)
before: i
before: J
running: g (3)
running: H (4)
completed: H (2)
completed: F (3)
completed: e (1)
completed: g (0)
after: E
after: F
running: i (1)
after: G
after: H
running: J (2)
completed: J (1)
completed: i (0)
after: I
after: J
注意到after
行和before
行的次序都是相同的, 即使元素以不同的次序处理完成. 例如 H
在g
之前完成, 但是仍然在其后输出.
括号内的数字表明同一时间内有多少个调用在处理中. 在这里下游的需求数以及并发数由ActorMaterializerSettings
的缓存大小4限定.
下面我们在同一个服务内替换成mapAsyncUnordered
方法:
implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
val service = new SometimesSlowService
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem })
.mapAsyncUnordered(4)(service.convert)
.runForeach(elem => println(s"after: $elem"))
输出可能如下
before: a
before: B
before: C
before: D
running: a (1)
running: B (2)
before: e
running: C (3)
before: F
running: D (4)
before: g
before: H
completed: B (3)
completed: C (1)
completed: D (2)
after: B
after: D
running: e (2)
after: C
running: F (3)
before: i
before: J
completed: F (2)
after: F
running: g (3)
running: H (4)
completed: H (3)
after: H
completed: a (2)
after: A
running: i (3)
running: J (4)
completed: J (3)
after: J
completed: e (2)
after: E
completed: g (1)
after: G
completed: i (0)
after: I
注意到after
行内容次序和before
行的不同. 例如 H
赶超了比较慢的G
.
括号内的数字表明同一时间内有多少个调用在处理中.在这里下游的需求数以及并发数由ActorMaterializerSettings
的缓存大小4限定.
##1.9.3 响应式流整合
响应式流定义了一个拥有无阻塞back-pressure
的异步流处理标准. 这样使得遵循标准实现的流库有了可以一起使用的可能. Akka Streams
就是其中一个遵循标准的库.
以下是一个不完整的其他实现的列表:
- Reactor(1.1+)
- RxJava
- Ratpack
- Slick
在响应式流中最重要的两个接口是Publisher
和Subscriber
.
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
下面我们假定一个提供推特文的发布者
def tweets: Publisher[Tweet]
和另一个能够在数据库存储用户的库
def storage: Subscriber[Author]
使用Akka Streams中的Flow
来转换这个流并链接这些组件:
val authors = Flow[Tweet]
.filter(_.hashtags.contains(akka))
.map(_.author)
Source.fromPublisher(tweets).via(authors).to(Sink.fromSubscriber(storage)).run()
这个Publisher
用作Source
到flow的输入, 同时这个Subscriber
用作为一个输出的Sink
.
一个Flow
也可以转化成一个RunnableGraph[Processor[In, Out]]
, 当调用run()
时会被具象化一个Processor
. run()
本身可以被多次调用, 并且每次都生成一个新的Processor
实例.
val processor: Processor[Tweet, Author] = authors.toProcessor.run()
tweets.subscribe(processor)
processor.subscribe(storage)
一个发布者(publisher)可以通过subscribe
来链接一个订阅者(subscriber).
也可以用Publisher-Sink
把一个Source
暴露成一个Publisher
.
val authorPublisher: Publisher[Author] =
Source.fromPublisher(tweets).via(authors).runWith(Sink.asPublisher(fanout = false))
authorPublisher.subscribe(storage)
一个通过Sink.asPublisher(false)
创建的发布者, 仅支持单个订阅. 额外的订阅请求将被以IllegalStateException
异常拒绝.
需要扇出(fan-out)/广播(broadcasting)支持多个订阅者的情况, 可以通过以下方法创建发布者:
def storage: Subscriber[Author
def alert: Subscriber[Author]
val authorPublisher: Publisher[Author] =
Source.fromPublisher(tweets).via(authors)
.runWith(Sink.asPublisher(fanout = true))
authorPublisher.subscribe(storage)
authorPublisher.subscribe(alert)
在Stage中输入的缓存大小决定了当最慢的订阅者和最快的订阅者相差多大时需要减缓整个流的速率.
为了完整, 同样也可以通过Subscriber-Source
把一个Sink
暴露成一个Subscriber
:
val tweetSubscriber: Subscriber[Tweet] =
authors.to(Sink.fromSubscriber(storage)).runWith(Source.asSubscriber[Tweet])
tweets.subscribe(tweetSubscriber)
同时也提供了能将Processor
实例重新包装成Flow
的方法, 只需要传入一个创建Processor
实例的工程类函数即可:
// 一个 Processor工厂的例子
def createProcessor: Processor[Int, Int] = Flow[Int].toProcessor.run()
val flow: Flow[Int, Int, Unit] = Flow.fromProcessor(() => createProcessor)
注意到这个工厂方法需要满足生成Flow
的可重用性.