From fd4dc69a37826927d7a0e1ceeb8036109f6f09f1 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Wed, 13 Mar 2019 11:09:01 +0800 Subject: [PATCH 1/2] Fix bug -- insert overwrite local dir in yarn-client mode. --- .../hive/execution/InsertIntoHiveDirCommand.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index 0c694910b06d4..b2dfa425d93b5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -86,12 +86,12 @@ case class InsertIntoHiveDirCommand( val jobConf = new JobConf(hadoopConf) val targetPath = new Path(storage.locationUri.get) + val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf) val writeToPath = if (isLocal) { val localFileSystem = FileSystem.getLocal(jobConf) localFileSystem.makeQualified(targetPath) } else { - val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf) val dfs = qualifiedPath.getFileSystem(jobConf) if (!dfs.exists(qualifiedPath)) { dfs.mkdirs(qualifiedPath.getParent) @@ -99,7 +99,8 @@ case class InsertIntoHiveDirCommand( qualifiedPath } - val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, writeToPath) + // The temporary path must be a HDFS path, not a local path. + val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, qualifiedPath) val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc( tmpPath.toString, tableDesc, false) @@ -119,7 +120,12 @@ case class InsertIntoHiveDirCommand( } fs.listStatus(tmpPath).foreach { - tmpFile => fs.rename(tmpFile.getPath, writeToPath) + tmpFile => + if (isLocal) { + fs.copyToLocalFile(tmpFile.getPath, writeToPath) + } else { + fs.rename(tmpFile.getPath, writeToPath) + } } } catch { case e: Throwable => From 0913a45ac701b99fc1c8d8e556646a45f30c330f Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Thu, 28 Mar 2019 10:02:11 +0800 Subject: [PATCH 2/2] create local dir explicitly. --- .../execution/InsertIntoHiveDirCommand.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala index b2dfa425d93b5..1825af6191ce0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala @@ -87,17 +87,17 @@ case class InsertIntoHiveDirCommand( val targetPath = new Path(storage.locationUri.get) val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf) - val writeToPath = + val (writeToPath: Path, fs: FileSystem) = if (isLocal) { val localFileSystem = FileSystem.getLocal(jobConf) - localFileSystem.makeQualified(targetPath) + (localFileSystem.makeQualified(targetPath), localFileSystem) } else { - val dfs = qualifiedPath.getFileSystem(jobConf) - if (!dfs.exists(qualifiedPath)) { - dfs.mkdirs(qualifiedPath.getParent) - } - qualifiedPath + val dfs = qualifiedPath.getFileSystem(hadoopConf) + (qualifiedPath, dfs) } + if (!fs.exists(writeToPath)) { + fs.mkdirs(writeToPath) + } // The temporary path must be a HDFS path, not a local path. val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, qualifiedPath) @@ -112,19 +112,19 @@ case class InsertIntoHiveDirCommand( fileSinkConf = fileSinkConf, outputLocation = tmpPath.toString) - val fs = writeToPath.getFileSystem(hadoopConf) if (overwrite && fs.exists(writeToPath)) { fs.listStatus(writeToPath).foreach { existFile => if (Option(existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true) } } - fs.listStatus(tmpPath).foreach { + val dfs = tmpPath.getFileSystem(hadoopConf) + dfs.listStatus(tmpPath).foreach { tmpFile => if (isLocal) { - fs.copyToLocalFile(tmpFile.getPath, writeToPath) + dfs.copyToLocalFile(tmpFile.getPath, writeToPath) } else { - fs.rename(tmpFile.getPath, writeToPath) + dfs.rename(tmpFile.getPath, writeToPath) } } } catch {