-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-8406] [SQL] Adding UUID to output file name to avoid accidental overwriting #6864
Conversation
Background and alternative solutions for this issue can be a little bit complex. Will give a summary of offline discussion with @yhuai here later. |
Test build #35066 has finished for PR 6864 at commit
|
// more cores, the issue can be reproduced steadily. Fortunately our Jenkins builder meets this | ||
// requirement. We probably want to move this test case to spark-integration-tests or spark-perf | ||
// later. | ||
test("SPARK-8406") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a description in addition to the JIRA?
retest this please |
The last build failure looks pretty weird: a large part of Jenkins build log output are replaced by tens of thousands of lines of integer triples, and none of the 5 test failure can be reproduced locally. |
OK, found out that those integers are printed by |
Test build #35077 has finished for PR 6864 at commit
|
(Removed the original comment as it was just the background knowledge and discussion summary which had already been moved into the PR description.) |
Thank you @liancheng for the summary, which is clear for me who didn't dive into this part before. One thing that I am think about when I review the code #6833, how to remove the redundant files when user switch on the |
@chenghao-intel Thanks for the comment! Speculation is a great point that I didn't notice. Updated this PR and now use a job level UUID instead of a task level one. Because essentially, what we want is to avoid name collision between different write jobs (potentially issued by different Spark applications). Within a single write job, we can always avoid name collision with the help of task ID. |
Test build #35258 has finished for PR 6864 at commit
|
@yhuai Updated PR description with an updated version of the summary commented above. This is ready for review. |
Test build #35259 has finished for PR 6864 at commit
|
@@ -70,7 +71,7 @@ private[sql] case class InsertIntoHadoopFsRelation( | |||
relation.paths.length == 1, | |||
s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}") | |||
|
|||
val hadoopConf = sqlContext.sparkContext.hadoopConfiguration | |||
val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this? We already do val job = new Job(hadoopConf)
below. BTW, we need to add comment to explain new Job
will clone the conf.
Test build #936 has finished for PR 6864 at commit
|
retest this please |
Retesting for gaining more test failure logs to diagnose. |
Test build #35313 has finished for PR 6864 at commit
|
Test build #35344 has finished for PR 6864 at commit
|
thanks @liancheng. @chenghao-intel there is other situation that needs to be considered when using data source interface.when some tasks are finished but job is failed because some tasks are failed, it needs to remove all output files of this job. |
@lianhuiwang Yeah, thanks for reminding. We are also working on this issue. It will be addressed in another PR. At first, appending jobs with output committers like |
With the help from @yhuai, finally found the root cause of the The reason why it shows in this PR and couldn't be reproduced locally on my laptop is that I changed the thread count number of the local For this reason, I made two more updates:
|
Test build #35361 has finished for PR 6864 at commit
|
@@ -49,7 +49,7 @@ import scala.collection.JavaConversions._ | |||
object TestHive | |||
extends TestHiveContext( | |||
new SparkContext( | |||
System.getProperty("spark.sql.test.master", "local[2]"), | |||
System.getProperty("spark.sql.test.master", "local[32]"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should still use local[*]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'd better use a fixed number here to improve determinism (if we use 32 from the beginning, the ORC bug would be much easier to reproduce).
Test build #35417 has finished for PR 6864 at commit
|
It's proven that increasing thread number of the local |
Test build #35423 timed out for PR 6864 at commit |
test this please |
super.abortTask() | ||
throw new RuntimeException("Failed to commit task", cause) | ||
} catch { case cause: Throwable => | ||
throw new RuntimeException("Failed to commit task", cause) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This exception will be cached in writeRows
, right? If so, can we add a comment and also explain how we will handle this exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I think we need to also add doc to InsertIntoHadoopFsRelation
to explain the flow of this command and how we handle different kinds of failures/errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, it's handled in writeRows
. Agree with more comments, I made multiple mistakes here myself...
LGTM. Left two comments regarding adding comments/docs. |
Test build #35429 has finished for PR 6864 at commit
|
#6932 was opened to backport this PR to branch-1.4. |
Can chance this can be merged today? |
Test build #35437 has finished for PR 6864 at commit
|
@nemccarthy Yeah. This one should be in today. I am taking a final check now. |
LGTM. I am merging it to master. |
Author: Cheng Lian <[email protected]> Closes #6932 from liancheng/spark-8406-for-1.4 and squashes the following commits: a0168fe [Cheng Lian] Backports SPARK-8406 and PR #6864 to branch-1.4
Author: Cheng Lian <[email protected]> Closes apache#6932 from liancheng/spark-8406-for-1.4 and squashes the following commits: a0168fe [Cheng Lian] Backports SPARK-8406 and PR apache#6864 to branch-1.4
…l overwriting This PR fixes a Parquet output file name collision bug which may cause data loss. Changes made: 1. Identify each write job issued by `InsertIntoHadoopFsRelation` with a UUID All concrete data sources which extend `HadoopFsRelation` (Parquet and ORC for now) must use this UUID to generate task output file path to avoid name collision. 2. Make `TestHive` use a local mode `SparkContext` with 32 threads to increase parallelism The major reason for this is that, the original parallelism of 2 is too low to reproduce the data loss issue. Also, higher concurrency may potentially caught more concurrency bugs during testing phase. (It did help us spotted SPARK-8501.) 3. `OrcSourceSuite` was updated to workaround SPARK-8501, which we detected along the way. NOTE: This PR is made a little bit more complicated than expected because we hit two other bugs on the way and have to work them around. See [SPARK-8501] [1] and [SPARK-8513] [2]. [1]: https://github.com/liancheng/spark/tree/spark-8501 [2]: https://github.com/liancheng/spark/tree/spark-8513 ---- Some background and a summary of offline discussion with yhuai about this issue for better understanding: In 1.4.0, we added `HadoopFsRelation` to abstract partition support of all data sources that are based on Hadoop `FileSystem` interface. Specifically, this makes partition discovery, partition pruning, and writing dynamic partitions for data sources much easier. To support appending, the Parquet data source tries to find out the max part number of part-files in the destination directory (i.e., `<id>` in output file name `part-r-<id>.gz.parquet`) at the beginning of the write job. In 1.3.0, this step happens on driver side before any files are written. However, in 1.4.0, this is moved to task side. Unfortunately, for tasks scheduled later, they may see wrong max part number generated of files newly written by other finished tasks within the same job. This actually causes a race condition. In most cases, this only causes nonconsecutive part numbers in output file names. But when the DataFrame contains thousands of RDD partitions, it's likely that two tasks may choose the same part number, then one of them gets overwritten by the other. Before `HadoopFsRelation`, Spark SQL already supports appending data to Hive tables. From a user's perspective, these two look similar. However, they differ a lot internally. When data are inserted into Hive tables via Spark SQL, `InsertIntoHiveTable` simulates Hive's behaviors: 1. Write data to a temporary location 2. Move data in the temporary location to the final destination location using - `Hive.loadTable()` for non-partitioned table - `Hive.loadPartition()` for static partitions - `Hive.loadDynamicPartitions()` for dynamic partitions The important part is that, `Hive.copyFiles()` is invoked in step 2 to move the data to the destination directory (I found the name is kinda confusing since no "copying" occurs here, we are just moving and renaming stuff). If a file in the source directory and another file in the destination directory happen to have the same name, say `part-r-00001.parquet`, the former is moved to the destination directory and renamed with a `_copy_N` postfix (`part-r-00001_copy_1.parquet`). That's how Hive handles appending and avoids name collision between different write jobs. Some alternatives fixes considered for this issue: 1. Use a similar approach as Hive This approach is not preferred in Spark 1.4.0 mainly because file metadata operations in S3 tend to be slow, especially for tables with lots of file and/or partitions. That's why `InsertIntoHadoopFsRelation` just inserts to destination directory directly, and is often used together with `DirectParquetOutputCommitter` to reduce latency when working with S3. This means, we don't have the chance to do renaming, and must avoid name collision from the very beginning. 2. Same as 1.3, just move max part number detection back to driver side This isn't doable because unlike 1.3, 1.4 also takes dynamic partitioning into account. When inserting into dynamic partitions, we don't know which partition directories will be touched on driver side before issuing the write job. Checking all partition directories is simply too expensive for tables with thousands of partitions. 3. Add extra component to output file names to avoid name collision This seems to be the only reasonable solution for now. To be more specific, we need a JOB level unique identifier to identify all write jobs issued by `InsertIntoHadoopFile`. Notice that TASK level unique identifiers can NOT be used. Because in this way a speculative task will write to a different output file from the original task. If both tasks succeed, duplicate output will be left behind. Currently, the ORC data source adds `System.currentTimeMillis` to the output file name for uniqueness. This doesn't work because of exactly the same reason. That's why this PR adds a job level random UUID in `BaseWriterContainer` (which is used by `InsertIntoHadoopFsRelation` to issue write jobs). The drawback is that record order is not preserved any more (output files of a later job may be listed before those of a earlier job). However, we never promise to preserve record order when writing data, and Hive doesn't promise this either because the `_copy_N` trick breaks the order. Author: Cheng Lian <[email protected]> Closes apache#6864 from liancheng/spark-8406 and squashes the following commits: db7a46a [Cheng Lian] More comments f5c1133 [Cheng Lian] Addresses comments 85c478e [Cheng Lian] Workarounds SPARK-8513 088c76c [Cheng Lian] Adds comment about SPARK-8501 99a5e7e [Cheng Lian] Uses job level UUID in SimpleTextRelation and avoids double task abortion 4088226 [Cheng Lian] Works around SPARK-8501 1d7d206 [Cheng Lian] Adds more logs 8966bbb [Cheng Lian] Fixes Scala style issue 18b7003 [Cheng Lian] Uses job level UUID to take speculative tasks into account 3806190 [Cheng Lian] Lets TestHive use all cores by default 748dbd7 [Cheng Lian] Adding UUID to output file name to avoid accidental overwriting
This PR fixes a Parquet output file name collision bug which may cause data loss. Changes made:
InsertIntoHadoopFsRelation
with a UUIDAll concrete data sources which extend
HadoopFsRelation
(Parquet and ORC for now) must use this UUID to generate task output file path to avoid name collision.2. Make
TestHive
use a local modeSparkContext
with 32 threads to increase parallelismThe major reason for this is that, the original parallelism of 2 is too low to reproduce the data loss issue. Also, higher concurrency may potentially caught more concurrency bugs during testing phase. (It did help us spotted SPARK-8501.)
3.
OrcSourceSuite
was updated to workaround SPARK-8501, which we detected along the way.NOTE: This PR is made a little bit more complicated than expected because we hit two other bugs on the way and have to work them around. See SPARK-8501 and SPARK-8513.
Some background and a summary of offline discussion with @yhuai about this issue for better understanding:
In 1.4.0, we added
HadoopFsRelation
to abstract partition support of all data sources that are based on HadoopFileSystem
interface. Specifically, this makes partition discovery, partition pruning, and writing dynamic partitions for data sources much easier.To support appending, the Parquet data source tries to find out the max part number of part-files in the destination directory (i.e.,
<id>
in output file namepart-r-<id>.gz.parquet
) at the beginning of the write job. In 1.3.0, this step happens on driver side before any files are written. However, in 1.4.0, this is moved to task side. Unfortunately, for tasks scheduled later, they may see wrong max part number generated of files newly written by other finished tasks within the same job. This actually causes a race condition. In most cases, this only causes nonconsecutive part numbers in output file names. But when the DataFrame contains thousands of RDD partitions, it's likely that two tasks may choose the same part number, then one of them gets overwritten by the other.Before
HadoopFsRelation
, Spark SQL already supports appending data to Hive tables. From a user's perspective, these two look similar. However, they differ a lot internally. When data are inserted into Hive tables via Spark SQL,InsertIntoHiveTable
simulates Hive's behaviors:Hive.loadTable()
for non-partitioned tableHive.loadPartition()
for static partitionsHive.loadDynamicPartitions()
for dynamic partitionsThe important part is that,
Hive.copyFiles()
is invoked in step 2 to move the data to the destination directory (I found the name is kinda confusing since no "copying" occurs here, we are just moving and renaming stuff). If a file in the source directory and another file in the destination directory happen to have the same name, saypart-r-00001.parquet
, the former is moved to the destination directory and renamed with a_copy_N
postfix (part-r-00001_copy_1.parquet
). That's how Hive handles appending and avoids name collision between different write jobs.Some alternatives fixes considered for this issue:
This approach is not preferred in Spark 1.4.0 mainly because file metadata operations in S3 tend to be slow, especially for tables with lots of file and/or partitions. That's why
InsertIntoHadoopFsRelation
just inserts to destination directory directly, and is often used together withDirectParquetOutputCommitter
to reduce latency when working with S3. This means, we don't have the chance to do renaming, and must avoid name collision from the very beginning.2. Same as 1.3, just move max part number detection back to driver side
This isn't doable because unlike 1.3, 1.4 also takes dynamic partitioning into account. When inserting into dynamic partitions, we don't know which partition directories will be touched on driver side before issuing the write job. Checking all partition directories is simply too expensive for tables with thousands of partitions.
3. Add extra component to output file names to avoid name collision
This seems to be the only reasonable solution for now. To be more specific, we need a JOB level unique identifier to identify all write jobs issued by
InsertIntoHadoopFile
. Notice that TASK level unique identifiers can NOT be used. Because in this way a speculative task will write to a different output file from the original task. If both tasks succeed, duplicate output will be left behind. Currently, the ORC data source addsSystem.currentTimeMillis
to the output file name for uniqueness. This doesn't work because of exactly the same reason.That's why this PR adds a job level random UUID in
BaseWriterContainer
(which is used byInsertIntoHadoopFsRelation
to issue write jobs). The drawback is that record order is not preserved any more (output files of a later job may be listed before those of a earlier job). However, we never promise to preserve record order when writing data, and Hive doesn't promise this either because the_copy_N
trick breaks the order.