Skip to content

Commit

Permalink
introducing jobs id validation on workflow starting (#228)
Browse files Browse the repository at this point in the history
  • Loading branch information
eryshev authored Feb 13, 2018
1 parent 3367c46 commit 17488e1
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ object TimeSeriesCalendar {
* @param firstDay The first day of the week for these weeks.
*/
case class Weekly(tz: ZoneId, firstDay: DayOfWeek) extends TimeSeriesCalendar {
private def truncateToWeek(t: ZonedDateTime) = t.`with`(TemporalAdjusters.previousOrSame(firstDay)).truncatedTo(DAYS)
private def truncateToWeek(t: ZonedDateTime) =
t.`with`(TemporalAdjusters.previousOrSame(firstDay)).truncatedTo(DAYS)
def truncate(t: Instant) = truncateToWeek(t.atZone(tz)).toInstant
def next(t: Instant) = truncateToWeek(t.atZone(tz)).plus(1, WEEKS).toInstant
}
Expand Down Expand Up @@ -720,9 +721,10 @@ private[timeseries] object TimeSeriesUtils {
val UTC: ZoneId = ZoneId.of("UTC")

/**
* Validation of cycle absence in workflow DAG and an absence the (execution, dependency) tuple that execution has
* a start date after an execution's start date.
* It's implemented based on Kahn's algorithm.
* Validation of:
* - cycle absence in workflow DAG, implemented based on Kahn's algorithm
* - absence the (child, parent) tuple that child has a start date before parent's start date
* - absence of jobs with the same id
* @param workflow workflow to be validated
* @return either a validation errors list or an unit
*/
Expand Down Expand Up @@ -755,6 +757,12 @@ private[timeseries] object TimeSeriesUtils {

if (edges.nonEmpty) errors += "Workflow has at least one cycle"

workflow.vertices.groupBy(_.id).collect {
case (id: String, jobs) if jobs.size > 1 => id
} foreach (id => {
errors += s"Id $id is used by more than 1 job"
})

if (errors.nonEmpty) Left(errors.toList)
else Right(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,22 @@ class TimeSeriesSpec extends FunSuite with TestScheduling {
assert(validationRes.isLeft, "workflow passed start date validation")
assert(validationRes.left.get === List("Workflow has at least one cycle"), "errors messages are bad")
}

test("it shouldn't validate a workflow that contains jobs with same ids") {
val id = "badJob"
val badJob = Job(id, hourly(date"2117-03-25T02:00:00Z"))(completed)
val badJobClone = Job(id, hourly(date"2117-03-24T02:00:00Z"))(completed)
val workflowParentChild = badJob dependsOn badJobClone
val workflowSiblings = badJob and badJobClone

val validationParentChild = TimeSeriesUtils.validate(workflowParentChild)
assert(validationParentChild.isLeft, "it means that workflow passed duplicate id validation")
assert(validationParentChild.left.get === List(s"Id badJob is used by more than 1 job"),
"it means that errors messages are bad")

val validationSiblings = TimeSeriesUtils.validate(workflowSiblings)
assert(validationSiblings.isLeft, "it means that workflow passed duplicate id validation")
assert(validationSiblings.left.get === List(s"Id badJob is used by more than 1 job"),
"it means that errors messages are bad")
}
}

0 comments on commit 17488e1

Please sign in to comment.