Skip to content

Commit

Permalink
[SPARK-7795] [CORE] Speed up task scheduling in standalone mode by re…
Browse files Browse the repository at this point in the history
…using serializer

My experiments with scheduling very short tasks in standalone cluster mode indicated that a significant amount of time was being spent in scheduling the tasks (>500ms for 256 tasks).  I found that most of the time was being spent in creating a new instance of serializer for each task.  Changing this to just one serializer brought down the scheduling time to 8ms.

Author: Akshat Aranya <[email protected]>

Closes apache#6323 from coolfrood/master and squashes the following commits:

12d8c9e [Akshat Aranya] Reduce visibility of serializer
bd4a5dd [Akshat Aranya] Style fix
0b8ca93 [Akshat Aranya] Incorporate review comments
fe530cd [Akshat Aranya] Speed up task scheduling in standalone mode by reusing serializer instead of creating a new one for each task.
  • Loading branch information
coolfrood authored and jeanlyn committed Jun 12, 2015
1 parent d01915b commit 2a8131e
Showing 1 changed file with 8 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {

// If this DriverEndpoint is changed to support multiple threads,
// then this may need to be changed so that we don't share the serializer
// instance across threads
private val ser = SparkEnv.get.closureSerializer.newInstance()

override protected def log = CoarseGrainedSchedulerBackend.this.log

private val addressToExecutorId = new HashMap[RpcAddress, String]
Expand Down Expand Up @@ -163,7 +168,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

// Make fake resource offers on all executors
def makeOffers() {
private def makeOffers() {
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq))
Expand All @@ -175,16 +180,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

// Make fake resource offers on just one executor
def makeOffers(executorId: String) {
private def makeOffers(executorId: String) {
val executorData = executorDataMap(executorId)
launchTasks(scheduler.resourceOffers(
Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))))
}

// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val ser = SparkEnv.get.closureSerializer.newInstance()
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
Expand Down

0 comments on commit 2a8131e

Please sign in to comment.