From 17488e1b186aeee68bd5b0027f8bb2c0fbb2e4bb Mon Sep 17 00:00:00 2001 From: eryshev Date: Tue, 13 Feb 2018 17:51:35 +0100 Subject: [PATCH 1/3] introducing jobs id validation on workflow starting (#228) --- .../timeseries/TimeSeriesScheduler.scala | 16 ++++++++++++---- .../cuttle/timeseries/TimeSeriesSpec.scala | 18 ++++++++++++++++++ 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala index 94d537911..e863e36dc 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala @@ -103,7 +103,8 @@ object TimeSeriesCalendar { * @param firstDay The first day of the week for these weeks. */ case class Weekly(tz: ZoneId, firstDay: DayOfWeek) extends TimeSeriesCalendar { - private def truncateToWeek(t: ZonedDateTime) = t.`with`(TemporalAdjusters.previousOrSame(firstDay)).truncatedTo(DAYS) + private def truncateToWeek(t: ZonedDateTime) = + t.`with`(TemporalAdjusters.previousOrSame(firstDay)).truncatedTo(DAYS) def truncate(t: Instant) = truncateToWeek(t.atZone(tz)).toInstant def next(t: Instant) = truncateToWeek(t.atZone(tz)).plus(1, WEEKS).toInstant } @@ -720,9 +721,10 @@ private[timeseries] object TimeSeriesUtils { val UTC: ZoneId = ZoneId.of("UTC") /** - * Validation of cycle absence in workflow DAG and an absence the (execution, dependency) tuple that execution has - * a start date after an execution's start date. - * It's implemented based on Kahn's algorithm. + * Validation of: + * - cycle absence in workflow DAG, implemented based on Kahn's algorithm + * - absence the (child, parent) tuple that child has a start date before parent's start date + * - absence of jobs with the same id * @param workflow workflow to be validated * @return either a validation errors list or an unit */ @@ -755,6 +757,12 @@ private[timeseries] object TimeSeriesUtils { if (edges.nonEmpty) errors += "Workflow has at least one cycle" + workflow.vertices.groupBy(_.id).collect { + case (id: String, jobs) if jobs.size > 1 => id + } foreach (id => { + errors += s"Id $id is used by more than 1 job" + }) + if (errors.nonEmpty) Left(errors.toList) else Right(()) } diff --git a/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSpec.scala b/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSpec.scala index 99c4bfea2..fee886e3b 100644 --- a/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSpec.scala +++ b/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSpec.scala @@ -81,4 +81,22 @@ class TimeSeriesSpec extends FunSuite with TestScheduling { assert(validationRes.isLeft, "workflow passed start date validation") assert(validationRes.left.get === List("Workflow has at least one cycle"), "errors messages are bad") } + + test("it shouldn't validate a workflow that contains jobs with same ids") { + val id = "badJob" + val badJob = Job(id, hourly(date"2117-03-25T02:00:00Z"))(completed) + val badJobClone = Job(id, hourly(date"2117-03-24T02:00:00Z"))(completed) + val workflowParentChild = badJob dependsOn badJobClone + val workflowSiblings = badJob and badJobClone + + val validationParentChild = TimeSeriesUtils.validate(workflowParentChild) + assert(validationParentChild.isLeft, "it means that workflow passed duplicate id validation") + assert(validationParentChild.left.get === List(s"Id badJob is used by more than 1 job"), + "it means that errors messages are bad") + + val validationSiblings = TimeSeriesUtils.validate(workflowSiblings) + assert(validationSiblings.isLeft, "it means that workflow passed duplicate id validation") + assert(validationSiblings.left.get === List(s"Id badJob is used by more than 1 job"), + "it means that errors messages are bad") + } } From 9dbd4f06d59d4df73b3ed9a88f35cdfd26e6c0de Mon Sep 17 00:00:00 2001 From: Masuzu Date: Wed, 14 Feb 2018 16:06:03 +0100 Subject: [PATCH 2/3] Refactor metrics aggregation and add aggregated metrics by job (#221) * Refactor metrics aggregation and add aggregated metrics by job for running, waiting, failing and paused executions * Compute and report metrics on finished executions aggregated by tag and job type --- build.sbt | 3 +- .../scala/com/criteo/cuttle/Executor.scala | 144 ++++++++++++++---- .../scala/com/criteo/cuttle/Metrics.scala | 29 +++- .../com/criteo/cuttle/ExecutorSpec.scala | 136 +++++++++++++++++ 4 files changed, 277 insertions(+), 35 deletions(-) create mode 100644 core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala diff --git a/build.sbt b/build.sbt index 32619230b..14526e0ff 100644 --- a/build.sbt +++ b/build.sbt @@ -199,7 +199,8 @@ lazy val cuttle = "mysql" % "mysql-connector-java" % "6.0.6" ), libraryDependencies ++= Seq( - "org.scalatest" %% "scalatest" % "3.0.1" + "org.scalatest" %% "scalatest" % "3.0.1", + "org.mockito" % "mockito-all" % "1.10.19" ).map(_ % "test"), // Webpack resourceGenerators in Compile += Def.task { diff --git a/core/src/main/scala/com/criteo/cuttle/Executor.scala b/core/src/main/scala/com/criteo/cuttle/Executor.scala index a4d3afc0c..8cdb387a6 100644 --- a/core/src/main/scala/com/criteo/cuttle/Executor.scala +++ b/core/src/main/scala/com/criteo/cuttle/Executor.scala @@ -349,6 +349,10 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla // signals whether the instance is shutting down private val isShuttingDown: Ref[Boolean] = Ref(false) private val timer = new Timer("com.criteo.cuttle.Executor.timer") + private val executionsCounters: Ref[Counter[Long]] = Ref(Counter[Long]( + "cuttle_executions_total", + help = "The number of finished executions that we have in concrete states by job and by tag" + )) // executions that failed recently and are now running private def retryingExecutions(filteredJobs: Set[String]): Seq[(Execution[S], FailingJob, ExecutionStatus)] = @@ -658,6 +662,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla atomic { implicit txn => runningState -= execution recentFailures -= (execution.job -> execution.context) + updateFinishedExecutionCounters(execution, "success") } case Failure(e) => val stacktrace = new StringWriter() @@ -666,6 +671,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla execution.streams.error(stacktrace.toString) atomic { implicit tx => + updateFinishedExecutionCounters(execution, "failure") // retain jobs in recent failures if last failure happened in [now - retryStrategy.retryWindow, now] recentFailures.retain { case (_, (retryExecution, failingJob)) => @@ -702,6 +708,16 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla } }) + private[cuttle] def updateFinishedExecutionCounters(execution: Execution[S], status: String): Unit = + atomic { implicit txn => + val tagsLabel = if (execution.job.tags.nonEmpty) + Set("tags" -> execution.job.tags.map(_.name).mkString(",")) + else + Set.empty + executionsCounters() = executionsCounters().inc( + Set("type" -> status, "job_id" -> execution.job.id) ++ tagsLabel + ) + } private def run0(all: Seq[(Job[S], S#Context)]): Seq[(Execution[S], Future[Completed])] = { sealed trait NewExecution @@ -880,41 +896,105 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla private[cuttle] def healthCheck(): Try[Boolean] = Try(queries.healthCheck.transact(xa).unsafeRunSync) - override def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric] = { - val ((running, waiting), paused, failing) = getStateAtomic(jobs) + + private case class ExecutionInfo(jobId: String, tags: Set[String], status: ExecutionStatus) + + /** + * @param jobs list of job IDs + * @param getStateAtomic Atomically get executor stats. Given a list of jobs ids, returns how much + * ((running, waiting), paused, failing) jobs are in concrete states + * @param runningExecutions executions which are either either running or waiting for a free thread to start + * @param pausedExecutions + */ + private[cuttle] def getMetrics(jobs: Set[String])( + getStateAtomic: Set[String] => ((Int, Int), Int, Int), + runningExecutions: Seq[(Execution[S], ExecutionStatus)], + pausedExecutions: Seq[Execution[S]], + failingExecutions: Seq[Execution[S]] + ): Seq[Metric] = { + val ((runningCount, waitingCount), pausedCount, failingCount) = getStateAtomic(jobs) val statMetrics = Seq( Gauge("cuttle_scheduler_stat_count", "The number of jobs that we have in concrete states") - .labeled("type" -> "running", running) - .labeled("type" -> "waiting", waiting) - .labeled("type" -> "paused", paused) - .labeled("type" -> "failing", failing)) + .labeled("type" -> "running", runningCount) + .labeled("type" -> "waiting", waitingCount) + .labeled("type" -> "paused", pausedCount) + .labeled("type" -> "failing", failingCount) + ) + + val (running: Seq[ExecutionInfo], waiting: Seq[ExecutionInfo]) = runningExecutions + .map { case (exec, status) => + ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), status) + } + .partition { execution => + execution.status == ExecutionStatus.ExecutionRunning + } + + val paused: Seq[ExecutionInfo] = pausedExecutions.map { exec => + ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), ExecutionStatus.ExecutionPaused) + } + + val failing: Seq[ExecutionInfo] = failingExecutions.map { exec => + ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), ExecutionStatus.ExecutionThrottled) + } + + statMetrics ++ + Seq(getMetricsByTag(running, waiting, paused, failing)) ++ + Seq(getMetricsByJob(running, waiting, paused, failing)) ++ + Seq(executionsCounters.single()) + } + + /** + * @param jobs the list of jobs ids + */ + override def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric] = atomic { implicit txn => - val (running, waiting) = runningExecutions - .flatMap { - case (exec, status) => - exec.job.tags.map(_.name -> status) - } - .partition(_._2 == ExecutionStatus.ExecutionRunning) - statMetrics ++ Seq( - ( - running.groupBy(_._1).mapValues("running" -> _.size).toList ++ - waiting.groupBy(_._1).mapValues("waiting" -> _.size).toList ++ - pausedState.values - .flatMap(_.keys.flatMap(_.job.tags.map(_.name))) - .groupBy(identity) - .mapValues("paused" -> _.size) - .toList ++ - allFailingExecutions - .flatMap(_.job.tags.map(_.name)) - .groupBy(identity) - .mapValues("failing" -> _.size) - .toList - ).foldLeft( - Gauge("cuttle_scheduler_stat_count_by_tag", "The number of jobs that we have in concrete states by tag") - ) { - case (gauge, (tag, (status, count))) => - gauge.labeled(Set("tag" -> tag, "type" -> status), count) - }) + getMetrics(jobs)( + getStateAtomic, + runningExecutions, + pausedState.values.flatMap(executionMap => executionMap.keys).toSeq, + allFailingExecutions + ) + } + + private def getMetricsByTag( + running: Seq[ExecutionInfo], + waiting: Seq[ExecutionInfo], + paused: Seq[ExecutionInfo], + failing: Seq[ExecutionInfo]): Metrics.Metric = { + ( // Explode by tag + running.flatMap { info => info.tags } + .groupBy(identity).mapValues("running" -> _.size).toList ++ + waiting.flatMap { info => info.tags } + .groupBy(identity).mapValues("waiting" -> _.size).toList ++ + paused.flatMap { info => info.tags } + .groupBy(identity).mapValues("paused" -> _.size).toList ++ + failing.flatMap { info => info.tags } + .groupBy(identity).mapValues("failing" -> _.size).toList + ).foldLeft( + Gauge("cuttle_scheduler_stat_count_by_tag", "The number of executions that we have in concrete states by tag") + ) { + case (gauge, (tag, (status, count))) => + gauge.labeled(Set("tag" -> tag, "type" -> status), count) + case (gauge, _) => + gauge + } + } + + private def getMetricsByJob( + running: Seq[ExecutionInfo], + waiting: Seq[ExecutionInfo], + paused: Seq[ExecutionInfo], + failing: Seq[ExecutionInfo]): Metrics.Metric = { + ( + running.groupBy(_.jobId).mapValues("running" -> _.size).toList ++ + waiting.groupBy(_.jobId).mapValues("waiting" -> _.size).toList ++ + paused.groupBy(_.jobId).mapValues("paused" -> _.size).toList ++ + failing.groupBy(_.jobId).mapValues("failing" -> _.size).toList + ).foldLeft( + Gauge("cuttle_scheduler_stat_count_by_job", "The number of executions that we have in concrete states by job") + ) { + case (gauge, (jobId, (status, count))) => + gauge.labeled(Set("job" -> jobId, "type" -> status), count) } } } diff --git a/core/src/main/scala/com/criteo/cuttle/Metrics.scala b/core/src/main/scala/com/criteo/cuttle/Metrics.scala index 1907368d9..a74fad099 100644 --- a/core/src/main/scala/com/criteo/cuttle/Metrics.scala +++ b/core/src/main/scala/com/criteo/cuttle/Metrics.scala @@ -1,11 +1,15 @@ package com.criteo.cuttle +import scala.concurrent.stm.{TMap, atomic} +import scala.math.Numeric + /** Expose cuttle metrics via the [[https://prometheus.io prometheus]] protocol. */ object Metrics { object MetricType extends Enumeration { type MetricType = Value val gauge = Value + val counter = Value } import MetricType._ @@ -20,11 +24,11 @@ object Metrics { def isDefined: Boolean = labels2Value.nonEmpty } - /** A __Gauge__ metric epresents a single numerical value that can arbitrarily go up and down. + /** A __Gauge__ metric represents a single numerical value that can arbitrarily go up and down. * * @param name The metric name. * @param help Metric description if provided. - * @param labels2Value The current value.s + * @param labels2Value map of (label name, label value) pairs to the current gauge values */ case class Gauge(name: String, help: String = "", labels2Value: Map[Set[(String, String)], AnyVal] = Map.empty) extends Metric { @@ -40,6 +44,27 @@ object Metrics { def set(value: AnyVal): Gauge = copy(labels2Value = labels2Value + (Set.empty[(String, String)] -> value)) } + /** + * A __Counter__ contains a value that can only be incremented. Increments are not threadsafe. + * + * @param name The metric name. + * @param help Metric description if provided. + * @param labels2Value map of (label name, label value) pairs to counter values + */ + case class Counter[T]( + name: String, + help: String = "", + labels2Value: Map[Set[(String, String)], AnyVal] = Map.empty + )(implicit number: Numeric[T]) extends Metric { + + override val metricType: MetricType = counter + + def inc(label: Set[(String, String)]): Counter[T] = { + val currentCount = labels2Value.getOrElse(label, number.zero).asInstanceOf[T] + copy(labels2Value = labels2Value + (label -> number.plus(currentCount, number.one).asInstanceOf[AnyVal])) + } + } + /** Components able to provide metrics. */ trait MetricProvider[S <: Scheduling] { def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric] diff --git a/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala b/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala new file mode 100644 index 000000000..81d4c8a52 --- /dev/null +++ b/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala @@ -0,0 +1,136 @@ +package com.criteo.cuttle + +import java.sql.{Connection, ResultSet} + +import scala.concurrent.Future + +import cats.effect.IO +import com.mysql.cj.jdbc.PreparedStatement +import doobie.util.transactor.Transactor +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.FunSuite + +import com.criteo.cuttle.ExecutionContexts.Implicits.sideEffectExecutionContext +import com.criteo.cuttle.ExecutionContexts._ +import com.criteo.cuttle.Metrics.Prometheus + +class ExecutorSpec extends FunSuite with TestScheduling { + test("Executor should return metrics aggregated by job and tag") { + val connection: Connection = { + val mockConnection = mock(classOf[Connection]) + val statement = mock(classOf[PreparedStatement]) + val resultSet = mock(classOf[ResultSet]) + when(mockConnection.prepareStatement(any(classOf[String]))).thenReturn(statement) + when(statement.executeQuery()).thenReturn(resultSet) + mockConnection + } + + val testExecutor = new Executor[TestScheduling]( + Seq.empty, + xa = Transactor.fromConnection[IO](connection).copy(strategy0 = doobie.util.transactor.Strategy.void), + logger, + "test_project" + )(RetryStrategy.ExponentialBackoffRetryStrategy) + + testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(fooJob), "success") + testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(fooJob), "success") + testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(untaggedJob), "success") + testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(fooBarJob), "success") + testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(untaggedJob), "failure") + testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(fooBarJob), "failure") + + val metrics = Prometheus.serialize( + testExecutor.getMetrics(Set("jobA"))( + getStateAtomic = _ => { + ((5, 1), 3, 2) + }, + runningExecutions = Seq( + buildExecutionForJob(fooJob) -> ExecutionStatus.ExecutionRunning, + buildExecutionForJob(fooJob) -> ExecutionStatus.ExecutionRunning, + buildExecutionForJob(fooJob) -> ExecutionStatus.ExecutionWaiting, + buildExecutionForJob(fooBarJob) -> ExecutionStatus.ExecutionRunning, + buildExecutionForJob(untaggedJob) -> ExecutionStatus.ExecutionRunning, + buildExecutionForJob(untaggedJob) -> ExecutionStatus.ExecutionWaiting + ), + pausedExecutions = Seq( + buildExecutionForJob(fooJob), + buildExecutionForJob(fooBarJob), + buildExecutionForJob(untaggedJob) + ), + failingExecutions = Seq( + buildExecutionForJob(fooBarJob), + buildExecutionForJob(fooBarJob), + buildExecutionForJob(untaggedJob) + ) + ) + ) + + println(metrics) + + val expectedMetrics = + """# HELP cuttle_scheduler_stat_count The number of jobs that we have in concrete states + |# TYPE cuttle_scheduler_stat_count gauge + |cuttle_scheduler_stat_count {type="running"} 5 + |cuttle_scheduler_stat_count {type="waiting"} 1 + |cuttle_scheduler_stat_count {type="paused"} 3 + |cuttle_scheduler_stat_count {type="failing"} 2 + |# HELP cuttle_scheduler_stat_count_by_tag The number of executions that we have in concrete states by tag + |# TYPE cuttle_scheduler_stat_count_by_tag gauge + |cuttle_scheduler_stat_count_by_tag {tag="bar", type="paused"} 1 + |cuttle_scheduler_stat_count_by_tag {tag="foo", type="waiting"} 1 + |cuttle_scheduler_stat_count_by_tag {tag="foo", type="running"} 3 + |cuttle_scheduler_stat_count_by_tag {tag="foo", type="paused"} 2 + |cuttle_scheduler_stat_count_by_tag {tag="bar", type="failing"} 2 + |cuttle_scheduler_stat_count_by_tag {tag="foo", type="failing"} 2 + |cuttle_scheduler_stat_count_by_tag {tag="bar", type="running"} 1 + |# HELP cuttle_scheduler_stat_count_by_job The number of executions that we have in concrete states by job + |# TYPE cuttle_scheduler_stat_count_by_job gauge + |cuttle_scheduler_stat_count_by_job {job="untagged_job", type="waiting"} 1 + |cuttle_scheduler_stat_count_by_job {job="foo_bar_job", type="running"} 1 + |cuttle_scheduler_stat_count_by_job {job="untagged_job", type="running"} 1 + |cuttle_scheduler_stat_count_by_job {job="foo_job", type="waiting"} 1 + |cuttle_scheduler_stat_count_by_job {job="foo_job", type="paused"} 1 + |cuttle_scheduler_stat_count_by_job {job="untagged_job", type="failing"} 1 + |cuttle_scheduler_stat_count_by_job {job="foo_job", type="running"} 2 + |cuttle_scheduler_stat_count_by_job {job="foo_bar_job", type="paused"} 1 + |cuttle_scheduler_stat_count_by_job {job="foo_bar_job", type="failing"} 2 + |cuttle_scheduler_stat_count_by_job {job="untagged_job", type="paused"} 1 + |# HELP cuttle_executions_total The number of finished executions that we have in concrete states by job and by tag + |# TYPE cuttle_executions_total counter + |cuttle_executions_total {type="success", job_id="foo_bar_job", tags="foo,bar"} 1 + |cuttle_executions_total {type="success", job_id="foo_job", tags="foo"} 2 + |cuttle_executions_total {type="failure", job_id="foo_bar_job", tags="foo,bar"} 1 + |cuttle_executions_total {type="success", job_id="untagged_job"} 1 + |cuttle_executions_total {type="failure", job_id="untagged_job"} 1 + |""".stripMargin + + assert(metrics == expectedMetrics) + } + + private def buildJob(jobId: String, tags: Set[Tag] = Set.empty): Job[TestScheduling] = + Job(jobId, TestScheduling(), jobId, tags = tags) { + implicit execution => Future { Completed }(execution.executionContext) + } + + private def buildExecutionForJob(job: Job[TestScheduling]): Execution[TestScheduling] = + Execution[TestScheduling]( + id = java.util.UUID.randomUUID.toString, + job = job, + context = TestContext(), + streams = new ExecutionStreams { + override private[cuttle] def writeln(str: CharSequence): Unit = ??? + }, + platforms = Seq.empty, + "foo-project" + ) + + private val fooTag = Tag("foo") + private val barTag = Tag("bar") + + private val fooJob: Job[TestScheduling] = buildJob("foo_job", Set(fooTag)) + + private val fooBarJob: Job[TestScheduling] = buildJob("foo_bar_job", Set(fooTag, barTag)) + + private val untaggedJob :Job[TestScheduling] = buildJob("untagged_job") +} From 1ae99214a053040f1692905349ac42590ef45ccc Mon Sep 17 00:00:00 2001 From: dufrannea Date: Thu, 15 Feb 2018 11:01:15 +0100 Subject: [PATCH 3/3] Bumps lolhttp to 9.0.2 (#229) * Bumps lolhttp to 9.0.2 * Removes warnings --- build.sbt | 11 +- .../scala/com/criteo/cuttle/Database.scala | 6 +- .../scala/com/criteo/cuttle/Executor.scala | 120 ++++++++++-------- .../scala/com/criteo/cuttle/Metrics.scala | 12 +- .../criteo/cuttle/platforms/RateLimiter.scala | 3 +- .../com/criteo/cuttle/ExecutorSpec.scala | 6 +- .../criteo/cuttle/timeseries/Database.scala | 21 +-- .../cuttle/timeseries/TimeSeriesApp.scala | 2 +- .../timeseries/TimeSeriesScheduler.scala | 2 +- 9 files changed, 103 insertions(+), 80 deletions(-) diff --git a/build.sbt b/build.sbt index 14526e0ff..16301d6cf 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ val devMode = settingKey[Boolean]("Some build optimization are applied in devMode.") val writeClasspath = taskKey[File]("Write the project classpath to a file.") -val VERSION = "0.3.1" +val VERSION = "0.3.2" lazy val commonSettings = Seq( organization := "com.criteo.cuttle", @@ -179,9 +179,9 @@ lazy val cuttle = "com.criteo.lolhttp" %% "lolhttp", "com.criteo.lolhttp" %% "loljson", "com.criteo.lolhttp" %% "lolhtml" - ).map(_ % "0.9.0"), + ).map(_ % "0.9.2"), libraryDependencies ++= Seq("core", "generic", "parser") - .map(module => "io.circe" %% s"circe-${module}" % "0.9.0-M3"), + .map(module => "io.circe" %% s"circe-${module}" % "0.9.1"), libraryDependencies ++= Seq( "de.sciss" %% "fingertree" % "1.5.2", "org.scala-stm" %% "scala-stm" % "0.8", @@ -194,7 +194,7 @@ lazy val cuttle = libraryDependencies ++= Seq( "org.tpolecat" %% "doobie-core", "org.tpolecat" %% "doobie-hikari" - ).map(_ % "0.5.0-M11"), + ).map(_ % "0.5.0"), libraryDependencies ++= Seq( "mysql" % "mysql-connector-java" % "6.0.6" ), @@ -226,8 +226,7 @@ lazy val cuttle = if (operatingSystem.indexOf("win") >= 0) { val yarnJsPath = ("where yarn.js" !!).trim() assert(s"""node "$yarnJsPath" install""" ! logger == 0, "yarn failed") - } - else { + } else { assert("yarn install" ! logger == 0, "yarn failed") } logger.out("Running webpack...") diff --git a/core/src/main/scala/com/criteo/cuttle/Database.scala b/core/src/main/scala/com/criteo/cuttle/Database.scala index 4b6f17f8a..f440d63ee 100644 --- a/core/src/main/scala/com/criteo/cuttle/Database.scala +++ b/core/src/main/scala/com/criteo/cuttle/Database.scala @@ -121,7 +121,7 @@ private[cuttle] object Database { (for { locks <- sql""" SELECT locked_by, locked_at FROM locks WHERE TIMESTAMPDIFF(MINUTE, locked_at, NOW()) < 5; - """.query[(String, Instant)].list + """.query[(String, Instant)].to[List] _ <- if (locks.isEmpty) { sql""" DELETE FROM locks; @@ -260,7 +260,7 @@ private[cuttle] trait Queries { fr"job", NonEmptyList.fromListUnsafe(jobs.toList)) ++ orderBy ++ sql""" LIMIT $limit OFFSET $offset""") .query[(String, String, Instant, Instant, Json, ExecutionStatus, Int)] - .list + .to[List] .map(_.map { case (id, job, startTime, endTime, context, status, waitingSeconds) => ExecutionLog(id, job, Some(startTime), Some(endTime), context, status, waitingSeconds = waitingSeconds) @@ -317,7 +317,7 @@ private[cuttle] trait Queries { where job=$jobId and end_time > DATE_SUB(CURDATE(), INTERVAL 30 DAY) order by start_time asc, end_time asc """ .query[(Instant, Instant, Int, Int, ExecutionStatus)] - .list + .to[List] .map(_.map { case (startTime, endTime, durationSeconds, waitingSeconds, status) => new ExecutionStat(startTime, endTime, durationSeconds, waitingSeconds, status) diff --git a/core/src/main/scala/com/criteo/cuttle/Executor.scala b/core/src/main/scala/com/criteo/cuttle/Executor.scala index 8cdb387a6..19a6b55e4 100644 --- a/core/src/main/scala/com/criteo/cuttle/Executor.scala +++ b/core/src/main/scala/com/criteo/cuttle/Executor.scala @@ -9,7 +9,7 @@ import scala.concurrent.duration._ import scala.concurrent.stm.Txn.ExternalDecider import scala.concurrent.stm._ import scala.concurrent.{Future, Promise} -import scala.reflect.{ClassTag, classTag} +import scala.reflect.{classTag, ClassTag} import scala.util.{Failure, Success, Try} import cats.Eq @@ -138,18 +138,19 @@ class CancellationListener private[cuttle] (execution: Execution[_], private[cut * @param executionContext The scoped `scala.concurrent.ExecutionContext` for this execution. */ case class Execution[S <: Scheduling]( - id: String, - job: Job[S], - context: S#Context, - streams: ExecutionStreams, - platforms: Seq[ExecutionPlatform], - projectName: String - )(implicit val executionContext: SideEffectExecutionContext) { + id: String, + job: Job[S], + context: S#Context, + streams: ExecutionStreams, + platforms: Seq[ExecutionPlatform], + projectName: String +)(implicit val executionContext: SideEffectExecutionContext) { private var waitingSeconds = 0 private[cuttle] var startTime: Option[Instant] = None private val cancelListeners = TSet.empty[CancellationListener] private val cancelled = Ref(false) + /** * An execution with forcedSuccess set to true will have its side effect return a successful Future instance even if the * user code raised an exception or returned a failed Future instance. @@ -208,14 +209,14 @@ case class Execution[S <: Scheduling]( hasBeenCancelled } - def forceSuccess()(implicit user: User): Unit = { + def forceSuccess()(implicit user: User): Unit = if (!atomic { implicit txn => - forcedSuccess.getAndTransform(_ => true) - }) { - streams.debug(s"""Possible execution failures will be ignored and final execution status will be marked as success. + forcedSuccess.getAndTransform(_ => true) + }) { + streams.debug( + s"""Possible execution failures will be ignored and final execution status will be marked as success. |Change initiated by user ${user.userId} at ${Instant.now().toString}.""".stripMargin) } - } private[cuttle] def toExecutionLog(status: ExecutionStatus, failing: Option[FailingJob] = None) = ExecutionLog( @@ -349,10 +350,11 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla // signals whether the instance is shutting down private val isShuttingDown: Ref[Boolean] = Ref(false) private val timer = new Timer("com.criteo.cuttle.Executor.timer") - private val executionsCounters: Ref[Counter[Long]] = Ref(Counter[Long]( - "cuttle_executions_total", - help = "The number of finished executions that we have in concrete states by job and by tag" - )) + private val executionsCounters: Ref[Counter[Long]] = Ref( + Counter[Long]( + "cuttle_executions_total", + help = "The number of finished executions that we have in concrete states by job and by tag" + )) // executions that failed recently and are now running private def retryingExecutions(filteredJobs: Set[String]): Seq[(Execution[S], FailingJob, ExecutionStatus)] = @@ -388,7 +390,8 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla .filter({ case (_, s) => s == ExecutionStatus.ExecutionWaiting }) .foreach({ case (e, _) => e.updateWaitingTime(intervalSeconds) }) }) - .run + .compile + .drain .unsafeRunAsync(_ => ()) } @@ -710,15 +713,15 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla private[cuttle] def updateFinishedExecutionCounters(execution: Execution[S], status: String): Unit = atomic { implicit txn => - val tagsLabel = if (execution.job.tags.nonEmpty) - Set("tags" -> execution.job.tags.map(_.name).mkString(",")) - else - Set.empty + val tagsLabel = + if (execution.job.tags.nonEmpty) + Set("tags" -> execution.job.tags.map(_.name).mkString(",")) + else + Set.empty executionsCounters() = executionsCounters().inc( Set("type" -> status, "job_id" -> execution.job.id) ++ tagsLabel ) } - private def run0(all: Seq[(Job[S], S#Context)]): Seq[(Execution[S], Future[Completed])] = { sealed trait NewExecution case object ToRunNow extends NewExecution @@ -896,7 +899,6 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla private[cuttle] def healthCheck(): Try[Boolean] = Try(queries.healthCheck.transact(xa).unsafeRunSync) - private case class ExecutionInfo(jobId: String, tags: Set[String], status: ExecutionStatus) /** @@ -922,8 +924,9 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla ) val (running: Seq[ExecutionInfo], waiting: Seq[ExecutionInfo]) = runningExecutions - .map { case (exec, status) => - ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), status) + .map { + case (exec, status) => + ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), status) } .partition { execution => execution.status == ExecutionStatus.ExecutionRunning @@ -956,21 +959,39 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla ) } - private def getMetricsByTag( - running: Seq[ExecutionInfo], - waiting: Seq[ExecutionInfo], - paused: Seq[ExecutionInfo], - failing: Seq[ExecutionInfo]): Metrics.Metric = { - ( // Explode by tag - running.flatMap { info => info.tags } - .groupBy(identity).mapValues("running" -> _.size).toList ++ - waiting.flatMap { info => info.tags } - .groupBy(identity).mapValues("waiting" -> _.size).toList ++ - paused.flatMap { info => info.tags } - .groupBy(identity).mapValues("paused" -> _.size).toList ++ - failing.flatMap { info => info.tags } - .groupBy(identity).mapValues("failing" -> _.size).toList - ).foldLeft( + private def getMetricsByTag(running: Seq[ExecutionInfo], + waiting: Seq[ExecutionInfo], + paused: Seq[ExecutionInfo], + failing: Seq[ExecutionInfo]): Metrics.Metric = + (// Explode by tag + running + .flatMap { info => + info.tags + } + .groupBy(identity) + .mapValues("running" -> _.size) + .toList ++ + waiting + .flatMap { info => + info.tags + } + .groupBy(identity) + .mapValues("waiting" -> _.size) + .toList ++ + paused + .flatMap { info => + info.tags + } + .groupBy(identity) + .mapValues("paused" -> _.size) + .toList ++ + failing + .flatMap { info => + info.tags + } + .groupBy(identity) + .mapValues("failing" -> _.size) + .toList).foldLeft( Gauge("cuttle_scheduler_stat_count_by_tag", "The number of executions that we have in concrete states by tag") ) { case (gauge, (tag, (status, count))) => @@ -978,23 +999,20 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla case (gauge, _) => gauge } - } - private def getMetricsByJob( - running: Seq[ExecutionInfo], - waiting: Seq[ExecutionInfo], - paused: Seq[ExecutionInfo], - failing: Seq[ExecutionInfo]): Metrics.Metric = { + private def getMetricsByJob(running: Seq[ExecutionInfo], + waiting: Seq[ExecutionInfo], + paused: Seq[ExecutionInfo], + failing: Seq[ExecutionInfo]): Metrics.Metric = ( running.groupBy(_.jobId).mapValues("running" -> _.size).toList ++ - waiting.groupBy(_.jobId).mapValues("waiting" -> _.size).toList ++ - paused.groupBy(_.jobId).mapValues("paused" -> _.size).toList ++ - failing.groupBy(_.jobId).mapValues("failing" -> _.size).toList + waiting.groupBy(_.jobId).mapValues("waiting" -> _.size).toList ++ + paused.groupBy(_.jobId).mapValues("paused" -> _.size).toList ++ + failing.groupBy(_.jobId).mapValues("failing" -> _.size).toList ).foldLeft( Gauge("cuttle_scheduler_stat_count_by_job", "The number of executions that we have in concrete states by job") ) { case (gauge, (jobId, (status, count))) => gauge.labeled(Set("job" -> jobId, "type" -> status), count) } - } } diff --git a/core/src/main/scala/com/criteo/cuttle/Metrics.scala b/core/src/main/scala/com/criteo/cuttle/Metrics.scala index a74fad099..e32907904 100644 --- a/core/src/main/scala/com/criteo/cuttle/Metrics.scala +++ b/core/src/main/scala/com/criteo/cuttle/Metrics.scala @@ -1,6 +1,5 @@ package com.criteo.cuttle -import scala.concurrent.stm.{TMap, atomic} import scala.math.Numeric /** Expose cuttle metrics via the [[https://prometheus.io prometheus]] protocol. */ @@ -50,12 +49,13 @@ object Metrics { * @param name The metric name. * @param help Metric description if provided. * @param labels2Value map of (label name, label value) pairs to counter values - */ + */ case class Counter[T]( - name: String, - help: String = "", - labels2Value: Map[Set[(String, String)], AnyVal] = Map.empty - )(implicit number: Numeric[T]) extends Metric { + name: String, + help: String = "", + labels2Value: Map[Set[(String, String)], AnyVal] = Map.empty + )(implicit number: Numeric[T]) + extends Metric { override val metricType: MetricType = counter diff --git a/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala b/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala index e525f34ab..58dce0962 100644 --- a/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala +++ b/core/src/main/scala/com/criteo/cuttle/platforms/RateLimiter.scala @@ -44,7 +44,8 @@ class RateLimiter(tokens: Int, refillRateInMs: Int) extends WaitingExecutionQueu } fs2.Stream(runNext()) }) - .run + .compile + .drain .unsafeRunAsync(_ => ()) def canRunNextCondition(implicit txn: InTxn) = _tokens() >= 1 diff --git a/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala b/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala index 81d4c8a52..a6e3c1414 100644 --- a/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala +++ b/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala @@ -109,8 +109,8 @@ class ExecutorSpec extends FunSuite with TestScheduling { } private def buildJob(jobId: String, tags: Set[Tag] = Set.empty): Job[TestScheduling] = - Job(jobId, TestScheduling(), jobId, tags = tags) { - implicit execution => Future { Completed }(execution.executionContext) + Job(jobId, TestScheduling(), jobId, tags = tags) { implicit execution => + Future { Completed }(execution.executionContext) } private def buildExecutionForJob(job: Job[TestScheduling]): Execution[TestScheduling] = @@ -132,5 +132,5 @@ class ExecutorSpec extends FunSuite with TestScheduling { private val fooBarJob: Job[TestScheduling] = buildJob("foo_bar_job", Set(fooTag, barTag)) - private val untaggedJob :Job[TestScheduling] = buildJob("untagged_job") + private val untaggedJob: Job[TestScheduling] = buildJob("untagged_job") } diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala index df94c4d44..a8bfc0bef 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/Database.scala @@ -24,18 +24,23 @@ private[timeseries] object Database { val contextIdMigration: ConnectionIO[Unit] = { implicit val jobs: Set[TimeSeriesJob] = Set.empty - val chunkSize = 1024*10 + val chunkSize = 1024 * 10 val stream = sql"SELECT id, json FROM timeseries_contexts" - .query[(String, Json)].processWithChunkSize(chunkSize) + .query[(String, Json)] + .streamWithChunkSize(chunkSize) val insert = Update[(String, String)]("INSERT into tmp (id, new_id) VALUES (? , ?)") for { _ <- sql"CREATE TEMPORARY TABLE tmp (id VARCHAR(1000), new_id VARCHAR(1000))".update.run - _ <- - stream.chunkLimit(chunkSize).evalMap { oldContexts => - insert.updateMany(oldContexts.map { case (id, json) => - (id, json.as[TimeSeriesContext].right.get.toId) + _ <- stream + .chunkLimit(chunkSize) + .evalMap { oldContexts => + insert.updateMany(oldContexts.map { + case (id, json) => + (id, json.as[TimeSeriesContext].right.get.toId) }) - }.run + } + .compile + .drain _ <- sql"CREATE INDEX tmp_id ON tmp (id)".update.run _ <- sql"""UPDATE timeseries_contexts ctx JOIN tmp ON ctx.id = tmp.id SET ctx.id = tmp.new_id""".update.run @@ -209,7 +214,7 @@ private[timeseries] object Database { ORDER BY c.id DESC """ .query[(String, String, Instant, Instant, Json, ExecutionStatus, Int)] - .list + .to[List] .map(_.map { case (id, job, startTime, endTime, context, status, waitingSeconds) => ExecutionLog(id, job, Some(startTime), Some(endTime), context, status, waitingSeconds = waitingSeconds) diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala index 4b1c81a00..a7ca34811 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesApp.scala @@ -417,7 +417,7 @@ private[timeseries] trait TimeSeriesApp { self: TimeSeriesScheduler => case GET at url"/api/timeseries/backfills" => Database .queryBackfills() - .list + .to[List] .map(_.map { case (id, name, description, jobs, priority, start, end, created_at, status, created_by) => Json.obj( diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala index e863e36dc..b2e018863 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala @@ -438,7 +438,7 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] wit atomic { implicit txn => val incompleteBackfills = Database .queryBackfills(Some(sql"""status = 'RUNNING'""")) - .list + .to[List] .map(_.map { case (id, name, description, jobsIdsString, priority, start, end, _, status, createdBy) => val jobsIds = jobsIdsString.split(",")