From e19a3bd77b6b9f44479e51659e244e9809b2963d Mon Sep 17 00:00:00 2001 From: jeanlyn Date: Tue, 16 Jun 2015 00:38:16 +0800 Subject: [PATCH] avoid speculative tasks write same file --- .../apache/spark/sql/hive/hiveWriterContainers.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index ee440e304ec19..a48813a0ae79e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -68,7 +68,7 @@ private[hive] class SparkHiveWriterContainer( @transient private var writer: FileSinkOperator.RecordWriter = null @transient protected lazy val committer = conf.value.getOutputCommitter @transient protected lazy val jobContext = newJobContext(conf.value, jID.value) - @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value) + @transient protected lazy val taskContext = newTaskAttemptContext(conf.value, taID.value) @transient private lazy val outputFormat = conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]] @@ -230,7 +230,15 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( val path = { val outputPath = FileOutputFormat.getOutputPath(conf.value) assert(outputPath != null, "Undefined job output-path") - val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) + var workPath = outputPath + if(committer.isInstanceOf[FileOutputCommitter]) { + // use the path like $outputPath/_temporary/${attemptId}/ + // to avoid write to the same file when `spark.speculation=true` + workPath = committer + .asInstanceOf[FileOutputCommitter] + .getWorkPath(taskContext, outputPath) + } + workPath = new Path(workPath, dynamicPartPath.stripPrefix("/")) new Path(workPath, getOutputName) }