Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Merge pull request #936 from mesosphere/wip-902-drexin
Browse files Browse the repository at this point in the history
fixes #902 - optimized task queueing behavior
  • Loading branch information
drexin committed Jan 13, 2015
2 parents 45a9bba + c3065de commit 3c26da7
Show file tree
Hide file tree
Showing 13 changed files with 180 additions and 143 deletions.
48 changes: 17 additions & 31 deletions src/main/scala/mesosphere/marathon/MarathonScheduler.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package mesosphere.marathon

import java.util
import javax.inject.{ Inject, Named }

import akka.actor.{ ActorRef, ActorSystem }
Expand Down Expand Up @@ -105,44 +106,26 @@ class MarathonScheduler @Inject() (
try {
log.debug("Received offer %s".format(offer))

val queuedTasks: Seq[QueuedTask] = taskQueue.removeAll()

val withTaskInfos: collection.Seq[(QueuedTask, (TaskInfo, Seq[Long]))] =
queuedTasks.view.flatMap { case qt => newTask(qt.app, offer).map(qt -> _) }

val launchedTask = withTaskInfos.find {
case (qt, (taskInfo, ports)) =>
val timeLeft = qt.delay.timeLeft
if (timeLeft.toNanos <= 0) {
true
}
else {
log.info(s"Delaying task ${taskInfo.getTaskId.getValue} due to backoff. Time left: $timeLeft.")
false
}
val matchingTask = taskQueue.pollMatching { app =>
newTask(app, offer).map(app -> _)
}

launchedTask.foreach {
case (qt, (taskInfo, ports)) =>
val taskInfos = Seq(taskInfo)
log.debug("Launching tasks: " + taskInfos)

matchingTask.foreach {
case (app, (taskInfo, ports)) =>
val marathonTask = MarathonTasks.makeTask(
taskInfo.getTaskId.getValue, offer.getHostname, ports,
offer.getAttributesList.asScala, qt.app.version)
offer.getAttributesList.asScala, app.version)

taskTracker.created(qt.app.id, marathonTask)
driver.launchTasks(Seq(offer.getId).asJava, taskInfos.asJava)
log.debug("Launching task: " + taskInfo)

taskTracker.created(app.id, marathonTask)
driver.launchTasks(Seq(offer.getId).asJava, util.Arrays.asList(taskInfo))

// here it is assumed that the health checks for the current
// version are already running.
}

// put unscheduled tasks back in the queue
val launchedTaskSeq: Seq[QueuedTask] = launchedTask.map(_._1).to[Seq]
taskQueue.addAll(queuedTasks diff launchedTaskSeq)

if (launchedTask.isEmpty) {
if (matchingTask.isEmpty) {
log.debug("Offer doesn't match request. Declining.")
driver.declineOffer(offer.getId)
}
Expand Down Expand Up @@ -193,9 +176,13 @@ class MarathonScheduler @Inject() (
}

case TASK_RUNNING =>
taskQueue.rateLimiter.resetDelay(appId)
taskTracker.running(appId, status).onComplete {
case Success(task) => postEvent(status, task)
case Success(task) =>
appRepo.app(PathId(task.getId), Timestamp(task.getVersion)).onSuccess {
case maybeApp => maybeApp.foreach(taskQueue.rateLimiter.resetDelay)
}
postEvent(status, task)

case Failure(t) =>
log.warn(s"Couldn't post event for ${status.getTaskId}", t)
log.warn(s"Killing task ${status.getTaskId}")
Expand Down Expand Up @@ -289,7 +276,6 @@ class MarathonScheduler @Inject() (
private def newTask(
app: AppDefinition,
offer: Offer): Option[(TaskInfo, Seq[Long])] = {
// TODO this should return a MarathonTask
new TaskBuilder(app, taskIdUtil.newTaskId, taskTracker, config, mapper).buildIfMatches(offer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,6 @@ class MarathonSchedulerActor(
val ids = plan.affectedApplicationIds

performAsyncWithLockFor(ids, origSender, cmd, isBlocking = blocking) {
ids.foreach(taskQueue.rateLimiter.resetDelay)

val res = deploy(driver, plan)
if (origSender != Actor.noSender) origSender ! cmd.answer
res
Expand Down Expand Up @@ -379,7 +377,7 @@ class SchedulerActions(
}
taskQueue.purge(app.id)
taskTracker.shutdown(app.id)
taskQueue.rateLimiter.resetDelay(app.id)
taskQueue.rateLimiter.resetDelay(app)
// TODO after all tasks have been killed we should remove the app from taskTracker

eventBus.publish(AppTerminatedEvent(app.id))
Expand Down Expand Up @@ -483,8 +481,7 @@ class SchedulerActions(

if (toQueue > 0) {
log.info(s"Queueing $toQueue new tasks for ${app.id} ($queuedCount queued)")
for (i <- 0 until toQueue)
taskQueue.add(app)
taskQueue.add(app, toQueue)
}
else {
log.info(s"Already queued $queuedCount tasks for ${app.id}. Not scaling.")
Expand Down Expand Up @@ -522,7 +519,6 @@ class SchedulerActions(
val updatedApp = appUpdate(currentVersion)

taskQueue.purge(id)
taskQueue.rateLimiter.resetDelay(id)

appRepository.store(updatedApp).map { _ =>
update(driver, updatedApp, appUpdate)
Expand Down
123 changes: 87 additions & 36 deletions src/main/scala/mesosphere/marathon/tasks/TaskQueue.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,39 @@
package mesosphere.marathon.tasks

import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.atomic.AtomicInteger

import mesosphere.marathon.state.{ AppDefinition, PathId }
import mesosphere.marathon.state.{ AppDefinition, PathId, Timestamp }
import mesosphere.util.RateLimiter
import org.apache.log4j.Logger

import scala.concurrent.duration.Deadline
import scala.collection.mutable
import scala.annotation.tailrec
import scala.collection.concurrent.TrieMap
import scala.collection.immutable.Seq
import scala.collection.JavaConverters._

object TaskQueue {
protected[marathon] case class QueuedTask(app: AppDefinition, count: AtomicInteger)

protected[tasks] implicit object AppConstraintsOrdering extends Ordering[QueuedTask] {
def compare(t1: QueuedTask, t2: QueuedTask): Int =
t2.app.constraints.size compare t1.app.constraints.size
}

protected[tasks] implicit class AtomicIntDecrementIfPositive(val value: AtomicInteger) extends AnyVal {
@tailrec
final def decrementIfPositive(): Boolean = {
val num = value.get()
if (num <= 0) {
false
}
else if (value.compareAndSet(num, num - 1)) {
true
}
else {
decrementIfPositive()
}
}
}
}

/**
* Utility class to stage tasks before they get scheduled
Expand All @@ -17,58 +42,84 @@ class TaskQueue {

import mesosphere.marathon.tasks.TaskQueue._

private val log = Logger.getLogger(getClass)
protected[marathon] val rateLimiter = new RateLimiter

// we used SynchronizedPriorityQueue before, but it has been deprecated
// because it is not safe to use
protected[tasks] var queue =
new PriorityBlockingQueue[QueuedTask](11, AppConstraintsOrdering.reverse)
protected[tasks] var apps = TrieMap.empty[(PathId, Timestamp), QueuedTask]

def list: Seq[QueuedTask] = queue.asScala.to[scala.collection.immutable.Seq]
def list: Seq[QueuedTask] = apps.values.to[Seq]

def listApps: Seq[AppDefinition] = list.map(_.app)

def poll(): Option[QueuedTask] = Option(queue.poll())
def poll(): Option[QueuedTask] = {
// TODO: make prioritization pluggable
// Marathon prioritizes tasks by number of constraints, so we have to sort here
apps.values.toSeq.sorted.find {
case QueuedTask(_, count) => count.decrementIfPositive()
}
}

def add(app: AppDefinition): Unit =
queue.add(QueuedTask(app, rateLimiter.getDelay(app)))
def add(app: AppDefinition): Unit = add(app, 1)

def add(app: AppDefinition, count: Int): Unit = {
val queuedTask = apps.getOrElseUpdate(
(app.id, app.version),
QueuedTask(app, new AtomicInteger(0)))
queuedTask.count.addAndGet(count)
}

// TODO: should only return the count for the same version
/**
* Number of tasks in the queue for the given app
*
* @param app The app
* @return count
*/
def count(app: AppDefinition): Int = queue.asScala.count(_.app.id == app.id)
def count(app: AppDefinition): Int = apps.values.foldLeft(0) {
case (count, task) if task.app.id == app.id => count + task.count.get()
case (count, _) => count
}

def purge(appId: PathId): Unit = {
val retained = queue.asScala.filterNot(_.app.id == appId)
removeAll()
queue.addAll(retained.asJavaCollection)
for {
QueuedTask(app, _) <- apps.values
if app.id == appId
} {
apps.remove(app.id -> app.version)
rateLimiter.resetDelay(app)
}
}

/**
* Retains only elements that satisfy the supplied predicate.
*/
def retain(f: (QueuedTask => Boolean)): Unit =
queue.iterator.asScala.foreach { qt => if (!f(qt)) queue.remove(qt) }

def addAll(xs: Seq[QueuedTask]): Unit = queue.addAll(xs.asJavaCollection)

def removeAll(): Seq[QueuedTask] = {
val builder = new java.util.ArrayList[QueuedTask]()
queue.drainTo(builder)
builder.asScala.to[Seq]
}

}

object TaskQueue {

protected[marathon] case class QueuedTask(app: AppDefinition, delay: Deadline)

protected object AppConstraintsOrdering extends Ordering[QueuedTask] {
def compare(t1: QueuedTask, t2: QueuedTask): Int =
t1.app.constraints.size compare t2.app.constraints.size
apps.values.foreach {
case qt @ QueuedTask(app, _) => if (!f(qt)) apps.remove(app.id -> app.version)
}

def pollMatching[B](f: AppDefinition => Option[B]): Option[B] = {
val sorted = apps.values.toList.sorted

@tailrec
def findMatching(xs: List[QueuedTask]): Option[B] = xs match {
case Nil => None
case head :: tail => head match {
case QueuedTask(app, _) if rateLimiter.getDelay(app).hasTimeLeft() =>
log.info(s"Delaying ${app.id} due to backoff. Time left: ${rateLimiter.getDelay(app).timeLeft}.")
findMatching(tail)

case QueuedTask(app, count) =>
val res = f(app)
if (res.isDefined && count.decrementIfPositive()) {
res
}
else {
findMatching(tail)
}
}
}

findMatching(sorted)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ trait StartingBehavior { this: Actor with ActorLogging =>
val actualSize = taskQueue.count(app) + taskTracker.count(app.id)

if (actualSize < expectedSize) {
for (_ <- 0 until (expectedSize - actualSize)) taskQueue.add(app)
taskQueue.add(app, expectedSize - actualSize)
}
context.system.scheduler.scheduleOnce(5.seconds, self, Sync)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class TaskReplaceActor(
driver.killTask(taskId)
}

for (_ <- 0 until app.instances) taskQueue.add(app)
taskQueue.add(app, app.instances)
}

override def postStop(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class TaskStartActor(
var running: Int = 0

override def initializeStart(): Unit = {
for (_ <- 0 until nrToStart) taskQueue.add(app)
taskQueue.add(app, nrToStart)
}

override def postStop(): Unit = {
Expand Down
20 changes: 10 additions & 10 deletions src/main/scala/mesosphere/util/RateLimiter.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package mesosphere.util

import mesosphere.marathon.state.{ AppDefinition, PathId }
import mesosphere.marathon.state.{ Timestamp, AppDefinition, PathId }

import scala.concurrent.duration._
import scala.util.Try
Expand All @@ -17,14 +17,14 @@ class RateLimiter {

protected[this] val maxLaunchDelay = 1.hour

protected[this] var taskLaunchDelays = Map[PathId, Delay]()
protected[this] var taskLaunchDelays = Map[(PathId, Timestamp), Delay]()

def getDelay(app: AppDefinition): Deadline =
taskLaunchDelays.get(app.id).map(_.current.fromNow) getOrElse Deadline.now
taskLaunchDelays.get(app.id -> app.version).map(_.current.fromNow) getOrElse Deadline.now

def addDelay(app: AppDefinition): Unit = {
val newDelay = taskLaunchDelays.get(app.id) match {
case Some(Delay(current, future)) => Delay(future.next, future)
val newDelay = taskLaunchDelays.get(app.id -> app.version) match {
case Some(Delay(current, future)) => Delay(future.next(), future)
case None => Delay(
app.backoff,
durations(app.backoff, app.backoffFactor)
Expand All @@ -33,13 +33,13 @@ class RateLimiter {

log.info(s"Task launch delay for [${app.id}] is now [${newDelay.current.toSeconds}] seconds")

taskLaunchDelays = taskLaunchDelays + (app.id -> newDelay)
taskLaunchDelays += ((app.id, app.version) -> newDelay)
}

def resetDelay(appId: PathId): Unit = {
if (taskLaunchDelays contains appId)
log.info(s"Task launch delay for [${appId}] reset to zero")
taskLaunchDelays = taskLaunchDelays - appId
def resetDelay(app: AppDefinition): Unit = {
if (taskLaunchDelays contains (app.id -> app.version))
log.info(s"Task launch delay for [${app.id} - ${app.version}}] reset to zero")
taskLaunchDelays = taskLaunchDelays - (app.id -> app.version)
}

/**
Expand Down
Loading

0 comments on commit 3c26da7

Please sign in to comment.