Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
update based on review
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 committed Mar 18, 2020
1 parent 1e24919 commit c3dc024
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 73 deletions.
35 changes: 22 additions & 13 deletions core/src/main/scala/dagr/core/execsystem/TaskManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,22 @@ object TaskManagerDefaults extends LazyLogging {
object TaskManager extends LazyLogging {
import dagr.core.execsystem.TaskManagerDefaults._

/** The initial time to wait between scheduling tasks. */
val InitialSleepMillis: Int = 100
/** The minimum time to wait between scheduling tasks. */
val MinSleepMillis: Int = 10
/** The maximum time to wait between scheduling tasks. */
val MaxSleepMillis: Int = 1000
/** The increased amount time to wait between scheduling tasks after nothing can be done (linear increase). */
val StepSleepMillis: Int = 50
/** The scaling factor to reduce (divide) the time by to wait between scheduling tasks (exponential backoff). */
val BackoffSleepFactor: Float = 2f
/** The maximum time between two attempts to task scheduling attempts after which a warning is logged. */
val SlowStepTimeSeconds: Int = 30

/** Runs a given task to either completion, failure, or inability to schedule. This will terminate tasks that were still running before returning.
*
* @param task the task to run
* @param sleepMilliseconds the time to wait in milliseconds to wait between trying to schedule tasks.
* @param taskManagerResources the set of task manager resources, otherwise we use the default
* @param scriptsDirectory the scripts directory, otherwise we use the default
* @param logDirectory the log directory, otherwise we use the default
Expand All @@ -93,7 +105,6 @@ object TaskManager extends LazyLogging {
* @return a bi-directional map from the set of tasks to their execution information.
*/
def run(task: Task,
sleepMilliseconds: Int = 1000,
taskManagerResources: Option[SystemResources] = Some(defaultTaskManagerResources),
scriptsDirectory: Option[Path] = None,
logDirectory: Option[Path] = None,
Expand All @@ -106,15 +117,15 @@ object TaskManager extends LazyLogging {
scriptsDirectory = scriptsDirectory,
logDirectory = logDirectory,
scheduler = scheduler.getOrElse(defaultScheduler),
simulate = simulate,
sleepMilliseconds = sleepMilliseconds
simulate = simulate
)

taskManager.addTask(task = task)
taskManager.runToCompletion(failFast=failFast)

taskManager.taskToInfoBiMapFor
}

}

/** A manager of tasks.
Expand All @@ -126,18 +137,16 @@ object TaskManager extends LazyLogging {
* @param logDirectory the log directory, otherwise a temporary directory will be used
* @param scheduler the scheduler, otherwise we use the default
* @param simulate true if we are to simulate running tasks, false otherwise
* @param sleepMilliseconds the time to wait in milliseconds to wait between trying to schedule tasks.
*/
class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.defaultTaskManagerResources,
scriptsDirectory: Option[Path] = None,
logDirectory: Option[Path] = None,
scheduler: Scheduler = TaskManagerDefaults.defaultScheduler,
simulate: Boolean = false,
sleepMilliseconds: Int = 250
simulate: Boolean = false

) extends TaskManagerLike with TaskTracker with FinalStatusReporter with LazyLogging {

private var curSleepMilliseconds: Int = sleepMilliseconds
private val slowStepTimeSeconds: Int = 30
private var curSleepMilliseconds: Int = TaskManager.InitialSleepMillis

private val actualScriptsDirectory = scriptsDirectory getOrElse Io.makeTempDir("scripts")
protected val actualLogsDirectory = logDirectory getOrElse Io.makeTempDir("logs")
Expand Down Expand Up @@ -579,10 +588,10 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de

// Update the current sleep time: exponential reduction if we could do **anything**, otherwise linear increase.
if (canDoAnything) {
curSleepMilliseconds = curSleepMilliseconds / 2
curSleepMilliseconds = Math.max(TaskManager.MinSleepMillis, (curSleepMilliseconds / TaskManager.BackoffSleepFactor).toInt)
}
else {
curSleepMilliseconds = Math.min(sleepMilliseconds, curSleepMilliseconds + (sleepMilliseconds / 10))
curSleepMilliseconds = Math.min(TaskManager.MaxSleepMillis, curSleepMilliseconds + TaskManager.StepSleepMillis)
}

(
Expand All @@ -603,9 +612,9 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de

// Warn if the single step in execution "took a long time"
val stepExecutionDuration = Duration.between(startTime, Instant.now()).getSeconds
if (stepExecutionDuration > slowStepTimeSeconds) {
if (stepExecutionDuration > TaskManager.SlowStepTimeSeconds) {
logger.warning("*" * 80)
logger.warning(s"A single step in execution was > ${slowStepTimeSeconds}s (${stepExecutionDuration}s).")
logger.warning(s"A single step in execution was > ${TaskManager.SlowStepTimeSeconds}s (${stepExecutionDuration}s).")
val infosByStatus: Map[execsystem.TaskStatus.Value, Iterable[TaskExecutionInfo]] = this.taskToInfoBiMapFor.values.groupBy(_.status)
TaskStatus.values.filter(infosByStatus.contains).foreach { status =>
logger.warning(s"Found ${infosByStatus(status).size} tasks with status: $status")
Expand Down
54 changes: 16 additions & 38 deletions core/src/main/scala/dagr/core/execsystem/TaskTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,21 @@ trait TaskTracker extends TaskManagerLike with LazyLogging {
private val idToTask: mutable.Map[TaskId, Task] = mutable.Map[TaskId, Task]()
private val idToNode: mutable.Map[TaskId, GraphNode] = mutable.Map[TaskId, GraphNode]()

override def addTask(task: Task): TaskId = {
addTask(task=task, enclosingNode=None, ignoreExists=false)
}

/** Adds a task to be managed, but does not check for cycles, or that the next identifier is currently used. This
* should be performed by the caller.
*/
private def addTaskNoChecking(task: Task, enclosingNode: Option[GraphNode] = None): TaskId = {
// set the task id
val id = yieldAndThen(nextId) {nextId += 1}
// set the task info
require(task._taskInfo.isEmpty) // should not have any info!
val info = new TaskExecutionInfo(
task=task,
taskId=id,
status=UNKNOWN,
script=scriptPathFor(task=task, id=id, attemptIndex=1),
logFile=logPathFor(task=task, id=id, attemptIndex=1),
submissionDate=Some(Instant.now())
task = task,
taskId = id,
status = UNKNOWN,
script = scriptPathFor(task=task, id=id, attemptIndex=1),
logFile = logPathFor(task=task, id=id, attemptIndex=1),
submissionDate = Some(Instant.now())
)
task._taskInfo = Some(info)

Expand All @@ -98,33 +97,10 @@ trait TaskTracker extends TaskManagerLike with LazyLogging {
id
}

/** Adds a task to be managed
*
* Throws an [[IllegalArgumentException]] if a cycle was found after logging each strongly connected component with
* a cycle in the graph.
*
* @param ignoreExists true if we just return the task id for already added tasks, false if we are to throw an [[IllegalArgumentException]]
* @param task the given task.
* @return the task identifier.
*/
protected[execsystem] def addTask(task: Task, enclosingNode: Option[GraphNode], ignoreExists: Boolean = false): TaskId = {
// Make sure the id we will assign the task are not being tracked.
if (idToTask.contains(nextId)) throw new IllegalArgumentException(s"Task '${task.name}' with id '$nextId' was already added!")

taskFor(task) match {
case Some(id) if ignoreExists => id
case Some(id) => throw new IllegalArgumentException(s"Task '${task.name}' with id '$id' was already added!")
case None =>
// check for cycles
checkForCycles(task = task)
// add the task
addTaskNoChecking(task, enclosingNode)
}
}

/** Adds tasks to be managed
/** Adds tasks to be managed.
*
* @param tasks the given tasks.
* @param enclosingNode the graph node of the parent task that generated this task (if any)
* @param ignoreExists true if we just return the task id for already added tasks, false if we are to throw an [[IllegalArgumentException]]
* @return the task identifiers.
*/
Expand All @@ -145,9 +121,11 @@ trait TaskTracker extends TaskManagerLike with LazyLogging {
tasks.map { task => taskFor(task).getOrElse(addTaskNoChecking(task, enclosingNode)) }
}

override def addTasks(tasks: Task*): Seq[TaskId] = {
this.addTasks(tasks, enclosingNode=None, ignoreExists=false)
}
/** Adds a task to be managed. */
override def addTask(task: Task): TaskId = addTasks(task).head

/** Adds one or more tasks to be managed. */
override def addTasks(tasks: Task*): Seq[TaskId] = this.addTasks(tasks, enclosingNode=None, ignoreExists=false)

override def taskFor(id: TaskId): Option[Task] = idToTask.get(id)

Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/dagr/core/tasksystem/Pipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ abstract class Pipeline(val outputDirectory: Option[Path] = None,

/** Recursively navigates dependencies, starting from the supplied task, and add all children to this.tasks. */
private def addChildren(task : Task) : Unit = {
// 1. find all tasks connected to this task
// Developer note: we may have very deep dependency graphs, so this implementation avoids stack overflows
// Developer note: we use a Set here so that we do not recurse on the same task twice.
// Suppose we have `A ==> (B :: C)` and `B ==> C`. Even thought this could be simplified to `A ==> B ==> C`, that's
// up to the caller, and we post-processing of the DAG. So when `addChildren` gets called on `A`, it recurses on
// `B` and `C`. Since `C` depends on `C`, without the uniqueness check we recurse on `C` in the `addChildren`
// call on `B`.
val toVisit: mutable.Set[Task] = mutable.HashSet[Task](task)
while (toVisit.nonEmpty) {
val nextTask: Task = toVisit.head
Expand Down
27 changes: 12 additions & 15 deletions core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B
override def beforeAll(): Unit = Logger.level = LogLevel.Fatal
override def afterAll(): Unit = Logger.level = LogLevel.Info

def getDefaultTaskManager(sleepMilliseconds: Int = 10): TestTaskManager = new TaskManager(
def getDefaultTaskManager(): TestTaskManager = new TaskManager(
taskManagerResources = SystemResources.infinite,
scriptsDirectory = None,
sleepMilliseconds = sleepMilliseconds
scriptsDirectory = None
) with TestTaskManager

private def runSchedulerOnce(taskManager: TestTaskManager,
Expand Down Expand Up @@ -98,7 +97,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B
val task: UnitTask = new ShellCommand("exit", "0") withName "exit 0" requires ResourceSet.empty
val taskManager: TestTaskManager = getDefaultTaskManager()
taskManager.addTasks(tasks=Seq(task, task), enclosingNode=None, ignoreExists=true) shouldBe List(0, 0)
an[IllegalArgumentException] should be thrownBy taskManager.addTask(task=task, enclosingNode=None, ignoreExists=false)
an[IllegalArgumentException] should be thrownBy taskManager.addTasks(tasks=Seq(task), enclosingNode=None, ignoreExists=false)
}

it should "get the task status for only tracked tasks" in {
Expand Down Expand Up @@ -190,7 +189,6 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B
def runSimpleEndToEnd(task: UnitTask = new ShellCommand("exit", "0") withName "exit 0", simulate: Boolean): Unit = {
val map: BiMap[Task, TaskExecutionInfo] = TaskManager.run(
task = task,
sleepMilliseconds = 10,
taskManagerResources = Some(SystemResources.infinite),
scriptsDirectory = None,
simulate = simulate,
Expand Down Expand Up @@ -222,7 +220,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B
val longTask: UnitTask = new ShellCommand("sleep", "1000") withName "sleep 1000"
val failedTask: UnitTask = new ShellCommand("exit", "1") withName "exit 1"

val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds=1)
val taskManager: TestTaskManager = getDefaultTaskManager()
taskManager.addTasks(longTask, failedTask)
taskManager.runToCompletion(failFast=true)
taskManager.taskStatusFor(failedTask).value should be(TaskStatus.FAILED_COMMAND)
Expand All @@ -233,7 +231,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B
it should "not schedule and run tasks that have failed dependencies" in {
val List(a, b, c) = List(0,1,0).map(c => new ShellCommand("exit", c.toString))
a ==> b ==> c
val tm = getDefaultTaskManager(sleepMilliseconds=1)
val tm = getDefaultTaskManager()
tm.addTasks(a, b, c)
tm.runToCompletion(failFast=false)
tm.taskStatusFor(a).value shouldBe TaskStatus.SUCCEEDED
Expand All @@ -247,7 +245,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B
it should "not schedule and run tasks that have failed dependencies and complete all when failed tasks are manually succeeded" in {
val List(a, b, c) = List(0,1,0).map(c => new ShellCommand("exit", c.toString))
a ==> b ==> c
val tm = getDefaultTaskManager(sleepMilliseconds=1)
val tm = getDefaultTaskManager()
tm.addTasks(a, b, c)
tm.runToCompletion(failFast=false)
tm.taskStatusFor(a).value shouldBe TaskStatus.SUCCEEDED
Expand Down Expand Up @@ -882,7 +880,6 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B

TaskManager.run(
new HungryPipeline,
sleepMilliseconds = 1,
taskManagerResources = Some(SystemResources(systemCores, Resource.parseSizeToBytes("8g").toLong, 0.toLong)),
failFast=true
)
Expand Down Expand Up @@ -915,7 +912,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B
}

// add the tasks to the task manager
val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1)
val taskManager: TestTaskManager = getDefaultTaskManager()
taskManager.addTasks(tasks)

// run the tasks
Expand Down Expand Up @@ -948,7 +945,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B
}

// add the tasks to the task manager
val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1)
val taskManager: TestTaskManager = getDefaultTaskManager()
taskManager.addTasks(pipeline)

// run the tasks
Expand Down Expand Up @@ -987,7 +984,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B
// NB: the execution is really: root ==> firstTask ==> secondTask

// add the tasks to the task manager
val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1)
val taskManager: TestTaskManager = getDefaultTaskManager()
taskManager.addTasks(outerPipeline)

// run the tasks
Expand Down Expand Up @@ -1022,7 +1019,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B

it should "mark a task as failed when one of its children fails" in {
val parent = new ParentFailTask()
val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1)
val taskManager: TestTaskManager = getDefaultTaskManager()
taskManager.addTask(parent)
taskManager.runToCompletion(failFast=true)
Seq(parent.child, parent).foreach { task =>
Expand All @@ -1037,7 +1034,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B

it should "mark a pipeline as failed when one of its children fails" in {
val pipeline = new FailPipeline()
val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1)
val taskManager: TestTaskManager = getDefaultTaskManager()
taskManager.addTask(pipeline)
taskManager.runToCompletion(failFast=true)
Seq(pipeline.child, pipeline).foreach { task =>
Expand All @@ -1055,7 +1052,7 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B
tasks += pipeline
pipeline
}
val taskManager: TestTaskManager = getDefaultTaskManager(sleepMilliseconds = 1)
val taskManager: TestTaskManager = getDefaultTaskManager()
taskManager.addTask(root)
taskManager.runToCompletion(failFast=true)
tasks.foreach { task =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with
}
}

private def getDefaultTaskManager(sleepMilliseconds: Int = 10): TaskManager = new TaskManager(
private def getDefaultTaskManager(): TaskManager = new TaskManager(
taskManagerResources = SystemResources.infinite,
scriptsDirectory = None,
sleepMilliseconds = sleepMilliseconds
scriptsDirectory = None
)

"Terminal" should "support ANSI codes" in {
Expand Down Expand Up @@ -156,8 +155,7 @@ class TopLikeStatusReporterTest extends UnitSpec with CaptureSystemStreams with
val printMethod: String => Unit = (str: String) => output.append(str)
val taskManager = new TaskManager(
taskManagerResources = SystemResources(1.0, Long.MaxValue, Long.MaxValue), // one task at a time
scriptsDirectory = None,
sleepMilliseconds = 10
scriptsDirectory = None
)
val reporter = new TopLikeStatusReporter(taskManager = taskManager, print = printMethod) with TestTerminal

Expand Down
2 changes: 1 addition & 1 deletion tasks/src/test/scala/dagr/tasks/ScatterGatherTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.scalatest.BeforeAndAfterAll
class ScatterGatherTests extends UnitSpec with LazyLogging with BeforeAndAfterAll {
override def beforeAll(): Unit = Logger.level = LogLevel.Fatal
override def afterAll(): Unit = Logger.level = LogLevel.Info
def buildTaskManager: TaskManager = new TaskManager(taskManagerResources = SystemResources.infinite, scriptsDirectory = None, sleepMilliseconds=1)
def buildTaskManager: TaskManager = new TaskManager(taskManagerResources = SystemResources.infinite, scriptsDirectory = None)

def tmp(prefix: Option[String] = None): Path = {
val path = Files.createTempFile(prefix.getOrElse("testScatterGather."), ".txt")
Expand Down

0 comments on commit c3dc024

Please sign in to comment.