Skip to content

Commit

Permalink
[SPARK-10618] [Mesos] removing meetconstraints check as it is already…
Browse files Browse the repository at this point in the history
… done
  • Loading branch information
SleepyThread committed Dec 16, 2015
1 parent 288c698 commit 53e0600
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private[spark] class CoarseMesosSchedulerBackend(
val cpus = getResource(offer.getResourcesList, "cpus").toInt
val id = offer.getId.getValue
if (meetsConstraints) {
if (isOfferSatisfiesRequirements(meetsConstraints, slaveId, mem, cpus, sc)) {
if (isOfferSatisfiesRequirements(slaveId, mem, cpus, sc)) {
// Launch an executor on the slave
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
totalCoresAcquired += cpusToUse
Expand Down Expand Up @@ -304,9 +304,8 @@ private[spark] class CoarseMesosSchedulerBackend(
}

// ToDo: Abstract out each condition and log them.
def isOfferSatisfiesRequirements(meetsConstraints: Boolean,
slaveId: String, mem: Double,
cpusOffered: Int, sc: SparkContext): Boolean = {
def isOfferSatisfiesRequirements(slaveId: String, mem: Double, cpusOffered: Int,
sc: SparkContext): Boolean = {
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsCPURequirements = cpusOffered >= 1
val needMoreCores = totalCoresAcquired < maxCores
Expand All @@ -317,7 +316,6 @@ private[spark] class CoarseMesosSchedulerBackend(
executorNotRunningOnSlave &&
taskOnEachSlaveLessThanExecutorLimit &&
needMoreCores &&
meetsConstraints &&
meetsMemoryRequirements &&
meetsCPURequirements &&
healthySlave
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,47 +197,42 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
test("isOfferSatisfiesRequirements return true when there is a valid offer") {
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)

assert(schedulerBackend.isOfferSatisfiesRequirements(true, "Slave1", 10000, 5, sc))
assert(schedulerBackend.isOfferSatisfiesRequirements("Slave1", 10000, 5, sc))
}

test("isOfferSatisfiesRequirements return false when offer do not meet constraints") {
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)

assert(schedulerBackend.isOfferSatisfiesRequirements(false, "Slave1", 10000, 5, sc) === false)
}

test("isOfferSatisfiesRequirements return false when memory in offer is less than required memory") {
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)

assert(schedulerBackend.isOfferSatisfiesRequirements(true, "Slave1", 1, 5, sc) === false)
assert(schedulerBackend.isOfferSatisfiesRequirements("Slave1", 1, 5, sc) === false)
}

test("isOfferSatisfiesRequirements return false when cpu in offer is less than required cpu") {
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)

assert(schedulerBackend.isOfferSatisfiesRequirements(true, "Slave1", 10000, 0, sc) === false)
assert(schedulerBackend.isOfferSatisfiesRequirements("Slave1", 10000, 0, sc) === false)
}

test("isOfferSatisfiesRequirements return false when offer is from slave already running" +
" an executor") {
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)
schedulerBackend.slaveIdsWithExecutors += "Slave2"

assert(schedulerBackend.isOfferSatisfiesRequirements(true, "Slave2", 10000, 5, sc) === false)
assert(schedulerBackend.isOfferSatisfiesRequirements("Slave2", 10000, 5, sc) === false)
}

test("isOfferSatisfiesRequirements return false when task is failed more than " +
"MAX_SLAVE_FAILURES times on the given slave") {
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)
schedulerBackend.failuresBySlaveId("Slave3") = 2

assert(schedulerBackend.isOfferSatisfiesRequirements(true, "Slave3", 10000, 5, sc) === false)
assert(schedulerBackend.isOfferSatisfiesRequirements("Slave3", 10000, 5, sc) === false)
}

test("isOfferSatisfiesRequirements return false when max core is already acquired") {
val schedulerBackend = createSchedulerBackendForGivenSparkConf(sc)
schedulerBackend.totalCoresAcquired = 10

assert(schedulerBackend.isOfferSatisfiesRequirements(true, "Slave1", 10000, 5, sc) === false)
assert(schedulerBackend.isOfferSatisfiesRequirements("Slave1", 10000, 5, sc) === false)
}
}

0 comments on commit 53e0600

Please sign in to comment.