From 17488e1b186aeee68bd5b0027f8bb2c0fbb2e4bb Mon Sep 17 00:00:00 2001 From: eryshev Date: Tue, 13 Feb 2018 17:51:35 +0100 Subject: [PATCH] introducing jobs id validation on workflow starting (#228) --- .../timeseries/TimeSeriesScheduler.scala | 16 ++++++++++++---- .../cuttle/timeseries/TimeSeriesSpec.scala | 18 ++++++++++++++++++ 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala index 94d537911..e863e36dc 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala @@ -103,7 +103,8 @@ object TimeSeriesCalendar { * @param firstDay The first day of the week for these weeks. */ case class Weekly(tz: ZoneId, firstDay: DayOfWeek) extends TimeSeriesCalendar { - private def truncateToWeek(t: ZonedDateTime) = t.`with`(TemporalAdjusters.previousOrSame(firstDay)).truncatedTo(DAYS) + private def truncateToWeek(t: ZonedDateTime) = + t.`with`(TemporalAdjusters.previousOrSame(firstDay)).truncatedTo(DAYS) def truncate(t: Instant) = truncateToWeek(t.atZone(tz)).toInstant def next(t: Instant) = truncateToWeek(t.atZone(tz)).plus(1, WEEKS).toInstant } @@ -720,9 +721,10 @@ private[timeseries] object TimeSeriesUtils { val UTC: ZoneId = ZoneId.of("UTC") /** - * Validation of cycle absence in workflow DAG and an absence the (execution, dependency) tuple that execution has - * a start date after an execution's start date. - * It's implemented based on Kahn's algorithm. + * Validation of: + * - cycle absence in workflow DAG, implemented based on Kahn's algorithm + * - absence the (child, parent) tuple that child has a start date before parent's start date + * - absence of jobs with the same id * @param workflow workflow to be validated * @return either a validation errors list or an unit */ @@ -755,6 +757,12 @@ private[timeseries] object TimeSeriesUtils { if (edges.nonEmpty) errors += "Workflow has at least one cycle" + workflow.vertices.groupBy(_.id).collect { + case (id: String, jobs) if jobs.size > 1 => id + } foreach (id => { + errors += s"Id $id is used by more than 1 job" + }) + if (errors.nonEmpty) Left(errors.toList) else Right(()) } diff --git a/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSpec.scala b/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSpec.scala index 99c4bfea2..fee886e3b 100644 --- a/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSpec.scala +++ b/timeseries/src/test/scala/com/criteo/cuttle/timeseries/TimeSeriesSpec.scala @@ -81,4 +81,22 @@ class TimeSeriesSpec extends FunSuite with TestScheduling { assert(validationRes.isLeft, "workflow passed start date validation") assert(validationRes.left.get === List("Workflow has at least one cycle"), "errors messages are bad") } + + test("it shouldn't validate a workflow that contains jobs with same ids") { + val id = "badJob" + val badJob = Job(id, hourly(date"2117-03-25T02:00:00Z"))(completed) + val badJobClone = Job(id, hourly(date"2117-03-24T02:00:00Z"))(completed) + val workflowParentChild = badJob dependsOn badJobClone + val workflowSiblings = badJob and badJobClone + + val validationParentChild = TimeSeriesUtils.validate(workflowParentChild) + assert(validationParentChild.isLeft, "it means that workflow passed duplicate id validation") + assert(validationParentChild.left.get === List(s"Id badJob is used by more than 1 job"), + "it means that errors messages are bad") + + val validationSiblings = TimeSeriesUtils.validate(workflowSiblings) + assert(validationSiblings.isLeft, "it means that workflow passed duplicate id validation") + assert(validationSiblings.left.get === List(s"Id badJob is used by more than 1 job"), + "it means that errors messages are bad") + } }