Skip to content

Commit

Permalink
[SPARK-26936][SQL] Fix bug of insert overwrite local dir can not crea…
Browse files Browse the repository at this point in the history
…te temporary path in local staging directory

## What changes were proposed in this pull request?
Th environment of my cluster as follows:
```
OS:Linux version 2.6.32-220.7.1.el6.x86_64 (mockbuildc6b18n3.bsys.dev.centos.org) (gcc version 4.4.6 20110731 (Red Hat 4.4.6-3) (GCC) ) #1 SMP Wed Mar 7 00:52:02 GMT 2012
Hadoop: 2.7.2
Spark: 2.3.0 or 3.0.0(master branch)
Hive: 1.2.1
```

My spark run on deploy mode yarn-client.

If I execute the SQL `insert overwrite local directory '/home/test/call_center/' select * from call_center`, a HiveException will appear as follows:
`Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.IOException: Mkdirs failed to create file:/home/xitong/hive/stagingdir_hive_2019-02-19_17-31-00_678_1816816774691551856-1/-ext-10000/_temporary/0/_temporary/attempt_20190219173233_0002_m_000000_3 (exists=false, cwd=file:/data10/yarn/nm-local-dir/usercache/xitong/appcache/application_1543893582405_6126857/container_e124_1543893582405_6126857_01_000011)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:249)`
Current spark sql generate a local temporary path in local staging directory.The schema of local temporary path start with `file`, so the HiveException appears.
This PR change the local temporary path to HDFS temporary path, and use DistributedFileSystem instance copy the data from HDFS temporary path to local directory.
If Spark run on local deploy mode, 'insert overwrite local directory' works fine.
## How was this patch tested?

UT cannot support yarn-client mode.The test is in my product environment.

Closes apache#23841 from beliefer/fix-bug-of-insert-overwrite-local-dir.

Authored-by: gengjiaan <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
beliefer authored and srowen committed Apr 5, 2019
1 parent 39f75b4 commit 979bb90
Showing 1 changed file with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,21 @@ case class InsertIntoHiveDirCommand(
val jobConf = new JobConf(hadoopConf)

val targetPath = new Path(storage.locationUri.get)
val writeToPath =
val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf)
val (writeToPath: Path, fs: FileSystem) =
if (isLocal) {
val localFileSystem = FileSystem.getLocal(jobConf)
localFileSystem.makeQualified(targetPath)
(localFileSystem.makeQualified(targetPath), localFileSystem)
} else {
val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf)
val dfs = qualifiedPath.getFileSystem(jobConf)
if (!dfs.exists(qualifiedPath)) {
dfs.mkdirs(qualifiedPath.getParent)
}
qualifiedPath
val dfs = qualifiedPath.getFileSystem(hadoopConf)
(qualifiedPath, dfs)
}
if (!fs.exists(writeToPath)) {
fs.mkdirs(writeToPath)
}

val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, writeToPath)
// The temporary path must be a HDFS path, not a local path.
val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, qualifiedPath)
val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc(
tmpPath.toString, tableDesc, false)

Expand All @@ -111,15 +112,20 @@ case class InsertIntoHiveDirCommand(
fileSinkConf = fileSinkConf,
outputLocation = tmpPath.toString)

val fs = writeToPath.getFileSystem(hadoopConf)
if (overwrite && fs.exists(writeToPath)) {
fs.listStatus(writeToPath).foreach { existFile =>
if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true)
}
}

fs.listStatus(tmpPath).foreach {
tmpFile => fs.rename(tmpFile.getPath, writeToPath)
val dfs = tmpPath.getFileSystem(hadoopConf)
dfs.listStatus(tmpPath).foreach {
tmpFile =>
if (isLocal) {
dfs.copyToLocalFile(tmpFile.getPath, writeToPath)
} else {
dfs.rename(tmpFile.getPath, writeToPath)
}
}
} catch {
case e: Throwable =>
Expand Down

0 comments on commit 979bb90

Please sign in to comment.