Skip to content

Commit

Permalink
Merge branch 'master' into graph_text
Browse files Browse the repository at this point in the history
  • Loading branch information
jedirandy authored Feb 16, 2018
2 parents 0eb6359 + 1ae9921 commit 54ca2f5
Show file tree
Hide file tree
Showing 10 changed files with 365 additions and 74 deletions.
14 changes: 7 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
val devMode = settingKey[Boolean]("Some build optimization are applied in devMode.")
val writeClasspath = taskKey[File]("Write the project classpath to a file.")

val VERSION = "0.3.1"
val VERSION = "0.3.2"

lazy val commonSettings = Seq(
organization := "com.criteo.cuttle",
Expand Down Expand Up @@ -179,9 +179,9 @@ lazy val cuttle =
"com.criteo.lolhttp" %% "lolhttp",
"com.criteo.lolhttp" %% "loljson",
"com.criteo.lolhttp" %% "lolhtml"
).map(_ % "0.9.0"),
).map(_ % "0.9.2"),
libraryDependencies ++= Seq("core", "generic", "parser")
.map(module => "io.circe" %% s"circe-${module}" % "0.9.0-M3"),
.map(module => "io.circe" %% s"circe-${module}" % "0.9.1"),
libraryDependencies ++= Seq(
"de.sciss" %% "fingertree" % "1.5.2",
"org.scala-stm" %% "scala-stm" % "0.8",
Expand All @@ -194,12 +194,13 @@ lazy val cuttle =
libraryDependencies ++= Seq(
"org.tpolecat" %% "doobie-core",
"org.tpolecat" %% "doobie-hikari"
).map(_ % "0.5.0-M11"),
).map(_ % "0.5.0"),
libraryDependencies ++= Seq(
"mysql" % "mysql-connector-java" % "6.0.6"
),
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.1"
"org.scalatest" %% "scalatest" % "3.0.1",
"org.mockito" % "mockito-all" % "1.10.19"
).map(_ % "test"),
// Webpack
resourceGenerators in Compile += Def.task {
Expand All @@ -225,8 +226,7 @@ lazy val cuttle =
if (operatingSystem.indexOf("win") >= 0) {
val yarnJsPath = ("where yarn.js" !!).trim()
assert(s"""node "$yarnJsPath" install""" ! logger == 0, "yarn failed")
}
else {
} else {
assert("yarn install" ! logger == 0, "yarn failed")
}
logger.out("Running webpack...")
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/com/criteo/cuttle/Database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private[cuttle] object Database {
(for {
locks <- sql"""
SELECT locked_by, locked_at FROM locks WHERE TIMESTAMPDIFF(MINUTE, locked_at, NOW()) < 5;
""".query[(String, Instant)].list
""".query[(String, Instant)].to[List]
_ <- if (locks.isEmpty) {
sql"""
DELETE FROM locks;
Expand Down Expand Up @@ -260,7 +260,7 @@ private[cuttle] trait Queries {
fr"job",
NonEmptyList.fromListUnsafe(jobs.toList)) ++ orderBy ++ sql""" LIMIT $limit OFFSET $offset""")
.query[(String, String, Instant, Instant, Json, ExecutionStatus, Int)]
.list
.to[List]
.map(_.map {
case (id, job, startTime, endTime, context, status, waitingSeconds) =>
ExecutionLog(id, job, Some(startTime), Some(endTime), context, status, waitingSeconds = waitingSeconds)
Expand Down Expand Up @@ -317,7 +317,7 @@ private[cuttle] trait Queries {
where job=$jobId and end_time > DATE_SUB(CURDATE(), INTERVAL 30 DAY) order by start_time asc, end_time asc
"""
.query[(Instant, Instant, Int, Int, ExecutionStatus)]
.list
.to[List]
.map(_.map {
case (startTime, endTime, durationSeconds, waitingSeconds, status) =>
new ExecutionStat(startTime, endTime, durationSeconds, waitingSeconds, status)
Expand Down
192 changes: 145 additions & 47 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import scala.concurrent.duration._
import scala.concurrent.stm.Txn.ExternalDecider
import scala.concurrent.stm._
import scala.concurrent.{Future, Promise}
import scala.reflect.{ClassTag, classTag}
import scala.reflect.{classTag, ClassTag}
import scala.util.{Failure, Success, Try}

import cats.Eq
Expand Down Expand Up @@ -138,18 +138,19 @@ class CancellationListener private[cuttle] (execution: Execution[_], private[cut
* @param executionContext The scoped `scala.concurrent.ExecutionContext` for this execution.
*/
case class Execution[S <: Scheduling](
id: String,
job: Job[S],
context: S#Context,
streams: ExecutionStreams,
platforms: Seq[ExecutionPlatform],
projectName: String
)(implicit val executionContext: SideEffectExecutionContext) {
id: String,
job: Job[S],
context: S#Context,
streams: ExecutionStreams,
platforms: Seq[ExecutionPlatform],
projectName: String
)(implicit val executionContext: SideEffectExecutionContext) {

private var waitingSeconds = 0
private[cuttle] var startTime: Option[Instant] = None
private val cancelListeners = TSet.empty[CancellationListener]
private val cancelled = Ref(false)

/**
* An execution with forcedSuccess set to true will have its side effect return a successful Future instance even if the
* user code raised an exception or returned a failed Future instance.
Expand Down Expand Up @@ -208,14 +209,14 @@ case class Execution[S <: Scheduling](
hasBeenCancelled
}

def forceSuccess()(implicit user: User): Unit = {
def forceSuccess()(implicit user: User): Unit =
if (!atomic { implicit txn =>
forcedSuccess.getAndTransform(_ => true)
}) {
streams.debug(s"""Possible execution failures will be ignored and final execution status will be marked as success.
forcedSuccess.getAndTransform(_ => true)
}) {
streams.debug(
s"""Possible execution failures will be ignored and final execution status will be marked as success.
|Change initiated by user ${user.userId} at ${Instant.now().toString}.""".stripMargin)
}
}

private[cuttle] def toExecutionLog(status: ExecutionStatus, failing: Option[FailingJob] = None) =
ExecutionLog(
Expand Down Expand Up @@ -349,6 +350,11 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
// signals whether the instance is shutting down
private val isShuttingDown: Ref[Boolean] = Ref(false)
private val timer = new Timer("com.criteo.cuttle.Executor.timer")
private val executionsCounters: Ref[Counter[Long]] = Ref(
Counter[Long](
"cuttle_executions_total",
help = "The number of finished executions that we have in concrete states by job and by tag"
))

// executions that failed recently and are now running
private def retryingExecutions(filteredJobs: Set[String]): Seq[(Execution[S], FailingJob, ExecutionStatus)] =
Expand Down Expand Up @@ -384,7 +390,8 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
.filter({ case (_, s) => s == ExecutionStatus.ExecutionWaiting })
.foreach({ case (e, _) => e.updateWaitingTime(intervalSeconds) })
})
.run
.compile
.drain
.unsafeRunAsync(_ => ())
}

Expand Down Expand Up @@ -658,6 +665,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
atomic { implicit txn =>
runningState -= execution
recentFailures -= (execution.job -> execution.context)
updateFinishedExecutionCounters(execution, "success")
}
case Failure(e) =>
val stacktrace = new StringWriter()
Expand All @@ -666,6 +674,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
execution.streams.error(stacktrace.toString)
atomic {
implicit tx =>
updateFinishedExecutionCounters(execution, "failure")
// retain jobs in recent failures if last failure happened in [now - retryStrategy.retryWindow, now]
recentFailures.retain {
case (_, (retryExecution, failingJob)) =>
Expand Down Expand Up @@ -702,7 +711,17 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
}
})


private[cuttle] def updateFinishedExecutionCounters(execution: Execution[S], status: String): Unit =
atomic { implicit txn =>
val tagsLabel =
if (execution.job.tags.nonEmpty)
Set("tags" -> execution.job.tags.map(_.name).mkString(","))
else
Set.empty
executionsCounters() = executionsCounters().inc(
Set("type" -> status, "job_id" -> execution.job.id) ++ tagsLabel
)
}
private def run0(all: Seq[(Job[S], S#Context)]): Seq[(Execution[S], Future[Completed])] = {
sealed trait NewExecution
case object ToRunNow extends NewExecution
Expand Down Expand Up @@ -880,41 +899,120 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
private[cuttle] def healthCheck(): Try[Boolean] =
Try(queries.healthCheck.transact(xa).unsafeRunSync)

override def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric] = {
val ((running, waiting), paused, failing) = getStateAtomic(jobs)
private case class ExecutionInfo(jobId: String, tags: Set[String], status: ExecutionStatus)

/**
* @param jobs list of job IDs
* @param getStateAtomic Atomically get executor stats. Given a list of jobs ids, returns how much
* ((running, waiting), paused, failing) jobs are in concrete states
* @param runningExecutions executions which are either either running or waiting for a free thread to start
* @param pausedExecutions
*/
private[cuttle] def getMetrics(jobs: Set[String])(
getStateAtomic: Set[String] => ((Int, Int), Int, Int),
runningExecutions: Seq[(Execution[S], ExecutionStatus)],
pausedExecutions: Seq[Execution[S]],
failingExecutions: Seq[Execution[S]]
): Seq[Metric] = {
val ((runningCount, waitingCount), pausedCount, failingCount) = getStateAtomic(jobs)
val statMetrics = Seq(
Gauge("cuttle_scheduler_stat_count", "The number of jobs that we have in concrete states")
.labeled("type" -> "running", running)
.labeled("type" -> "waiting", waiting)
.labeled("type" -> "paused", paused)
.labeled("type" -> "failing", failing))
.labeled("type" -> "running", runningCount)
.labeled("type" -> "waiting", waitingCount)
.labeled("type" -> "paused", pausedCount)
.labeled("type" -> "failing", failingCount)
)

val (running: Seq[ExecutionInfo], waiting: Seq[ExecutionInfo]) = runningExecutions
.map {
case (exec, status) =>
ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), status)
}
.partition { execution =>
execution.status == ExecutionStatus.ExecutionRunning
}

val paused: Seq[ExecutionInfo] = pausedExecutions.map { exec =>
ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), ExecutionStatus.ExecutionPaused)
}

val failing: Seq[ExecutionInfo] = failingExecutions.map { exec =>
ExecutionInfo(exec.job.id, exec.job.tags.map(_.name), ExecutionStatus.ExecutionThrottled)
}

statMetrics ++
Seq(getMetricsByTag(running, waiting, paused, failing)) ++
Seq(getMetricsByJob(running, waiting, paused, failing)) ++
Seq(executionsCounters.single())
}

/**
* @param jobs the list of jobs ids
*/
override def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric] =
atomic { implicit txn =>
val (running, waiting) = runningExecutions
.flatMap {
case (exec, status) =>
exec.job.tags.map(_.name -> status)
getMetrics(jobs)(
getStateAtomic,
runningExecutions,
pausedState.values.flatMap(executionMap => executionMap.keys).toSeq,
allFailingExecutions
)
}

private def getMetricsByTag(running: Seq[ExecutionInfo],
waiting: Seq[ExecutionInfo],
paused: Seq[ExecutionInfo],
failing: Seq[ExecutionInfo]): Metrics.Metric =
(// Explode by tag
running
.flatMap { info =>
info.tags
}
.groupBy(identity)
.mapValues("running" -> _.size)
.toList ++
waiting
.flatMap { info =>
info.tags
}
.partition(_._2 == ExecutionStatus.ExecutionRunning)
statMetrics ++ Seq(
(
running.groupBy(_._1).mapValues("running" -> _.size).toList ++
waiting.groupBy(_._1).mapValues("waiting" -> _.size).toList ++
pausedState.values
.flatMap(_.keys.flatMap(_.job.tags.map(_.name)))
.groupBy(identity)
.mapValues("paused" -> _.size)
.toList ++
allFailingExecutions
.flatMap(_.job.tags.map(_.name))
.groupBy(identity)
.mapValues("failing" -> _.size)
.toList
).foldLeft(
Gauge("cuttle_scheduler_stat_count_by_tag", "The number of jobs that we have in concrete states by tag")
) {
case (gauge, (tag, (status, count))) =>
gauge.labeled(Set("tag" -> tag, "type" -> status), count)
})
.groupBy(identity)
.mapValues("waiting" -> _.size)
.toList ++
paused
.flatMap { info =>
info.tags
}
.groupBy(identity)
.mapValues("paused" -> _.size)
.toList ++
failing
.flatMap { info =>
info.tags
}
.groupBy(identity)
.mapValues("failing" -> _.size)
.toList).foldLeft(
Gauge("cuttle_scheduler_stat_count_by_tag", "The number of executions that we have in concrete states by tag")
) {
case (gauge, (tag, (status, count))) =>
gauge.labeled(Set("tag" -> tag, "type" -> status), count)
case (gauge, _) =>
gauge
}

private def getMetricsByJob(running: Seq[ExecutionInfo],
waiting: Seq[ExecutionInfo],
paused: Seq[ExecutionInfo],
failing: Seq[ExecutionInfo]): Metrics.Metric =
(
running.groupBy(_.jobId).mapValues("running" -> _.size).toList ++
waiting.groupBy(_.jobId).mapValues("waiting" -> _.size).toList ++
paused.groupBy(_.jobId).mapValues("paused" -> _.size).toList ++
failing.groupBy(_.jobId).mapValues("failing" -> _.size).toList
).foldLeft(
Gauge("cuttle_scheduler_stat_count_by_job", "The number of executions that we have in concrete states by job")
) {
case (gauge, (jobId, (status, count))) =>
gauge.labeled(Set("job" -> jobId, "type" -> status), count)
}
}
}
29 changes: 27 additions & 2 deletions core/src/main/scala/com/criteo/cuttle/Metrics.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package com.criteo.cuttle

import scala.math.Numeric

/** Expose cuttle metrics via the [[https://prometheus.io prometheus]] protocol. */
object Metrics {

object MetricType extends Enumeration {
type MetricType = Value
val gauge = Value
val counter = Value
}

import MetricType._
Expand All @@ -20,11 +23,11 @@ object Metrics {
def isDefined: Boolean = labels2Value.nonEmpty
}

/** A __Gauge__ metric epresents a single numerical value that can arbitrarily go up and down.
/** A __Gauge__ metric represents a single numerical value that can arbitrarily go up and down.
*
* @param name The metric name.
* @param help Metric description if provided.
* @param labels2Value The current value.s
* @param labels2Value map of (label name, label value) pairs to the current gauge values
*/
case class Gauge(name: String, help: String = "", labels2Value: Map[Set[(String, String)], AnyVal] = Map.empty)
extends Metric {
Expand All @@ -40,6 +43,28 @@ object Metrics {
def set(value: AnyVal): Gauge = copy(labels2Value = labels2Value + (Set.empty[(String, String)] -> value))
}

/**
* A __Counter__ contains a value that can only be incremented. Increments are not threadsafe.
*
* @param name The metric name.
* @param help Metric description if provided.
* @param labels2Value map of (label name, label value) pairs to counter values
*/
case class Counter[T](
name: String,
help: String = "",
labels2Value: Map[Set[(String, String)], AnyVal] = Map.empty
)(implicit number: Numeric[T])
extends Metric {

override val metricType: MetricType = counter

def inc(label: Set[(String, String)]): Counter[T] = {
val currentCount = labels2Value.getOrElse(label, number.zero).asInstanceOf[T]
copy(labels2Value = labels2Value + (label -> number.plus(currentCount, number.one).asInstanceOf[AnyVal]))
}
}

/** Components able to provide metrics. */
trait MetricProvider[S <: Scheduling] {
def getMetrics(jobs: Set[String], workflow: Workflow[S]): Seq[Metric]
Expand Down
Loading

0 comments on commit 54ca2f5

Please sign in to comment.