Skip to content

Commit

Permalink
Monitors waiting time for executions (#132)
Browse files Browse the repository at this point in the history
* Monitors waiting time for executions

- Monitors via executor polling
- Adds waitingSeconds info to execution model
- Adds a progressbar component to display waiting / running times

* Moves monitoring logic to executor & reformats

* Makes Executor public again.
  • Loading branch information
dufrannea authored and guillaumebort committed Jul 28, 2017
1 parent 48fbb36 commit 10d9f0e
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 21 deletions.
70 changes: 64 additions & 6 deletions core/src/main/javascript/app/pages/Execution.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import ExitFullscreenIcon from "react-icons/lib/md/fullscreen-exit";
import AutoScrollIcon from "react-icons/lib/md/arrow-downward";
import BreakIcon from "react-icons/lib/md/keyboard-control";
import CalendarIcon from "react-icons/lib/md/date-range";
import ReactTooltip from "react-tooltip";

import moment from "moment";

import Window from "../components/Window";
Expand Down Expand Up @@ -43,6 +45,52 @@ type State = {
autoScroll: boolean
};

type ProgressBarProps = {
classes: any,
totalTimeSeconds: number,
waitingTimeSeconds: number
};

const ProgressBarComponent = ({
classes,
totalTimeSeconds,
waitingTimeSeconds
}: ProgressBarProps) => {
const totalWidth = 200;
const height = 8;
const barWidth = totalTimeSeconds !== 0
? waitingTimeSeconds / totalTimeSeconds * totalWidth
: 0;

const tooltip =
`Waiting : ${moment.utc(waitingTimeSeconds * 1000).format("HH:mm:ss")} / ` +
`Running : ${moment
.utc((totalTimeSeconds - waitingTimeSeconds) * 1000)
.format("HH:mm:ss")}`;

return (
<span className={classes.main}>
<svg width={totalWidth} height={height} className={classes.svg}>
<g data-tip={tooltip}>
<rect width={totalWidth} height={height} fill="#00BCD4" />
<rect width={barWidth} height={height} fill="#ff9800" />
</g>
</svg>
<ReactTooltip className={classes.tooltip} effect="float" html={true} />
</span>
);
};

const ProgressBar = injectSheet({
main: {
"margin-left": "10px"
},
svg: {
lineHeight: "18px",
display: "inline-block"
}
})(ProgressBarComponent);

class Execution extends React.Component {
props: Props;
state: State;
Expand Down Expand Up @@ -241,13 +289,23 @@ class Execution extends React.Component {
<dt key="duration">Duration:</dt>,
<dd key="duration_">
{data.endTime
? moment
.utc(
moment(data.endTime).diff(
moment(data.startTime)
? [
moment
.utc(
moment(data.endTime).diff(
moment(data.startTime)
)
)
)
.format("HH:mm:ss")
.format("HH:mm:ss"),
<ProgressBar
totalTimeSeconds={
moment(data.endTime).diff(
moment(data.startTime)
) / 1000
}
waitingTimeSeconds={data.waitingSeconds}
/>
]
: <Clock time={data.startTime} humanize={false} />}
</dd>
]
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/javascript/datamodel.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ export type ExecutionLog = {
failing?: {
failedExecutions: Array<ExecutionLog>,
nextRetry: ?string
}
},
waitingSeconds : number
};

export type Paginated<A> = {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/com/criteo/cuttle/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ private[cuttle] object App {
"failedExecutions" -> Json.fromValues(failedExecutions.map(_.asJson(executionLogEncoder))),
"nextRetry" -> nextRetry.asJson
)
}.asJson
}.asJson,
"waitingSeconds" -> execution.waitingSeconds.asJson
)
}

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/com/criteo/cuttle/Cuttle.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class CuttleProject[S <: Scheduling] private[cuttle] (
) = {
val xa = Database.connect(databaseConfig)
val executor = new Executor[S](platforms, xa)(retryStrategy)

Server.listen(port = httpPort, onError = { e =>
e.printStackTrace()
InternalServerError(e.getMessage)
Expand Down
21 changes: 11 additions & 10 deletions core/src/main/scala/com/criteo/cuttle/Database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ private[cuttle] object Database {
end_time DATETIME NOT NULL,
context_id VARCHAR(1000) NOT NULL,
success BOOLEAN NOT NULL,
waiting_seconds INT NOT NULL,
PRIMARY KEY (id)
) ENGINE = INNODB;

Expand Down Expand Up @@ -128,8 +129,8 @@ private[cuttle] trait Queries {
for {
contextId <- logContext
result <- sql"""
INSERT INTO executions (id, job, start_time, end_time, success, context_id)
VALUES (${e.id}, ${e.job}, ${e.startTime}, ${e.endTime}, ${e.status}, ${contextId})
INSERT INTO executions (id, job, start_time, end_time, success, context_id, waiting_seconds)
VALUES (${e.id}, ${e.job}, ${e.startTime}, ${e.endTime}, ${e.status}, ${contextId}, ${e.waitingSeconds})
""".update.run
} yield result

Expand Down Expand Up @@ -158,29 +159,29 @@ private[cuttle] trait Queries {
case _ => sql"ORDER BY end_time, id DESC"
}
(sql"""
SELECT executions.id, job, start_time, end_time, contexts.json AS context, success
SELECT executions.id, job, start_time, end_time, contexts.json AS context, success, executions.waiting_seconds
FROM executions INNER JOIN (""" ++ contextQuery ++ sql""") contexts
ON executions.context_id = contexts.id WHERE """ ++ Fragments.in(
fr"job",
NonEmptyList.fromListUnsafe(jobs.toList)) ++ orderBy ++ sql""" LIMIT $limit OFFSET $offset""")
.query[(String, String, Instant, Instant, Json, ExecutionStatus)]
.query[(String, String, Instant, Instant, Json, ExecutionStatus, Int)]
.list
.map(_.map {
case (id, job, startTime, endTime, context, status) =>
ExecutionLog(id, job, Some(startTime), Some(endTime), context, status)
case (id, job, startTime, endTime, context, status, waitingSeconds) =>
ExecutionLog(id, job, Some(startTime), Some(endTime), context, status, waitingSeconds = waitingSeconds)
})
}

def getExecutionById(contextQuery: Fragment, id: String): ConnectionIO[Option[ExecutionLog]] =
(sql"""
SELECT executions.id, job, start_time, end_time, contexts.json AS context, success
SELECT executions.id, job, start_time, end_time, contexts.json AS context, success, executions.waiting_seconds
FROM executions INNER JOIN (""" ++ contextQuery ++ sql""") contexts
ON executions.context_id = contexts.id WHERE executions.id = $id""")
.query[(String, String, Instant, Instant, Json, ExecutionStatus)]
.query[(String, String, Instant, Instant, Json, ExecutionStatus, Int)]
.option
.map(_.map {
case (id, job, startTime, endTime, context, status) =>
ExecutionLog(id, job, Some(startTime), Some(endTime), context, status)
case (id, job, startTime, endTime, context, status, waitingSeconds) =>
ExecutionLog(id, job, Some(startTime), Some(endTime), context, status, waitingSeconds = waitingSeconds)
})

def unpauseJob[S <: Scheduling](job: Job[S]): ConnectionIO[Int] =
Expand Down
25 changes: 23 additions & 2 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ private[cuttle] case class ExecutionLog(
endTime: Option[Instant],
context: Json,
status: ExecutionStatus,
failing: Option[FailingJob] = None
failing: Option[FailingJob] = None,
waitingSeconds: Int
)

private object ExecutionCancelledException extends RuntimeException("Execution cancelled")
Expand All @@ -72,6 +73,7 @@ case class Execution[S <: Scheduling](
private[cuttle] val cancelSignal = Promise[Nothing]
def isCancelled = cancelSignal.isCompleted
val cancelled = cancelSignal.future
var waitingSeconds = 0
def onCancelled(thunk: () => Unit) = cancelled.andThen {
case Failure(_) =>
thunk()
Expand All @@ -91,10 +93,15 @@ case class Execution[S <: Scheduling](
startTime,
None,
context.toJson,
status
status,
waitingSeconds = waitingSeconds
)

private[cuttle] val isParked = new AtomicBoolean(false)

private[cuttle] def updateWaitingTime(seconds: Int): Unit =
waitingSeconds += seconds

def park(duration: FiniteDuration): Future[Unit] =
if (isParked.get) {
sys.error(s"Already parked")
Expand Down Expand Up @@ -147,6 +154,8 @@ class Executor[S <: Scheduling] private[cuttle] (
private val recentFailures = TMap.empty[(Job[S], S#Context), (Option[Execution[S]], FailingJob)]
private val timer = new Timer("com.criteo.cuttle.Executor.timer")

startMonitoringExecutions()

private def flagWaitingExecutions(executions: Seq[Execution[S]]): Seq[(Execution[S], ExecutionStatus)] = {
val waitings = platforms.flatMap(_.waiting).toSet
executions.map { execution =>
Expand Down Expand Up @@ -520,4 +529,16 @@ class Executor[S <: Scheduling] private[cuttle] (
}
))
}

private def startMonitoringExecutions() = {
val SC = fs2.Scheduler.fromFixedDaemonPool(1, "com.criteo.cuttle.platforms.ExecutionMonitor.SC")

val intervalSeconds = 1

SC.scheduleAtFixedRate(intervalSeconds.second) {
runningExecutions
.filter({ case (_, s) => s == ExecutionStatus.ExecutionWaiting })
.foreach({ case (e, _) => e.updateWaitingTime(intervalSeconds) })
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ private[timeseries] trait TimeSeriesApp { self: TimeSeriesScheduler =>
(lo, hi) <- grid.split(interval)
} yield {
val context = TimeSeriesContext(grid.truncate(lo), grid.ceil(hi), maybeBackfill)
ExecutionLog("", job.id, None, None, context.asJson, ExecutionTodo, None)
ExecutionLog("", job.id, None, None, context.asJson, ExecutionTodo, None, 0)
}
val throttledExecutions = executor.allFailingExecutions
.filter(e => e.job == job && e.context.toInterval.intersects(requestedInterval))
Expand Down

0 comments on commit 10d9f0e

Please sign in to comment.