Skip to content

Commit

Permalink
Incorporate review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
coolfrood committed May 21, 2015
1 parent fe530cd commit 0b8ca93
Showing 1 changed file with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val maxRegisteredWaitingTimeMs =
conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
val createTime = System.currentTimeMillis()
// If this CoarseGrainedSchedulerBackend is changed to support multiple threads, then this may need
// to be changed so that we don't share the serializer instance across threads
private val ser = SparkEnv.get.closureSerializer.newInstance()

private val executorDataMap = new HashMap[String, ExecutorData]

Expand Down Expand Up @@ -163,7 +166,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

// Make fake resource offers on all executors
def makeOffers() {
private def makeOffers() {
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq))
Expand All @@ -175,15 +178,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

// Make fake resource offers on just one executor
def makeOffers(executorId: String) {
private def makeOffers(executorId: String) {
val executorData = executorDataMap(executorId)
launchTasks(scheduler.resourceOffers(
Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))))
}

// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
val ser = SparkEnv.get.closureSerializer.newInstance()
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
Expand Down

0 comments on commit 0b8ca93

Please sign in to comment.