Skip to content

Commit

Permalink
Refactor metrics aggregation and add aggregated metrics by job (#221)
Browse files Browse the repository at this point in the history
* Refactor metrics aggregation and add aggregated metrics by job
for running, waiting, failing and paused executions

* Compute and report metrics on finished executions aggregated by tag and job type
  • Loading branch information
Masuzu authored Feb 14, 2018
1 parent 17488e1 commit 9dbd4f0
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 35 deletions.
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ lazy val cuttle =
"mysql" % "mysql-connector-java" % "6.0.6"
),
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.1"
"org.scalatest" %% "scalatest" % "3.0.1",
"org.mockito" % "mockito-all" % "1.10.19"
).map(_ % "test"),
// Webpack
resourceGenerators in Compile += Def.task {
Expand Down
144 changes: 112 additions & 32 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
// signals whether the instance is shutting down
private val isShuttingDown: Ref[Boolean] = Ref(false)
private val timer = new Timer("com.criteo.cuttle.Executor.timer")
private val executionsCounters: Ref[Counter[Long]] = Ref(Counter[Long](
"cuttle_executions_total",
help = "The number of finished executions that we have in concrete states by job and by tag"
))

// executions that failed recently and are now running
private def retryingExecutions(filteredJobs: Set[String]): Seq[(Execution[S], FailingJob, ExecutionStatus)] =
Expand Down Expand Up @@ -658,6 +662,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
atomic { implicit txn =>
runningState -= execution
recentFailures -= (execution.job -> execution.context)
updateFinishedExecutionCounters(execution, "success")
}
case Failure(e) =>
val stacktrace = new StringWriter()
Expand All @@ -666,6 +671,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
execution.streams.error(stacktrace.toString)
atomic {
implicit tx =>
updateFinishedExecutionCounters(execution, "failure")
// retain jobs in recent failures if last failure happened in [now - retryStrategy.retryWindow, now]
recentFailures.retain {
case (_, (retryExecution, failingJob)) =>
Expand Down Expand Up @@ -702,6 +708,16 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
}
})

private[cuttle] def updateFinishedExecutionCounters(execution: Execution[S], status: String): Unit =
atomic { implicit txn =>
val tagsLabel = if (execution.job.tags.nonEmpty)
Set("tags" -> execution.job.tags.map(_.name).mkString(","))
else
Set.empty
executionsCounters() = executionsCounters().inc(
Set("type" -> status, "job_id" -> execution.job.id) ++ tagsLabel
)
}

private def run0(all: Seq[(Job[S], S#Context)]): Seq[(Execution[S], Future[Completed])] = {
sealed trait NewExecution
Expand Down Expand Up @@ -880,41 +896,105 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
private[cuttle] def healthCheck(): Try[Boolean] =
Try(queries.healthCheck.transact(xa).unsafeRunSync)

override def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric] = {
val ((running, waiting), paused, failing) = getStateAtomic(jobs)

private case class ExecutionInfo(jobId: String, tags: Set[String], status: ExecutionStatus)

/**
* @param jobs list of job IDs
* @param getStateAtomic Atomically get executor stats. Given a list of jobs ids, returns how much
* ((running, waiting), paused, failing) jobs are in concrete states
* @param runningExecutions executions which are either either running or waiting for a free thread to start
* @param pausedExecutions
*/
private[cuttle] def getMetrics(jobs: Set[String])(
getStateAtomic: Set[String] => ((Int, Int), Int, Int),
runningExecutions: Seq[(Execution[S], ExecutionStatus)],
pausedExecutions: Seq[Execution[S]],
failingExecutions: Seq[Execution[S]]
): Seq[Metric] = {
val ((runningCount, waitingCount), pausedCount, failingCount) = getStateAtomic(jobs)
val statMetrics = Seq(
Gauge("cuttle_scheduler_stat_count", "The number of jobs that we have in concrete states")
.labeled("type" -> "running", running)
.labeled("type" -> "waiting", waiting)
.labeled("type" -> "paused", paused)
.labeled("type" -> "failing", failing))
.labeled("type" -> "running", runningCount)
.labeled("type" -> "waiting", waitingCount)
.labeled("type" -> "paused", pausedCount)
.labeled("type" -> "failing", failingCount)
)

val (running: Seq[ExecutionInfo], waiting: Seq[ExecutionInfo]) = runningExecutions
.map { case (exec, status) =>
ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), status)
}
.partition { execution =>
execution.status == ExecutionStatus.ExecutionRunning
}

val paused: Seq[ExecutionInfo] = pausedExecutions.map { exec =>
ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), ExecutionStatus.ExecutionPaused)
}

val failing: Seq[ExecutionInfo] = failingExecutions.map { exec =>
ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), ExecutionStatus.ExecutionThrottled)
}

statMetrics ++
Seq(getMetricsByTag(running, waiting, paused, failing)) ++
Seq(getMetricsByJob(running, waiting, paused, failing)) ++
Seq(executionsCounters.single())
}

/**
* @param jobs the list of jobs ids
*/
override def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric] =
atomic { implicit txn =>
val (running, waiting) = runningExecutions
.flatMap {
case (exec, status) =>
exec.job.tags.map(_.name -> status)
}
.partition(_._2 == ExecutionStatus.ExecutionRunning)
statMetrics ++ Seq(
(
running.groupBy(_._1).mapValues("running" -> _.size).toList ++
waiting.groupBy(_._1).mapValues("waiting" -> _.size).toList ++
pausedState.values
.flatMap(_.keys.flatMap(_.job.tags.map(_.name)))
.groupBy(identity)
.mapValues("paused" -> _.size)
.toList ++
allFailingExecutions
.flatMap(_.job.tags.map(_.name))
.groupBy(identity)
.mapValues("failing" -> _.size)
.toList
).foldLeft(
Gauge("cuttle_scheduler_stat_count_by_tag", "The number of jobs that we have in concrete states by tag")
) {
case (gauge, (tag, (status, count))) =>
gauge.labeled(Set("tag" -> tag, "type" -> status), count)
})
getMetrics(jobs)(
getStateAtomic,
runningExecutions,
pausedState.values.flatMap(executionMap => executionMap.keys).toSeq,
allFailingExecutions
)
}

private def getMetricsByTag(
running: Seq[ExecutionInfo],
waiting: Seq[ExecutionInfo],
paused: Seq[ExecutionInfo],
failing: Seq[ExecutionInfo]): Metrics.Metric = {
( // Explode by tag
running.flatMap { info => info.tags }
.groupBy(identity).mapValues("running" -> _.size).toList ++
waiting.flatMap { info => info.tags }
.groupBy(identity).mapValues("waiting" -> _.size).toList ++
paused.flatMap { info => info.tags }
.groupBy(identity).mapValues("paused" -> _.size).toList ++
failing.flatMap { info => info.tags }
.groupBy(identity).mapValues("failing" -> _.size).toList
).foldLeft(
Gauge("cuttle_scheduler_stat_count_by_tag", "The number of executions that we have in concrete states by tag")
) {
case (gauge, (tag, (status, count))) =>
gauge.labeled(Set("tag" -> tag, "type" -> status), count)
case (gauge, _) =>
gauge
}
}

private def getMetricsByJob(
running: Seq[ExecutionInfo],
waiting: Seq[ExecutionInfo],
paused: Seq[ExecutionInfo],
failing: Seq[ExecutionInfo]): Metrics.Metric = {
(
running.groupBy(_.jobId).mapValues("running" -> _.size).toList ++
waiting.groupBy(_.jobId).mapValues("waiting" -> _.size).toList ++
paused.groupBy(_.jobId).mapValues("paused" -> _.size).toList ++
failing.groupBy(_.jobId).mapValues("failing" -> _.size).toList
).foldLeft(
Gauge("cuttle_scheduler_stat_count_by_job", "The number of executions that we have in concrete states by job")
) {
case (gauge, (jobId, (status, count))) =>
gauge.labeled(Set("job" -> jobId, "type" -> status), count)
}
}
}
29 changes: 27 additions & 2 deletions core/src/main/scala/com/criteo/cuttle/Metrics.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package com.criteo.cuttle

import scala.concurrent.stm.{TMap, atomic}
import scala.math.Numeric

/** Expose cuttle metrics via the [[https://prometheus.io prometheus]] protocol. */
object Metrics {

object MetricType extends Enumeration {
type MetricType = Value
val gauge = Value
val counter = Value
}

import MetricType._
Expand All @@ -20,11 +24,11 @@ object Metrics {
def isDefined: Boolean = labels2Value.nonEmpty
}

/** A __Gauge__ metric epresents a single numerical value that can arbitrarily go up and down.
/** A __Gauge__ metric represents a single numerical value that can arbitrarily go up and down.
*
* @param name The metric name.
* @param help Metric description if provided.
* @param labels2Value The current value.s
* @param labels2Value map of (label name, label value) pairs to the current gauge values
*/
case class Gauge(name: String, help: String = "", labels2Value: Map[Set[(String, String)], AnyVal] = Map.empty)
extends Metric {
Expand All @@ -40,6 +44,27 @@ object Metrics {
def set(value: AnyVal): Gauge = copy(labels2Value = labels2Value + (Set.empty[(String, String)] -> value))
}

/**
* A __Counter__ contains a value that can only be incremented. Increments are not threadsafe.
*
* @param name The metric name.
* @param help Metric description if provided.
* @param labels2Value map of (label name, label value) pairs to counter values
*/
case class Counter[T](
name: String,
help: String = "",
labels2Value: Map[Set[(String, String)], AnyVal] = Map.empty
)(implicit number: Numeric[T]) extends Metric {

override val metricType: MetricType = counter

def inc(label: Set[(String, String)]): Counter[T] = {
val currentCount = labels2Value.getOrElse(label, number.zero).asInstanceOf[T]
copy(labels2Value = labels2Value + (label -> number.plus(currentCount, number.one).asInstanceOf[AnyVal]))
}
}

/** Components able to provide metrics. */
trait MetricProvider[S <: Scheduling] {
def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric]
Expand Down
136 changes: 136 additions & 0 deletions core/src/test/scala/com/criteo/cuttle/ExecutorSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package com.criteo.cuttle

import java.sql.{Connection, ResultSet}

import scala.concurrent.Future

import cats.effect.IO
import com.mysql.cj.jdbc.PreparedStatement
import doobie.util.transactor.Transactor
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.FunSuite

import com.criteo.cuttle.ExecutionContexts.Implicits.sideEffectExecutionContext
import com.criteo.cuttle.ExecutionContexts._
import com.criteo.cuttle.Metrics.Prometheus

class ExecutorSpec extends FunSuite with TestScheduling {
test("Executor should return metrics aggregated by job and tag") {
val connection: Connection = {
val mockConnection = mock(classOf[Connection])
val statement = mock(classOf[PreparedStatement])
val resultSet = mock(classOf[ResultSet])
when(mockConnection.prepareStatement(any(classOf[String]))).thenReturn(statement)
when(statement.executeQuery()).thenReturn(resultSet)
mockConnection
}

val testExecutor = new Executor[TestScheduling](
Seq.empty,
xa = Transactor.fromConnection[IO](connection).copy(strategy0 = doobie.util.transactor.Strategy.void),
logger,
"test_project"
)(RetryStrategy.ExponentialBackoffRetryStrategy)

testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(fooJob), "success")
testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(fooJob), "success")
testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(untaggedJob), "success")
testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(fooBarJob), "success")
testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(untaggedJob), "failure")
testExecutor.updateFinishedExecutionCounters(buildExecutionForJob(fooBarJob), "failure")

val metrics = Prometheus.serialize(
testExecutor.getMetrics(Set("jobA"))(
getStateAtomic = _ => {
((5, 1), 3, 2)
},
runningExecutions = Seq(
buildExecutionForJob(fooJob) -> ExecutionStatus.ExecutionRunning,
buildExecutionForJob(fooJob) -> ExecutionStatus.ExecutionRunning,
buildExecutionForJob(fooJob) -> ExecutionStatus.ExecutionWaiting,
buildExecutionForJob(fooBarJob) -> ExecutionStatus.ExecutionRunning,
buildExecutionForJob(untaggedJob) -> ExecutionStatus.ExecutionRunning,
buildExecutionForJob(untaggedJob) -> ExecutionStatus.ExecutionWaiting
),
pausedExecutions = Seq(
buildExecutionForJob(fooJob),
buildExecutionForJob(fooBarJob),
buildExecutionForJob(untaggedJob)
),
failingExecutions = Seq(
buildExecutionForJob(fooBarJob),
buildExecutionForJob(fooBarJob),
buildExecutionForJob(untaggedJob)
)
)
)

println(metrics)

val expectedMetrics =
"""# HELP cuttle_scheduler_stat_count The number of jobs that we have in concrete states
|# TYPE cuttle_scheduler_stat_count gauge
|cuttle_scheduler_stat_count {type="running"} 5
|cuttle_scheduler_stat_count {type="waiting"} 1
|cuttle_scheduler_stat_count {type="paused"} 3
|cuttle_scheduler_stat_count {type="failing"} 2
|# HELP cuttle_scheduler_stat_count_by_tag The number of executions that we have in concrete states by tag
|# TYPE cuttle_scheduler_stat_count_by_tag gauge
|cuttle_scheduler_stat_count_by_tag {tag="bar", type="paused"} 1
|cuttle_scheduler_stat_count_by_tag {tag="foo", type="waiting"} 1
|cuttle_scheduler_stat_count_by_tag {tag="foo", type="running"} 3
|cuttle_scheduler_stat_count_by_tag {tag="foo", type="paused"} 2
|cuttle_scheduler_stat_count_by_tag {tag="bar", type="failing"} 2
|cuttle_scheduler_stat_count_by_tag {tag="foo", type="failing"} 2
|cuttle_scheduler_stat_count_by_tag {tag="bar", type="running"} 1
|# HELP cuttle_scheduler_stat_count_by_job The number of executions that we have in concrete states by job
|# TYPE cuttle_scheduler_stat_count_by_job gauge
|cuttle_scheduler_stat_count_by_job {job="untagged_job", type="waiting"} 1
|cuttle_scheduler_stat_count_by_job {job="foo_bar_job", type="running"} 1
|cuttle_scheduler_stat_count_by_job {job="untagged_job", type="running"} 1
|cuttle_scheduler_stat_count_by_job {job="foo_job", type="waiting"} 1
|cuttle_scheduler_stat_count_by_job {job="foo_job", type="paused"} 1
|cuttle_scheduler_stat_count_by_job {job="untagged_job", type="failing"} 1
|cuttle_scheduler_stat_count_by_job {job="foo_job", type="running"} 2
|cuttle_scheduler_stat_count_by_job {job="foo_bar_job", type="paused"} 1
|cuttle_scheduler_stat_count_by_job {job="foo_bar_job", type="failing"} 2
|cuttle_scheduler_stat_count_by_job {job="untagged_job", type="paused"} 1
|# HELP cuttle_executions_total The number of finished executions that we have in concrete states by job and by tag
|# TYPE cuttle_executions_total counter
|cuttle_executions_total {type="success", job_id="foo_bar_job", tags="foo,bar"} 1
|cuttle_executions_total {type="success", job_id="foo_job", tags="foo"} 2
|cuttle_executions_total {type="failure", job_id="foo_bar_job", tags="foo,bar"} 1
|cuttle_executions_total {type="success", job_id="untagged_job"} 1
|cuttle_executions_total {type="failure", job_id="untagged_job"} 1
|""".stripMargin

assert(metrics == expectedMetrics)
}

private def buildJob(jobId: String, tags: Set[Tag] = Set.empty): Job[TestScheduling] =
Job(jobId, TestScheduling(), jobId, tags = tags) {
implicit execution => Future { Completed }(execution.executionContext)
}

private def buildExecutionForJob(job: Job[TestScheduling]): Execution[TestScheduling] =
Execution[TestScheduling](
id = java.util.UUID.randomUUID.toString,
job = job,
context = TestContext(),
streams = new ExecutionStreams {
override private[cuttle] def writeln(str: CharSequence): Unit = ???
},
platforms = Seq.empty,
"foo-project"
)

private val fooTag = Tag("foo")
private val barTag = Tag("bar")

private val fooJob: Job[TestScheduling] = buildJob("foo_job", Set(fooTag))

private val fooBarJob: Job[TestScheduling] = buildJob("foo_bar_job", Set(fooTag, barTag))

private val untaggedJob :Job[TestScheduling] = buildJob("untagged_job")
}

0 comments on commit 9dbd4f0

Please sign in to comment.