Skip to content

Commit

Permalink
[SPARK-10618] [Mesos] Adressing comments on PR.
Browse files Browse the repository at this point in the history
  • Loading branch information
SleepyThread committed Dec 16, 2015
1 parent d733bf2 commit 288c698
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private[spark] class CoarseMesosSchedulerBackend(
val cpus = getResource(offer.getResourcesList, "cpus").toInt
val id = offer.getId.getValue
if (meetsConstraints) {
if (isOfferValidForScheduling(meetsConstraints, slaveId, mem, cpus, sc)) {
if (isOfferSatisfiesRequirements(meetsConstraints, slaveId, mem, cpus, sc)) {
// Launch an executor on the slave
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
totalCoresAcquired += cpusToUse
Expand Down Expand Up @@ -303,16 +303,24 @@ private[spark] class CoarseMesosSchedulerBackend(
}
}

def isOfferValidForScheduling(meetsConstraints: Boolean,
// ToDo: Abstract out each condition and log them.
def isOfferSatisfiesRequirements(meetsConstraints: Boolean,
slaveId: String, mem: Double,
cpus: Int, sc: SparkContext): Boolean = {
taskIdToSlaveId.size < executorLimit &&
totalCoresAcquired < maxCores &&
cpusOffered: Int, sc: SparkContext): Boolean = {
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsCPURequirements = cpusOffered >= 1
val needMoreCores = totalCoresAcquired < maxCores
val healthySlave = failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES
val taskOnEachSlaveLessThanExecutorLimit = taskIdToSlaveId.size < executorLimit
val executorNotRunningOnSlave = !slaveIdsWithExecutors.contains(slaveId)

executorNotRunningOnSlave &&
taskOnEachSlaveLessThanExecutorLimit &&
needMoreCores &&
meetsConstraints &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)
meetsMemoryRequirements &&
meetsCPURequirements &&
healthySlave
}

override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,11 @@ private[spark] class MesosSchedulerBackend(
val slaveId = o.getSlaveId.getValue
val offerAttributes = toAttributeMap(o.getAttributesList)

// check if all constraints are satisfield
// 1. Attribute constraints
// 2. Memory requirements
// 3. CPU requirements - need at least 1 for executor, 1 for task
// check if Attribute constraints is satisfied
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)

val meetsRequirements =
isOfferValidForScheduling(cpus, mem, slaveId, sc)
isOfferSatisfiesRequirements(cpus, mem, slaveId, sc)

// add some debug messaging
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
Expand Down Expand Up @@ -330,7 +327,10 @@ private[spark] class MesosSchedulerBackend(
}
}

def isOfferValidForScheduling(cpusOffered: Double, memory : Double,
// check if all constraints are satisfied
// 1. Memory requirements
// 2. CPU requirements - need at least 1 for executor, 1 for task
def isOfferSatisfiesRequirements(cpusOffered: Double, memory : Double,
slaveId: String, sc : SparkContext): Boolean = {
val meetsMemoryRequirements = memory >= calculateTotalMemory(sc)
val meetsCPURequirements = cpusOffered >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,50 +194,50 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
verify(driver, times(1)).reviveOffers()
}

test("isOfferValidForScheduling return true when there is a valid offer") {
test("isOfferSatisfiesRequirements return true when there is a valid offer") {
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)

assert(schedulerBackend.isOfferValidForScheduling(true, "Slave1", 10000, 5, sc))
assert(schedulerBackend.isOfferSatisfiesRequirements(true, "Slave1", 10000, 5, sc))
}

test("isOfferValidForScheduling return false when offer do not meet constraints") {
test("isOfferSatisfiesRequirements return false when offer do not meet constraints") {
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)

assert(schedulerBackend.isOfferValidForScheduling(false, "Slave1", 10000, 5, sc) === false)
assert(schedulerBackend.isOfferSatisfiesRequirements(false, "Slave1", 10000, 5, sc) === false)
}

test("isOfferValidForScheduling return false when memory in offer is less than required memory") {
test("isOfferSatisfiesRequirements return false when memory in offer is less than required memory") {
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)

assert(schedulerBackend.isOfferValidForScheduling(true, "Slave1", 1, 5, sc) === false)
assert(schedulerBackend.isOfferSatisfiesRequirements(true, "Slave1", 1, 5, sc) === false)
}

test("isOfferValidForScheduling return false when cpu in offer is less than required cpu") {
test("isOfferSatisfiesRequirements return false when cpu in offer is less than required cpu") {
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)

assert(schedulerBackend.isOfferValidForScheduling(true, "Slave1", 10000, 0, sc) === false)
assert(schedulerBackend.isOfferSatisfiesRequirements(true, "Slave1", 10000, 0, sc) === false)
}

test("isOfferValidForScheduling return false when offer is from slave already running" +
test("isOfferSatisfiesRequirements return false when offer is from slave already running" +
" an executor") {
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)
schedulerBackend.slaveIdsWithExecutors += "Slave2"

assert(schedulerBackend.isOfferValidForScheduling(true, "Slave2", 10000, 5, sc) === false)
assert(schedulerBackend.isOfferSatisfiesRequirements(true, "Slave2", 10000, 5, sc) === false)
}

test("isOfferValidForScheduling return false when task is failed more than " +
test("isOfferSatisfiesRequirements return false when task is failed more than " +
"MAX_SLAVE_FAILURES times on the given slave") {
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)
schedulerBackend.failuresBySlaveId("Slave3") = 2

assert(schedulerBackend.isOfferValidForScheduling(true, "Slave3", 10000, 5, sc) === false)
assert(schedulerBackend.isOfferSatisfiesRequirements(true, "Slave3", 10000, 5, sc) === false)
}

test("isOfferValidForScheduling return false when max core is already acquired") {
test("isOfferSatisfiesRequirements return false when max core is already acquired") {
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)
schedulerBackend.totalCoresAcquired = 10

assert(schedulerBackend.isOfferValidForScheduling(true, "Slave1", 10000, 5, sc) === false)
assert(schedulerBackend.isOfferSatisfiesRequirements(true, "Slave1", 10000, 5, sc) === false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -365,43 +365,43 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
new MesosSchedulerBackend(taskScheduler, sc, "master")
}

test("isOfferValidForScheduling return true when there offer meet cpu and memory requirement") {
test("isOfferSatisfiesRequirements return true when there offer meet cpu and memory requirement") {
val sc = mock[SparkContext]
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)

assert(schedulerBackend.isOfferValidForScheduling( 5, 10000, "Slave1", sc))
assert(schedulerBackend.isOfferSatisfiesRequirements( 5, 10000, "Slave1", sc))
}

test("isOfferValidForScheduling return false when memory in offer is less than required memory") {
test("isOfferSatisfiesRequirements return false when memory in offer is less than required memory") {
val sc = mock[SparkContext]
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)

assert(schedulerBackend.isOfferValidForScheduling(5, 10, "Slave1", sc) === false)
assert(schedulerBackend.isOfferSatisfiesRequirements(5, 10, "Slave1", sc) === false)
}

test("isOfferValidForScheduling return false when cpu in offer is less than required cpu") {
test("isOfferSatisfiesRequirements return false when cpu in offer is less than required cpu") {
val sc = mock[SparkContext]
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)

assert(schedulerBackend.isOfferValidForScheduling(0, 10000, "Slave1", sc) === false)
assert(schedulerBackend.isOfferSatisfiesRequirements(0, 10000, "Slave1", sc) === false)
}

test("isOfferValidForScheduling return true when offer is from slave already running and" +
test("isOfferSatisfiesRequirements return true when offer is from slave already running and" +
" cpu is less than minimum cpu per task an executor") {
val sc = mock[SparkContext]
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)
schedulerBackend.slaveIdToExecutorInfo("Slave2") = null

assert(schedulerBackend.isOfferValidForScheduling(2, 10000, "Slave2", sc) === true)
assert(schedulerBackend.isOfferSatisfiesRequirements(2, 10000, "Slave2", sc) === true)
}

test("isOfferValidForScheduling return false when offer is from slave already running but" +
test("isOfferSatisfiesRequirements return false when offer is from slave already running but" +
" cpu is less than minimum cpu per task an executor") {
val sc = mock[SparkContext]
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)
schedulerBackend.slaveIdToExecutorInfo("Slave2") = null

assert(schedulerBackend.isOfferValidForScheduling(1, 10000, "Slave2", sc) === false)
assert(schedulerBackend.isOfferSatisfiesRequirements(1, 10000, "Slave2", sc) === false)
}

}

0 comments on commit 288c698

Please sign in to comment.