Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

687 expose recent scheduling decisions #1130

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/main/scala/mesosphere/marathon/MarathonModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class MarathonModule(conf: MarathonConf, http: HttpConf, zk: ZooKeeperClient)
storage: StorageProvider,
@Named(EventModule.busName) eventBus: EventStream,
taskFailureRepository: TaskFailureRepository,
taskOffersDeclinedRepository: TaskOffersDeclinedRepository,
config: MarathonConf): ActorRef = {
val supervision = OneForOneStrategy() {
case NonFatal(_) => Restart
Expand All @@ -113,6 +114,7 @@ class MarathonModule(conf: MarathonConf, http: HttpConf, zk: ZooKeeperClient)
storage,
eventBus,
taskFailureRepository,
taskOffersDeclinedRepository,
config).withRouter(RoundRobinPool(nrOfInstances = 1, supervisorStrategy = supervision)),
"MarathonScheduler")
}
Expand Down Expand Up @@ -161,6 +163,15 @@ class MarathonModule(conf: MarathonConf, http: HttpConf, zk: ZooKeeperClient)
)
}

@Provides
@Singleton
def provideTaskOffersDeclinedRepository(
state: State,
conf: MarathonConf,
registry: MetricRegistry): TaskOffersDeclinedRepository = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These parameters are unused, please remove them.

new TaskOffersDeclinedRepository()
}

@Provides
@Singleton
def provideAppRepository(
Expand Down
24 changes: 15 additions & 9 deletions src/main/scala/mesosphere/marathon/MarathonScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,22 @@ class MarathonScheduler @Inject() (
log.debug("Received offer %s".format(offer))

val matchingTask = taskQueue.pollMatching { app =>
newTask(app, offer).map(app -> _)
val result: TaskBuilder.BuildResult = newTask(app, offer)
result match {
case declined: TaskBuilder.BuildDeclined => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for curly braces here.

eventBus.publish(TaskOfferDeclinedEvent(app.id, declined.reason.getBytes))
None
}
case success: TaskBuilder.BuildSuccess => Some((app, success))
}
}

matchingTask.foreach {
case (app, (taskInfo, ports)) =>
matchingTask match {
case None => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for curly braces here.

log.debug("Offer doesn't match request. Declining.")
driver.declineOffer(offer.getId)
}
case Some((app, TaskBuilder.BuildSuccess(taskInfo, ports))) =>
val marathonTask = MarathonTasks.makeTask(
taskInfo.getTaskId.getValue, offer.getHostname, ports,
offer.getAttributesList.asScala, app.version)
Expand All @@ -124,11 +135,6 @@ class MarathonScheduler @Inject() (
// here it is assumed that the health checks for the current
// version are already running.
}

if (matchingTask.isEmpty) {
log.debug("Offer doesn't match request. Declining.")
driver.declineOffer(offer.getId)
}
}
catch {
case t: Throwable =>
Expand Down Expand Up @@ -276,7 +282,7 @@ class MarathonScheduler @Inject() (

private def newTask(
app: AppDefinition,
offer: Offer): Option[(TaskInfo, Seq[Long])] = {
offer: Offer): TaskBuilder.BuildResult = {
new TaskBuilder(app, taskIdUtil.newTaskId, taskTracker, config, mapper).buildIfMatches(offer)
}
}
14 changes: 8 additions & 6 deletions src/main/scala/mesosphere/marathon/MarathonSchedulerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import java.util.concurrent.Semaphore
import akka.actor._
import akka.event.EventStream
import com.fasterxml.jackson.databind.ObjectMapper
import mesosphere.mesos.TaskBuilder.BuildResult
import org.apache.mesos.Protos.{ TaskState, TaskID, TaskStatus, TaskInfo }
import org.apache.mesos.SchedulerDriver
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -40,6 +41,7 @@ class MarathonSchedulerActor(
storage: StorageProvider,
eventBus: EventStream,
taskFailureRepository: TaskFailureRepository,
taskOffersDeclinedRepository: TaskOffersDeclinedRepository,
config: MarathonConf) extends Actor with ActorLogging with Stash {
import context.dispatcher

Expand Down Expand Up @@ -79,7 +81,7 @@ class MarathonSchedulerActor(
)

historyActor = context.actorOf(
Props(classOf[HistoryActor], eventBus, taskFailureRepository), "HistoryActor")
Props(classOf[HistoryActor], eventBus, taskFailureRepository, taskOffersDeclinedRepository), "HistoryActor")
}

def receive: Receive = suspended
Expand Down Expand Up @@ -447,7 +449,7 @@ class SchedulerActions(
} healthCheckManager.reconcileWith(app.id)

private def newTask(app: AppDefinition,
offer: Offer): Option[(TaskInfo, Seq[Long])] = {
offer: Offer): BuildResult = {
// TODO this should return a MarathonTask
val builder = new TaskBuilder(
app,
Expand All @@ -457,10 +459,10 @@ class SchedulerActions(
mapper
)

builder.buildIfMatches(offer) map {
case (task, ports) =>
val taskBuilder = task.toBuilder
taskBuilder.build -> ports
builder.buildIfMatches(offer) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this pattern match doesn't do anything. In both cases, the result (or the equivalent) of the call to buildIfMatches is returned. Why not just return that value instead?

case TaskBuilder.BuildSuccess(task, ports) =>
TaskBuilder.BuildSuccess(task.toBuilder.build, ports)
case declined: TaskBuilder.BuildDeclined => declined
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ trait DeploymentFormats {

trait EventFormats {
import Formats._

implicit lazy val TaskOfferDeclinedEventWrites: Writes[TaskOfferDeclinedEvent] = Json.writes[TaskOfferDeclinedEvent]
implicit lazy val AppTerminatedEventWrites: Writes[AppTerminatedEvent] = Json.writes[AppTerminatedEvent]
implicit lazy val ApiPostEventWrites: Writes[ApiPostEvent] = Json.writes[ApiPostEvent]
implicit lazy val SubscribeWrites: Writes[Subscribe] = Json.writes[Subscribe]
Expand Down
6 changes: 6 additions & 0 deletions src/main/scala/mesosphere/marathon/event/Events.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,9 @@ case class MesosFrameworkMessageEvent(
message: Array[Byte],
eventType: String = "framework_message_event",
timestamp: String = Timestamp.now().toString) extends MarathonEvent

case class TaskOfferDeclinedEvent(
appId: PathId,
message: Array[Byte],
eventType: String = "app_failed_to_start_event",
timestamp: String = Timestamp.now().toString) extends MarathonEvent
9 changes: 7 additions & 2 deletions src/main/scala/mesosphere/marathon/event/HistoryActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package mesosphere.marathon.event

import akka.actor.{ Actor, ActorLogging }
import akka.event.EventStream
import mesosphere.marathon.state.{ TaskFailure, TaskFailureRepository }
import mesosphere.marathon.state.{ TaskOffersDeclinedRepository, TaskFailure, TaskFailureRepository }

class HistoryActor(eventBus: EventStream, taskFailureRepository: TaskFailureRepository)
class HistoryActor(eventBus: EventStream,
taskFailureRepository: TaskFailureRepository,
taskOfferDeclinedRepository: TaskOffersDeclinedRepository)
extends Actor with ActorLogging {

override def preStart(): Unit = {
eventBus.subscribe(self, classOf[MesosStatusUpdateEvent])
eventBus.subscribe(self, classOf[AppTerminatedEvent])
eventBus.subscribe(self, classOf[TaskOfferDeclinedEvent])
}

def receive: Receive = {
Expand All @@ -20,6 +23,8 @@ class HistoryActor(eventBus: EventStream, taskFailureRepository: TaskFailureRepo

case AppTerminatedEvent(appId, eventType, timestamp) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon receipt of an AppTerminatedEvent, the entry for the terminated app should be expunged also from the taskOfferDeclinedRepository here.

taskFailureRepository.expunge(appId)

case e: TaskOfferDeclinedEvent => taskOfferDeclinedRepository.store(e.appId, e)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class HttpEventActor(val subscribersKeeper: ActorRef) extends Actor with ActorLo
}

def eventToJson(event: MarathonEvent): JsValue = event match {
case event: TaskOfferDeclinedEvent => Json.toJson(event)
case event: AppTerminatedEvent => Json.toJson(event)
case event: ApiPostEvent => Json.toJson(event)
case event: Subscribe => Json.toJson(event)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package mesosphere.marathon.state

import mesosphere.marathon.event.TaskOfferDeclinedEvent

import scala.collection.mutable

class TaskOffersDeclinedRepository() {

protected[this] val tasksDeclined = mutable.Map[PathId, TaskOfferDeclinedEvent]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use TrieMap instead and remove the synchronized blocks.


def store(id: PathId, value: TaskOfferDeclinedEvent): Unit =
synchronized { tasksDeclined(id) = value }

def expunge(id: PathId): Unit =
synchronized { tasksDeclined -= id }

def current(id: PathId): Option[TaskOfferDeclinedEvent] = tasksDeclined.get(id)

}
17 changes: 10 additions & 7 deletions src/main/scala/mesosphere/mesos/TaskBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class TaskBuilder(app: AppDefinition,

val log = Logger.getLogger(getClass.getName)

def buildIfMatches(offer: Offer): Option[(TaskInfo, Seq[Long])] = {
def buildIfMatches(offer: Offer): TaskBuilder.BuildResult = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

var cpuRole = ""
var memRole = ""
var diskRole = ""
Expand All @@ -44,11 +44,10 @@ class TaskBuilder(app: AppDefinition,
diskRole = disk
portsResource = ranges
case _ =>
log.info(
s"No matching offer for ${app.id} (need cpus=${app.cpus}, mem=${app.mem}, " +
s"disk=${app.disk}, ports=${app.hostPorts}) : " + offer
)
return None
val reason = s"No matching offer for ${app.id} (need cpus=${app.cpus}, mem=${app.mem}, " +
s"disk=${app.disk}, ports=${app.hostPorts}) : " + offer
log.info(reason)
return TaskBuilder.BuildDeclined(reason)
}

val executor: Executor = if (app.executor == "") {
Expand Down Expand Up @@ -143,7 +142,7 @@ class TaskBuilder(app: AppDefinition,

mesosHealthChecks.headOption.foreach(builder.setHealthCheck)

Some(builder.build -> ports)
TaskBuilder.BuildSuccess(builder.build, ports)
}

private def offerMatches(offer: Offer): Option[(String, String, String, RangesResource)] = {
Expand Down Expand Up @@ -203,6 +202,10 @@ class TaskBuilder(app: AppDefinition,

object TaskBuilder {

sealed trait BuildResult
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding isSuccess and isFailure methods to this trait? It could clean up some of the use sites where you only care about which projection holds for your value.

case class BuildDeclined(reason: String) extends BuildResult
case class BuildSuccess(info: TaskInfo, ports: Seq[Long]) extends BuildResult
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make both case classes final.


def commandInfo(app: AppDefinition, taskId: Option[TaskID], host: Option[String], ports: Seq[Long]): CommandInfo = {
val containerPorts = for (pms <- app.portMappings) yield pms.map(_.containerPort)
val declaredPorts = containerPorts.getOrElse(app.ports)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class MarathonSchedulerActorTest extends TestKit(ActorSystem("System"))
var taskIdUtil: TaskIdUtil = _
var storage: StorageProvider = _
var taskFailureEventRepository: TaskFailureRepository = _
var taskOffersDeclinedRepository: TaskOffersDeclinedRepository = _

implicit val defaultTimeout: Timeout = 5.seconds

Expand All @@ -60,6 +61,7 @@ class MarathonSchedulerActorTest extends TestKit(ActorSystem("System"))
taskIdUtil = new TaskIdUtil
storage = mock[StorageProvider]
taskFailureEventRepository = mock[TaskFailureRepository]
taskOffersDeclinedRepository = mock[TaskOffersDeclinedRepository]

when(deploymentRepo.store(any())).thenAnswer(new Answer[Future[DeploymentPlan]] {
override def answer(p1: InvocationOnMock): Future[DeploymentPlan] = {
Expand Down Expand Up @@ -88,6 +90,7 @@ class MarathonSchedulerActorTest extends TestKit(ActorSystem("System"))
storage,
system.eventStream,
taskFailureEventRepository,
taskOffersDeclinedRepository,
mock[MarathonConf]
))
}
Expand Down Expand Up @@ -350,6 +353,7 @@ class MarathonSchedulerActorTest extends TestKit(ActorSystem("System"))
storage,
system.eventStream,
taskFailureEventRepository,
taskOffersDeclinedRepository,
mock[MarathonConf]
))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.testkit.{ ImplicitSender, TestActorRef, TestKit }
import mesosphere.marathon.MarathonSpec
import mesosphere.marathon.state.PathId._
import mesosphere.marathon.state.{ TaskFailure, TaskFailureRepository, Timestamp }
import mesosphere.marathon.state.{ TaskFailure, TaskFailureRepository, TaskOffersDeclinedRepository, Timestamp }
import org.apache.mesos.Protos.TaskState
import org.mockito.Matchers.any
import org.mockito.Mockito._
Expand All @@ -22,11 +22,13 @@ class HistoryActorTest

var historyActor: ActorRef = _
var failureRepo: TaskFailureRepository = _
var declinedRepo: TaskOffersDeclinedRepository = _

before {
failureRepo = mock[TaskFailureRepository]
declinedRepo = mock[TaskOffersDeclinedRepository]
historyActor = TestActorRef(Props(
new HistoryActor(system.eventStream, failureRepo)
new HistoryActor(system.eventStream, failureRepo, declinedRepo)
))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.{ ActorSystem, Props }
import akka.testkit.{ TestProbe, TestActorRef, TestKit }
import mesosphere.marathon.Protos.MarathonTask
import mesosphere.marathon.event.{ HistoryActor, AppTerminatedEvent, MesosStatusUpdateEvent }
import mesosphere.marathon.state.{ TaskFailure, TaskFailureRepository, AppDefinition, PathId }
import mesosphere.marathon.state._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change. Wildcard imports are to be avoided, especially for only four items.

import mesosphere.marathon.tasks.TaskTracker
import mesosphere.marathon.upgrade.StoppingBehavior.SynchronizeTasks
import mesosphere.marathon.{ MarathonSpec, SchedulerActions, TaskUpgradeCanceledException }
Expand All @@ -27,12 +27,14 @@ class AppStopActorTest
var scheduler: SchedulerActions = _
var taskTracker: TaskTracker = _
var taskFailureRepository: TaskFailureRepository = _
var declinedRepository: TaskOffersDeclinedRepository = _

before {
driver = mock[SchedulerDriver]
scheduler = mock[SchedulerActions]
taskTracker = mock[TaskTracker]
taskFailureRepository = mock[TaskFailureRepository]
declinedRepository = mock[TaskOffersDeclinedRepository]
}

test("Stop App") {
Expand All @@ -59,7 +61,8 @@ class AppStopActorTest
Props(
new HistoryActor(
system.eventStream,
taskFailureRepository
taskFailureRepository,
declinedRepository
)
)
)
Expand Down
Loading