From 646e0fd300935169b0600ac22cf5fa0d1a94aa33 Mon Sep 17 00:00:00 2001 From: Dario Rexin Date: Mon, 5 Jan 2015 17:59:25 +0100 Subject: [PATCH 1/4] fixes #902 - optimized task queueing behavior --- .../marathon/MarathonScheduler.scala | 42 +++------ .../marathon/MarathonSchedulerActor.scala | 3 +- .../mesosphere/marathon/tasks/TaskQueue.scala | 88 +++++++++++++------ .../marathon/upgrade/StartingBehavior.scala | 2 +- .../marathon/upgrade/TaskReplaceActor.scala | 2 +- .../marathon/upgrade/TaskStartActor.scala | 2 +- .../marathon/MarathonSchedulerActorTest.scala | 4 +- .../marathon/MarathonSchedulerTest.scala | 13 ++- .../marathon/tasks/TaskQueueTest.scala | 41 ++++----- .../upgrade/DeploymentActorTest.scala | 20 ++--- .../marathon/upgrade/TaskStartActorTest.scala | 21 ++--- 11 files changed, 127 insertions(+), 111 deletions(-) diff --git a/src/main/scala/mesosphere/marathon/MarathonScheduler.scala b/src/main/scala/mesosphere/marathon/MarathonScheduler.scala index e1785dc1e78..01986e9f921 100644 --- a/src/main/scala/mesosphere/marathon/MarathonScheduler.scala +++ b/src/main/scala/mesosphere/marathon/MarathonScheduler.scala @@ -1,5 +1,6 @@ package mesosphere.marathon +import java.util import javax.inject.{ Inject, Named } import akka.actor.{ ActorRef, ActorSystem } @@ -97,7 +98,7 @@ class MarathonScheduler @Inject() ( Await.result(appRepo.currentAppVersions(), config.zkTimeoutDuration) taskQueue.retain { - case QueuedTask(app, _) => + case QueuedTask(app, _, _) => appVersions.get(app.id) contains app.version } @@ -105,44 +106,26 @@ class MarathonScheduler @Inject() ( try { log.debug("Received offer %s".format(offer)) - val queuedTasks: Seq[QueuedTask] = taskQueue.removeAll() - - val withTaskInfos: collection.Seq[(QueuedTask, (TaskInfo, Seq[Long]))] = - queuedTasks.view.flatMap { case qt => newTask(qt.app, offer).map(qt -> _) } - - val launchedTask = withTaskInfos.find { - case (qt, (taskInfo, ports)) => - val timeLeft = qt.delay.timeLeft - if (timeLeft.toNanos <= 0) { - true - } - else { - log.info(s"Delaying task ${taskInfo.getTaskId.getValue} due to backoff. Time left: $timeLeft.") - false - } + val matchingTask = taskQueue.pollMatching { app => + newTask(app, offer).map(app -> _) } - launchedTask.foreach { - case (qt, (taskInfo, ports)) => - val taskInfos = Seq(taskInfo) - log.debug("Launching tasks: " + taskInfos) - + matchingTask.foreach { + case (app, (taskInfo, ports)) => val marathonTask = MarathonTasks.makeTask( taskInfo.getTaskId.getValue, offer.getHostname, ports, - offer.getAttributesList.asScala, qt.app.version) + offer.getAttributesList.asScala, app.version) + + log.debug("Launching task: " + taskInfo) - taskTracker.created(qt.app.id, marathonTask) - driver.launchTasks(Seq(offer.getId).asJava, taskInfos.asJava) + taskTracker.created(app.id, marathonTask) + driver.launchTasks(Seq(offer.getId).asJava, util.Arrays.asList(taskInfo)) // here it is assumed that the health checks for the current // version are already running. } - // put unscheduled tasks back in the queue - val launchedTaskSeq: Seq[QueuedTask] = launchedTask.map(_._1).to[Seq] - taskQueue.addAll(queuedTasks diff launchedTaskSeq) - - if (launchedTask.isEmpty) { + if (matchingTask.isEmpty) { log.debug("Offer doesn't match request. Declining.") driver.declineOffer(offer.getId) } @@ -289,7 +272,6 @@ class MarathonScheduler @Inject() ( private def newTask( app: AppDefinition, offer: Offer): Option[(TaskInfo, Seq[Long])] = { - // TODO this should return a MarathonTask new TaskBuilder(app, taskIdUtil.newTaskId, taskTracker, config, mapper).buildIfMatches(offer) } } diff --git a/src/main/scala/mesosphere/marathon/MarathonSchedulerActor.scala b/src/main/scala/mesosphere/marathon/MarathonSchedulerActor.scala index cec03fbf568..6ed6c5ef760 100644 --- a/src/main/scala/mesosphere/marathon/MarathonSchedulerActor.scala +++ b/src/main/scala/mesosphere/marathon/MarathonSchedulerActor.scala @@ -483,8 +483,7 @@ class SchedulerActions( if (toQueue > 0) { log.info(s"Queueing $toQueue new tasks for ${app.id} ($queuedCount queued)") - for (i <- 0 until toQueue) - taskQueue.add(app) + taskQueue.add(app, toQueue) } else { log.info(s"Already queued $queuedCount tasks for ${app.id}. Not scaling.") diff --git a/src/main/scala/mesosphere/marathon/tasks/TaskQueue.scala b/src/main/scala/mesosphere/marathon/tasks/TaskQueue.scala index aba37b232e7..06fe266eb21 100644 --- a/src/main/scala/mesosphere/marathon/tasks/TaskQueue.scala +++ b/src/main/scala/mesosphere/marathon/tasks/TaskQueue.scala @@ -1,14 +1,15 @@ package mesosphere.marathon.tasks -import java.util.concurrent.PriorityBlockingQueue +import java.util.concurrent.atomic.AtomicInteger -import mesosphere.marathon.state.{ AppDefinition, PathId } +import mesosphere.marathon.state.{ Timestamp, AppDefinition, PathId } import mesosphere.util.RateLimiter +import org.apache.log4j.Logger +import scala.annotation.tailrec +import scala.collection.concurrent.TrieMap import scala.concurrent.duration.Deadline -import scala.collection.mutable import scala.collection.immutable.Seq -import scala.collection.JavaConverters._ /** * Utility class to stage tasks before they get scheduled @@ -17,21 +18,31 @@ class TaskQueue { import mesosphere.marathon.tasks.TaskQueue._ + private val log = Logger.getLogger(getClass) protected[marathon] val rateLimiter = new RateLimiter - // we used SynchronizedPriorityQueue before, but it has been deprecated - // because it is not safe to use - protected[tasks] var queue = - new PriorityBlockingQueue[QueuedTask](11, AppConstraintsOrdering.reverse) + protected[tasks] var apps = TrieMap.empty[(PathId, Timestamp), QueuedTask] - def list: Seq[QueuedTask] = queue.asScala.to[scala.collection.immutable.Seq] + def list: Seq[QueuedTask] = apps.values.to[Seq] def listApps: Seq[AppDefinition] = list.map(_.app) - def poll(): Option[QueuedTask] = Option(queue.poll()) - - def add(app: AppDefinition): Unit = - queue.add(QueuedTask(app, rateLimiter.getDelay(app))) + def poll(): Option[QueuedTask] = + apps.values.toSeq.sortWith { + case (a, b) => + a.app.constraints.size > b.app.constraints.size + }.find { + case QueuedTask(_, count, _) => count.decrementAndGet() >= 0 + } + + def add(app: AppDefinition): Unit = add(app, 1) + + def add(app: AppDefinition, count: Int): Unit = { + val queuedTask = apps.getOrElseUpdate( + (app.id, app.version), + QueuedTask(app, new AtomicInteger(0), rateLimiter.getDelay(app))) + queuedTask.count.addAndGet(count) + } /** * Number of tasks in the queue for the given app @@ -39,33 +50,56 @@ class TaskQueue { * @param app The app * @return count */ - def count(app: AppDefinition): Int = queue.asScala.count(_.app.id == app.id) + def count(app: AppDefinition): Int = apps.get((app.id, app.version)).map(_.count.get()).getOrElse(0) def purge(appId: PathId): Unit = { - val retained = queue.asScala.filterNot(_.app.id == appId) - removeAll() - queue.addAll(retained.asJavaCollection) + for { + QueuedTask(app, _, _) <- apps.values + if app.id == appId + } apps.remove(app.id -> app.version) } /** * Retains only elements that satisfy the supplied predicate. */ def retain(f: (QueuedTask => Boolean)): Unit = - queue.iterator.asScala.foreach { qt => if (!f(qt)) queue.remove(qt) } - - def addAll(xs: Seq[QueuedTask]): Unit = queue.addAll(xs.asJavaCollection) - - def removeAll(): Seq[QueuedTask] = { - val builder = new java.util.ArrayList[QueuedTask]() - queue.drainTo(builder) - builder.asScala.to[Seq] + apps.values.foreach { + case qt @ QueuedTask(app, _, _) => if (!f(qt)) apps.remove(app.id -> app.version) + } + + def pollMatching[B](f: AppDefinition => Option[B]): Option[B] = { + val sorted = apps.values.toList.sortWith { (a, b) => + a.app.constraints.size > b.app.constraints.size + } + + @tailrec + def findMatching(xs: List[QueuedTask]): Option[B] = xs match { + case Nil => None + case head :: tail => head match { + case QueuedTask(app, _, delay) if delay.hasTimeLeft() => + log.info(s"Delaying ${app.id} due to backoff. Time left: ${delay.timeLeft}.") + findMatching(tail) + + case QueuedTask(app, count, delay) => + val res = f(app) + if (res.isDefined && count.decrementAndGet() >= 0) { + res + } + else { + // app count is 0, so we can remove this app from the queue + apps.remove(app.id -> app.version) + findMatching(tail) + } + } + } + + findMatching(sorted) } - } object TaskQueue { - protected[marathon] case class QueuedTask(app: AppDefinition, delay: Deadline) + protected[marathon] case class QueuedTask(app: AppDefinition, count: AtomicInteger, delay: Deadline) protected object AppConstraintsOrdering extends Ordering[QueuedTask] { def compare(t1: QueuedTask, t2: QueuedTask): Int = diff --git a/src/main/scala/mesosphere/marathon/upgrade/StartingBehavior.scala b/src/main/scala/mesosphere/marathon/upgrade/StartingBehavior.scala index 34db1419ba0..76fb7954a5d 100644 --- a/src/main/scala/mesosphere/marathon/upgrade/StartingBehavior.scala +++ b/src/main/scala/mesosphere/marathon/upgrade/StartingBehavior.scala @@ -75,7 +75,7 @@ trait StartingBehavior { this: Actor with ActorLogging => val actualSize = taskQueue.count(app) + taskTracker.count(app.id) if (actualSize < expectedSize) { - for (_ <- 0 until (expectedSize - actualSize)) taskQueue.add(app) + taskQueue.add(app, expectedSize - actualSize) } context.system.scheduler.scheduleOnce(5.seconds, self, Sync) } diff --git a/src/main/scala/mesosphere/marathon/upgrade/TaskReplaceActor.scala b/src/main/scala/mesosphere/marathon/upgrade/TaskReplaceActor.scala index 98e080f7b9a..b5fea4a58df 100644 --- a/src/main/scala/mesosphere/marathon/upgrade/TaskReplaceActor.scala +++ b/src/main/scala/mesosphere/marathon/upgrade/TaskReplaceActor.scala @@ -44,7 +44,7 @@ class TaskReplaceActor( driver.killTask(taskId) } - for (_ <- 0 until app.instances) taskQueue.add(app) + taskQueue.add(app, app.instances) } override def postStop(): Unit = { diff --git a/src/main/scala/mesosphere/marathon/upgrade/TaskStartActor.scala b/src/main/scala/mesosphere/marathon/upgrade/TaskStartActor.scala index 515a94245b8..6a27c0b2ad9 100644 --- a/src/main/scala/mesosphere/marathon/upgrade/TaskStartActor.scala +++ b/src/main/scala/mesosphere/marathon/upgrade/TaskStartActor.scala @@ -23,7 +23,7 @@ class TaskStartActor( var running: Int = 0 override def initializeStart(): Unit = { - for (_ <- 0 until nrToStart) taskQueue.add(app) + taskQueue.add(app, nrToStart) } override def postStop(): Unit = { diff --git a/src/test/scala/mesosphere/marathon/MarathonSchedulerActorTest.scala b/src/test/scala/mesosphere/marathon/MarathonSchedulerActorTest.scala index f12936bf70e..6d5a83529eb 100644 --- a/src/test/scala/mesosphere/marathon/MarathonSchedulerActorTest.scala +++ b/src/test/scala/mesosphere/marathon/MarathonSchedulerActorTest.scala @@ -144,7 +144,7 @@ class MarathonSchedulerActorTest extends TestKit(ActorSystem("System")) awaitAssert({ verify(tracker).shutdown("nope".toPath) - verify(queue).add(app) + verify(queue).add(app, 1) verify(driver).killTask(TaskID("task_a")) }, 5.seconds, 10.millis) } @@ -168,7 +168,7 @@ class MarathonSchedulerActorTest extends TestKit(ActorSystem("System")) schedulerActor ! ScaleApp("test-app".toPath) awaitAssert({ - verify(queue).add(app) + verify(queue).add(app, 1) }, 5.seconds, 10.millis) expectMsg(5.seconds, AppScaled(app.id)) diff --git a/src/test/scala/mesosphere/marathon/MarathonSchedulerTest.scala b/src/test/scala/mesosphere/marathon/MarathonSchedulerTest.scala index dc8149e70df..54379a3c28f 100644 --- a/src/test/scala/mesosphere/marathon/MarathonSchedulerTest.scala +++ b/src/test/scala/mesosphere/marathon/MarathonSchedulerTest.scala @@ -1,5 +1,7 @@ package mesosphere.marathon +import java.util.concurrent.atomic.AtomicInteger + import akka.actor.ActorSystem import akka.event.EventStream import akka.testkit.{ TestKit, TestProbe } @@ -45,7 +47,7 @@ class MarathonSchedulerTest extends TestKit(ActorSystem("System")) with Marathon repo = mock[AppRepository] hcManager = mock[HealthCheckManager] tracker = mock[TaskTracker] - queue = mock[TaskQueue] + queue = spy(new TaskQueue) frameworkIdUtil = mock[FrameworkIdUtil] config = defaultConfig() taskIdUtil = mock[TaskIdUtil] @@ -81,17 +83,15 @@ class MarathonSchedulerTest extends TestKit(ActorSystem("System")) with Marathon ports = Seq(8080), version = now ) - val queuedTask = QueuedTask(app, Deadline.now) + val queuedTask = QueuedTask(app, new AtomicInteger(app.instances), Deadline.now) val list = Vector(queuedTask) val allApps = Vector(app) + queue.add(app) + when(taskIdUtil.newTaskId("testOffers".toRootPath)) .thenReturn(TaskID.newBuilder.setValue("testOffers_0-1234").build) when(tracker.checkStagedTasks).thenReturn(Seq()) - when(queue.poll()).thenReturn(Some(queuedTask)) - when(queue.list).thenReturn(list) - when(queue.removeAll()).thenReturn(list) - when(queue.listApps).thenReturn(allApps) when(repo.currentAppVersions()) .thenReturn(Future.successful(Map(app.id -> app.version))) @@ -103,7 +103,6 @@ class MarathonSchedulerTest extends TestKit(ActorSystem("System")) with Marathon verify(driver).launchTasks(offersCaptor.capture(), taskInfosCaptor.capture()) verify(tracker).created(same(app.id), marathonTaskCaptor.capture()) - verify(queue).addAll(Seq.empty) assert(1 == offersCaptor.getValue.size()) assert(offer.getId == offersCaptor.getValue.get(0)) diff --git a/src/test/scala/mesosphere/marathon/tasks/TaskQueueTest.scala b/src/test/scala/mesosphere/marathon/tasks/TaskQueueTest.scala index 5954f38bb3c..47b234461a3 100644 --- a/src/test/scala/mesosphere/marathon/tasks/TaskQueueTest.scala +++ b/src/test/scala/mesosphere/marathon/tasks/TaskQueueTest.scala @@ -7,9 +7,6 @@ import mesosphere.marathon.state.AppDefinition import mesosphere.marathon.state.PathId.StringPathId import mesosphere.marathon.tasks.TaskQueue.QueuedTask -import scala.collection.immutable.Seq -import scala.concurrent.duration.Deadline - class TaskQueueTest extends MarathonSpec { val app1 = AppDefinition(id = "app1".toPath, constraints = Set.empty) val app2 = AppDefinition(id = "app2".toPath, constraints = Set(buildConstraint("hostname", "UNIQUE"), buildConstraint("rack_id", "CLUSTER", "rack-1"))) @@ -46,33 +43,37 @@ class TaskQueueTest extends MarathonSpec { queue.add(app3) assert(queue.list.size == 3, "Queue should contain 3 elements.") - queue.retain { case QueuedTask(app, _) => app.id == app2.id } + queue.retain { case QueuedTask(app, _, _) => app.id == app2.id } assert(queue.list.size == 1, "Queue should contain 1 elements.") } - test("RemoveAll") { + test("pollMatching") { queue.add(app1) queue.add(app2) queue.add(app3) - val res = queue.removeAll().map(_.app) - - assert(Vector(app2, app3, app1) == res, s"Should return all elements in correct order.") - assert(queue.queue.isEmpty, "TaskQueue should be empty.") + assert(Some(app1) == queue.pollMatching { + case x if x.id == "app1".toPath => Some(x) + case _ => None + }) } - test("AddAll") { - val queue = new TaskQueue + test("pollMatching Priority") { + queue.add(app1) + queue.add(app2) + queue.add(app3) - queue.addAll(Seq( - QueuedTask(app1, Deadline.now), - QueuedTask(app2, Deadline.now), - QueuedTask(app3, Deadline.now) - )) + assert(Some(app2) == queue.pollMatching(Some(_))) + } - assert(queue.list.size == 3, "Queue should contain 3 elements.") - assert(queue.count(app1) == 1, s"Queue should contain $app1.") - assert(queue.count(app2) == 1, s"Queue should contain $app2.") - assert(queue.count(app3) == 1, s"Queue should contain $app3.") + test("pollMatching no match") { + queue.add(app1) + queue.add(app2) + queue.add(app3) + + assert(None == queue.pollMatching { + case x if x.id == "DOES_NOT_EXIST".toPath => Some(x) + case _ => None + }) } } diff --git a/src/test/scala/mesosphere/marathon/upgrade/DeploymentActorTest.scala b/src/test/scala/mesosphere/marathon/upgrade/DeploymentActorTest.scala index d8caea3d300..d0393835e7d 100644 --- a/src/test/scala/mesosphere/marathon/upgrade/DeploymentActorTest.scala +++ b/src/test/scala/mesosphere/marathon/upgrade/DeploymentActorTest.scala @@ -18,7 +18,7 @@ import mesosphere.mesos.protos.Implicits._ import mesosphere.mesos.protos.TaskID import org.apache.mesos.Protos.Status import org.apache.mesos.SchedulerDriver -import org.mockito.Matchers.any +import org.mockito.Matchers.{ any, same } import org.mockito.Mockito.{ times, verify, when } import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -100,9 +100,11 @@ class DeploymentActorTest } }) - when(queue.add(app2New)).thenAnswer(new Answer[Boolean] { + when(queue.add(same(app2New), any[Int])).thenAnswer(new Answer[Boolean] { def answer(invocation: InvocationOnMock): Boolean = { - system.eventStream.publish(MesosStatusUpdateEvent("", UUID.randomUUID().toString, "TASK_RUNNING", "", app2.id, "", Nil, app2New.version.toString)) + println(invocation.getArguments.toSeq) + for (i <- 0 until invocation.getArguments()(1).asInstanceOf[Int]) + system.eventStream.publish(MesosStatusUpdateEvent("", UUID.randomUUID().toString, "TASK_RUNNING", "", app2.id, "", Nil, app2New.version.toString)) true } }) @@ -194,17 +196,15 @@ class DeploymentActorTest }) val taskIDs = Iterator.from(3) - var taskCount = 0 when(queue.count(appNew)).thenAnswer(new Answer[Int] { - override def answer(p1: InvocationOnMock): Int = taskCount + override def answer(p1: InvocationOnMock): Int = appNew.instances }) - when(queue.add(appNew)).thenAnswer(new Answer[Boolean] { + when(queue.add(same(appNew), any[Int])).thenAnswer(new Answer[Boolean] { def answer(invocation: InvocationOnMock): Boolean = { - if (taskCount >= 2) throw new Exception("Too many invocations.") - taskCount += 1 - system.eventStream.publish(MesosStatusUpdateEvent("", s"task1_${taskIDs.next()}", "TASK_RUNNING", "", app.id, "", Nil, appNew.version.toString)) + for (i <- 0 until invocation.getArguments()(1).asInstanceOf[Int]) + system.eventStream.publish(MesosStatusUpdateEvent("", s"task1_${taskIDs.next()}", "TASK_RUNNING", "", app.id, "", Nil, appNew.version.toString)) true } }) @@ -233,7 +233,7 @@ class DeploymentActorTest verify(driver).killTask(TaskID(task1_1.getId)) verify(driver).killTask(TaskID(task1_2.getId)) - verify(queue, times(2)).add(appNew) + verify(queue).add(appNew, 2) } finally { system.shutdown() diff --git a/src/test/scala/mesosphere/marathon/upgrade/TaskStartActorTest.scala b/src/test/scala/mesosphere/marathon/upgrade/TaskStartActorTest.scala index 7d377a77948..eea4b96140e 100644 --- a/src/test/scala/mesosphere/marathon/upgrade/TaskStartActorTest.scala +++ b/src/test/scala/mesosphere/marathon/upgrade/TaskStartActorTest.scala @@ -3,14 +3,14 @@ package mesosphere.marathon.upgrade import akka.actor.{ ActorSystem, Props } import akka.testkit.{ TestActorRef, TestKit } import com.codahale.metrics.MetricRegistry -import mesosphere.marathon.{ MarathonConf, SchedulerActions, TaskUpgradeCanceledException } import mesosphere.marathon.event.{ HealthStatusChanged, MesosStatusUpdateEvent } import mesosphere.marathon.state.AppDefinition import mesosphere.marathon.state.PathId._ -import mesosphere.marathon.tasks.{ TaskTracker, TaskQueue } +import mesosphere.marathon.tasks.{ TaskQueue, TaskTracker } +import mesosphere.marathon.{ MarathonConf, SchedulerActions, TaskUpgradeCanceledException } import org.apache.mesos.SchedulerDriver import org.apache.mesos.state.InMemoryState -import org.mockito.Mockito.{ spy, times, verify } +import org.mockito.Mockito.{ times, spy, verify } import org.scalatest.mock.MockitoSugar import org.scalatest.{ BeforeAndAfterAll, FunSuiteLike, Matchers } @@ -54,7 +54,7 @@ class TaskStartActorTest awaitCond(taskQueue.count(app) == 5, 3.seconds) - for ((task, i) <- taskQueue.removeAll().zipWithIndex) + for (i <- 0 until taskQueue.count(app)) system.eventStream.publish(MesosStatusUpdateEvent("", s"task-$i", "TASK_RUNNING", "", app.id, "", Nil, app.version.toString)) Await.result(promise.future, 3.seconds) should be(()) @@ -115,8 +115,8 @@ class TaskStartActorTest awaitCond(taskQueue.count(app) == 5, 3.seconds) - for ((_, i) <- taskQueue.removeAll().zipWithIndex) - system.eventStream.publish(HealthStatusChanged(app.id, s"task_${i}", app.version.toString, true)) + for (i <- 0 until taskQueue.count(app)) + system.eventStream.publish(HealthStatusChanged(app.id, s"task_$i", app.version.toString, alive = true)) Await.result(promise.future, 3.seconds) should be(()) @@ -208,14 +208,15 @@ class TaskStartActorTest awaitCond(taskQueue.count(app) == 1, 3.seconds) - for (task <- taskQueue.removeAll()) - system.eventStream.publish(MesosStatusUpdateEvent("", "", "TASK_FAILED", "", app.id, "", Nil, app.version.toString)) + taskQueue.purge(app.id) + + system.eventStream.publish(MesosStatusUpdateEvent("", "", "TASK_FAILED", "", app.id, "", Nil, app.version.toString)) awaitCond(taskQueue.count(app) == 1, 3.seconds) - verify(taskQueue, times(2)).add(app) + verify(taskQueue, times(2)).add(app, 1) - for (task <- taskQueue.removeAll()) + for (i <- 0 until taskQueue.count(app)) system.eventStream.publish(MesosStatusUpdateEvent("", "", "TASK_RUNNING", "", app.id, "", Nil, app.version.toString)) Await.result(promise.future, 3.seconds) should be(()) From 6b24290007d0f3b897e31de31a2f7c6fdf808177 Mon Sep 17 00:00:00 2001 From: drexin Date: Tue, 6 Jan 2015 16:03:14 +0100 Subject: [PATCH 2/4] changed according to feedback and fixed a possible race in TaskQueue --- .../marathon/MarathonScheduler.scala | 2 +- .../mesosphere/marathon/tasks/TaskQueue.scala | 73 +++++++++++-------- .../marathon/MarathonSchedulerTest.scala | 9 +-- .../marathon/tasks/TaskQueueTest.scala | 12 ++- 4 files changed, 55 insertions(+), 41 deletions(-) diff --git a/src/main/scala/mesosphere/marathon/MarathonScheduler.scala b/src/main/scala/mesosphere/marathon/MarathonScheduler.scala index 01986e9f921..82c30b9c17e 100644 --- a/src/main/scala/mesosphere/marathon/MarathonScheduler.scala +++ b/src/main/scala/mesosphere/marathon/MarathonScheduler.scala @@ -98,7 +98,7 @@ class MarathonScheduler @Inject() ( Await.result(appRepo.currentAppVersions(), config.zkTimeoutDuration) taskQueue.retain { - case QueuedTask(app, _, _) => + case QueuedTask(app, _) => appVersions.get(app.id) contains app.version } diff --git a/src/main/scala/mesosphere/marathon/tasks/TaskQueue.scala b/src/main/scala/mesosphere/marathon/tasks/TaskQueue.scala index 06fe266eb21..10c1eb06275 100644 --- a/src/main/scala/mesosphere/marathon/tasks/TaskQueue.scala +++ b/src/main/scala/mesosphere/marathon/tasks/TaskQueue.scala @@ -11,6 +11,31 @@ import scala.collection.concurrent.TrieMap import scala.concurrent.duration.Deadline import scala.collection.immutable.Seq +object TaskQueue { + protected[marathon] case class QueuedTask(app: AppDefinition, count: AtomicInteger) + + protected[tasks] implicit object AppConstraintsOrdering extends Ordering[QueuedTask] { + def compare(t1: QueuedTask, t2: QueuedTask): Int = + t2.app.constraints.size compare t1.app.constraints.size + } + + protected[tasks] implicit class AtomicIntDecrementIfPositive(val value: AtomicInteger) extends AnyVal { + @tailrec + final def decrementIfPositive(): Boolean = { + val num = value.get() + if (num <= 0) { + false + } + else if (value.compareAndSet(num, num - 1)) { + true + } + else { + decrementIfPositive() + } + } + } +} + /** * Utility class to stage tasks before they get scheduled */ @@ -27,34 +52,38 @@ class TaskQueue { def listApps: Seq[AppDefinition] = list.map(_.app) - def poll(): Option[QueuedTask] = - apps.values.toSeq.sortWith { - case (a, b) => - a.app.constraints.size > b.app.constraints.size - }.find { - case QueuedTask(_, count, _) => count.decrementAndGet() >= 0 + def poll(): Option[QueuedTask] = { + // TODO: make prioritization pluggable + // Marathon prioritizes tasks by number of constraints, so we have to sort here + apps.values.toSeq.sorted.find { + case QueuedTask(_, count) => count.decrementIfPositive() } + } def add(app: AppDefinition): Unit = add(app, 1) def add(app: AppDefinition, count: Int): Unit = { val queuedTask = apps.getOrElseUpdate( (app.id, app.version), - QueuedTask(app, new AtomicInteger(0), rateLimiter.getDelay(app))) + QueuedTask(app, new AtomicInteger(0))) queuedTask.count.addAndGet(count) } + // TODO: should only return the count for the same version /** * Number of tasks in the queue for the given app * * @param app The app * @return count */ - def count(app: AppDefinition): Int = apps.get((app.id, app.version)).map(_.count.get()).getOrElse(0) + def count(app: AppDefinition): Int = apps.values.foldLeft(0) { + case (count, task) if task.app.id == app.id => count + task.count.get() + case (count, _) => count + } def purge(appId: PathId): Unit = { for { - QueuedTask(app, _, _) <- apps.values + QueuedTask(app, _) <- apps.values if app.id == appId } apps.remove(app.id -> app.version) } @@ -64,30 +93,26 @@ class TaskQueue { */ def retain(f: (QueuedTask => Boolean)): Unit = apps.values.foreach { - case qt @ QueuedTask(app, _, _) => if (!f(qt)) apps.remove(app.id -> app.version) + case qt @ QueuedTask(app, _) => if (!f(qt)) apps.remove(app.id -> app.version) } def pollMatching[B](f: AppDefinition => Option[B]): Option[B] = { - val sorted = apps.values.toList.sortWith { (a, b) => - a.app.constraints.size > b.app.constraints.size - } + val sorted = apps.values.toList.sorted @tailrec def findMatching(xs: List[QueuedTask]): Option[B] = xs match { case Nil => None case head :: tail => head match { - case QueuedTask(app, _, delay) if delay.hasTimeLeft() => - log.info(s"Delaying ${app.id} due to backoff. Time left: ${delay.timeLeft}.") + case QueuedTask(app, _) if rateLimiter.getDelay(app).hasTimeLeft() => + log.info(s"Delaying ${app.id} due to backoff. Time left: ${rateLimiter.getDelay(app).timeLeft}.") findMatching(tail) - case QueuedTask(app, count, delay) => + case QueuedTask(app, count) => val res = f(app) - if (res.isDefined && count.decrementAndGet() >= 0) { + if (res.isDefined && count.decrementIfPositive()) { res } else { - // app count is 0, so we can remove this app from the queue - apps.remove(app.id -> app.version) findMatching(tail) } } @@ -96,13 +121,3 @@ class TaskQueue { findMatching(sorted) } } - -object TaskQueue { - - protected[marathon] case class QueuedTask(app: AppDefinition, count: AtomicInteger, delay: Deadline) - - protected object AppConstraintsOrdering extends Ordering[QueuedTask] { - def compare(t1: QueuedTask, t2: QueuedTask): Int = - t1.app.constraints.size compare t2.app.constraints.size - } -} diff --git a/src/test/scala/mesosphere/marathon/MarathonSchedulerTest.scala b/src/test/scala/mesosphere/marathon/MarathonSchedulerTest.scala index 54379a3c28f..c3c9466a0bb 100644 --- a/src/test/scala/mesosphere/marathon/MarathonSchedulerTest.scala +++ b/src/test/scala/mesosphere/marathon/MarathonSchedulerTest.scala @@ -1,7 +1,5 @@ package mesosphere.marathon -import java.util.concurrent.atomic.AtomicInteger - import akka.actor.ActorSystem import akka.event.EventStream import akka.testkit.{ TestKit, TestProbe } @@ -13,7 +11,6 @@ import mesosphere.marathon.event.{ SchedulerRegisteredEvent, SchedulerReregister import mesosphere.marathon.health.HealthCheckManager import mesosphere.marathon.state.PathId._ import mesosphere.marathon.state.{ AppDefinition, AppRepository, Timestamp } -import mesosphere.marathon.tasks.TaskQueue.QueuedTask import mesosphere.marathon.tasks.{ TaskIdUtil, TaskQueue, TaskTracker } import mesosphere.mesos.util.FrameworkIdUtil import org.apache.mesos.Protos._ @@ -26,7 +23,6 @@ import org.scalatest.BeforeAndAfterAll import scala.collection.JavaConverters._ import scala.collection.immutable.Seq import scala.concurrent.Future -import scala.concurrent.duration.Deadline class MarathonSchedulerTest extends TestKit(ActorSystem("System")) with MarathonSpec with BeforeAndAfterAll { @@ -76,16 +72,13 @@ class MarathonSchedulerTest extends TestKit(ActorSystem("System")) with Marathon val driver = mock[SchedulerDriver] val offer = makeBasicOffer(cpus = 4, mem = 1024, disk = 4000, beginPort = 31000, endPort = 32000).build val offers = Lists.newArrayList(offer) - val now = Timestamp.now + val now = Timestamp.now() val app = AppDefinition( id = "testOffers".toRootPath, executor = "//cmd", ports = Seq(8080), version = now ) - val queuedTask = QueuedTask(app, new AtomicInteger(app.instances), Deadline.now) - val list = Vector(queuedTask) - val allApps = Vector(app) queue.add(app) diff --git a/src/test/scala/mesosphere/marathon/tasks/TaskQueueTest.scala b/src/test/scala/mesosphere/marathon/tasks/TaskQueueTest.scala index 47b234461a3..a6322bc7e5e 100644 --- a/src/test/scala/mesosphere/marathon/tasks/TaskQueueTest.scala +++ b/src/test/scala/mesosphere/marathon/tasks/TaskQueueTest.scala @@ -1,6 +1,5 @@ package mesosphere.marathon.tasks -import com.codahale.metrics.MetricRegistry import mesosphere.marathon.MarathonSpec import mesosphere.marathon.Protos.Constraint import mesosphere.marathon.state.AppDefinition @@ -15,7 +14,6 @@ class TaskQueueTest extends MarathonSpec { var queue: TaskQueue = null before { - val metricRegistry = new MetricRegistry queue = new TaskQueue() } @@ -43,10 +41,18 @@ class TaskQueueTest extends MarathonSpec { queue.add(app3) assert(queue.list.size == 3, "Queue should contain 3 elements.") - queue.retain { case QueuedTask(app, _, _) => app.id == app2.id } + queue.retain { case QueuedTask(app, _) => app.id == app2.id } assert(queue.list.size == 1, "Queue should contain 1 elements.") } + test("poll") { + queue.add(app1, 3) + + assert(queue.count(app1) == 3) + assert(queue.poll().map(_.app) == Some(app1)) + assert(queue.count(app1) == 2) + } + test("pollMatching") { queue.add(app1) queue.add(app2) From 135e850b75235a0254811a3d21f7c9654e1a9f9e Mon Sep 17 00:00:00 2001 From: Dario Rexin Date: Wed, 7 Jan 2015 11:14:44 +0100 Subject: [PATCH 3/4] make RateLimiter use app versions --- .../marathon/MarathonScheduler.scala | 8 ++++++-- .../marathon/MarathonSchedulerActor.scala | 5 +---- .../mesosphere/marathon/tasks/TaskQueue.scala | 8 +++++--- .../scala/mesosphere/util/RateLimiter.scala | 20 +++++++++---------- .../marathon/MarathonSchedulerActorTest.scala | 6 +++++- 5 files changed, 27 insertions(+), 20 deletions(-) diff --git a/src/main/scala/mesosphere/marathon/MarathonScheduler.scala b/src/main/scala/mesosphere/marathon/MarathonScheduler.scala index 82c30b9c17e..ba7ab0f0a12 100644 --- a/src/main/scala/mesosphere/marathon/MarathonScheduler.scala +++ b/src/main/scala/mesosphere/marathon/MarathonScheduler.scala @@ -176,9 +176,13 @@ class MarathonScheduler @Inject() ( } case TASK_RUNNING => - taskQueue.rateLimiter.resetDelay(appId) taskTracker.running(appId, status).onComplete { - case Success(task) => postEvent(status, task) + case Success(task) => + appRepo.app(PathId(task.getId), Timestamp(task.getVersion)).onSuccess { + case maybeApp => maybeApp.foreach(taskQueue.rateLimiter.resetDelay) + } + postEvent(status, task) + case Failure(t) => log.warn(s"Couldn't post event for ${status.getTaskId}", t) log.warn(s"Killing task ${status.getTaskId}") diff --git a/src/main/scala/mesosphere/marathon/MarathonSchedulerActor.scala b/src/main/scala/mesosphere/marathon/MarathonSchedulerActor.scala index 6ed6c5ef760..d623a87c6f9 100644 --- a/src/main/scala/mesosphere/marathon/MarathonSchedulerActor.scala +++ b/src/main/scala/mesosphere/marathon/MarathonSchedulerActor.scala @@ -246,8 +246,6 @@ class MarathonSchedulerActor( val ids = plan.affectedApplicationIds performAsyncWithLockFor(ids, origSender, cmd, isBlocking = blocking) { - ids.foreach(taskQueue.rateLimiter.resetDelay) - val res = deploy(driver, plan) if (origSender != Actor.noSender) origSender ! cmd.answer res @@ -379,7 +377,7 @@ class SchedulerActions( } taskQueue.purge(app.id) taskTracker.shutdown(app.id) - taskQueue.rateLimiter.resetDelay(app.id) + taskQueue.rateLimiter.resetDelay(app) // TODO after all tasks have been killed we should remove the app from taskTracker eventBus.publish(AppTerminatedEvent(app.id)) @@ -521,7 +519,6 @@ class SchedulerActions( val updatedApp = appUpdate(currentVersion) taskQueue.purge(id) - taskQueue.rateLimiter.resetDelay(id) appRepository.store(updatedApp).map { _ => update(driver, updatedApp, appUpdate) diff --git a/src/main/scala/mesosphere/marathon/tasks/TaskQueue.scala b/src/main/scala/mesosphere/marathon/tasks/TaskQueue.scala index 10c1eb06275..32dca677b36 100644 --- a/src/main/scala/mesosphere/marathon/tasks/TaskQueue.scala +++ b/src/main/scala/mesosphere/marathon/tasks/TaskQueue.scala @@ -2,13 +2,12 @@ package mesosphere.marathon.tasks import java.util.concurrent.atomic.AtomicInteger -import mesosphere.marathon.state.{ Timestamp, AppDefinition, PathId } +import mesosphere.marathon.state.{ AppDefinition, PathId, Timestamp } import mesosphere.util.RateLimiter import org.apache.log4j.Logger import scala.annotation.tailrec import scala.collection.concurrent.TrieMap -import scala.concurrent.duration.Deadline import scala.collection.immutable.Seq object TaskQueue { @@ -85,7 +84,10 @@ class TaskQueue { for { QueuedTask(app, _) <- apps.values if app.id == appId - } apps.remove(app.id -> app.version) + } { + apps.remove(app.id -> app.version) + rateLimiter.resetDelay(app) + } } /** diff --git a/src/main/scala/mesosphere/util/RateLimiter.scala b/src/main/scala/mesosphere/util/RateLimiter.scala index 6684754e83a..a13772d8b01 100644 --- a/src/main/scala/mesosphere/util/RateLimiter.scala +++ b/src/main/scala/mesosphere/util/RateLimiter.scala @@ -1,6 +1,6 @@ package mesosphere.util -import mesosphere.marathon.state.{ AppDefinition, PathId } +import mesosphere.marathon.state.{ Timestamp, AppDefinition, PathId } import scala.concurrent.duration._ import scala.util.Try @@ -17,14 +17,14 @@ class RateLimiter { protected[this] val maxLaunchDelay = 1.hour - protected[this] var taskLaunchDelays = Map[PathId, Delay]() + protected[this] var taskLaunchDelays = Map[(PathId, Timestamp), Delay]() def getDelay(app: AppDefinition): Deadline = - taskLaunchDelays.get(app.id).map(_.current.fromNow) getOrElse Deadline.now + taskLaunchDelays.get(app.id -> app.version).map(_.current.fromNow) getOrElse Deadline.now def addDelay(app: AppDefinition): Unit = { - val newDelay = taskLaunchDelays.get(app.id) match { - case Some(Delay(current, future)) => Delay(future.next, future) + val newDelay = taskLaunchDelays.get(app.id -> app.version) match { + case Some(Delay(current, future)) => Delay(future.next(), future) case None => Delay( app.backoff, durations(app.backoff, app.backoffFactor) @@ -33,13 +33,13 @@ class RateLimiter { log.info(s"Task launch delay for [${app.id}] is now [${newDelay.current.toSeconds}] seconds") - taskLaunchDelays = taskLaunchDelays + (app.id -> newDelay) + taskLaunchDelays += ((app.id, app.version) -> newDelay) } - def resetDelay(appId: PathId): Unit = { - if (taskLaunchDelays contains appId) - log.info(s"Task launch delay for [${appId}] reset to zero") - taskLaunchDelays = taskLaunchDelays - appId + def resetDelay(app: AppDefinition): Unit = { + if (taskLaunchDelays contains (app.id -> app.version)) + log.info(s"Task launch delay for [${app.id} - ${app.version}}] reset to zero") + taskLaunchDelays = taskLaunchDelays - (app.id -> app.version) } /** diff --git a/src/test/scala/mesosphere/marathon/MarathonSchedulerActorTest.scala b/src/test/scala/mesosphere/marathon/MarathonSchedulerActorTest.scala index 6d5a83529eb..47d4e4e0783 100644 --- a/src/test/scala/mesosphere/marathon/MarathonSchedulerActorTest.scala +++ b/src/test/scala/mesosphere/marathon/MarathonSchedulerActorTest.scala @@ -24,7 +24,7 @@ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.{ BeforeAndAfterAll, Matchers } -import scala.collection.immutable.Seq +import scala.collection.immutable.{ Seq, Set } import scala.collection.mutable import scala.concurrent.Future import scala.concurrent.duration._ @@ -260,6 +260,7 @@ class MarathonSchedulerActorTest extends TestKit(ActorSystem("System")) test("Deployment resets rate limiter for affected apps") { val probe = TestProbe() val app = AppDefinition(id = PathId("app1"), cmd = Some("cmd"), instances = 2, upgradeStrategy = UpgradeStrategy(0.5), version = Timestamp(0)) + val taskA = MarathonTask.newBuilder().setId("taskA_id").build() val origGroup = Group(PathId("/foo/bar"), Set(app)) val appNew = app.copy(cmd = Some("cmd new"), version = Timestamp(1000)) @@ -268,6 +269,9 @@ class MarathonSchedulerActorTest extends TestKit(ActorSystem("System")) val plan = DeploymentPlan("foo", origGroup, targetGroup, List(DeploymentStep(List(StopApplication(app)))), Timestamp.now()) + when(tracker.get(app.id)).thenReturn(Set(taskA)) + when(repo.expunge(app.id)).thenReturn(Future.successful(Seq(true))) + system.eventStream.subscribe(probe.ref, classOf[UpgradeEvent]) queue.rateLimiter.addDelay(app) From c3065de274ab58699220905ee9589a7ed193f29b Mon Sep 17 00:00:00 2001 From: Dario Rexin Date: Mon, 12 Jan 2015 17:02:02 +0100 Subject: [PATCH 4/4] fixed TaskReplaceActorTest after rebase with master --- .../mesosphere/marathon/upgrade/TaskReplaceActorTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/mesosphere/marathon/upgrade/TaskReplaceActorTest.scala b/src/test/scala/mesosphere/marathon/upgrade/TaskReplaceActorTest.scala index eb75a2ca95f..c43f38b2f47 100644 --- a/src/test/scala/mesosphere/marathon/upgrade/TaskReplaceActorTest.scala +++ b/src/test/scala/mesosphere/marathon/upgrade/TaskReplaceActorTest.scala @@ -145,7 +145,7 @@ class TaskReplaceActorTest watch(ref) // all new tasks are queued directly - eventually { verify(queue, times(3)).add(_) } + eventually { app: AppDefinition => verify(queue, times(3)).add(app) } // ceiling(minimumHealthCapacity * 3) = 2 are left running assert(oldTaskCount == 2)