diff --git a/build.sbt b/build.sbt index 32619230b..14526e0ff 100644 --- a/build.sbt +++ b/build.sbt @@ -199,7 +199,8 @@ lazy val cuttle = "mysql" % "mysql-connector-java" % "6.0.6" ), libraryDependencies ++= Seq( - "org.scalatest" %% "scalatest" % "3.0.1" + "org.scalatest" %% "scalatest" % "3.0.1", + "org.mockito" % "mockito-all" % "1.10.19" ).map(_ % "test"), // Webpack resourceGenerators in Compile += Def.task { diff --git a/core/src/main/scala/com/criteo/cuttle/Executor.scala b/core/src/main/scala/com/criteo/cuttle/Executor.scala index a4d3afc0c..8cdb387a6 100644 --- a/core/src/main/scala/com/criteo/cuttle/Executor.scala +++ b/core/src/main/scala/com/criteo/cuttle/Executor.scala @@ -349,6 +349,10 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla // signals whether the instance is shutting down private val isShuttingDown: Ref[Boolean] = Ref(false) private val timer = new Timer("com.criteo.cuttle.Executor.timer") + private val executionsCounters: Ref[Counter[Long]] = Ref(Counter[Long]( + "cuttle_executions_total", + help = "The number of finished executions that we have in concrete states by job and by tag" + )) // executions that failed recently and are now running private def retryingExecutions(filteredJobs: Set[String]): Seq[(Execution[S], FailingJob, ExecutionStatus)] = @@ -658,6 +662,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla atomic { implicit txn => runningState -= execution recentFailures -= (execution.job -> execution.context) + updateFinishedExecutionCounters(execution, "success") } case Failure(e) => val stacktrace = new StringWriter() @@ -666,6 +671,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla execution.streams.error(stacktrace.toString) atomic { implicit tx => + updateFinishedExecutionCounters(execution, "failure") // retain jobs in recent failures if last failure happened in [now - retryStrategy.retryWindow, now] recentFailures.retain { case (_, (retryExecution, failingJob)) => @@ -702,6 +708,16 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla } }) + private[cuttle] def updateFinishedExecutionCounters(execution: Execution[S], status: String): Unit = + atomic { implicit txn => + val tagsLabel = if (execution.job.tags.nonEmpty) + Set("tags" -> execution.job.tags.map(_.name).mkString(",")) + else + Set.empty + executionsCounters() = executionsCounters().inc( + Set("type" -> status, "job_id" -> execution.job.id) ++ tagsLabel + ) + } private def run0(all: Seq[(Job[S], S#Context)]): Seq[(Execution[S], Future[Completed])] = { sealed trait NewExecution @@ -880,41 +896,105 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla private[cuttle] def healthCheck(): Try[Boolean] = Try(queries.healthCheck.transact(xa).unsafeRunSync) - override def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric] = { - val ((running, waiting), paused, failing) = getStateAtomic(jobs) + + private case class ExecutionInfo(jobId: String, tags: Set[String], status: ExecutionStatus) + + /** + * @param jobs list of job IDs + * @param getStateAtomic Atomically get executor stats. Given a list of jobs ids, returns how much + * ((running, waiting), paused, failing) jobs are in concrete states + * @param runningExecutions executions which are either either running or waiting for a free thread to start + * @param pausedExecutions + */ + private[cuttle] def getMetrics(jobs: Set[String])( + getStateAtomic: Set[String] => ((Int, Int), Int, Int), + runningExecutions: Seq[(Execution[S], ExecutionStatus)], + pausedExecutions: Seq[Execution[S]], + failingExecutions: Seq[Execution[S]] + ): Seq[Metric] = { + val ((runningCount, waitingCount), pausedCount, failingCount) = getStateAtomic(jobs) val statMetrics = Seq( Gauge("cuttle_scheduler_stat_count", "The number of jobs that we have in concrete states") - .labeled("type" -> "running", running) - .labeled("type" -> "waiting", waiting) - .labeled("type" -> "paused", paused) - .labeled("type" -> "failing", failing)) + .labeled("type" -> "running", runningCount) + .labeled("type" -> "waiting", waitingCount) + .labeled("type" -> "paused", pausedCount) + .labeled("type" -> "failing", failingCount) + ) + + val (running: Seq[ExecutionInfo], waiting: Seq[ExecutionInfo]) = runningExecutions + .map { case (exec, status) => + ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), status) + } + .partition { execution => + execution.status == ExecutionStatus.ExecutionRunning + } + + val paused: Seq[ExecutionInfo] = pausedExecutions.map { exec => + ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), ExecutionStatus.ExecutionPaused) + } + + val failing: Seq[ExecutionInfo] = failingExecutions.map { exec => + ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), ExecutionStatus.ExecutionThrottled) + } + + statMetrics ++ + Seq(getMetricsByTag(running, waiting, paused, failing)) ++ + Seq(getMetricsByJob(running, waiting, paused, failing)) ++ + Seq(executionsCounters.single()) + } + + /** + * @param jobs the list of jobs ids + */ + override def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric] = atomic { implicit txn => - val (running, waiting) = runningExecutions - .flatMap { - case (exec, status) => - exec.job.tags.map(_.name -> status) - } - .partition(_._2 == ExecutionStatus.ExecutionRunning) - statMetrics ++ Seq( - ( - running.groupBy(_._1).mapValues("running" -> _.size).toList ++ - waiting.groupBy(_._1).mapValues("waiting" -> _.size).toList ++ - pausedState.values - .flatMap(_.keys.flatMap(_.job.tags.map(_.name))) - .groupBy(identity) - .mapValues("paused" -> _.size) - .toList ++ - allFailingExecutions - .flatMap(_.job.tags.map(_.name)) - .groupBy(identity) - .mapValues("failing" -> _.size) - .toList - ).foldLeft( - Gauge("cuttle_scheduler_stat_count_by_tag", "The number of jobs that we have in concrete states by tag") - ) { - case (gauge, (tag, (status, count))) => - gauge.labeled(Set("tag" -> tag, "type" -> status), count) - }) + getMetrics(jobs)( + getStateAtomic, + runningExecutions, + pausedState.values.flatMap(executionMap => executionMap.keys).toSeq, + allFailingExecutions + ) + } + + private def getMetricsByTag( + running: Seq[ExecutionInfo], + waiting: Seq[ExecutionInfo], + paused: Seq[ExecutionInfo], + failing: Seq[ExecutionInfo]): Metrics.Metric = { + ( // Explode by tag + running.flatMap { info => info.tags } + .groupBy(identity).mapValues("running" -> _.size).toList ++ + waiting.flatMap { info => info.tags } + .groupBy(identity).mapValues("waiting" -> _.size).toList ++ + paused.flatMap { info => info.tags } + .groupBy(identity).mapValues("paused" -> _.size).toList ++ + failing.flatMap { info => info.tags } + .groupBy(identity).mapValues("failing" -> _.size).toList + ).foldLeft( + Gauge("cuttle_scheduler_stat_count_by_tag", "The number of executions that we have in concrete states by tag") + ) { + case (gauge, (tag, (status, count))) => + gauge.labeled(Set("tag" -> tag, "type" -> status), count) + case (gauge, _) => + gauge + } + } + + private def getMetricsByJob( + running: Seq[ExecutionInfo], + waiting: Seq[ExecutionInfo], + paused: Seq[ExecutionInfo], + failing: Seq[ExecutionInfo]): Metrics.Metric = { + ( + running.groupBy(_.jobId).mapValues("running" -> _.size).toList ++ + waiting.groupBy(_.jobId).mapValues("waiting" -> _.size).toList ++ + paused.groupBy(_.jobId).mapValues("paused" -> _.size).toList ++ + failing.groupBy(_.jobId).mapValues("failing" -> _.size).toList + ).foldLeft( + Gauge("cuttle_scheduler_stat_count_by_job", "The number of executions that we have in concrete states by job") + ) { + case (gauge, (jobId, (status, count))) => + gauge.labeled(Set("job" -> jobId, "type" -> status), count) } } } diff --git a/core/src/main/scala/com/criteo/cuttle/Metrics.scala b/core/src/main/scala/com/criteo/cuttle/Metrics.scala index 1907368d9..a74fad099 100644 --- a/core/src/main/scala/com/criteo/cuttle/Metrics.scala +++ b/core/src/main/scala/com/criteo/cuttle/Metrics.scala @@ -1,11 +1,15 @@ package com.criteo.cuttle +import scala.concurrent.stm.{TMap, atomic} +import scala.math.Numeric + /** Expose cuttle metrics via the [[https://prometheus.io prometheus]] protocol. */ object Metrics { object MetricType extends Enumeration { type MetricType = Value val gauge = Value + val counter = Value } import MetricType._ @@ -20,11 +24,11 @@ object Metrics { def isDefined: Boolean = labels2Value.nonEmpty } - /** A __Gauge__ metric epresents a single numerical value that can arbitrarily go up and down. + /** A __Gauge__ metric represents a single numerical value that can arbitrarily go up and down. * * @param name The metric name. * @param help Metric description if provided. - * @param labels2Value The current value.s + * @param labels2Value map of (label name, label value) pairs to the current gauge values */ case class Gauge(name: String, help: String = "", labels2Value: Map[Set[(String, String)], AnyVal] = Map.empty) extends Metric { @@ -40,6 +44,27 @@ object Metrics { def set(value: AnyVal): Gauge = copy(labels2Value = labels2Value + (Set.empty[(String, String)] -> value)) } + /** + * A __Counter__ contains a value that can only be incremented. Increments are not threadsafe. + * + * @param name The metric name. + * @param help Metric description if provided. + * @param labels2Value map of (label name, label value) pairs to counter values + */ + case class Counter[T]( + name: String, + help: String = "", + labels2Value: Map[Set[(String, String)], AnyVal] = Map.empty + )(implicit number: Numeric[T]) extends Metric { + + override val metricType: MetricType = counter + + def inc(label: Set[(String, String)]): Counter[T] = { + val currentCount = labels2Value.getOrElse(label, number.zero).asInstanceOf[T] + copy(labels2Value = labels2Value + (label -> number.plus(currentCount, number.one).asInstanceOf[AnyVal])) + } + } + /** Components able to provide metrics. */ trait MetricProvider[S <: Scheduling] { def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric] diff --git a/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala b/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala new file mode 100644 index 000000000..81d4c8a52 --- /dev/null +++ b/core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala @@ -0,0 +1,136 @@ +package com.criteo.cuttle + +import java.sql.{Connection, ResultSet} + +import scala.concurrent.Future + +import cats.effect.IO +import com.mysql.cj.jdbc.PreparedStatement +import doobie.util.transactor.Transactor +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.FunSuite + +import com.criteo.cuttle.ExecutionContexts.Implicits.sideEffectExecutionContext +import com.criteo.cuttle.ExecutionContexts._ +import com.criteo.cuttle.Metrics.Prometheus + +class ExecutorSpec extends FunSuite with TestScheduling { + test("Executor should return metrics aggregated by job and tag") { + val connection: Connection = { + val mockConnection = mock(classOf[Connection]) + val statement = mock(classOf[PreparedStatement]) + val resultSet = mock(classOf[ResultSet]) + when(mockConnection.prepareStatement(any(classOf[String]))).thenReturn(statement) + when(statement.executeQuery()).thenReturn(resultSet) + mockConnection + } + + val testExecutor = new Executor[TestScheduling]( + Seq.empty, + xa = Transactor.fromConnection[IO](connection).copy(strategy0 = doobie.util.transactor.Strategy.void), + logger, + "test_project" + )(RetryStrategy.ExponentialBackoffRetryStrategy) + + testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(fooJob), "success") + testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(fooJob), "success") + testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(untaggedJob), "success") + testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(fooBarJob), "success") + testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(untaggedJob), "failure") + testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(fooBarJob), "failure") + + val metrics = Prometheus.serialize( + testExecutor.getMetrics(Set("jobA"))( + getStateAtomic = _ => { + ((5, 1), 3, 2) + }, + runningExecutions = Seq( + buildExecutionForJob(fooJob) -> ExecutionStatus.ExecutionRunning, + buildExecutionForJob(fooJob) -> ExecutionStatus.ExecutionRunning, + buildExecutionForJob(fooJob) -> ExecutionStatus.ExecutionWaiting, + buildExecutionForJob(fooBarJob) -> ExecutionStatus.ExecutionRunning, + buildExecutionForJob(untaggedJob) -> ExecutionStatus.ExecutionRunning, + buildExecutionForJob(untaggedJob) -> ExecutionStatus.ExecutionWaiting + ), + pausedExecutions = Seq( + buildExecutionForJob(fooJob), + buildExecutionForJob(fooBarJob), + buildExecutionForJob(untaggedJob) + ), + failingExecutions = Seq( + buildExecutionForJob(fooBarJob), + buildExecutionForJob(fooBarJob), + buildExecutionForJob(untaggedJob) + ) + ) + ) + + println(metrics) + + val expectedMetrics = + """# HELP cuttle_scheduler_stat_count The number of jobs that we have in concrete states + |# TYPE cuttle_scheduler_stat_count gauge + |cuttle_scheduler_stat_count {type="running"} 5 + |cuttle_scheduler_stat_count {type="waiting"} 1 + |cuttle_scheduler_stat_count {type="paused"} 3 + |cuttle_scheduler_stat_count {type="failing"} 2 + |# HELP cuttle_scheduler_stat_count_by_tag The number of executions that we have in concrete states by tag + |# TYPE cuttle_scheduler_stat_count_by_tag gauge + |cuttle_scheduler_stat_count_by_tag {tag="bar", type="paused"} 1 + |cuttle_scheduler_stat_count_by_tag {tag="foo", type="waiting"} 1 + |cuttle_scheduler_stat_count_by_tag {tag="foo", type="running"} 3 + |cuttle_scheduler_stat_count_by_tag {tag="foo", type="paused"} 2 + |cuttle_scheduler_stat_count_by_tag {tag="bar", type="failing"} 2 + |cuttle_scheduler_stat_count_by_tag {tag="foo", type="failing"} 2 + |cuttle_scheduler_stat_count_by_tag {tag="bar", type="running"} 1 + |# HELP cuttle_scheduler_stat_count_by_job The number of executions that we have in concrete states by job + |# TYPE cuttle_scheduler_stat_count_by_job gauge + |cuttle_scheduler_stat_count_by_job {job="untagged_job", type="waiting"} 1 + |cuttle_scheduler_stat_count_by_job {job="foo_bar_job", type="running"} 1 + |cuttle_scheduler_stat_count_by_job {job="untagged_job", type="running"} 1 + |cuttle_scheduler_stat_count_by_job {job="foo_job", type="waiting"} 1 + |cuttle_scheduler_stat_count_by_job {job="foo_job", type="paused"} 1 + |cuttle_scheduler_stat_count_by_job {job="untagged_job", type="failing"} 1 + |cuttle_scheduler_stat_count_by_job {job="foo_job", type="running"} 2 + |cuttle_scheduler_stat_count_by_job {job="foo_bar_job", type="paused"} 1 + |cuttle_scheduler_stat_count_by_job {job="foo_bar_job", type="failing"} 2 + |cuttle_scheduler_stat_count_by_job {job="untagged_job", type="paused"} 1 + |# HELP cuttle_executions_total The number of finished executions that we have in concrete states by job and by tag + |# TYPE cuttle_executions_total counter + |cuttle_executions_total {type="success", job_id="foo_bar_job", tags="foo,bar"} 1 + |cuttle_executions_total {type="success", job_id="foo_job", tags="foo"} 2 + |cuttle_executions_total {type="failure", job_id="foo_bar_job", tags="foo,bar"} 1 + |cuttle_executions_total {type="success", job_id="untagged_job"} 1 + |cuttle_executions_total {type="failure", job_id="untagged_job"} 1 + |""".stripMargin + + assert(metrics == expectedMetrics) + } + + private def buildJob(jobId: String, tags: Set[Tag] = Set.empty): Job[TestScheduling] = + Job(jobId, TestScheduling(), jobId, tags = tags) { + implicit execution => Future { Completed }(execution.executionContext) + } + + private def buildExecutionForJob(job: Job[TestScheduling]): Execution[TestScheduling] = + Execution[TestScheduling]( + id = java.util.UUID.randomUUID.toString, + job = job, + context = TestContext(), + streams = new ExecutionStreams { + override private[cuttle] def writeln(str: CharSequence): Unit = ??? + }, + platforms = Seq.empty, + "foo-project" + ) + + private val fooTag = Tag("foo") + private val barTag = Tag("bar") + + private val fooJob: Job[TestScheduling] = buildJob("foo_job", Set(fooTag)) + + private val fooBarJob: Job[TestScheduling] = buildJob("foo_bar_job", Set(fooTag, barTag)) + + private val untaggedJob :Job[TestScheduling] = buildJob("untagged_job") +}