Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate TaskDescrition constructors. #2

Merged
merged 1 commit into from
Feb 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,40 +54,27 @@ private[spark] class TaskDescription(
val index: Int, // Index within this task's TaskSet
val addedFiles: Map[String, Long],
val addedJars: Map[String, Long],
val properties: Properties) extends Logging {

def this(
taskId: Long,
attemptNumber: Int,
executorId: String,
name: String,
index: Int, // Index within this task's TaskSet
addedFiles: Map[String, Long],
addedJars: Map[String, Long],
properties: Properties,
task: Task[_]) {
this(taskId, attemptNumber, executorId, name, index,
addedFiles, addedJars, properties)
task_ = task
}

def serializedTask: ByteBuffer = {
// This is where we serialize the task on the driver before sending it to the executor.
// This is not done when creating the TaskDescription so we can postpone this serialization
// to later in the scheduling process -- particularly,
// so it can happen in another thread by the CoarseGrainedSchedulerBackend.
try {
ByteBuffer.wrap(Utils.serialize(task_))
} catch {
case NonFatal(e) =>
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
logError(msg, e)
throw new TaskNotSerializableException(e)
}
val properties: Properties,
// Task object corresponding to the TaskDescription. This is only defined on the master; on
// the worker, the Task object is handled separately from the TaskDescription so that it can
// deserialized after the TaskDescription is deserialized.
@transient private val task: Task[_] = null) extends Logging {

/**
* Serializes the task for this TaskDescription and returns the serialized task.
*
* This method should only be used on the master (to serialize a task to send to a worker).
*/
def serializeTask(): ByteBuffer = {
try {
ByteBuffer.wrap(Utils.serialize(task))
} catch {
case NonFatal(e) =>
val msg = s"Failed to serialize task ${taskId}."
logError(msg, e)
throw new TaskNotSerializableException(e)
}

private var task_ : Task[_] = null

}
override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
}

Expand All @@ -101,7 +88,7 @@ private[spark] object TaskDescription {
}

@throws[TaskNotSerializableException]
def encode(taskDescription: TaskDescription, serializedTask: ByteBuffer): ByteBuffer = {
def encode(taskDescription: TaskDescription): ByteBuffer = {
val bytesOut = new ByteBufferOutputStream(4096)
val dataOut = new DataOutputStream(bytesOut)

Expand All @@ -124,8 +111,8 @@ private[spark] object TaskDescription {
dataOut.writeUTF(value)
}

// Write the task. The task is already serialized, so write it directly to the byte buffer.
Utils.writeByteBuffer(serializedTask, bytesOut)
// Serialize and write the task.
Utils.writeByteBuffer(taskDescription.serializeTask(), bytesOut)

dataOut.close()
bytesOut.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val serializedTasks = tasks.flatten.map { task =>
var serializedTask: ByteBuffer = null
try {
serializedTask = TaskDescription.encode(task, task.serializedTask)
serializedTask = TaskDescription.encode(task)
if (serializedTask.limit >= maxRpcMessageSize) {
val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ private[spark] class LocalEndpoint(
val serializedTasks = scheduler.resourceOffers(offers).flatten.map { task =>
var serializedTask: ByteBuffer = null
try {
serializedTask = task.serializedTask
serializedTask = task.serializeTask
} catch {
case NonFatal(e) =>
abortTaskSetManager(scheduler, task.taskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class TaskDescriptionSuite extends SparkFunSuite {
originalProperties.put("property1", "18")
originalProperties.put("property2", "test value")

// Create a dummy byte buffer for the task.
val taskBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))

val originalTaskDescription = new TaskDescription(
Expand All @@ -48,10 +47,15 @@ class TaskDescriptionSuite extends SparkFunSuite {
index = 19,
originalFiles,
originalJars,
originalProperties
)
originalProperties,
// Pass in null for the task, because we override the serialize method below anyway (which
// is the only time task is used).
task = null
) {
override def serializeTask() = taskBuffer
}

val serializedTaskDescription = TaskDescription.encode(originalTaskDescription, taskBuffer)
val serializedTaskDescription = TaskDescription.encode(originalTaskDescription)
val (decodedTaskDescription, serializedTask) = TaskDescription.decode(serializedTaskDescription)

// Make sure that all of the fields in the decoded task description match the original.
Expand Down