Skip to content

Commit

Permalink
avoid speculative tasks write same file
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanlyn committed Jun 15, 2015
1 parent 4c5889e commit e19a3bd
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[hive] class SparkHiveWriterContainer(
@transient private var writer: FileSinkOperator.RecordWriter = null
@transient protected lazy val committer = conf.value.getOutputCommitter
@transient protected lazy val jobContext = newJobContext(conf.value, jID.value)
@transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
@transient protected lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
@transient private lazy val outputFormat =
conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]

Expand Down Expand Up @@ -230,7 +230,15 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
val path = {
val outputPath = FileOutputFormat.getOutputPath(conf.value)
assert(outputPath != null, "Undefined job output-path")
val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
var workPath = outputPath
if(committer.isInstanceOf[FileOutputCommitter]) {
// use the path like $outputPath/_temporary/${attemptId}/
// to avoid write to the same file when `spark.speculation=true`
workPath = committer
.asInstanceOf[FileOutputCommitter]
.getWorkPath(taskContext, outputPath)
}
workPath = new Path(workPath, dynamicPartPath.stripPrefix("/"))
new Path(workPath, getOutputName)
}

Expand Down

0 comments on commit e19a3bd

Please sign in to comment.