From 7e500eb0c9b9e8bb19ca5541229b3d5ecb4c5088 Mon Sep 17 00:00:00 2001 From: Abhay Agarwal Date: Mon, 2 Feb 2015 23:00:50 -0800 Subject: [PATCH 1/2] Adds event integration for app offer declines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This creates a kv-store for data about declined offers for apps. If an app’s resource requirements cannot be supplied by the current system, there should eventually be some notification back to the executor. Note: Manually adding event types to a bucket style method isn’t fun. --- .../mesosphere/marathon/MarathonModule.scala | 11 +++ .../marathon/MarathonScheduler.scala | 24 +++--- .../marathon/MarathonSchedulerActor.scala | 14 ++-- .../marathon/api/v2/json/Formats.scala | 2 +- .../mesosphere/marathon/event/Events.scala | 6 ++ .../marathon/event/HistoryActor.scala | 9 ++- .../marathon/event/http/HttpEventActor.scala | 1 + .../state/TaskOfferDeclinedRepository.scala | 19 +++++ .../scala/mesosphere/mesos/TaskBuilder.scala | 17 +++-- .../marathon/event/HistoryActorTest.scala | 6 +- .../marathon/upgrade/AppStopActorTest.scala | 7 +- .../mesosphere/mesos/TaskBuilderTest.scala | 75 +++++++++++-------- 12 files changed, 130 insertions(+), 61 deletions(-) create mode 100644 src/main/scala/mesosphere/marathon/state/TaskOfferDeclinedRepository.scala diff --git a/src/main/scala/mesosphere/marathon/MarathonModule.scala b/src/main/scala/mesosphere/marathon/MarathonModule.scala index 5885feb0dd0..61436c0a152 100644 --- a/src/main/scala/mesosphere/marathon/MarathonModule.scala +++ b/src/main/scala/mesosphere/marathon/MarathonModule.scala @@ -94,6 +94,7 @@ class MarathonModule(conf: MarathonConf, http: HttpConf, zk: ZooKeeperClient) storage: StorageProvider, @Named(EventModule.busName) eventBus: EventStream, taskFailureRepository: TaskFailureRepository, + taskOffersDeclinedRepository: TaskOffersDeclinedRepository, config: MarathonConf): ActorRef = { val supervision = OneForOneStrategy() { case NonFatal(_) => Restart @@ -113,6 +114,7 @@ class MarathonModule(conf: MarathonConf, http: HttpConf, zk: ZooKeeperClient) storage, eventBus, taskFailureRepository, + taskOffersDeclinedRepository, config).withRouter(RoundRobinPool(nrOfInstances = 1, supervisorStrategy = supervision)), "MarathonScheduler") } @@ -161,6 +163,15 @@ class MarathonModule(conf: MarathonConf, http: HttpConf, zk: ZooKeeperClient) ) } + @Provides + @Singleton + def provideTaskOffersDeclinedRepository( + state: State, + conf: MarathonConf, + registry: MetricRegistry): TaskOffersDeclinedRepository = { + new TaskOffersDeclinedRepository() + } + @Provides @Singleton def provideAppRepository( diff --git a/src/main/scala/mesosphere/marathon/MarathonScheduler.scala b/src/main/scala/mesosphere/marathon/MarathonScheduler.scala index 13432b184da..97989549e31 100644 --- a/src/main/scala/mesosphere/marathon/MarathonScheduler.scala +++ b/src/main/scala/mesosphere/marathon/MarathonScheduler.scala @@ -107,11 +107,22 @@ class MarathonScheduler @Inject() ( log.debug("Received offer %s".format(offer)) val matchingTask = taskQueue.pollMatching { app => - newTask(app, offer).map(app -> _) + val result: TaskBuilder.BuildResult = newTask(app, offer) + result match { + case declined: TaskBuilder.BuildDeclined => { + eventBus.publish(TaskOfferDeclinedEvent(app.id, declined.reason.getBytes)) + None + } + case success: TaskBuilder.BuildSuccess => Some((app, success)) + } } - matchingTask.foreach { - case (app, (taskInfo, ports)) => + matchingTask match { + case None => { + log.debug("Offer doesn't match request. Declining.") + driver.declineOffer(offer.getId) + } + case Some((app, TaskBuilder.BuildSuccess(taskInfo, ports))) => val marathonTask = MarathonTasks.makeTask( taskInfo.getTaskId.getValue, offer.getHostname, ports, offer.getAttributesList.asScala, app.version) @@ -124,11 +135,6 @@ class MarathonScheduler @Inject() ( // here it is assumed that the health checks for the current // version are already running. } - - if (matchingTask.isEmpty) { - log.debug("Offer doesn't match request. Declining.") - driver.declineOffer(offer.getId) - } } catch { case t: Throwable => @@ -276,7 +282,7 @@ class MarathonScheduler @Inject() ( private def newTask( app: AppDefinition, - offer: Offer): Option[(TaskInfo, Seq[Long])] = { + offer: Offer): TaskBuilder.BuildResult = { new TaskBuilder(app, taskIdUtil.newTaskId, taskTracker, config, mapper).buildIfMatches(offer) } } diff --git a/src/main/scala/mesosphere/marathon/MarathonSchedulerActor.scala b/src/main/scala/mesosphere/marathon/MarathonSchedulerActor.scala index 2237b38b3d1..f2e7a722b10 100644 --- a/src/main/scala/mesosphere/marathon/MarathonSchedulerActor.scala +++ b/src/main/scala/mesosphere/marathon/MarathonSchedulerActor.scala @@ -5,6 +5,7 @@ import java.util.concurrent.Semaphore import akka.actor._ import akka.event.EventStream import com.fasterxml.jackson.databind.ObjectMapper +import mesosphere.mesos.TaskBuilder.BuildResult import org.apache.mesos.Protos.{ TaskState, TaskID, TaskStatus, TaskInfo } import org.apache.mesos.SchedulerDriver import org.slf4j.LoggerFactory @@ -40,6 +41,7 @@ class MarathonSchedulerActor( storage: StorageProvider, eventBus: EventStream, taskFailureRepository: TaskFailureRepository, + taskOffersDeclinedRepository: TaskOffersDeclinedRepository, config: MarathonConf) extends Actor with ActorLogging with Stash { import context.dispatcher @@ -79,7 +81,7 @@ class MarathonSchedulerActor( ) historyActor = context.actorOf( - Props(classOf[HistoryActor], eventBus, taskFailureRepository), "HistoryActor") + Props(classOf[HistoryActor], eventBus, taskFailureRepository, taskOffersDeclinedRepository), "HistoryActor") } def receive: Receive = suspended @@ -447,7 +449,7 @@ class SchedulerActions( } healthCheckManager.reconcileWith(app.id) private def newTask(app: AppDefinition, - offer: Offer): Option[(TaskInfo, Seq[Long])] = { + offer: Offer): BuildResult = { // TODO this should return a MarathonTask val builder = new TaskBuilder( app, @@ -457,10 +459,10 @@ class SchedulerActions( mapper ) - builder.buildIfMatches(offer) map { - case (task, ports) => - val taskBuilder = task.toBuilder - taskBuilder.build -> ports + builder.buildIfMatches(offer) match { + case TaskBuilder.BuildSuccess(task, ports) => + TaskBuilder.BuildSuccess(task.toBuilder.build, ports) + case declined: TaskBuilder.BuildDeclined => declined } } diff --git a/src/main/scala/mesosphere/marathon/api/v2/json/Formats.scala b/src/main/scala/mesosphere/marathon/api/v2/json/Formats.scala index 61b6bfbcae1..6621b2c5113 100644 --- a/src/main/scala/mesosphere/marathon/api/v2/json/Formats.scala +++ b/src/main/scala/mesosphere/marathon/api/v2/json/Formats.scala @@ -231,7 +231,7 @@ trait DeploymentFormats { trait EventFormats { import Formats._ - + implicit lazy val TaskOfferDeclinedEventWrites: Writes[TaskOfferDeclinedEvent] = Json.writes[TaskOfferDeclinedEvent] implicit lazy val AppTerminatedEventWrites: Writes[AppTerminatedEvent] = Json.writes[AppTerminatedEvent] implicit lazy val ApiPostEventWrites: Writes[ApiPostEvent] = Json.writes[ApiPostEvent] implicit lazy val SubscribeWrites: Writes[Subscribe] = Json.writes[Subscribe] diff --git a/src/main/scala/mesosphere/marathon/event/Events.scala b/src/main/scala/mesosphere/marathon/event/Events.scala index 8bd82b8502f..b0239ecac86 100644 --- a/src/main/scala/mesosphere/marathon/event/Events.scala +++ b/src/main/scala/mesosphere/marathon/event/Events.scala @@ -203,3 +203,9 @@ case class MesosFrameworkMessageEvent( message: Array[Byte], eventType: String = "framework_message_event", timestamp: String = Timestamp.now().toString) extends MarathonEvent + +case class TaskOfferDeclinedEvent( + appId: PathId, + message: Array[Byte], + eventType: String = "app_failed_to_start_event", + timestamp: String = Timestamp.now().toString) extends MarathonEvent \ No newline at end of file diff --git a/src/main/scala/mesosphere/marathon/event/HistoryActor.scala b/src/main/scala/mesosphere/marathon/event/HistoryActor.scala index 442b144dce2..1a410441f29 100644 --- a/src/main/scala/mesosphere/marathon/event/HistoryActor.scala +++ b/src/main/scala/mesosphere/marathon/event/HistoryActor.scala @@ -2,14 +2,17 @@ package mesosphere.marathon.event import akka.actor.{ Actor, ActorLogging } import akka.event.EventStream -import mesosphere.marathon.state.{ TaskFailure, TaskFailureRepository } +import mesosphere.marathon.state.{ TaskOffersDeclinedRepository, TaskFailure, TaskFailureRepository } -class HistoryActor(eventBus: EventStream, taskFailureRepository: TaskFailureRepository) +class HistoryActor(eventBus: EventStream, + taskFailureRepository: TaskFailureRepository, + taskOfferDeclinedRepository: TaskOffersDeclinedRepository) extends Actor with ActorLogging { override def preStart(): Unit = { eventBus.subscribe(self, classOf[MesosStatusUpdateEvent]) eventBus.subscribe(self, classOf[AppTerminatedEvent]) + eventBus.subscribe(self, classOf[TaskOfferDeclinedEvent]) } def receive: Receive = { @@ -20,6 +23,8 @@ class HistoryActor(eventBus: EventStream, taskFailureRepository: TaskFailureRepo case AppTerminatedEvent(appId, eventType, timestamp) => taskFailureRepository.expunge(appId) + + case e: TaskOfferDeclinedEvent => taskOfferDeclinedRepository.store(e.appId, e) } } diff --git a/src/main/scala/mesosphere/marathon/event/http/HttpEventActor.scala b/src/main/scala/mesosphere/marathon/event/http/HttpEventActor.scala index ce48e9378f2..9cc401333e5 100644 --- a/src/main/scala/mesosphere/marathon/event/http/HttpEventActor.scala +++ b/src/main/scala/mesosphere/marathon/event/http/HttpEventActor.scala @@ -54,6 +54,7 @@ class HttpEventActor(val subscribersKeeper: ActorRef) extends Actor with ActorLo } def eventToJson(event: MarathonEvent): JsValue = event match { + case event: TaskOfferDeclinedEvent => Json.toJson(event) case event: AppTerminatedEvent => Json.toJson(event) case event: ApiPostEvent => Json.toJson(event) case event: Subscribe => Json.toJson(event) diff --git a/src/main/scala/mesosphere/marathon/state/TaskOfferDeclinedRepository.scala b/src/main/scala/mesosphere/marathon/state/TaskOfferDeclinedRepository.scala new file mode 100644 index 00000000000..8842805c4f0 --- /dev/null +++ b/src/main/scala/mesosphere/marathon/state/TaskOfferDeclinedRepository.scala @@ -0,0 +1,19 @@ +package mesosphere.marathon.state + +import mesosphere.marathon.event.TaskOfferDeclinedEvent + +import scala.collection.mutable + +class TaskOffersDeclinedRepository() { + + protected[this] val tasksDeclined = mutable.Map[PathId, TaskOfferDeclinedEvent]() + + def store(id: PathId, value: TaskOfferDeclinedEvent): Unit = + synchronized { tasksDeclined(id) = value } + + def expunge(id: PathId): Unit = + synchronized { tasksDeclined -= id } + + def current(id: PathId): Option[TaskOfferDeclinedEvent] = tasksDeclined.get(id) + +} diff --git a/src/main/scala/mesosphere/mesos/TaskBuilder.scala b/src/main/scala/mesosphere/mesos/TaskBuilder.scala index 6422832cb9b..7adb3c62055 100644 --- a/src/main/scala/mesosphere/mesos/TaskBuilder.scala +++ b/src/main/scala/mesosphere/mesos/TaskBuilder.scala @@ -31,7 +31,7 @@ class TaskBuilder(app: AppDefinition, val log = Logger.getLogger(getClass.getName) - def buildIfMatches(offer: Offer): Option[(TaskInfo, Seq[Long])] = { + def buildIfMatches(offer: Offer): TaskBuilder.BuildResult = { var cpuRole = "" var memRole = "" var diskRole = "" @@ -44,11 +44,10 @@ class TaskBuilder(app: AppDefinition, diskRole = disk portsResource = ranges case _ => - log.info( - s"No matching offer for ${app.id} (need cpus=${app.cpus}, mem=${app.mem}, " + - s"disk=${app.disk}, ports=${app.hostPorts}) : " + offer - ) - return None + val reason = s"No matching offer for ${app.id} (need cpus=${app.cpus}, mem=${app.mem}, " + + s"disk=${app.disk}, ports=${app.hostPorts}) : " + offer + log.info(reason) + return TaskBuilder.BuildDeclined(reason) } val executor: Executor = if (app.executor == "") { @@ -143,7 +142,7 @@ class TaskBuilder(app: AppDefinition, mesosHealthChecks.headOption.foreach(builder.setHealthCheck) - Some(builder.build -> ports) + TaskBuilder.BuildSuccess(builder.build, ports) } private def offerMatches(offer: Offer): Option[(String, String, String, RangesResource)] = { @@ -203,6 +202,10 @@ class TaskBuilder(app: AppDefinition, object TaskBuilder { + sealed trait BuildResult + case class BuildDeclined(reason: String) extends BuildResult + case class BuildSuccess(info: TaskInfo, ports: Seq[Long]) extends BuildResult + def commandInfo(app: AppDefinition, taskId: Option[TaskID], host: Option[String], ports: Seq[Long]): CommandInfo = { val containerPorts = for (pms <- app.portMappings) yield pms.map(_.containerPort) val declaredPorts = containerPorts.getOrElse(app.ports) diff --git a/src/test/scala/mesosphere/marathon/event/HistoryActorTest.scala b/src/test/scala/mesosphere/marathon/event/HistoryActorTest.scala index 2633c78206a..6f101d355f4 100644 --- a/src/test/scala/mesosphere/marathon/event/HistoryActorTest.scala +++ b/src/test/scala/mesosphere/marathon/event/HistoryActorTest.scala @@ -4,7 +4,7 @@ import akka.actor.{ ActorRef, ActorSystem, Props } import akka.testkit.{ ImplicitSender, TestActorRef, TestKit } import mesosphere.marathon.MarathonSpec import mesosphere.marathon.state.PathId._ -import mesosphere.marathon.state.{ TaskFailure, TaskFailureRepository, Timestamp } +import mesosphere.marathon.state.{ TaskFailure, TaskFailureRepository, TaskOffersDeclinedRepository, Timestamp } import org.apache.mesos.Protos.TaskState import org.mockito.Matchers.any import org.mockito.Mockito._ @@ -22,11 +22,13 @@ class HistoryActorTest var historyActor: ActorRef = _ var failureRepo: TaskFailureRepository = _ + var declinedRepo: TaskOffersDeclinedRepository = _ before { failureRepo = mock[TaskFailureRepository] + declinedRepo = mock[TaskOffersDeclinedRepository] historyActor = TestActorRef(Props( - new HistoryActor(system.eventStream, failureRepo) + new HistoryActor(system.eventStream, failureRepo, declinedRepo) )) } diff --git a/src/test/scala/mesosphere/marathon/upgrade/AppStopActorTest.scala b/src/test/scala/mesosphere/marathon/upgrade/AppStopActorTest.scala index 73b6e7e6de3..0b18f482476 100644 --- a/src/test/scala/mesosphere/marathon/upgrade/AppStopActorTest.scala +++ b/src/test/scala/mesosphere/marathon/upgrade/AppStopActorTest.scala @@ -4,7 +4,7 @@ import akka.actor.{ ActorSystem, Props } import akka.testkit.{ TestProbe, TestActorRef, TestKit } import mesosphere.marathon.Protos.MarathonTask import mesosphere.marathon.event.{ HistoryActor, AppTerminatedEvent, MesosStatusUpdateEvent } -import mesosphere.marathon.state.{ TaskFailure, TaskFailureRepository, AppDefinition, PathId } +import mesosphere.marathon.state._ import mesosphere.marathon.tasks.TaskTracker import mesosphere.marathon.upgrade.StoppingBehavior.SynchronizeTasks import mesosphere.marathon.{ MarathonSpec, SchedulerActions, TaskUpgradeCanceledException } @@ -27,12 +27,14 @@ class AppStopActorTest var scheduler: SchedulerActions = _ var taskTracker: TaskTracker = _ var taskFailureRepository: TaskFailureRepository = _ + var declinedRepository: TaskOffersDeclinedRepository = _ before { driver = mock[SchedulerDriver] scheduler = mock[SchedulerActions] taskTracker = mock[TaskTracker] taskFailureRepository = mock[TaskFailureRepository] + declinedRepository = mock[TaskOffersDeclinedRepository] } test("Stop App") { @@ -59,7 +61,8 @@ class AppStopActorTest Props( new HistoryActor( system.eventStream, - taskFailureRepository + taskFailureRepository, + declinedRepository ) ) ) diff --git a/src/test/scala/mesosphere/mesos/TaskBuilderTest.scala b/src/test/scala/mesosphere/mesos/TaskBuilderTest.scala index 8ee17f954e6..6f37d27325d 100644 --- a/src/test/scala/mesosphere/mesos/TaskBuilderTest.scala +++ b/src/test/scala/mesosphere/mesos/TaskBuilderTest.scala @@ -29,7 +29,7 @@ class TaskBuilderTest extends MarathonSpec { .addResources(ScalarResource("disk", 2000)) .build - val task: Option[(TaskInfo, Seq[Long])] = buildIfMatches( + val task: TaskBuilder.BuildResult = buildIfMatches( offer, AppDefinition( id = "/product/frontend".toPath, @@ -42,9 +42,10 @@ class TaskBuilderTest extends MarathonSpec { ) ) - assert(task.isDefined) + assert(task.isInstanceOf[TaskBuilder.BuildSuccess]) - val (taskInfo, taskPorts) = task.get + val success = task.asInstanceOf[TaskBuilder.BuildSuccess] + val (taskInfo, taskPorts) = (success.info, success.ports) val range = taskInfo.getResourcesList.asScala .find(r => r.getName == Resource.PORTS) .map(r => r.getRanges.getRange(0)) @@ -92,7 +93,7 @@ class TaskBuilderTest extends MarathonSpec { .addResources(ScalarResource("disk", 2000)) .build - val task: Option[(TaskInfo, Seq[Long])] = buildIfMatches( + val task: TaskBuilder.BuildResult = buildIfMatches( offer, AppDefinition( id = "testApp".toPath, @@ -105,9 +106,10 @@ class TaskBuilderTest extends MarathonSpec { ) ) - assert(task.isDefined) + assert(task.isInstanceOf[TaskBuilder.BuildSuccess]) - val (taskInfo, taskPorts) = task.get + val success = task.asInstanceOf[TaskBuilder.BuildSuccess] + val (taskInfo, taskPorts) = (success.info, success.ports) val range = taskInfo.getResourcesList.asScala .find(r => r.getName == Resource.PORTS) .map(r => r.getRanges.getRange(0)) @@ -136,7 +138,7 @@ class TaskBuilderTest extends MarathonSpec { .addResources(ScalarResource("disk", 2000)) .build - val task: Option[(TaskInfo, Seq[Long])] = buildIfMatches( + val task: TaskBuilder.BuildResult = buildIfMatches( offer, AppDefinition( id = "testApp".toPath, @@ -149,9 +151,10 @@ class TaskBuilderTest extends MarathonSpec { ) ) - assert(task.isDefined) + assert(task.isInstanceOf[TaskBuilder.BuildSuccess]) - val (taskInfo, taskPorts) = task.get + val success = task.asInstanceOf[TaskBuilder.BuildSuccess] + val (taskInfo, taskPorts) = (success.info, success.ports) assert(taskInfo.hasExecutor) assert(!taskInfo.hasCommand) @@ -169,7 +172,7 @@ class TaskBuilderTest extends MarathonSpec { .addResources(ScalarResource("disk", 2000)) .build - val task: Option[(TaskInfo, Seq[Long])] = buildIfMatches( + val task: TaskBuilder.BuildResult = buildIfMatches( offer, AppDefinition( id = "testApp".toPath, @@ -182,9 +185,10 @@ class TaskBuilderTest extends MarathonSpec { ) ) - assert(task.isDefined) + assert(task.isInstanceOf[TaskBuilder.BuildSuccess]) - val (taskInfo, taskPorts) = task.get + val success = task.asInstanceOf[TaskBuilder.BuildSuccess] + val (taskInfo, taskPorts) = (success.info, success.ports) val cmd = taskInfo.getExecutor.getCommand assert(!taskInfo.hasCommand) @@ -202,7 +206,7 @@ class TaskBuilderTest extends MarathonSpec { .addResources(RangesResource(Resource.PORTS, Seq(protos.Range(33000, 34000)), "marathon")) .build - val task: Option[(TaskInfo, Seq[Long])] = buildIfMatches( + val task: TaskBuilder.BuildResult = buildIfMatches( offer, AppDefinition( id = "testApp".toPath, @@ -214,9 +218,10 @@ class TaskBuilderTest extends MarathonSpec { ) ) - assert(task.isDefined) + assert(task.isInstanceOf[TaskBuilder.BuildSuccess]) - val (taskInfo, taskPorts) = task.get + val success = task.asInstanceOf[TaskBuilder.BuildSuccess] + val (taskInfo, taskPorts) = (success.info, success.ports) val range = taskInfo.getResourcesList.asScala .find(r => r.getName == Resource.PORTS) .map(r => r.getRanges.getRange(0)) @@ -243,7 +248,7 @@ class TaskBuilderTest extends MarathonSpec { .addResources(RangesResource(Resource.PORTS, Seq(protos.Range(33000, 34000)), "marathon")) .build - val task: Option[(TaskInfo, Seq[Long])] = buildIfMatches( + val task: TaskBuilder.BuildResult = buildIfMatches( offer, AppDefinition( id = "testApp".toPath, @@ -255,9 +260,10 @@ class TaskBuilderTest extends MarathonSpec { ) ) - assert(task.isDefined) + assert(task.isInstanceOf[TaskBuilder.BuildSuccess]) - val (taskInfo, taskPorts) = task.get + val success = task.asInstanceOf[TaskBuilder.BuildSuccess] + val (taskInfo, taskPorts) = (success.info, success.ports) val range = taskInfo.getResourcesList.asScala .find(r => r.getName == Resource.PORTS) .map(r => r.getRanges.getRange(0)) @@ -301,7 +307,7 @@ class TaskBuilderTest extends MarathonSpec { val task = builder.buildIfMatches(offer) - assert(task.isDefined) + assert(task.isInstanceOf[TaskBuilder.BuildSuccess]) // TODO test for resources etc. } @@ -326,20 +332,22 @@ class TaskBuilderTest extends MarathonSpec { s => TaskID(s.toString), taskTracker, defaultConfig()) def shouldBuildTask(message: String, offer: Offer) { - val tupleOption = builder.buildIfMatches(offer) - assert(tupleOption.isDefined, message) + val task = builder.buildIfMatches(offer) + assert(task.isInstanceOf[TaskBuilder.BuildSuccess], message) + + val success = task.asInstanceOf[TaskBuilder.BuildSuccess] val marathonTask = MarathonTasks.makeTask( - tupleOption.get._1.getTaskId.getValue, + success.info.getTaskId.getValue, offer.getHostname, - tupleOption.get._2, + success.ports, offer.getAttributesList.asScala.toList, Timestamp.now) runningTasks += marathonTask } def shouldNotBuildTask(message: String, offer: Offer) { - val tupleOption = builder.buildIfMatches(offer) - assert(tupleOption.isEmpty, message) + val task = builder.buildIfMatches(offer) + assert(task.isInstanceOf[TaskBuilder.BuildDeclined], message) } val offerRack1HostA = makeBasicOffer() @@ -387,19 +395,22 @@ class TaskBuilderTest extends MarathonSpec { s => TaskID(s.toString), taskTracker, defaultConfig()) def shouldBuildTask(message: String, offer: Offer) { - val tupleOption = builder.buildIfMatches(offer) - assert(tupleOption.isDefined, message) + val task = builder.buildIfMatches(offer) + assert(task.isInstanceOf[TaskBuilder.BuildSuccess], message) + + val success = task.asInstanceOf[TaskBuilder.BuildSuccess] val marathonTask = MarathonTasks.makeTask( - tupleOption.get._1.getTaskId.getValue, + success.info.getTaskId.getValue, offer.getHostname, - tupleOption.get._2, - offer.getAttributesList.asScala.toList, Timestamp.now) + success.ports, + offer.getAttributesList.asScala.toList, + Timestamp.now) runningTasks += marathonTask } def shouldNotBuildTask(message: String, offer: Offer) { - val tupleOption = builder.buildIfMatches(offer) - assert(tupleOption.isEmpty, message) + val task = builder.buildIfMatches(offer) + assert(task.isInstanceOf[TaskBuilder.BuildDeclined], message) } val offerHostA = makeBasicOffer() From c1a8dc00cbe78e1ee6751c197b19cedca08529e1 Mon Sep 17 00:00:00 2001 From: Abhay Agarwal Date: Mon, 2 Feb 2015 23:11:00 -0800 Subject: [PATCH 2/2] Adding testing file missing from previous --- .../mesosphere/marathon/MarathonSchedulerActorTest.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/test/scala/mesosphere/marathon/MarathonSchedulerActorTest.scala b/src/test/scala/mesosphere/marathon/MarathonSchedulerActorTest.scala index 624048b15e1..50e7bdf12cb 100644 --- a/src/test/scala/mesosphere/marathon/MarathonSchedulerActorTest.scala +++ b/src/test/scala/mesosphere/marathon/MarathonSchedulerActorTest.scala @@ -45,6 +45,7 @@ class MarathonSchedulerActorTest extends TestKit(ActorSystem("System")) var taskIdUtil: TaskIdUtil = _ var storage: StorageProvider = _ var taskFailureEventRepository: TaskFailureRepository = _ + var taskOffersDeclinedRepository: TaskOffersDeclinedRepository = _ implicit val defaultTimeout: Timeout = 5.seconds @@ -60,6 +61,7 @@ class MarathonSchedulerActorTest extends TestKit(ActorSystem("System")) taskIdUtil = new TaskIdUtil storage = mock[StorageProvider] taskFailureEventRepository = mock[TaskFailureRepository] + taskOffersDeclinedRepository = mock[TaskOffersDeclinedRepository] when(deploymentRepo.store(any())).thenAnswer(new Answer[Future[DeploymentPlan]] { override def answer(p1: InvocationOnMock): Future[DeploymentPlan] = { @@ -88,6 +90,7 @@ class MarathonSchedulerActorTest extends TestKit(ActorSystem("System")) storage, system.eventStream, taskFailureEventRepository, + taskOffersDeclinedRepository, mock[MarathonConf] )) } @@ -350,6 +353,7 @@ class MarathonSchedulerActorTest extends TestKit(ActorSystem("System")) storage, system.eventStream, taskFailureEventRepository, + taskOffersDeclinedRepository, mock[MarathonConf] ))