diff --git a/build.sbt b/build.sbt index 14526e0ff..16301d6cf 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ val devMode = settingKey[Boolean]("Some build optimization are applied in devMode.") val writeClasspath = taskKey[File]("Write the project classpath to a file.") -val VERSION = "0.3.1" +val VERSION = "0.3.2" lazy val commonSettings = Seq( organization := "com.criteo.cuttle", @@ -179,9 +179,9 @@ lazy val cuttle = "com.criteo.lolhttp" %% "lolhttp", "com.criteo.lolhttp" %% "loljson", "com.criteo.lolhttp" %% "lolhtml" - ).map(_ % "0.9.0"), + ).map(_ % "0.9.2"), libraryDependencies ++= Seq("core", "generic", "parser") - .map(module => "io.circe" %% s"circe-${module}" % "0.9.0-M3"), + .map(module => "io.circe" %% s"circe-${module}" % "0.9.1"), libraryDependencies ++= Seq( "de.sciss" %% "fingertree" % "1.5.2", "org.scala-stm" %% "scala-stm" % "0.8", @@ -194,7 +194,7 @@ lazy val cuttle = libraryDependencies ++= Seq( "org.tpolecat" %% "doobie-core", "org.tpolecat" %% "doobie-hikari" - ).map(_ % "0.5.0-M11"), + ).map(_ % "0.5.0"), libraryDependencies ++= Seq( "mysql" % "mysql-connector-java" % "6.0.6" ), @@ -226,8 +226,7 @@ lazy val cuttle = if (operatingSystem.indexOf("win") >= 0) { val yarnJsPath = ("where yarn.js" !!).trim() assert(s"""node "$yarnJsPath" install""" ! logger == 0, "yarn failed") - } - else { + } else { assert("yarn install" ! logger == 0, "yarn failed") } logger.out("Running webpack...") diff --git a/core/src/main/scala/com/criteo/cuttle/Database.scala b/core/src/main/scala/com/criteo/cuttle/Database.scala index 4b6f17f8a..f440d63ee 100644 --- a/core/src/main/scala/com/criteo/cuttle/Database.scala +++ b/core/src/main/scala/com/criteo/cuttle/Database.scala @@ -121,7 +121,7 @@ private[cuttle] object Database { (for { locks <- sql""" SELECT locked_by, locked_at FROM locks WHERE TIMESTAMPDIFF(MINUTE, locked_at, NOW()) < 5; - """.query[(String, Instant)].list + """.query[(String, Instant)].to[List] _ <- if (locks.isEmpty) { sql""" DELETE FROM locks; @@ -260,7 +260,7 @@ private[cuttle] trait Queries { fr"job", NonEmptyList.fromListUnsafe(jobs.toList)) ++ orderBy ++ sql""" LIMIT $limit OFFSET $offset""") .query[(String, String, Instant, Instant, Json, ExecutionStatus, Int)] - .list + .to[List] .map(_.map { case (id, job, startTime, endTime, context, status, waitingSeconds) => ExecutionLog(id, job, Some(startTime), Some(endTime), context, status, waitingSeconds = waitingSeconds) @@ -317,7 +317,7 @@ private[cuttle] trait Queries { where job=$jobId and end_time > DATE_SUB(CURDATE(), INTERVAL 30 DAY) order by start_time asc, end_time asc """ .query[(Instant, Instant, Int, Int, ExecutionStatus)] - .list + .to[List] .map(_.map { case (startTime, endTime, durationSeconds, waitingSeconds, status) => new ExecutionStat(startTime, endTime, durationSeconds, waitingSeconds, status) diff --git a/core/src/main/scala/com/criteo/cuttle/Executor.scala b/core/src/main/scala/com/criteo/cuttle/Executor.scala index 8cdb387a6..19a6b55e4 100644 --- a/core/src/main/scala/com/criteo/cuttle/Executor.scala +++ b/core/src/main/scala/com/criteo/cuttle/Executor.scala @@ -9,7 +9,7 @@ import scala.concurrent.duration._ import scala.concurrent.stm.Txn.ExternalDecider import scala.concurrent.stm._ import scala.concurrent.{Future, Promise} -import scala.reflect.{ClassTag, classTag} +import scala.reflect.{classTag, ClassTag} import scala.util.{Failure, Success, Try} import cats.Eq @@ -138,18 +138,19 @@ class CancellationListener private[cuttle] (execution: Execution[_], private[cut * @param executionContext The scoped `scala.concurrent.ExecutionContext` for this execution. */ case class Execution[S <: Scheduling]( - id: String, - job: Job[S], - context: S#Context, - streams: ExecutionStreams, - platforms: Seq[ExecutionPlatform], - projectName: String - )(implicit val executionContext: SideEffectExecutionContext) { + id: String, + job: Job[S], + context: S#Context, + streams: ExecutionStreams, + platforms: Seq[ExecutionPlatform], + projectName: String +)(implicit val executionContext: SideEffectExecutionContext) { private var waitingSeconds = 0 private[cuttle] var startTime: Option[Instant] = None private val cancelListeners = TSet.empty[CancellationListener] private val cancelled = Ref(false) + /** * An execution with forcedSuccess set to true will have its side effect return a successful Future instance even if the * user code raised an exception or returned a failed Future instance. @@ -208,14 +209,14 @@ case class Execution[S <: Scheduling]( hasBeenCancelled } - def forceSuccess()(implicit user: User): Unit = { + def forceSuccess()(implicit user: User): Unit = if (!atomic { implicit txn => - forcedSuccess.getAndTransform(_ => true) - }) { - streams.debug(s"""Possible execution failures will be ignored and final execution status will be marked as success. + forcedSuccess.getAndTransform(_ => true) + }) { + streams.debug( + s"""Possible execution failures will be ignored and final execution status will be marked as success. |Change initiated by user ${user.userId} at ${Instant.now().toString}.""".stripMargin) } - } private[cuttle] def toExecutionLog(status: ExecutionStatus, failing: Option[FailingJob] = None) = ExecutionLog( @@ -349,10 +350,11 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla // signals whether the instance is shutting down private val isShuttingDown: Ref[Boolean] = Ref(false) private val timer = new Timer("com.criteo.cuttle.Executor.timer") - private val executionsCounters: Ref[Counter[Long]] = Ref(Counter[Long]( - "cuttle_executions_total", - help = "The number of finished executions that we have in concrete states by job and by tag" - )) + private val executionsCounters: Ref[Counter[Long]] = Ref( + Counter[Long]( + "cuttle_executions_total", + help = "The number of finished executions that we have in concrete states by job and by tag" + )) // executions that failed recently and are now running private def retryingExecutions(filteredJobs: Set[String]): Seq[(Execution[S], FailingJob, ExecutionStatus)] = @@ -388,7 +390,8 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla .filter({ case (_, s) => s == ExecutionStatus.ExecutionWaiting }) .foreach({ case (e, _) => e.updateWaitingTime(intervalSeconds) }) }) - .run + .compile + .drain .unsafeRunAsync(_ => ()) } @@ -710,15 +713,15 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla private[cuttle] def updateFinishedExecutionCounters(execution: Execution[S], status: String): Unit = atomic { implicit txn => - val tagsLabel = if (execution.job.tags.nonEmpty) - Set("tags" -> execution.job.tags.map(_.name).mkString(",")) - else - Set.empty + val tagsLabel = + if (execution.job.tags.nonEmpty) + Set("tags" -> execution.job.tags.map(_.name).mkString(",")) + else + Set.empty executionsCounters() = executionsCounters().inc( Set("type" -> status, "job_id" -> execution.job.id) ++ tagsLabel ) } - private def run0(all: Seq[(Job[S], S#Context)]): Seq[(Execution[S], Future[Completed])] = { sealed trait NewExecution case object ToRunNow extends NewExecution @@ -896,7 +899,6 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla private[cuttle] def healthCheck(): Try[Boolean] = Try(queries.healthCheck.transact(xa).unsafeRunSync) - private case class ExecutionInfo(jobId: String, tags: Set[String], status: ExecutionStatus) /** @@ -922,8 +924,9 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla ) val (running: Seq[ExecutionInfo], waiting: Seq[ExecutionInfo]) = runningExecutions - .map { case (exec, status) => - ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), status) + .map { + case (exec, status) => + ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), status) } .partition { execution => execution.status == ExecutionStatus.ExecutionRunning @@ -956,21 +959,39 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla ) } - private def getMetricsByTag( - running: Seq[ExecutionInfo], - waiting: Seq[ExecutionInfo], - paused: Seq[ExecutionInfo], - failing: Seq[ExecutionInfo]): Metrics.Metric = { - ( // Explode by tag - running.flatMap { info => info.tags } - .groupBy(identity).mapValues("running" -> _.size).toList ++ - waiting.flatMap { info => info.tags } - .groupBy(identity).mapValues("waiting" -> _.size).toList ++ - paused.flatMap { info => info.tags } - .groupBy(identity).mapValues("paused" -> _.size).toList ++ - failing.flatMap { info => info.tags } - .groupBy(identity).mapValues("failing" -> _.size).toList - ).foldLeft( + private def getMetricsByTag(running: Seq[ExecutionInfo], + waiting: Seq[ExecutionInfo], + paused: Seq[ExecutionInfo], + failing: Seq[ExecutionInfo]): Metrics.Metric = + (// Explode by tag + running + .flatMap { info => + info.tags + } + .groupBy(identity) + .mapValues("running" -> _.size) + .toList ++ + waiting + .flatMap { info => + info.tags + } + .groupBy(identity) + .mapValues("waiting" -> _.size) + .toList ++ + paused + .flatMap { info => + info.tags + } + .groupBy(identity) + .mapValues("paused" -> _.size) + .toList ++ + failing + .flatMap { info => + info.tags + } + .groupBy(identity) + .mapValues("failing" -> _.size) + .toList).foldLeft( Gauge("cuttle_scheduler_stat_count_by_tag", "The number of executions that we have in concrete states by tag") ) { case (gauge, (tag, (status, count))) => @@ -978,23 +999,20 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla case (gauge, _) => gauge } - } - private def getMetricsByJob( - running: Seq[ExecutionInfo], - waiting: Seq[ExecutionInfo], - paused: Seq[ExecutionInfo], - failing: Seq[ExecutionInfo]): Metrics.Metric = { + private def getMetricsByJob(running: Seq[ExecutionInfo], + waiting: Seq[ExecutionInfo], + paused: Seq[ExecutionInfo], + failing: Seq[ExecutionInfo]): Metrics.Metric = ( running.groupBy(_.jobId).mapValues("running" -> _.size).toList ++ - waiting.groupBy(_.jobId).mapValues("waiting" -> _.size).toList ++ - paused.groupBy(_.jobId).mapValues("paused" -> _.size).toList ++ - failing.groupBy(_.jobId).mapValues("failing" -> _.size).toList + waiting.groupBy(_.jobId).mapValues("waiting" -> _.size).toList ++ + paused.groupBy(_.jobId).mapValues("paused" -> _.size).toList ++ + failing.groupBy(_.jobId).mapValues("failing" -> _.size).toList ).foldLeft( Gauge("cuttle_scheduler_stat_count_by_job", "The number of executions that we have in concrete states by job") ) { case (gauge, (jobId, (status, count))) => gauge.labeled(Set("job" -> jobId, "type" -> status), count) } - } } diff --git a/core/src/main/scala/com/criteo/cuttle/Metrics.scala b/core/src/main/scala/com/criteo/cuttle/Metrics.scala index a74fad099..e32907904 100644 --- a/core/src/main/scala/com/criteo/cuttle/Metrics.scala +++ b/core/src/main/scala/com/criteo/cuttle/Metrics.scala @@ -1,6 +1,5 @@ package com.criteo.cuttle -import scala.concurrent.stm.{TMap, atomic} import scala.math.Numeric /** Expose cuttle metrics via the [[https://prometheus.io prometheus]] protocol. */ @@ -50,12 +49,13 @@ object Metrics { * @param name The metric name. * @param help Metric description if provided. * @param labels2Value map of (label name, label value) pairs to counter values - */ + */ case class Counter[T]( - name: String, - help: String = "", - labels2Value: Map[Set[(String, String)], AnyVal] = Map.empty - )(implicit number: Numeric[T]) extends Metric { + name: String, + help: String = "", + labels2Value: Map[Set[(String, String)], AnyVal] = Map.empty + )(implicit number: Numeric[T]) + extends Metric { override val metricType: MetricType = counter diff --git a/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala b/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala index e525f34ab..58dce0962 100644 --- a/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala +++ b/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala @@ -44,7 +44,8 @@ class RateLimiter(tokens: Int, refillRateInMs: Int) extends WaitingExecutionQueu } fs2.Stream(runNext()) }) - .run + .compile + .drain .unsafeRunAsync(_ => ()) def canRunNextCondition(implicit txn: InTxn) = _tokens() >= 1 diff --git a/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala b/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala index 81d4c8a52..a6e3c1414 100644 --- a/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala +++ b/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala @@ -109,8 +109,8 @@ class ExecutorSpec extends FunSuite with TestScheduling { } private def buildJob(jobId: String, tags: Set[Tag] = Set.empty): Job[TestScheduling] = - Job(jobId, TestScheduling(), jobId, tags = tags) { - implicit execution => Future { Completed }(execution.executionContext) + Job(jobId, TestScheduling(), jobId, tags = tags) { implicit execution => + Future { Completed }(execution.executionContext) } private def buildExecutionForJob(job: Job[TestScheduling]): Execution[TestScheduling] = @@ -132,5 +132,5 @@ class ExecutorSpec extends FunSuite with TestScheduling { private val fooBarJob: Job[TestScheduling] = buildJob("foo_bar_job", Set(fooTag, barTag)) - private val untaggedJob :Job[TestScheduling] = buildJob("untagged_job") + private val untaggedJob: Job[TestScheduling] = buildJob("untagged_job") } diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala index df94c4d44..a8bfc0bef 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala @@ -24,18 +24,23 @@ private[timeseries] object Database { val contextIdMigration: ConnectionIO[Unit] = { implicit val jobs: Set[TimeSeriesJob] = Set.empty - val chunkSize = 1024*10 + val chunkSize = 1024 * 10 val stream = sql"SELECT id, json FROM timeseries_contexts" - .query[(String, Json)].processWithChunkSize(chunkSize) + .query[(String, Json)] + .streamWithChunkSize(chunkSize) val insert = Update[(String, String)]("INSERT into tmp (id, new_id) VALUES (? , ?)") for { _ <- sql"CREATE TEMPORARY TABLE tmp (id VARCHAR(1000), new_id VARCHAR(1000))".update.run - _ <- - stream.chunkLimit(chunkSize).evalMap { oldContexts => - insert.updateMany(oldContexts.map { case (id, json) => - (id, json.as[TimeSeriesContext].right.get.toId) + _ <- stream + .chunkLimit(chunkSize) + .evalMap { oldContexts => + insert.updateMany(oldContexts.map { + case (id, json) => + (id, json.as[TimeSeriesContext].right.get.toId) }) - }.run + } + .compile + .drain _ <- sql"CREATE INDEX tmp_id ON tmp (id)".update.run _ <- sql"""UPDATE timeseries_contexts ctx JOIN tmp ON ctx.id = tmp.id SET ctx.id = tmp.new_id""".update.run @@ -209,7 +214,7 @@ private[timeseries] object Database { ORDER BY c.id DESC """ .query[(String, String, Instant, Instant, Json, ExecutionStatus, Int)] - .list + .to[List] .map(_.map { case (id, job, startTime, endTime, context, status, waitingSeconds) => ExecutionLog(id, job, Some(startTime), Some(endTime), context, status, waitingSeconds = waitingSeconds) diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala index 4b1c81a00..a7ca34811 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala @@ -417,7 +417,7 @@ private[timeseries] trait TimeSeriesApp { self: TimeSeriesScheduler => case GET at url"/api/timeseries/backfills" => Database .queryBackfills() - .list + .to[List] .map(_.map { case (id, name, description, jobs, priority, start, end, created_at, status, created_by) => Json.obj( diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala index e863e36dc..b2e018863 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala @@ -438,7 +438,7 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] wit atomic { implicit txn => val incompleteBackfills = Database .queryBackfills(Some(sql"""status = 'RUNNING'""")) - .list + .to[List] .map(_.map { case (id, name, description, jobsIdsString, priority, start, end, _, status, createdBy) => val jobsIds = jobsIdsString.split(",")