Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the endpoint /api/metrics for Prometheus. #175

Merged
merged 14 commits into from
Oct 2, 2017
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions core/src/main/scala/com/criteo/cuttle/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

import authentication._
import logging._
import ExecutionStatus._

private[cuttle] object App {
Expand Down Expand Up @@ -136,10 +135,13 @@ private[cuttle] object App {
}
}

private[cuttle] case class App[S <: Scheduling](project: CuttleProject[S], executor: Executor[S], xa: XA, logger: Logger) {
private[cuttle] case class App[S <: Scheduling](project: CuttleProject[S], executor: Executor[S], xa: XA,
logger: Logger) {
import App._
import project.{scheduler, workflow}

private def allJobs = workflow.vertices.map(_.id)

val publicApi: PartialService = {

case GET at url"/api/status" => {
Expand All @@ -157,40 +159,42 @@ private[cuttle] case class App[S <: Scheduling](project: CuttleProject[S], execu
case GET at url"/api/statistics?events=$events&jobs=$jobs" =>
val filteredJobs = Try(jobs.split(",").toSeq.filter(_.nonEmpty)).toOption
.filter(_.nonEmpty)
.getOrElse(workflow.vertices.map(_.id))
.getOrElse(allJobs)
.toSet
def getStats() =
Some(
(executor.runningExecutionsSizes(filteredJobs),
executor.pausedExecutionsSize(filteredJobs),
executor.failingExecutionsSize(filteredJobs)))
def asJson(x: ((Int, Int), Int, Int)) = x match {
case ((running, waiting), paused, failing) =>
Json.obj(
"running" -> running.asJson,
"waiting" -> waiting.asJson,
"paused" -> paused.asJson,
"failing" -> failing.asJson,
"scheduler" -> scheduler.getStats(filteredJobs)
)

def getStats() = Try(
executor.getStats(filteredJobs) -> scheduler.getStats(filteredJobs)
).toOption

def asJson(x: (Json, Json)) = x match {
case (executorStats, schedulerStats) =>
executorStats.deepMerge(Json.obj(
"scheduler" -> schedulerStats
))
}

events match {
case "true" | "yes" =>
sse(getStats _, asJson)
case _ =>
Ok(asJson(getStats().get))
getStats().map(stat => Ok(asJson(stat))).getOrElse(InternalServerError)
}

case GET at url"/api/statistics/$jobName" =>
Ok(executor.jobStatsForLastThirtyDays(jobName).asJson)

case GET at "/metrics" => {
val metrics = executor.getMetrics(allJobs) ++ scheduler.getMetrics(allJobs)
Ok(Prometheus.format(metrics))
}

case GET at url"/api/executions/status/$kind?limit=$l&offset=$o&events=$events&sort=$sort&order=$a&jobs=$jobs" =>
val limit = Try(l.toInt).toOption.getOrElse(25)
val offset = Try(o.toInt).toOption.getOrElse(0)
val asc = (a.toLowerCase == "asc")
val filteredJobs = Try(jobs.split(",").toSeq.filter(_.nonEmpty)).toOption
.filter(_.nonEmpty)
.getOrElse(workflow.vertices.map(_.id))
.getOrElse(allJobs)
.toSet
def getExecutions() = kind match {
case "started" =>
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/scala/com/criteo/cuttle/Cuttle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.criteo.cuttle

import lol.http._
import scala.concurrent.ExecutionContext.Implicits.global
import com.criteo.cuttle.logging.Logger

class CuttleProject[S <: Scheduling] private[cuttle] (
val name: String,
Expand All @@ -19,7 +18,7 @@ class CuttleProject[S <: Scheduling] private[cuttle] (
httpPort: Int = 8888,
databaseConfig: DatabaseConfig = DatabaseConfig.fromEnv,
retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy
) = {
): Unit = {
val xa = Database.connect(databaseConfig)
val executor = new Executor[S](platforms, xa, logger = logger)(retryStrategy)

Expand All @@ -37,8 +36,8 @@ object CuttleProject {
version: String = "",
description: String = "",
env: (String, Boolean) = ("", false),
authenticator: Authenticator = GuestAuth)
(workflow: Workflow[S])(implicit scheduler: Scheduler[S], logger: Logger): CuttleProject[S] =
authenticator: Authenticator = GuestAuth)(
workflow: Workflow[S])(implicit scheduler: Scheduler[S], logger: Logger): CuttleProject[S] =
new CuttleProject(name, version, description, env, workflow, scheduler, authenticator, logger)

private[CuttleProject] def defaultPlatforms: Seq[ExecutionPlatform] = {
Expand Down
38 changes: 30 additions & 8 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import lol.http.PartialService
import doobie.imports._
import cats.implicits._
import io.circe._
import io.circe.syntax._

import authentication._
import logging._

trait RetryStrategy {
def apply[S <: Scheduling](job: Job[S], context: S#Context, previouslyFailing: List[String]): Duration
Expand Down Expand Up @@ -207,7 +207,7 @@ private[cuttle] object ExecutionPlatform {
class Executor[S <: Scheduling] private[cuttle] (
val platforms: Seq[ExecutionPlatform],
xa: XA,
logger: Logger)(implicit retryStrategy: RetryStrategy) {
logger: Logger)(implicit retryStrategy: RetryStrategy) extends MetricProvider {

val queries = new Queries {
val appLogger = logger
Expand Down Expand Up @@ -274,9 +274,7 @@ class Executor[S <: Scheduling] private[cuttle] (
flagWaitingExecutions(runningState.single.keys.toSeq)

private[cuttle] def runningExecutionsSizeTotal(filteredJobs: Set[String]): Int =
runningState.single.keys
.filter(e => filteredJobs.contains(e.job.id))
.size
runningState.single.keys.count(e => filteredJobs.contains(e.job.id))

private[cuttle] def runningExecutionsSizes(filteredJobs: Set[String]): (Int, Int) = {
val statuses =
Expand Down Expand Up @@ -378,6 +376,32 @@ class Executor[S <: Scheduling] private[cuttle] (
limit: Int): Seq[ExecutionLog] =
queries.getExecutionLog(queryContexts, jobs, sort, asc, offset, limit).transact(xa).unsafePerformIO

private[cuttle] def getStats(jobs: Set[String]): Json = {
val (running, waiting) = runningExecutionsSizes(jobs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to point out that provided figures can be inconsistent because stm accesses are non enclosed in a single
transaction. Not use this is critical though

val paused = pausedExecutionsSize(jobs)
val failing = failingExecutionsSize(jobs)
val finished = archivedExecutionsSize(jobs)
Map(
"running" -> running,
"waiting" -> waiting,
"paused" -> paused,
"failing" -> failing,
"finished" -> finished
).asJson
}

override private[cuttle] def getMetrics(jobs: Set[String]): Seq[Metric] = {
val (running, waiting) = runningExecutionsSizes(jobs)

Seq(
Gauge("scheduler_stat_count", running, Seq("type" -> "running")),
Gauge("scheduler_stat_count", waiting, Seq("type" -> "waiting")),
Gauge("scheduler_stat_count", pausedExecutionsSize(jobs), Seq("type" -> "paused")),
Gauge("scheduler_stat_count", failingExecutionsSize(jobs), Seq("type" -> "failing")),
Gauge("scheduler_stat_count", archivedExecutionsSize(jobs), Seq("type" -> "finished"))
)
}

private[cuttle] def pausedJobs: Seq[String] =
pausedState.single.keys.toSeq

Expand All @@ -391,9 +415,7 @@ class Executor[S <: Scheduling] private[cuttle] (
private[cuttle] def getExecution(queryContexts: Fragment, executionId: String): Option[ExecutionLog] =
atomic { implicit tx =>
val predicate = (e: Execution[S]) => e.id == executionId
pausedState.values
.map(_.keys)
.flatten
pausedState.values.flatMap(_.keys)
.find(predicate)
.map(_.toExecutionLog(ExecutionPaused))
.orElse(throttledState.keys
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.criteo.cuttle.logging
package com.criteo.cuttle

trait Logger {
def debug(message: => String): Unit
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/scala/com/criteo/cuttle/Metrics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.criteo.cuttle
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we scope all this into a Metrics object, so in the public API it's all coherent?


sealed trait Metric {
def toString: String
}

case class Gauge(name: String, value: Long, tags: Seq[(String, String)] = Seq.empty) extends Metric

trait MetricProvider {
private[cuttle] def getMetrics(jobs: Set[String]): Seq[Metric]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't be private[cuttle] if it is exposed as part of the Scheduler API.

}

object Prometheus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one should be private[cuttle] as it is an implementation detail.

def format(metrics: Seq[Metric]): String = {
val prometheusMetrics = metrics.map {
case Gauge(name, value, tags) =>
s"$name {${if (tags.nonEmpty) tags.map(tag => s"""${tag._1}="${tag._2}"""").mkString(", ") else ""}} $value"
}

s"${prometheusMetrics.mkString("\n")}\n"
}
}
4 changes: 2 additions & 2 deletions core/src/main/scala/com/criteo/cuttle/Scheduling.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import io.circe.Json
import doobie.imports._
import java.util.Comparator

import logging._
import authentication._

trait Scheduler[S <: Scheduling] {
trait Scheduler[S <: Scheduling] extends MetricProvider {
def start(workflow: Workflow[S], executor: Executor[S], xa: XA, logger: Logger): Unit
private[cuttle] def publicRoutes(workflow: Workflow[S], executor: Executor[S], xa: XA): PartialService =
PartialFunction.empty
private[cuttle] def privateRoutes(workflow: Workflow[S], executor: Executor[S], xa: XA): AuthenticatedService =
PartialFunction.empty
val allContexts: Fragment

def getStats(jobs: Set[String]): Json
}

Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/com/criteo/cuttle/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.criteo
import scala.concurrent._
import doobie.imports._
import cats.free._
import cuttle.logging.Logger

package cuttle {
sealed trait Completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.criteo.cuttle.timeseries

import Internal._
import com.criteo.cuttle._
import com.criteo.cuttle.logging._

import scala.concurrent._
import scala.concurrent.duration.{Duration => ScalaDuration}
Expand Down Expand Up @@ -434,15 +433,56 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] wit
}
}

override def getStats(jobs: Set[String]) = {
private def getRunningBackfillsSize(jobs: Set[String]) = {
val runningBackfills = state match {
case (_, backfills) =>
backfills.filter(
bf =>
bf.status == "RUNNING" &&
bf.jobs.map(_.id).intersect(jobs).nonEmpty)
}
Map("backfills" -> runningBackfills.size).asJson

runningBackfills.size
}

/**
* We compute the last instant when job was in a valid state
* @param jobs set of jobs to process
* @return Iterable of job to to last instant when job was in a valid state.
* Iterable is empty when job doesn't contain any "DONE" interval.
*/
private def getTimeOfLastSuccess(jobs: Set[String]) = {
_state.single().collect {
case (job, intervals) if jobs.contains(job.id) =>
val intervalList = intervals.toList
val lastValidInterval = intervalList.takeWhile {
case (_, Running(_)) => false
case (_, Todo(None)) => false
case _ => true
}.lastOption

lastValidInterval.map {
case (interval, _) => job -> (interval.hi match {
case Finite(instant) => instant
case _ => Instant.MAX
})
}
}.flatten
}

override private[cuttle] def getMetrics(jobs: Set[String]): Seq[Metric] = {
val timeOfLastSuccessMetrics = getTimeOfLastSuccess(jobs).map {
case (job, instant) =>
Gauge("scheduler_last_success_epoch_seconds", Instant.now().getEpochSecond - instant.getEpochSecond,
Seq("job_id" -> job.id, "job_name" -> job.name))
}

Seq(Gauge("scheduler_stat_count", getRunningBackfillsSize(jobs), Seq("type" -> "backfills"))) ++
timeOfLastSuccessMetrics
}

override def getStats(jobs: Set[String]): Json = {
Map("backfills" -> getRunningBackfillsSize(jobs)).asJson
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ private[timeseries] object IntervalMap {
m <- this.intersect(interval).toList
} yield m): _*))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are adding code you do not use, If you really feel this is something that needs to be done I would suggest you could move it to another PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned it

def head: (Interval[A], B) = tree.head

def last: (Interval[A], B) = tree.last
}

implicit def functorFilterInstance[K: Ordering] =
Expand Down Expand Up @@ -162,4 +166,6 @@ private[timeseries] sealed trait IntervalMap[A, B] {
def mapKeys[K: Ordering](f: A => K): IntervalMap[K, B]
def whenIsDef[C](other: IntervalMap[A, C]): IntervalMap[A, B]
def whenIsUndef[C](other: IntervalMap[A, C]): IntervalMap[A, B]
def head: (Interval[A], B)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

def last: (Interval[A], B)
}