From 5d08fc258eba520d61333f9f0c93df211c8438d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Ferreira?= Date: Wed, 17 Jan 2024 15:02:17 +0000 Subject: [PATCH] fix(kinesis): use stage materializer with IODispatcher instead of injected EC (#3047) --- .../impl/KinesisSchedulerSourceStage.scala | 23 ++++++++++++++++--- .../scaladsl/KinesisSchedulerSource.scala | 9 +------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/kinesis/src/main/scala/akka/stream/alpakka/kinesis/impl/KinesisSchedulerSourceStage.scala b/kinesis/src/main/scala/akka/stream/alpakka/kinesis/impl/KinesisSchedulerSourceStage.scala index 398d1379ca..f576975bc1 100644 --- a/kinesis/src/main/scala/akka/stream/alpakka/kinesis/impl/KinesisSchedulerSourceStage.scala +++ b/kinesis/src/main/scala/akka/stream/alpakka/kinesis/impl/KinesisSchedulerSourceStage.scala @@ -40,8 +40,7 @@ private[kinesis] object KinesisSchedulerSourceStage { private[kinesis] class KinesisSchedulerSourceStage( settings: KinesisSchedulerSourceSettings, schedulerBuilder: ShardRecordProcessorFactory => Scheduler -)(implicit ec: ExecutionContext) - extends GraphStageWithMaterializedValue[SourceShape[CommittableRecord], Future[Scheduler]] { +) extends GraphStageWithMaterializedValue[SourceShape[CommittableRecord], Future[Scheduler]] { private val out = Outlet[CommittableRecord]("Records") override def shape: SourceShape[CommittableRecord] = new SourceShape[CommittableRecord](out) @@ -69,6 +68,7 @@ private[kinesis] class KinesisSchedulerSourceStage( private[this] var schedulerOpt: Option[Scheduler] = None override def preStart(): Unit = { + implicit val ec: ExecutionContext = executionContext(attributes) val scheduler = schedulerBuilder(new ShardRecordProcessorFactory { override def shardRecordProcessor(): ShardRecordProcessor = new ShardProcessor(newRecordCallback) @@ -103,6 +103,23 @@ private[kinesis] class KinesisSchedulerSourceStage( failStage(SchedulerUnexpectedShutdown(e)) } override def postStop(): Unit = - schedulerOpt.foreach(scheduler => Future(if (!scheduler.shutdownComplete()) scheduler.shutdown())) + schedulerOpt.foreach( + scheduler => if (!scheduler.shutdownComplete()) scheduler.shutdown() + ) + + protected def executionContext(attributes: Attributes): ExecutionContext = { + val dispatcherId = (attributes.get[ActorAttributes.Dispatcher](ActorAttributes.IODispatcher) match { + case ActorAttributes.Dispatcher("") => + ActorAttributes.IODispatcher + case d => d + }) match { + case d @ ActorAttributes.IODispatcher => + // this one is not a dispatcher id, but is a config path pointing to the dispatcher id + materializer.system.settings.config.getString(d.dispatcher) + case d => d.dispatcher + } + + materializer.system.dispatchers.lookup(dispatcherId) + } } } diff --git a/kinesis/src/main/scala/akka/stream/alpakka/kinesis/scaladsl/KinesisSchedulerSource.scala b/kinesis/src/main/scala/akka/stream/alpakka/kinesis/scaladsl/KinesisSchedulerSource.scala index 482bd1f4ca..a5a1710981 100644 --- a/kinesis/src/main/scala/akka/stream/alpakka/kinesis/scaladsl/KinesisSchedulerSource.scala +++ b/kinesis/src/main/scala/akka/stream/alpakka/kinesis/scaladsl/KinesisSchedulerSource.scala @@ -5,7 +5,6 @@ package akka.stream.alpakka.kinesis.scaladsl import akka.NotUsed -import akka.dispatch.ExecutionContexts import akka.stream._ import akka.stream.alpakka.kinesis.impl.KinesisSchedulerSourceStage import akka.stream.alpakka.kinesis.{ @@ -36,13 +35,7 @@ object KinesisSchedulerSource { schedulerBuilder: ShardRecordProcessorFactory => Scheduler, settings: KinesisSchedulerSourceSettings ): Source[CommittableRecord, Future[Scheduler]] = - Source - .fromMaterializer { (mat, _) => - import mat.executionContext - Source - .fromGraph(new KinesisSchedulerSourceStage(settings, schedulerBuilder)) - } - .mapMaterializedValue(_.flatMap(identity)(ExecutionContexts.parasitic)) + Source.fromGraph(new KinesisSchedulerSourceStage(settings, schedulerBuilder)) def sharded( schedulerBuilder: ShardRecordProcessorFactory => Scheduler,