Skip to content

Commit

Permalink
[SPARK-8379] [SQL] avoid speculative tasks write to the same file
Browse files Browse the repository at this point in the history
The issue link [SPARK-8379](https://issues.apache.org/jira/browse/SPARK-8379)
Currently,when we insert data to the dynamic partition with speculative tasks we will get the Exception
```
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
Lease mismatch on /tmp/hive-jeanlyn/hive_2015-06-15_15-20-44_734_8801220787219172413-1/-ext-10000/ds=2015-06-15/type=2/part-00301.lzo
owned by DFSClient_attempt_201506031520_0011_m_000189_0_-1513487243_53
but is accessed by DFSClient_attempt_201506031520_0011_m_000042_0_-1275047721_57
```
This pr try to write the data to temporary dir when using dynamic parition  avoid the speculative tasks writing the same file

Author: jeanlyn <[email protected]>

Closes apache#6833 from jeanlyn/speculation and squashes the following commits:

64bbfab [jeanlyn] use FileOutputFormat.getTaskOutputPath to get the path
8860af0 [jeanlyn] remove the never using code
e19a3bd [jeanlyn] avoid speculative tasks write same file
  • Loading branch information
jeanlyn authored and liancheng committed Jun 21, 2015
1 parent 41ab285 commit a1e3649
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ case class InsertIntoHiveTable(
table.hiveQlTable.getPartCols().foreach { entry =>
orderedPartitionSpec.put(entry.getName, partitionSpec.get(entry.getName).getOrElse(""))
}
val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)

// inheritTableSpecs is set to true. It should be set to false for a IMPORT query
// which is currently considered as a Hive native command.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,11 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)

val path = {
val outputPath = FileOutputFormat.getOutputPath(conf.value)
assert(outputPath != null, "Undefined job output-path")
val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
new Path(workPath, getOutputName)
}
// use the path like ${hive_tmp}/_temporary/${attemptId}/
// to avoid write to the same file when `spark.speculation=true`
val path = FileOutputFormat.getTaskOutputPath(
conf.value,
dynamicPartPath.stripPrefix("/") + "/" + getOutputName)

HiveFileFormatUtils.getHiveRecordWriter(
conf.value,
Expand Down

0 comments on commit a1e3649

Please sign in to comment.