Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Various Task Manage Speedups #373

Merged
merged 10 commits into from
Mar 25, 2020
Merged

Various Task Manage Speedups #373

merged 10 commits into from
Mar 25, 2020

Conversation

nh13
Copy link
Member

@nh13 nh13 commented Mar 18, 2020

I am using the pipeline below to do some stress testing:

Test Pipeline

Place in pipelines/src/main/scala/dagr/pipelines/TestingPipeline.scala

/*
 * The MIT License
 *
 * Copyright (c) $year Fulcrum Genomics
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 * THE SOFTWARE.
 *
 */

package dagr.pipelines

import dagr.core.cmdline.Pipelines
import dagr.core.tasksystem._
import com.fulcrumgenomics.sopt.{arg, clp}
import dagr.core.execsystem.{Cores, Memory, ResourceSet}
import com.fulcrumgenomics.commons.CommonsDef.forloop

private trait GreedyResourcePicking extends UnitTask {
  override def pickResources(availableResources: ResourceSet): Option[ResourceSet] = {
    val mem = Memory("1g")
    val cores = Cores(1)
    val desiredResources = ResourceSet(cores, mem)
    availableResources.subset(desiredResources).map { _ => desiredResources }
  }
}


private class SleepProcessTask(seconds: Int = 1) extends ProcessTask with GreedyResourcePicking {
  override def args: Seq[Any] = "sleep" :: s"$seconds" :: Nil
}

private class SleepInJvmTask(seconds: Int = 1) extends SimpleInJvmTask with GreedyResourcePicking {
  def run(): Unit = {
    logger.info(s"Sleeping for $seconds")
    Thread.sleep(seconds * 1000)
    logger.info(s"I'm awake!")
  }
}

/**
  * Very simple example pipeline that creates random tasks and dependencies
  */
@clp(description="A bunch of sleep tasks.", group = classOf[Pipelines])
class TestingPipeline
( @arg(flag='j', doc="Use JVM tasks") val jvmTask: Boolean = false,
  @arg(flag='n', doc="The number of tasks to create") val numTasks: Int = 100,
  @arg(flag='p', doc="The probability of creating a dependency") val dependencyProbability: Double = 0.1,
  @arg(flag='s', doc="The seed for the random number generator") val seed: Option[Long] = None,
  @arg(flag='S', doc="The time for each task to sleep in seconds") val sleepSeconds: Int = 1,
  @arg(flag='f', doc="The failure rate of tasks") val failureRate: Double = 0.0
) extends Pipeline {
  private val randomNumberGenerator = seed match {
    case Some(s) => new scala.util.Random(s)
    case None    => scala.util.Random
  }

  private def toATask: (Int) => Task = (s) => {
    if (randomNumberGenerator.nextFloat() < failureRate) {
      ShellCommand("exit", "1")
    }
    else {
      new SleepProcessTask(s)
    }
  }
  private def toBTask: Int => Task = (s) => {
    if (randomNumberGenerator.nextFloat() < failureRate) {
      SimpleInJvmTask.apply(name = "Name", f = { if (true) throw new IllegalArgumentException("failed") else Unit })
    }
    else {
      new SleepInJvmTask(s)
    }
  }
  private val toTask   = if (jvmTask) toBTask else toATask
  private val taskType = if (jvmTask) "JVM" else "Shell"


  override def build(): Unit = {
    // create the tasks
    val tasks: Seq[Task] = for (i <- 0 to numTasks) yield toTask(sleepSeconds) withName s"task-${taskType}-$i"

    // make them depend on previous tasks
    var rootTasks = Seq.range(start=0, numTasks).toSet
    forloop(from = 0, until = numTasks) { i =>
      forloop(from = 0, until = i) { j =>
        if (randomNumberGenerator.nextFloat < dependencyProbability) {
          require(i != j)
          logger.info(s"Task $i will depend on task $j")
          tasks(j) ==> tasks(i)
          rootTasks = rootTasks - i
        }
      }
    }

    require(rootTasks.nonEmpty)
    rootTasks.foreach { i =>
      root ==> tasks(i)
    }
  }
}

In particular, these options:

> dagr TestingPipeline -n 3000 -S 0 -p 0.1 -j 
> dagr TestingPipeline -n 3000 -S 0 -p 1 -j 
> dagr TestingPipeline -n 5000 -S 0 -p 0.1 -j 
> dagr TestingPipeline -n 25000 -S 0 -p 0.1 -j 

When a task returns a list of tasks via getTasks, we currently check for
cyclical dependencies on each task independently.  If they are all
connected in the DAG, then this is really slow!  This avoids that
by finding the strongly connected components jointly across the new *to
be added* tasks.
also change a info to a debug in TaskManager
@nh13 nh13 requested a review from tfenne March 18, 2020 08:14
@codecov-io
Copy link

codecov-io commented Mar 18, 2020

Codecov Report

Merging #373 into master will decrease coverage by 0.41%.
The diff coverage is 91.66%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #373      +/-   ##
==========================================
- Coverage   91.95%   91.53%   -0.42%     
==========================================
  Files          31       31              
  Lines        1156     1182      +26     
  Branches       65       73       +8     
==========================================
+ Hits         1063     1082      +19     
- Misses         93      100       +7     
Impacted Files Coverage Δ
.../main/scala/dagr/core/execsystem/TaskManager.scala 90.79% <73.91%> (-2.07%) ⬇️
.../main/scala/dagr/core/execsystem/TaskTracker.scala 93.97% <96.29%> (-1.09%) ⬇️
...rc/main/scala/dagr/core/execsystem/GraphNode.scala 94.11% <100.00%> (+0.36%) ⬆️
...src/main/scala/dagr/core/tasksystem/Pipeline.scala 88.00% <100.00%> (+3.00%) ⬆️
...ore/src/main/scala/dagr/core/tasksystem/Task.scala 95.71% <100.00%> (+0.19%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ad14fa1...c3dc024. Read the comment docs.

Copy link
Member

@tfenne tfenne left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lots of good stuff here. A few questions/suggestions.

@@ -78,8 +78,10 @@ abstract class Pipeline(val outputDirectory: Option[Path] = None,

/** Recursively navigates dependencies, starting from the supplied task, and add all children to this.tasks. */
private def addChildren(task : Task) : Unit = {
tasks ++= task.tasksDependingOnThisTask
task.tasksDependingOnThisTask.foreach(addChildren)
task.tasksDependingOnThisTask.filterNot(tasks.contains).foreach { child =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than doing a filterNot here, I think you could write this as:

task.tasksDependingOnThisTask.foreach { child =>
  if (tasks.add(child)) addChildren(child)
}

The bonus being that for tasks that have not been previously added you're not checking the set twice.

tasks += child
addChildren(child)
// 1. find all tasks connected to this task
val toVisit: mutable.Set[Task] = mutable.HashSet[Task](task)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use a Set here instead of a Stack? Are you gaining something from the uniqueness checking? Otherwise it's just overhead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are gaining from uniqueness checking (I'll add a comment). Suppose we have A ==> (B :: C) and B ==> C. Even thought this could be simplified to A ==> B ==> C, that's up to the caller, and we post-processing of the DAG. So when addChildren gets called on A, it recurses on B and C. Since C depends on C, without the uniqueness check we recurse on C in the addChildren call on B.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks!

core/src/main/scala/dagr/core/execsystem/TaskTracker.scala Outdated Show resolved Hide resolved
core/src/main/scala/dagr/core/execsystem/TaskManager.scala Outdated Show resolved Hide resolved
core/src/main/scala/dagr/core/execsystem/TaskManager.scala Outdated Show resolved Hide resolved
core/src/main/scala/dagr/core/execsystem/TaskManager.scala Outdated Show resolved Hide resolved
core/src/main/scala/dagr/core/execsystem/TaskManager.scala Outdated Show resolved Hide resolved
core/src/main/scala/dagr/core/execsystem/TaskManager.scala Outdated Show resolved Hide resolved
@@ -80,10 +81,22 @@ object TaskManagerDefaults extends LazyLogging {
object TaskManager extends LazyLogging {
import dagr.core.execsystem.TaskManagerDefaults._

/** The initial time to wait between scheduling tasks. */
val InitialSleepMillis: Int = 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing this!

tasks += child
addChildren(child)
// 1. find all tasks connected to this task
val toVisit: mutable.Set[Task] = mutable.HashSet[Task](task)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks!

@tfenne tfenne assigned nh13 and unassigned tfenne Mar 23, 2020
…zations (#374)

* ResourceSet bug fix

If the minimum to subset to is fractional, with a different fractional
value than the maximum, it could be missed.

Also added a small performance optimization.

* Task Manager optimizations

* A few NaiveScheduler simplifications
@nh13 nh13 merged commit 9313a7e into master Mar 25, 2020
@nh13 nh13 deleted the nh_task_manager_speedups branch March 25, 2020 20:08
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants