Skip to content

Commit

Permalink
New feature: can pause specific jobs upon startup (#524)
Browse files Browse the repository at this point in the history
  • Loading branch information
brisssou authored Feb 19, 2020
1 parent 28ee3c8 commit 406078a
Showing 1 changed file with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class CuttleProject private[cuttle] (val name: String,
* @param stateRetention If specified, automatically clean the timeseries state older than the given duration.
* @param logsRetention If specified, automatically clean the execution logs older than the given duration.
* @param maxVersionsHistory If specified keep only the version information for the x latest versions.
* @param jobsToBePausedOnStartup Automatically pause those jobs at startup. Ignored if `paused == true`.
*/
def start(
platforms: Seq[ExecutionPlatform] = CuttleProject.defaultPlatforms,
Expand All @@ -40,7 +41,8 @@ class CuttleProject private[cuttle] (val name: String,
paused: Boolean = false,
stateRetention: Option[Duration] = None,
logsRetention: Option[Duration] = None,
maxVersionsHistory: Option[Int] = None
maxVersionsHistory: Option[Int] = None,
jobsToBePausedOnStartup: Set[Job[TimeSeries]] = Set.empty
): Unit = {
val (routes, startScheduler) =
build(platforms, databaseConfig, retryStrategy, paused, stateRetention, logsRetention, maxVersionsHistory)
Expand All @@ -66,6 +68,7 @@ class CuttleProject private[cuttle] (val name: String,
* @param stateRetention If specified, automatically clean the timeseries state older than the given duration.
* @param logsRetention If specified, automatically clean the execution logs older than the given duration.
* @param maxVersionsHistory If specified keep only the version information for the x latest versions.
* @param jobsToBePausedOnStartup Automatically pause those jobs at startup. Ignored if `paused == true`.
*
* @return a tuple with cuttleRoutes (needed to start a server) and a function to start the scheduler
*/
Expand All @@ -76,7 +79,8 @@ class CuttleProject private[cuttle] (val name: String,
paused: Boolean = false,
stateRetention: Option[Duration] = None,
logsRetention: Option[Duration] = None,
maxVersionsHistory: Option[Int] = None
maxVersionsHistory: Option[Int] = None,
jobsToBePausedOnStartup: Set[Job[TimeSeries]] = Set.empty
): (Service, () => Unit) = {
val xa = CuttleDatabase.connect(databaseConfig)(logger)
val executor = new Executor[TimeSeries](platforms, xa, logger, name, version, logsRetention)(retryStrategy)
Expand All @@ -86,6 +90,12 @@ class CuttleProject private[cuttle] (val name: String,
if (paused) {
logger.info("Pausing workflow")
scheduler.pauseJobs(jobs.all, executor, xa)(Auth.User("Startup"))
} else {
val toPause = jobs.all.intersect(jobsToBePausedOnStartup)
if (toPause.nonEmpty) {
logger.info(s"Pausing ${toPause.map(j => s"${j.name} (${j.id})").mkString(", ")}")
scheduler.pauseJobs(toPause, executor, xa)(Auth.User("Startup"))
}
}
logger.info("Start workflow")
scheduler.start(jobs, executor, xa, logger)
Expand Down

0 comments on commit 406078a

Please sign in to comment.