Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26936][SQL] Fix bug of insert overwrite local dir can not create temporary path in local staging directory #23841

Closed
wants to merge 2 commits into from
Closed

[SPARK-26936][SQL] Fix bug of insert overwrite local dir can not create temporary path in local staging directory #23841

wants to merge 2 commits into from

Conversation

beliefer
Copy link
Contributor

@beliefer beliefer commented Feb 20, 2019

What changes were proposed in this pull request?

Th environment of my cluster as follows:

OS:Linux version 2.6.32-220.7.1.el6.x86_64 ([email protected]) (gcc version 4.4.6 20110731 (Red Hat 4.4.6-3) (GCC) ) #1 SMP Wed Mar 7 00:52:02 GMT 2012
Hadoop: 2.7.2
Spark: 2.3.0 or 3.0.0(master branch)
Hive: 1.2.1

My spark run on deploy mode yarn-client.

If I execute the SQL insert overwrite local directory '/home/test/call_center/' select * from call_center, a HiveException will appear as follows:
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.IOException: Mkdirs failed to create file:/home/xitong/hive/stagingdir_hive_2019-02-19_17-31-00_678_1816816774691551856-1/-ext-10000/_temporary/0/_temporary/attempt_20190219173233_0002_m_000000_3 (exists=false, cwd=file:/data10/yarn/nm-local-dir/usercache/xitong/appcache/application_1543893582405_6126857/container_e124_1543893582405_6126857_01_000011) at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:249)
Current spark sql generate a local temporary path in local staging directory.The schema of local temporary path start with file, so the HiveException appears.
This PR change the local temporary path to HDFS temporary path, and use DistributedFileSystem instance copy the data from HDFS temporary path to local directory.
If Spark run on local deploy mode, 'insert overwrite local directory' works fine.

How was this patch tested?

UT cannot support yarn-client mode.The test is in my product environment.

@maropu
Copy link
Member

maropu commented Feb 20, 2019

Can you add tests before runing tests in Jenkins?

@beliefer
Copy link
Contributor Author

beliefer commented Feb 21, 2019

Can you add tests before runing tests in Jenkins?
Existing unit tests
And add a junit of create not exists local directory

@maropu
Copy link
Member

maropu commented Feb 22, 2019

ok to test

@SparkQA
Copy link

SparkQA commented Feb 22, 2019

Test build #102610 has finished for PR 23841 at commit 8799a86.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 22, 2019

Test build #102612 has finished for PR 23841 at commit 747296e.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 22, 2019

Test build #102614 has finished for PR 23841 at commit 19698df.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 22, 2019

Test build #102642 has finished for PR 23841 at commit 41215c2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@beliefer
Copy link
Contributor Author

beliefer commented Feb 26, 2019

@maropu Please review this pr again,thanks!


val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, writeToPath)
// The temporary path must be a HDFS path, not a local path.
val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, qualifiedPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of inserts from non-hive tables, we still need to use a non-local path?

Copy link
Contributor Author

@beliefer beliefer Feb 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If target path is local, we still need to use a non-local path.

val path = dir.toURI.getPath
val notExistsPath = s"${path}/src/result"

sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${notExistsPath}' SELECT * FROM src where key < 10")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ${notExistsPath} -> $notExistsPath

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I have adjust it.


sql(
s"""
|INSERT OVERWRITE LOCAL DIRECTORY '${notExistsPath}'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I have adjust it.

|CREATE TEMPORARY VIEW orc_source
|USING org.apache.spark.sql.hive.orc
|OPTIONS (
| PATH '${notExistsPath}'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I have adjust it.


checkAnswer(
sql("select * from orc_source"),
sql("select * from src where key < 10"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should set literals in the expected answer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These code refers to the writing of other UT.

@@ -581,6 +581,38 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
}
}

test("insert overwrite to not exists dir from hive metastore table") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference from the "insert overwrite to not exist local dir" test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One is used for test result correct or not, another is used for test the not exist path created or not.

Copy link
Member

@maropu maropu Feb 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you gather into a single test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes,I can remove "insert overwrite to not exist local dir" test and retain another.

@maropu
Copy link
Member

maropu commented Feb 26, 2019

cc: @dongjoon-hyun

@SparkQA
Copy link

SparkQA commented Feb 26, 2019

Test build #102786 has finished for PR 23841 at commit 8d81fb6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 26, 2019

Test build #102788 has finished for PR 23841 at commit 90633d0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Feb 26, 2019

I realized that the test you added passed in the master without your fix... Can you check again?
Your example query is like this?

$ls /tmp/noexistdir
ls: /tmp/noexistdir: No such file or directory

scala> sql("""create table t(c0 int, c1 int)""")
scala> spark.table("t").explain
== Physical Plan ==
Scan hive default.t [c0#5, c1#6], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c0#5, c1#6]

scala> sql("""insert into t values(1, 1)""")
scala> sql("""select * from t""").show
+---+---+
| c0| c1|
+---+---+
|  1|  1|
+---+---+

scala> sql("""insert overwrite local directory '/tmp/noexistdir/t' select * from t""")

$ls /tmp/noexistdir/t/
_SUCCESS  part-00000-bbea4213-071a-49b4-aac8-8510e7263d45-c000

@beliefer
Copy link
Contributor Author

beliefer commented Feb 27, 2019

There parent path /tmp/noexistdir exists or not? Please make sure both parent path and child path not exists.
What is the deploy mode on which your query run?

@SparkQA
Copy link

SparkQA commented Feb 27, 2019

Test build #102810 has finished for PR 23841 at commit 8bb8e56.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Feb 27, 2019

My test case was that /tmp/noexistdir didn't exist first.

What is the deploy mode on which your query run?

./bin/spark-master --master=local[*]

If you have any precondition for the failuare, can you update the PR description?

@beliefer
Copy link
Contributor Author

OK,I have supplement the deploy mode for the PR description.

@beliefer
Copy link
Contributor Author

beliefer commented Feb 27, 2019

I have run these SQL you provided on local[*] deploy mode and still appear inconsistent behavior too.
The spark version to test is 2.3.0.

ls /tmp/noexistdir
ls: cannot access /tmp/noexistdir: No such file or directory

scala> sql("""create table t(c0 int, c1 int)""")
res0: org.apache.spark.sql.DataFrame = []
scala> spark.table("t").explain
== Physical Plan ==
HiveTableScan [c0#5, c1#6], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c0#5, c1#6]

scala> sql("""insert into t values(1, 1)""")
scala> sql("""select * from t""").show
+---+---+                                                                       
| c0| c1|
+---+---+
|  1|  1|
+---+---+

scala> sql("""insert overwrite local directory '/tmp/noexistdir/t' select * from t""")
res1: org.apache.spark.sql.DataFrame = [] 

ls /tmp/noexistdir/t/
/tmp/noexistdir/t

vi /tmp/noexistdir/t
  1 

'/tmp/noexistdir/t' is not a directory but a file.
Normally, temp path has result file and '_SUCCESS' file.
I doubt result file cover the '_SUCCESS' file by code as follows:
fs.listStatus(tmpPath).foreach { tmpFile => fs.rename(tmpFile.getPath, writeToPath) }

@beliefer
Copy link
Contributor Author

beliefer commented Feb 27, 2019

Then I pull the master branch and compile it and deploy it on my hadoop cluster.I get the inconsistent behavior again.
The spark version to test is 3.0.0.

ls /tmp/noexistdir
ls: cannot access /tmp/noexistdir: No such file or directory
Java HotSpot(TM) 64-Bit Server VM warning: Using the ParNew young collector with the Serial old collector is deprecated and will likely be removed in a future release
Spark context Web UI available at http://10.198.66.204:55326
Spark context available as 'sc' (master = local[*], app id = local-1551259036573).
Spark session available as 'spark'.
Welcome to spark version 3.0.0-SNAPSHOT
Using Scala version 2.12.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sql("""select * from t""").show
+---+---+                                                                       
| c0| c1|
+---+---+
|  1|  1|
+---+---+


scala> sql("""insert overwrite local directory '/tmp/noexistdir/t' select * from t""")
res1: org.apache.spark.sql.DataFrame = []                                       

scala> 
ll /tmp/noexistdir/t
-rw-r--r-- 1 xitong xitong 0 Feb 27 17:19 /tmp/noexistdir/t
vi /tmp/noexistdir/t
  1

The /tmp/noexistdir/t is a file too.

@beliefer
Copy link
Contributor Author

The origin code of InsertIntoHiveDirCommand.scala not create parent path on local FileSystem.

@beliefer
Copy link
Contributor Author

I have update the PR and associated JIRA.

@beliefer
Copy link
Contributor Author

@maropu Could you review the PR again?Thanks.

@beliefer
Copy link
Contributor Author

beliefer commented Feb 28, 2019

I have check the source of Hadoop LocalFileSystem .LocalFileSystem don't implement the method rename. LocalFileSystem extends ChecksumFileSystem and the latter implement the method rename.
The method rename of ChecksumFileSystem as follows:

  public boolean rename(Path src, Path dst) throws IOException {
    if (fs.isDirectory(src)) {
      return fs.rename(src, dst);
    } else {
      if (fs.isDirectory(dst)) {
        dst = new Path(dst, src.getName());
      }

      boolean value = fs.rename(src, dst);
      if (!value)
        return false;

      Path srcCheckFile = getChecksumFile(src);
      Path dstCheckFile = getChecksumFile(dst);
      if (fs.exists(srcCheckFile)) { //try to rename checksum
        value = fs.rename(srcCheckFile, dstCheckFile);
      } else if (fs.exists(dstCheckFile)) {
        // no src checksum, so remove dst checksum
        value = fs.delete(dstCheckFile, true); 
      }

      return value;
    }
  }

If target path is a directory, ChecksumFileSystem will move source file into target path.
If target path is not a directory, ChecksumFileSystem will rename source file to target file.
There exists a variable named fs that is a RawLocalFileSystem. RawLocalFileSystem will call the method rename of UNIXFileSystem or WinNTFileSystem.

@maropu
Copy link
Member

maropu commented Mar 1, 2019

Does this issue only happen in yarn-client mode?

@beliefer
Copy link
Contributor Author

beliefer commented Mar 1, 2019

Does this issue only happen in yarn-client mode?

This SQL will occur org.apache.hadoop.hive.ql.metadata.HiveException: java.io.IOException: Mkdirs failed to create file:/ on yarn-client mode first.
This SQL occurs inconsistent behavior on local mode.

@maropu
Copy link
Member

maropu commented Mar 1, 2019

You need to first check the test you added in this pr... it still passes without your fix.

@beliefer
Copy link
Contributor Author

beliefer commented Mar 1, 2019

You need to first check the test you added in this pr... it still passes without your fix.

Maybe UT run on local and depend on operating system or other difference of environment.
I have checked the test framework, found UT run on local mode indeed.

object TestHive
  extends TestHiveContext(
    new SparkContext(
      System.getProperty("spark.sql.test.master", "local[1]"),
      "TestSQLContext",
      new SparkConf()
        .set("spark.sql.test", "")
        .set(SQLConf.CODEGEN_FALLBACK.key, "false")
        .set("spark.sql.hive.metastore.barrierPrefixes",
          "org.apache.spark.sql.hive.execution.PairSerDe")
        .set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath)
        // SPARK-8910
        .set("spark.ui.enabled", "false")
        .set("spark.unsafe.exceptionOnMemoryLeak", "true")))

@SparkQA
Copy link

SparkQA commented Mar 1, 2019

Test build #102907 has finished for PR 23841 at commit 0efa4cf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2019

Test build #102911 has finished for PR 23841 at commit ce23ec7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Mar 26, 2019

You're changing all code paths though, ones that work. I am not clear that's valid. Why not work out why this happens only in yarn-client mode?

@beliefer
Copy link
Contributor Author

You're changing all code paths though, ones that work. I am not clear that's valid. Why not work out why this happens only in yarn-client mode?

I paste the full stack as follows:
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2037) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) ... 36 more Caused by: org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.IOException: Mkdirs failed to create file:/home/xitong/hive/stagingdir_hive_2019-02-19_17-31-00_678_1816816774691551856-1/-ext-10000/_temporary/0/_temporary/attempt_20190219173233_0002_m_000000_3 (exists=false, cwd=file:/data10/yarn/nm-local-dir/usercache/xitong/appcache/application_1543893582405_6126857/container_e124_1543893582405_6126857_01_000011) at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:249) at org.apache.spark.sql.hive.execution.HiveOutputWriter.<init>(HiveFileFormat.scala:123) at org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:103) at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:367) at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:378) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272) ... 8 more Caused by: java.io.IOException: Mkdirs failed to create file:/home/xitong/hive/stagingdir_hive_2019-02-19_17-31-00_678_1816816774691551856-1/-ext-10000/_temporary/0/_temporary/attempt_20190219173233_0002_m_000000_3 (exists=false, cwd=file:/data10/yarn/nm-local-dir/usercache/xitong/appcache/application_1543893582405_6126857/container_e124_1543893582405_6126857_01_000011) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:449) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:435) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:928) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:821) at org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.getHiveRecordWriter(HiveIgnoreKeyTextOutputFormat.java:80) at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:261) at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:246) ... 16 more

@beliefer
Copy link
Contributor Author

beliefer commented Mar 26, 2019

@srowen
According to the error:
FileFormatWriter will create SingleDirectoryWriteTask and call the method execute.
The SingleDirectoryWriteTask runs on one of nodes in yarn cluster.
The original code use a temp path start with file:/ , but this path is not a valid path in this node. the temp path only valid in driver node.

@cloud-fan
Copy link
Contributor

Will we hit this bug when we deploy spark in cluster? Seems to me it's not specific to yarn.

@beliefer
Copy link
Contributor Author

beliefer commented Mar 26, 2019

Will we hit this bug when we deploy spark in cluster? Seems to me it's not specific to yarn.

Yes, If spark runs in yarn-client deploy mode, this bug will occurs.

@srowen
Copy link
Member

srowen commented Mar 26, 2019

That makes more sense if this isn't YARN-specific, but isn't this still using a local path as if it's remote, or am I misreading?

@beliefer
Copy link
Contributor Author

That makes more sense if this isn't YARN-specific, but isn't this still using a local path as if it's remote, or am I misreading?

The SQL start with insert overwrite local directory decides the target path is local.
There exists an intermediate process write result data to a temp path first. After the intermediate process, move result data from temp path to local target path.
The original code uses a local path as temp path.
This PR uses a distributed path as temp path.

@srowen
Copy link
Member

srowen commented Mar 27, 2019

OK it's probably that I don't know this code well. Maybe my question is this: when isLocal == true, storage.locationUri is a local path, so targetPath points to a local directory. It is now used as a path on distributed storage here. It may not exist or be usable that way. Is that OK? It may happen to work but seems a little wrong. Or else I am misunderstanding how the temp dir gets created and it's not a big deal.

@meteorchenwu
Copy link

It is great.I have merged it into my production environment.

@beliefer
Copy link
Contributor Author

beliefer commented Mar 28, 2019

OK it's probably that I don't know this code well. Maybe my question is this: when isLocal == true, storage.locationUri is a local path, so targetPath points to a local directory. It is now used as a path on distributed storage here. It may not exist or be usable that way. Is that OK? It may happen to work but seems a little wrong. Or else I am misunderstanding how the temp dir gets created and it's not a big deal.

Yes, when isLocal == true, storage.locationUri is a local path, so targetPath points to a local directory. It is not used as a path on distributed storage here.
You means targetPath may not exist or be usable that way, so this PR will create the local directory explicitly.
Let me explain in another way:
Driver and Executor running in different node(or machine).The SQL start with insert overwrite local directory will write data to a temp path first. The original code will use a local temp path.The local temp path is valid in node that running Driver and it is invalid in node that running Executor.
So this PR change the local temp path to a distributed storage path can be recognized by Driver and all the Executor.

@srowen
Copy link
Member

srowen commented Mar 28, 2019

Yes I get all that, but here:
val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf)
targetPath is local. Why would you know it's valid path on distributed storage?

@beliefer
Copy link
Contributor Author

It is great.I have merged it into my production environment.

I add some code to guarantee the Robustness.

@beliefer
Copy link
Contributor Author

beliefer commented Mar 28, 2019

Yes I get all that, but here:
val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf)
targetPath is local. Why would you know it's valid path on distributed storage?

In yarn-client mode, qualifiedPath is a distributed path although targetPath is a local path.
In local mode, qualifiedPath is a local path.
I have debug and log these variables on our production environment.

The variables on our production environment as follows:
targetPath /home/xitong/gja_test/call_center
qualifiedPath hdfs://namenode.********:9000/home/xitong/gja_test/call_center

@SparkQA
Copy link

SparkQA commented Mar 28, 2019

Test build #104033 has finished for PR 23841 at commit 0913a45.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Mar 28, 2019

After the change, qualifiedPath is constructed as:
val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf)
Before it was qualified w.r.t. the local file system.
Doesn't this become a path on the distributed file system? that's the point of this change. But, if targetPath is "/tmp/foo", this assumes it's valid to use "/tmp/foo" on the distributed storage. That doesn't seem technically right. Am I missing something so far?

@beliefer
Copy link
Contributor Author

beliefer commented Mar 29, 2019

After the change, qualifiedPath is constructed as:
val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf)
Before it was qualified w.r.t. the local file system.
Doesn't this become a path on the distributed file system? that's the point of this change. But, if targetPath is "/tmp/foo", this assumes it's valid to use "/tmp/foo" on the distributed storage. That doesn't seem technically right. Am I missing something so far?

The key issue is not the targetPath but the tmpPath.
Before the change of this PR, the variables on our production environment in local mode as follows:
targetPath /home/***/gja_test/call_center
qualifiedPath(not used in fact) file:/home/***/gja_test/call_center
writeToPath file:/home/***/gja_test/call_center
tmpPath file:/home/***/hive/stagingdir_hive_2019-02-19_17-31-00_678_1816816774691551856-1/-ext-10000
tmpPath is constructed with writeToPath. The both path in the Driver node.

Before the change of this PR, the variables on our production environment in yarn-client mode as follows:
targetPath /home/***/gja_test/call_center
qualifiedPath(not used in fact) hdfs://namenode.***:9000/home/***/gja_test/call_center
writeToPath file:/home/***/gja_test/call_center
tmpPath file:/home/***/hive/stagingdir_hive_2019-02-19_17-31-00_678_1816816774691551856-1/-ext-10000
tmpPath is constructed with writeToPath. The both path in the Driver node. So Executor can not recognize tmpPath.

After the change of this PR , the variables on our production environment in yarn-client mode as follows:
targetPath /home/***/gja_test/call_center
qualifiedPath hdfs://namenode.***:9000/home/***/gja_test/call_center
writeToPath file:/home/***/gja_test/call_center
tmpPath hdfs://namenode.***:9000/home/***/hive/stagingdir_hive_2019-02-19_17-18-29_777_5647666931466990865-1/-ext-10000
tmpPath is constructed with qualifiedPath. writeToPath belongs to Driver and tmpPath belongs to distributed storage.

@beliefer
Copy link
Contributor Author

After the change, qualifiedPath is constructed as:
val qualifiedPath = FileUtils.makeQualified(targetPath, hadoopConf)
Before it was qualified w.r.t. the local file system.
Doesn't this become a path on the distributed file system? that's the point of this change. But, if targetPath is "/tmp/foo", this assumes it's valid to use "/tmp/foo" on the distributed storage. That doesn't seem technically right. Am I missing something so far?

If targetPath is "/tmp/foo", and assumes it's valid to use tmpPath "hdfs://tmp/foo" on the distributed storage.
But the original code uses the "file:/tmp/foo" as tmpPath.

@srowen
Copy link
Member

srowen commented Mar 29, 2019

Yeah, I think that's a potential problem. We don't know whether that same path is valid on a distributed store. I agree there's a problem to fix here. Is there any standard tmp dir you can instead write to in this case rather than reusing that local path as a distributed path?

@beliefer
Copy link
Contributor Author

beliefer commented Mar 29, 2019

Yeah, I think that's a potential problem. We don't know whether that same path is valid on a distributed store. I agree there's a problem to fix here. Is there any standard tmp dir you can instead write to in this case rather than reusing that local path as a distributed path?

The purpose of executing this command is to write the data to a local path. So the targetPath should be a local path. The writeToPath created with localFileSystem.makeQualified(targetPath) used to confirm the final path which store result data. The localFileSystem's type is LocalFileSystem so writeToPath is a local qualified targetPath. The qualifiedPath is always a distributed path in yarn-client deploy mode. The tmpPath is created with getExternalTmpPath and qualifiedPath used as temporary storage of result data. I think getExternalTmpPath used to generate a string about standard tmp dir. If qualifiedPath is a local path, the tmpPath is created with getExternalTmpPath is a local path too. If qualifiedPath is a distributed path, the tmpPath is created with getExternalTmpPath is a distributed path too. The qualifiedPath is not the same as writeToPath or targetPath,so there not reuse the local path.

@srowen
Copy link
Member

srowen commented Mar 29, 2019

OK, if you're saying this change doesn't cause the temp path to be some function of the (local) write path, then I get it. If it just comes up with a standard tmp location that is now on distributed storage, I get it.

@beliefer
Copy link
Contributor Author

OK, if you're saying this change doesn't cause the temp path to be some function of the (local) write path, then I get it. If it just comes up with a standard tmp location that is now on distributed storage, I get it.

Yes,it is.

@srowen
Copy link
Member

srowen commented Apr 1, 2019

@maropu @cloud-fan does this sound reasonable to you? given the extended discussion above and the last couple comments here

@beliefer
Copy link
Contributor Author

beliefer commented Apr 3, 2019

@maropu @cloud-fan srowen and I have a lot discussion above,can you pay attention to last couple comments?

@beliefer
Copy link
Contributor Author

beliefer commented Apr 4, 2019

@cloud-fan Please look this PR, thanks!

@beliefer
Copy link
Contributor Author

beliefer commented Apr 4, 2019

@maropu @cloud-fan does this sound reasonable to you? given the extended discussion above and the last couple comments here

This PR has been going on for more than 40 days. Please help me advance this PR. Thanks.

@srowen
Copy link
Member

srowen commented Apr 5, 2019

Given the discussion, my understanding, passed tests, and reports that this fixes the problem in a prod env, I'm merging this to master.

@srowen srowen closed this in 979bb90 Apr 5, 2019
@beliefer
Copy link
Contributor Author

beliefer commented Apr 6, 2019

Given the discussion, my understanding, passed tests, and reports that this fixes the problem in a prod env, I'm merging this to master.

Thank you very mush.Your rigor and attitude towards the discussion is very helpful to the contributors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants