From b8c5777ff7edcb9b9a1c67f8ba84374e565a96cd Mon Sep 17 00:00:00 2001 From: Remy Pecqueur <1932432+Lordshinjo@users.noreply.github.com> Date: Wed, 5 Aug 2020 13:00:37 +0900 Subject: [PATCH 1/3] Clean up job <-> dag confusion in Cron scheduler --- .gitignore | 5 +- .../com/criteo/cuttle/cron/CronApp.scala | 20 ++-- .../criteo/cuttle/cron/CronExpression.scala | 9 +- .../com/criteo/cuttle/cron/CronModel.scala | 94 ++++++++----------- .../com/criteo/cuttle/cron/CronProject.scala | 15 ++- .../criteo/cuttle/cron/CronScheduler.scala | 53 +++++------ .../com/criteo/cuttle/cron/Database.scala | 2 +- .../scala/com/criteo/cuttle/cron/UI.scala | 23 ++--- .../com/criteo/cuttle/cron/package.scala | 15 ++- .../cuttle/timeseries/CuttleProject.scala | 9 +- 10 files changed, 122 insertions(+), 123 deletions(-) diff --git a/.gitignore b/.gitignore index ea1008171..8d0647553 100644 --- a/.gitignore +++ b/.gitignore @@ -5,5 +5,8 @@ node_modules dev.config.json .DS_Store -# price.log gererated by HelloCronScheduling +# files generated by example workflows price.log +letter.log +number1.log +number2.log diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala index d195d95e0..d5cdd0c49 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronApp.scala @@ -3,19 +3,18 @@ package com.criteo.cuttle.cron import java.time.Instant import java.util.concurrent.TimeUnit -import scala.util.{Success, Try} - import cats.data.EitherT import cats.effect.IO +import com.criteo.cuttle.Auth._ +import com.criteo.cuttle.Metrics.{Gauge, Prometheus} +import com.criteo.cuttle._ +import com.criteo.cuttle.utils.getJVMUptime import io.circe._ import io.circe.syntax._ import lol.http._ import lol.json._ -import com.criteo.cuttle.Auth._ -import com.criteo.cuttle.Metrics.{Gauge, Prometheus} -import com.criteo.cuttle._ -import com.criteo.cuttle.utils.getJVMUptime +import scala.util.{Success, Try} private[cron] case class CronApp(project: CronProject, executor: Executor[CronScheduling])( implicit val transactor: XA @@ -71,16 +70,13 @@ private[cron] case class CronApp(project: CronProject, executor: Executor[CronSc Ok(bodyFromStream) } - case GET at "/api/dashboard" => - Ok(scheduler.getStats(allJobIds)) - - case GET at "/api/jobs/paused" => - Ok(scheduler.getPausedJobs.asJson) + case GET at "/api/dags/paused" => + Ok(scheduler.getPausedDags.asJson) // we only show 20 recent executions by default but it could be modified via query parameter case GET at url"/api/cron/executions?dag=$dag&start=$start&end=$end&limit=$limit" => val jsonOrError: EitherT[IO, Throwable, Json] = for { - dag <- EitherT.fromOption[IO](workload.dags.find(_.id == dag), throw new Exception(s"Unknown job dag $dag")) + dag <- EitherT.fromOption[IO](workload.dags.find(_.id == dag), throw new Exception(s"Unknown job DAG $dag")) jobIds <- EitherT.rightT[IO, Throwable](dag.cronPipeline.vertices.map(_.id)) startDate <- EitherT.rightT[IO, Throwable](Try(Instant.parse(start)).getOrElse(minStartDateForExecutions)) endDate <- EitherT.rightT[IO, Throwable](Try(Instant.parse(end)).getOrElse(maxStartDateForExecutions)) diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronExpression.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronExpression.scala index 6c0fa3a5a..e3783fcb8 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronExpression.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronExpression.scala @@ -1,14 +1,13 @@ package com.criteo.cuttle.cron -import cron4s.Cron -import cron4s.lib.javatime._ -import java.time.{Duration, Instant, ZoneId, ZoneOffset} import java.time.temporal.ChronoUnit +import java.time.{Duration, Instant, ZoneId, ZoneOffset} -import io.circe.{Encoder, Json} +import cron4s.Cron +import cron4s.lib.javatime._ import io.circe.syntax._ +import io.circe.{Encoder, Json} -import java.time.ZoneOffset import scala.concurrent.duration._ /** diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronModel.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronModel.scala index 80c20316a..ca6393078 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronModel.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronModel.scala @@ -1,17 +1,17 @@ package com.criteo.cuttle.cron import java.time.Instant -import cats.effect.concurrent.Deferred -import cats.effect.IO -import scala.concurrent.duration._ -import scala.concurrent.stm.{Ref, _} +import cats.effect.IO +import cats.effect.concurrent.Deferred +import com.criteo.cuttle.Auth.User +import com.criteo.cuttle.{Job, Logger, Scheduling, SchedulingContext, Tag, Workload} import io.circe._ -import io.circe.syntax._ import io.circe.java8.time._ -import com.criteo.cuttle.Auth.User -import com.criteo.cuttle.{ExecutionStatus, Job, Logger, PausedJob, Scheduling, SchedulingContext, Tag, Workload} +import io.circe.syntax._ +import scala.concurrent.duration._ +import scala.concurrent.stm.{atomic, Ref} import scala.reflect.ClassTag private[cron] case class ScheduledAt(instant: Instant, delay: FiniteDuration) @@ -22,25 +22,25 @@ private[cron] case class ScheduledAt(instant: Instant, delay: FiniteDuration) private[cron] case class CronState(logger: Logger) { private val executions = Ref(Map.empty[CronDag, Either[Instant, Set[CronExecution]]]) - private val paused = Ref(Map.empty[CronDag, PausedJob]) + private val paused = Ref(Map.empty[CronDag, PausedDag]) private val runNowHandlers: Ref[Map[CronDag, Deferred[IO, (ScheduledAt, User)]]] = Ref(Map.empty[CronDag, Deferred[IO, (ScheduledAt, User)]])( implicitly[ClassTag[Map[CronDag, Deferred[IO, (ScheduledAt, User)]]]] ) private val jobDagState = Ref(Map.empty[CronDag, Map[CronJob, CronJobState]]) - private[cron] def init(availableJobDags: Set[CronDag], pausedJobs: Seq[PausedJob]) = { + private[cron] def init(availableJobDags: Set[CronDag], pausedDags: Seq[PausedDag]) = { logger.debug("Cron Scheduler States initialization") val available = availableJobDags.map(dag => dag.id -> dag).toMap - val initialPausedJobs = pausedJobs.collect { - case pausedJob if available.contains(pausedJob.id) => - logger.debug(s"Job ${pausedJob.id} is paused") - available(pausedJob.id) -> pausedJob + val initialPausedDags = pausedDags.collect { + case pausedDag if available.contains(pausedDag.id) => + logger.debug(s"DAG ${pausedDag.id} is paused") + available(pausedDag.id) -> pausedDag }.toMap atomic { implicit txn => - paused() = initialPausedJobs + paused() = initialPausedDags } } @@ -73,7 +73,7 @@ private[cron] case class CronState(logger: Logger) { jobDagState() = jobDagState() + (dag -> Map.empty) } - private[cron] def getPausedJobs(): Set[PausedJob] = paused.single.get.values.toSet + private[cron] def getPausedDags(): Set[PausedDag] = paused.single.get.values.toSet private[cron] def isPaused(dag: CronDag): Boolean = paused.single.get.contains(dag) private[cron] def addNextEventToState(dag: CronDag, instant: Instant): Unit = atomic { implicit txn => @@ -100,7 +100,7 @@ private[cron] case class CronState(logger: Logger) { executions() = executions() - dag } - private[cron] def pauseDags(dags: Set[CronDag])(implicit user: User): Set[PausedJob] = { + private[cron] def pauseDags(dags: Set[CronDag])(implicit user: User): Set[PausedDag] = { val pauseDate = Instant.now() atomic { implicit txn => val dagsToPause = dags @@ -108,10 +108,10 @@ private[cron] case class CronState(logger: Logger) { .toSeq dagsToPause.foreach(removeDagFromState) - val justPausedJobs = dagsToPause.map(job => PausedJob(job.id, user, pauseDate)) - paused() = paused() ++ dagsToPause.zip(justPausedJobs) + val justPausedDags = dagsToPause.map(dag => PausedDag(dag.id, user, pauseDate)) + paused() = paused() ++ dagsToPause.zip(justPausedDags) - justPausedJobs.toSet + justPausedDags.toSet } } @@ -129,57 +129,45 @@ private[cron] case class CronState(logger: Logger) { runNowHandlers() = runNowHandlers() - dag } - private[cron] def getRunNowHandlers(jobIds: Set[String]) = atomic { implicit txn => - runNowHandlers().filter(cronJob => jobIds.contains(cronJob._1.id)) + private[cron] def getRunNowHandlers(dagIds: Set[String]) = atomic { implicit txn => + runNowHandlers().filter(cronJob => dagIds.contains(cronJob._1.id)) } - private[cron] def snapshotAsJson(jobIds: Set[String]) = atomic { implicit txn => - val activeJobsSnapshot = executions().collect { - case (cronDag: CronDag, state) if jobIds.contains(cronDag.id) => - cronDag.asJson - .deepMerge( - Json.obj( - state.fold( - "nextInstant" -> _.asJson, - ( - (a: Set[CronExecution]) => - ("currentExecutions" -> Json - .arr(a.map(_.toExecutionLog(ExecutionStatus.ExecutionRunning).asJson).toArray: _*)) - ) - ) - ) - ) - .deepMerge( - Json.obj( - "status" -> "active".asJson - ) - ) - } - val pausedJobsSnapshot = paused().collect { - case (cronJob, pausedJob) if jobIds.contains(cronJob.id) => pausedJob.asJson + private[cron] def snapshotAsJson(dagIds: Set[String]) = atomic { implicit txn => + val activeDagsSnapshot = executions().collect { + case (cronDag: CronDag, Left(nextInstant)) if dagIds.contains(cronDag.id) => + Json.obj( + "id" -> cronDag.id.asJson, + "status" -> "waiting".asJson, + "nextInstant" -> nextInstant.asJson + ) + case (cronDag: CronDag, Right(_)) if dagIds.contains(cronDag.id) => + Json.obj( + "id" -> cronDag.id.asJson, + "status" -> "running".asJson + ) } - val acc = (activeJobsSnapshot ++ pausedJobsSnapshot).toSeq Json.arr( - acc: _* + activeDagsSnapshot.toSeq: _* ) } private[cron] def snapshot(dagIds: Set[String]) = atomic { implicit txn => val activeJobsSnapshot = executions().filterKeys(cronDag => dagIds.contains(cronDag.id)) - val pausedJobsSnapshot = paused().filterKeys(cronDag => dagIds.contains(cronDag.id)) + val pausedDagsSnapshot = paused().filterKeys(cronDag => dagIds.contains(cronDag.id)) - activeJobsSnapshot -> pausedJobsSnapshot + activeJobsSnapshot -> pausedDagsSnapshot } - override def toString(): String = { + override def toString: String = { val builder = new StringBuilder() val state = executions.single.get builder.append("\n======State======\n") state.foreach { - case (job, jobState) => + case (dag, dagState) => val messages = Seq( - job.id, - jobState.fold(_ toString, _.map(_.id).mkString(",")) + dag.id, + dagState.fold(_.toString, _.map(_.id).mkString(",")) ) builder.append(messages mkString " :: ") builder.append("\n") diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala index 0baa7358b..ce2e6dd6f 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala @@ -1,15 +1,14 @@ package com.criteo.cuttle.cron -import scala.concurrent.duration._ - -import lol.http._ -import io.circe.{Encoder, Json} +import com.criteo.cuttle.Auth.Authenticator +import com.criteo.cuttle.ThreadPools.Implicits.serverThreadPool +import com.criteo.cuttle.ThreadPools._ +import com.criteo.cuttle._ import io.circe.syntax._ +import io.circe.{Encoder, Json} +import lol.http._ -import com.criteo.cuttle._ -import com.criteo.cuttle.ThreadPools._ -import com.criteo.cuttle.ThreadPools.Implicits.serverThreadPool -import com.criteo.cuttle.Auth.Authenticator +import scala.concurrent.duration._ /** * A cuttle project is a workflow to execute with the appropriate scheduler. diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/CronScheduler.scala b/cron/src/main/scala/com/criteo/cuttle/cron/CronScheduler.scala index e6f0fe990..10d268d12 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/CronScheduler.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/CronScheduler.scala @@ -3,18 +3,17 @@ package com.criteo.cuttle.cron import java.time.Instant import java.util.concurrent.TimeUnit -import scala.concurrent.stm.Txn.ExternalDecider -import scala.concurrent.stm._ import cats.effect.IO import cats.effect.concurrent.Deferred import cats.implicits._ -import doobie.implicits._ -import io.circe.Json import com.criteo.cuttle._ -import com.criteo.cuttle.utils._ import com.criteo.cuttle.cron.Implicits._ +import com.criteo.cuttle.utils._ +import doobie.implicits._ +import io.circe.Json import scala.concurrent.duration.FiniteDuration +import scala.concurrent.stm.{atomic, InTxnEnd, Txn} /** A [[CronScheduler]] executes the set of Jobs at the time instants defined by Cron expressions. * Each [[com.criteo.cuttle.Job Job]] has it's own expression and executed separately from others. @@ -50,20 +49,20 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { * @param txn current STM transaction context */ private def setExernalDecider[A](dbConnection: doobie.ConnectionIO[A])(implicit transactor: XA, txn: InTxnEnd): Unit = - Txn.setExternalDecider(new ExternalDecider { + Txn.setExternalDecider(new Txn.ExternalDecider { def shouldCommit(implicit txn: InTxnEnd): Boolean = { dbConnection.transact(transactor).unsafeRunSync true } }) - private[cron] def getPausedJobs = state.getPausedJobs() + private[cron] def getPausedDags = state.getPausedDags() private[cron] def snapshot(dagIds: Set[String]) = state.snapshot(dagIds) private[cron] def pauseDags(dags: Set[CronDag], executor: Executor[CronScheduling])(implicit transactor: XA, user: Auth.User): Unit = { - logger.debug(s"Pausing job dags $dags") + logger.debug(s"Pausing job DAGs $dags") val cancelableExecutions = atomic { implicit tx => val dagsToPause = state.pauseDags(dags) @@ -94,7 +93,7 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { private[cron] def resumeDags(dags: Set[CronDag], executor: Executor[CronScheduling])(implicit transactor: XA, user: Auth.User): Unit = { - logger.debug(s"Resuming job dags $dags") + logger.debug(s"Resuming job DAGs $dags") val dagIdsToExecute = dags.map(_.id) val resumeQuery = dagIdsToExecute.map(queries.resumeJob).reduceLeft(_ *> _) @@ -103,19 +102,19 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { state.resumeDags(dags) } val programs = dags.map { dag => - logger.debug(s"Activating job dags $dag") + logger.debug(s"Activating job DAGs $dag") run(dag, executor) } - logger.info(s"Relaunching job dags $dags") + logger.info(s"Relaunching job DAGs $dags") unsafeRunAsync(programs) } - private[cron] def runJobsNow(jobsToRun: Set[CronDag], executor: Executor[CronScheduling])(implicit transactor: XA, + private[cron] def runJobsNow(dagsToRun: Set[CronDag], executor: Executor[CronScheduling])(implicit transactor: XA, user: Auth.User): Unit = { - logger.info(s"Request by ${user.userId} to run on demand jobs ${jobsToRun.map(_.id).mkString}") - val runNowHandlers = state.getRunNowHandlers(jobsToRun.map(_.id)) - if (runNowHandlers.size == 0) { + logger.info(s"Request by ${user.userId} to run on demand DAGs ${dagsToRun.map(_.id).mkString}") + val runNowHandlers = state.getRunNowHandlers(dagsToRun.map(_.id)) + if (runNowHandlers.isEmpty) { logger.info("No job in waiting state.") } for ((job, d) <- runNowHandlers) { @@ -132,10 +131,10 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { } else { state .getNextJobsInDag(dag) - .map { node: CronJob => + .map { job: CronJob => for { - runResult <- runAndRetry(dag, node, scheduledAt, runNowUser) - _ <- IO(state.cronJobFinished(dag, node, success = runResult.isRight)) + runResult <- runAndRetry(dag, job, scheduledAt, runNowUser) + _ <- IO(state.cronJobFinished(dag, job, success = runResult.isRight)) result <- runNextPart(dag, scheduledAt, runNowUser) } yield result } @@ -177,7 +176,7 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { } } - // don't run anything when job is paused + // don't run anything when dag is paused if (state.isPaused(dag)) { IO.pure(Completed) } else { @@ -186,11 +185,11 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { completed <- maybeScheduledAt match { // we couldn't get next event from cron4s and it didn't fail so it means we've finished for this job case None => - logger.info(s"Job dag ${dag.id} has finished. We will not submit executions anymore") + logger.info(s"Job DAG ${dag.id} has finished. We will not submit executions anymore") IO.pure(Completed) // we got next event: update state with it, wait for it and run it or retry until max retry case Some(scheduledAt) => - logger.debug(s"Run job dag ${dag.id} at ${scheduledAt.instant} with a delay of ${scheduledAt.delay}") + logger.debug(s"Run job DAG ${dag.id} at ${scheduledAt.instant} with a delay of ${scheduledAt.delay}") for { _ <- IO(state.addNextEventToState(dag, scheduledAt.instant)) _ <- logState @@ -212,7 +211,7 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { runIO.recover { case e => - val message = s"Fatal error Cron loop for job dag ${dag.id} will be stopped" + val message = s"Fatal error Cron loop for job DAG ${dag.id} will be stopped" logger.error(message) logger.error(e.getMessage) throw new Exception(message) @@ -225,20 +224,20 @@ case class CronScheduler(logger: Logger) extends Scheduler[CronScheduling] { programs.toList.parSequence.unsafeRunAsyncAndForget() } - override def getStats(jobIds: Set[String]): Json = state.snapshotAsJson(jobIds) + override def getStats(dagIds: Set[String]): Json = state.snapshotAsJson(dagIds) override def start(workload: Workload[CronScheduling], executor: Executor[CronScheduling], xa: XA, logger: Logger = logger): Unit = { - logger.info("Getting paused jobs") - val pausedJob = queries.getPausedJobs.transact(xa).unsafeRunSync() + logger.info("Getting paused DAGs") + val pausedDag = queries.getPausedJobs.transact(xa).unsafeRunSync() val programs = workload match { case CronWorkload(dags) => logger.info("Init Cron State") - state.init(dags, pausedJob) - logger.info(s"Building IOs for Cron Workload with ${dags.size} job dag(s)") + state.init(dags, pausedDag) + logger.info(s"Building IOs for Cron Workload with ${dags.size} job DAG(s)") dags.map(dag => run(dag, executor)) } diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/Database.scala b/cron/src/main/scala/com/criteo/cuttle/cron/Database.scala index 94f6a4318..22c5dee44 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/Database.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/Database.scala @@ -2,8 +2,8 @@ package com.criteo.cuttle.cron import java.time.Instant -import doobie.util.fragment.Fragment import doobie.implicits._ +import doobie.util.fragment.Fragment private[cron] object Database { def sqlGetContextsBetween(start: Instant, end: Instant, job: CronJob): Fragment = diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/UI.scala b/cron/src/main/scala/com/criteo/cuttle/cron/UI.scala index 14f5062bb..27657cb07 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/UI.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/UI.scala @@ -1,18 +1,19 @@ package com.criteo.cuttle.cron -import java.time.{Instant, ZoneId} import java.time.format.DateTimeFormatter +import java.time.{Instant, ZoneId} -import scala.util.Try import cats.Foldable import cats.data.EitherT import cats.effect.IO import cats.implicits._ -import lol.http._ +import com.criteo.cuttle.{ExecutionLog, ExecutionStatus, Executor, XA} +import io.circe.Json import lol.html._ +import lol.http._ import lol.json._ -import io.circe.Json -import com.criteo.cuttle.{ExecutionLog, ExecutionStatus, Executor, PausedJob, XA} + +import scala.util.Try private[cron] case class UI(project: CronProject, executor: Executor[CronScheduling])(implicit val transactor: XA) { private val scheduler = project.scheduler @@ -98,15 +99,15 @@ private[cron] case class UI(project: CronProject, executor: Executor[CronSchedul } } - private implicit val pausedToHtml = ToHtml { pausedDags: Map[CronDag, PausedJob] => + private implicit val pausedToHtml = ToHtml { pausedDags: Map[CronDag, PausedDag] => foldHtml(pausedDags.toList.sortBy(_._1.id)) { - case (cronDag, pausedJob) => + case (cronDag, pausedDag) => tmpl"""
Running @e.job.id, retry: @e.context.retry
") + executions => foldHtml(executions.toList)(e => tmpl"Running @e.job.id
") ) } @@ -226,7 +226,7 @@ private[cron] case class UI(project: CronProject, executor: Executor[CronSchedul jobIds <- EitherT.rightT[IO, Throwable](dag.cronPipeline.vertices.map(_.id)) limit <- EitherT.rightT[IO, Throwable](Try(limit.toInt).getOrElse(Int.MaxValue)) executionList <- EitherT.right[Throwable]( - buildExecutionsList(executor, jobIds, minStartDateForExecutions, maxStartDateForExecutions, limit) + buildExecutionsList(executor, jobIds, None, None, limit) ) } yield (dag, executionList) diff --git a/cron/src/main/scala/com/criteo/cuttle/cron/package.scala b/cron/src/main/scala/com/criteo/cuttle/cron/package.scala index 2c53e9bea..055bba3c9 100644 --- a/cron/src/main/scala/com/criteo/cuttle/cron/package.scala +++ b/cron/src/main/scala/com/criteo/cuttle/cron/package.scala @@ -51,24 +51,25 @@ package object cron { } - // Fair assumptions about start and end date within which we operate by default if user doesn't specify his interval. - // We choose these dates over Instant.MIN and Instant.MAX because MySQL works within this range. - private[cron] val minStartDateForExecutions = Instant.parse("1000-01-01T00:00:00Z") - private[cron] val maxStartDateForExecutions = Instant.parse("9999-12-31T23:59:59Z") - - // This function was implemented because executor.archivedExecutions returns duplicates when passing the same table - // into the context query. private[cron] def buildExecutionsList(executor: Executor[CronScheduling], - jobPartIds: Set[String], - startDate: Instant, - endDate: Instant, + jobIds: Set[String], + startDate: Option[Instant], + endDate: Option[Instant], limit: Int): IO[Map[Instant, Seq[ExecutionLog]]] = for { - archived <- executor.rawArchivedExecutions(jobPartIds, "", asc = false, 0, limit) + archived <- executor.archivedExecutions( + queryContexts = Database.sqlGetContextsBetween(startDate, endDate), + jobs = jobIds, + sort = "", + asc = false, + offset = 0, + limit = limit + ) running <- IO(executor.runningExecutions.collect { case (e, status) - if jobPartIds.contains(e.job.id) && e.context.instant.isAfter(startDate) && e.context.instant - .isBefore(endDate) => + if jobIds.contains(e.job.id) + && startDate.forall(e.context.instant.isAfter) + && endDate.forall(e.context.instant.isBefore) => e.toExecutionLog(status) }) } yield (running ++ archived).groupBy(