Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable logs & timeseries state retention #357

Merged
merged 1 commit into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/scala/com/criteo/cuttle/Database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.criteo.cuttle

import java.time._
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.{Duration => ScalaDuration}

import scala.util._

Expand Down Expand Up @@ -353,6 +354,15 @@ private[cuttle] case class Queries(logger: Logger) {
(${id}, ${streams})
""".update.run

def applyLogsRetention(logsRetention: ScalaDuration): ConnectionIO[Int] =
sql"""
guillaumebort marked this conversation as resolved.
Show resolved Hide resolved
DELETE es
FROM executions_streams es
JOIN executions e
ON es.id = e.id
WHERE end_time < ${Instant.now.minusSeconds(logsRetention.toSeconds)}
""".update.run

def archivedStreams(id: String): ConnectionIO[Option[String]] =
sql"SELECT streams FROM executions_streams WHERE id = ${id}"
.query[String]
Expand Down
16 changes: 11 additions & 5 deletions core/src/main/scala/com/criteo/cuttle/ExecutionStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,17 @@ private[cuttle] object ExecutionStreams {
logFile(id).delete()
}

def archive(id: ExecutionId, queries: Queries, xa: XA): Unit = {
queries
.archiveStreams(id, streamsAsString(id).getOrElse(sys.error(s"Cannot archive streams for execution $id")))
.transact(xa)
.unsafeRunSync()
def archive(id: ExecutionId, queries: Queries, logsRetention: Option[Duration], xa: XA): Unit = {
(for {
_ <-
queries
.archiveStreams(id, streamsAsString(id).getOrElse(sys.error(s"Cannot archive streams for execution $id")))
.transact(xa)
_ <-
logsRetention.map { retention =>
queries.applyLogsRetention(retention).transact(xa)
}.getOrElse(IO.unit)
} yield ()).unsafeRunSync()
discard(id)
}
}
6 changes: 4 additions & 2 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.{Timer, TimerTask}

import scala.concurrent.duration._
import scala.concurrent.duration.{Duration => ScalaDuration}
import scala.concurrent.stm._
import scala.concurrent.{Future, Promise}
import scala.reflect.{ClassTag, classTag}
Expand Down Expand Up @@ -411,7 +412,8 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
xa: XA,
logger: Logger,
val projectName: String,
val projectVersion: String)(implicit retryStrategy: RetryStrategy)
val projectVersion: String,
logsRetention: Option[ScalaDuration] = None)(implicit retryStrategy: RetryStrategy)
extends MetricProvider[S] {

import ExecutionStatus._
Expand Down Expand Up @@ -723,7 +725,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
.andThen {
case result =>
try {
ExecutionStreams.archive(execution.id, queries, xa)
ExecutionStreams.archive(execution.id, queries, logsRetention, xa)
} catch {
case e: Throwable =>
e.printStackTrace()
Expand Down
6 changes: 4 additions & 2 deletions cron/src/main/scala/com/criteo/cuttle/cron/CronProject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,20 @@ class CronProject private[cuttle] (val name: String,
* @param port The port to use for the HTTP daemon.
* @param databaseConfig JDBC configuration for MySQL server
* @param retryStrategy The strategy to use for execution retry. Default to exponential backoff.
* @param logsRetention If specified, automatically clean the execution logs older than the given duration.
*/
def start(
platforms: Seq[ExecutionPlatform] = CronProject.defaultPlatforms,
port: Int = CronProject.port,
databaseConfig: DatabaseConfig = CronProject.databaseConfig,
retryStrategy: RetryStrategy = CronProject.retryStrategy
retryStrategy: RetryStrategy = CronProject.retryStrategy,
logsRetention: Option[Duration] = None
): Unit = {
logger.info("Connecting to database")
implicit val transactor = com.criteo.cuttle.Database.connect(databaseConfig)(logger)

logger.info("Creating Executor")
val executor = new Executor[CronScheduling](platforms, transactor, logger, name, version)(retryStrategy)
val executor = new Executor[CronScheduling](platforms, transactor, logger, name, version, logsRetention)(retryStrategy)

logger.info("Scheduler starting workflow")
scheduler.start(workload, executor, transactor, logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,6 @@ object HelloTimeSeries {
world dependsOn (hello1 and hello2 and hello3)
}.
// Starts the scheduler and an HTTP server.
start()
start(logsRetention = Some(1.minute))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import lol.http._
import com.criteo.cuttle._
import com.criteo.cuttle.{Database => CuttleDatabase}
import com.criteo.cuttle.ThreadPools._, Implicits.serverThreadPool
import scala.concurrent.duration.Duration

/**
* A cuttle project is a workflow to execute with the appropriate scheduler.
Expand All @@ -15,7 +16,6 @@ class CuttleProject private[cuttle] (val name: String,
val description: String,
val env: (String, Boolean),
val jobs: Workflow,
val scheduler: TimeSeriesScheduler,
val authenticator: Auth.Authenticator,
val logger: Logger) {

Expand All @@ -27,30 +27,28 @@ class CuttleProject private[cuttle] (val name: String,
* @param httpPort The port to use for the HTTP daemon.
* @param databaseConfig JDBC configuration for MySQL server 5.7.
* @param retryStrategy The strategy to use for execution retry. Default to exponential backoff.
* @param paused Automatically pause all jobs at startup.
* @param stateRetention If specified, automatically clean the timeseries state older than the given duration.
* @param logsRetention If specified, automatically clean the execution logs older than the given duration.
*/
def start(
platforms: Seq[ExecutionPlatform] = CuttleProject.defaultPlatforms,
httpPort: Int = 8888,
databaseConfig: DatabaseConfig = DatabaseConfig.fromEnv,
retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy,
paused: Boolean = false
paused: Boolean = false,
stateRetention: Option[Duration] = None,
logsRetention: Option[Duration] = None
): Unit = {
val xa = CuttleDatabase.connect(databaseConfig)(logger)
val executor = new Executor[TimeSeries](platforms, xa, logger, name, version)(retryStrategy)

if (paused) {
logger.info("Pausing workflow")
scheduler.pauseJobs(jobs.all, executor, xa)(Auth.User("Startup"))
}
val (routes, startScheduler) = build(platforms, databaseConfig, retryStrategy, paused, stateRetention, logsRetention)

logger.info("Start workflow")
scheduler.start(jobs, executor, xa, logger)
startScheduler()

logger.info("Start server")
Server.listen(port = httpPort, onError = { e =>
e.printStackTrace()
InternalServerError(e.getMessage)
})(TimeSeriesApp(this, executor, xa).routes)
})(routes)

logger.info(s"Listening on http://localhost:$httpPort")
}
Expand All @@ -61,23 +59,34 @@ class CuttleProject private[cuttle] (val name: String,
* @param platforms The configured [[ExecutionPlatform ExecutionPlatforms]] to use to execute jobs.
* @param databaseConfig JDBC configuration for MySQL server 5.7.
* @param retryStrategy The strategy to use for execution retry. Default to exponential backoff.
* @param paused Automatically pause all jobs at startup.
* @param stateRetention If specified, automatically clean the timeseries state older than the given duration.
* @param logsRetention If specified, automatically clean the execution logs older than the given duration.
*
* @return a tuple with cuttleRoutes (needed to start a server) and a function to start the scheduler
*/
def build(
platforms: Seq[ExecutionPlatform] = CuttleProject.defaultPlatforms,
databaseConfig: DatabaseConfig = DatabaseConfig.fromEnv,
retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy
retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy,
paused: Boolean = false,
stateRetention: Option[Duration] = None,
logsRetention: Option[Duration] = None
): (Service, () => Unit) = {
val xa = CuttleDatabase.connect(databaseConfig)(logger)
val executor = new Executor[TimeSeries](platforms, xa, logger, name, version)(retryStrategy)
val executor = new Executor[TimeSeries](platforms, xa, logger, name, version, logsRetention)(retryStrategy)
val scheduler = new TimeSeriesScheduler(logger, stateRetention)

val startScheduler = () => {
if (paused) {
logger.info("Pausing workflow")
scheduler.pauseJobs(jobs.all, executor, xa)(Auth.User("Startup"))
}
logger.info("Start workflow")
scheduler.start(jobs, executor, xa, logger)
}

(TimeSeriesApp(this, executor, xa).routes, startScheduler)
(TimeSeriesApp(this, executor, scheduler, xa).routes, startScheduler)
}
}

Expand All @@ -95,16 +104,14 @@ object CuttleProject {
* if the environment is a production one).
* @param authenticator The way to authenticate HTTP request for the UI and the private API.
* @param jobs The workflow to run in this project.
* @param scheduler The scheduler instance to use to schedule the Workflow jobs.
* @param logger The logger to use to log internal debug informations.
*/
def apply(name: String,
version: String = "",
description: String = "",
env: (String, Boolean) = ("", false),
authenticator: Auth.Authenticator = Auth.GuestAuth)(jobs: Workflow)(implicit scheduler: TimeSeriesScheduler,
logger: Logger): CuttleProject =
new CuttleProject(name, version, description, env, jobs, scheduler, authenticator, logger)
authenticator: Auth.Authenticator = Auth.GuestAuth)(jobs: Workflow)(implicit logger: Logger): CuttleProject =
new CuttleProject(name, version, description, env, jobs, authenticator, logger)

private[CuttleProject] def defaultPlatforms: Seq[ExecutionPlatform] = {
import java.util.concurrent.TimeUnit.SECONDS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.criteo.cuttle._
import Internal._

import java.time._
import scala.concurrent.duration.Duration

import cats.Applicative
import cats.data.{NonEmptyList, OptionT}
Expand Down Expand Up @@ -48,36 +49,6 @@ private[timeseries] object Database {
} yield ()
}

val cleanBigTimeseriesState: ConnectionIO[Unit] = {
sql"""
CREATE TABLE IF NOT EXISTS timeseries_state_temp (
state JSON NOT NULL,
date DATETIME NOT NULL,
PRIMARY KEY (date),
KEY timeseries_state_by_date (date)
) ENGINE = INNODB;

TRUNCATE TABLE timeseries_state_temp;

INSERT INTO timeseries_state_temp
SELECT
*
FROM timeseries_state
WHERE date > (NOW() - INTERVAL 1 WEEK);

TRUNCATE TABLE timeseries_state;

INSERT INTO timeseries_state
SELECT
*
FROM timeseries_state_temp;

DROP TABLE timeseries_state_temp;

OPTIMIZE table timeseries_state;
""".update.run.map(_ => Unit)
}

val schema = List(
sql"""
CREATE TABLE timeseries_state (
Expand Down Expand Up @@ -115,7 +86,7 @@ private[timeseries] object Database {
CREATE INDEX timeseries_backfills_by_status ON timeseries_backfills (status);
""".update.run,
contextIdMigration,
cleanBigTimeseriesState
NoUpdate // We removed this migration, so we reserve this slot
)

val doSchemaUpdates: ConnectionIO[Unit] = utils.updateSchema("timeseries", schema)
Expand Down Expand Up @@ -161,10 +132,16 @@ private[timeseries] object Database {
.value
}

def serializeState(state: State): ConnectionIO[Int] = {
def serializeState(state: State, retention: Option[Duration]): ConnectionIO[Int] = {
import JobState.{Done, Todo}

val now = Instant.now()
val cleanStateBefore = retention.map { duration =>
if(duration.toSeconds <= 0 )
sys.error(s"State retention is badly configured: ${duration}")
else
now.minusSeconds(duration.toSeconds)
}
val stateJson = state.toList.map {
case (job, im) =>
(job.id, im.toList.filter {
Expand All @@ -178,8 +155,11 @@ private[timeseries] object Database {
}.asJson

for {
// First clean state older that 1 week ago
_ <- sql"DELETE FROM timeseries_state where date < (${now} - INTERVAL 1 WEEK)".update.run
// Apply state retention if needed
_ <-
cleanStateBefore.map { t =>
sql"DELETE FROM timeseries_state where date < ${t}".update.run
}.getOrElse(NoUpdate)
// Insert the latest state
x <- sql"INSERT INTO timeseries_state (state, date) VALUES (${stateJson}, ${now})".update.run
} yield x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[timeseries] object TimeSeriesApp {
"name" -> project.name.asJson,
"version" -> Option(project.version).filterNot(_.isEmpty).asJson,
"description" -> Option(project.description).filterNot(_.isEmpty).asJson,
"scheduler" -> project.scheduler.name.asJson,
"scheduler" -> "timeseries".asJson,
"env" -> Json.obj(
"name" -> Option(project.env._1).filterNot(_.isEmpty).asJson,
"critical" -> project.env._2.asJson
Expand Down Expand Up @@ -85,9 +85,9 @@ private[timeseries] object TimeSeriesApp {

}

private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: Executor[TimeSeries], xa: XA) {
private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: Executor[TimeSeries], scheduler: TimeSeriesScheduler, xa: XA) {

import project.{jobs, scheduler}
import project.{jobs}

import JobState._
import TimeSeriesApp._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,7 @@ case class TimeSeries(calendar: TimeSeriesCalendar, start: Instant, end: Option[
}

/** [[TimeSeries]] utilities. */
object TimeSeries {
/* Provide a default [[TimeSeriesScheduler]] for [[TimeSeries]] scheduling. */
implicit def scheduler(implicit logger: Logger) = TimeSeriesScheduler(logger)
}
object TimeSeries

private[timeseries] sealed trait JobState
private[timeseries] object JobState {
Expand Down Expand Up @@ -328,7 +325,7 @@ private[timeseries] object JobState {
* The scheduler also allow to [[Backfill]] already computed partitions. The [[Backfill]] can be recursive
* or not and an audit log of backfills is kept.
*/
case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] {
case class TimeSeriesScheduler(logger: Logger, stateRetention: Option[ScalaDuration] = None) extends Scheduler[TimeSeries] {
guillaumebort marked this conversation as resolved.
Show resolved Hide resolved
import JobState.{Done, Running, Todo}
import TimeSeriesUtils._

Expand Down Expand Up @@ -651,7 +648,7 @@ case class TimeSeriesScheduler(logger: Logger) extends Scheduler[TimeSeries] {
}

if (completed.nonEmpty || toRun.nonEmpty)
runOrLogAndDie(Database.serializeState(stateSnapshot).transact(xa).unsafeRunSync,
runOrLogAndDie(Database.serializeState(stateSnapshot, stateRetention).transact(xa).unsafeRunSync,
"TimeseriesScheduler, cannot serialize state, shutting down")

if (completedBackfills.nonEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object TimeSeriesSchedulerIntegrationTests {
val executor =
new Executor[TimeSeries](Seq(LocalPlatform(maxForkedProcesses = 10)), xa, logger, project.name, project.version)(
retryImmediatelyStrategy)
val scheduler = project.scheduler
val scheduler = new TimeSeriesScheduler(logger)

scheduler.initialize(project.jobs, xa, logger)

Expand Down