-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist #15153
Conversation
Test build #65611 has finished for PR 15153 at commit
|
cc @cloud-fan |
LGTM, merging to master! @brkyvz this conflicts with 2.0, can you send a new PR? thanks! |
@cloud-fan Something similar to this already exists in Branch 2.0. In branch 2.0, we're going to need something similar to this: |
## What changes were proposed in this pull request? This Backports PR #15153 and PR #15122 to Spark 2.0 branch for Structured Streaming. It is structured a bit differently because similar code paths already existed in the 2.0 branch. The unit test makes sure that both behaviors don't break. Author: Burak Yavuz <[email protected]> Closes #15202 from brkyvz/backports-to-streaming.
@brkyvz the change here only affects the serial version, and not the parallel version, does it? Wouldn't that be a problem? |
@petermaxlee It is true that the parallel version can fail as well, the same kind of race condition can bite people there |
… doesn't exist ## What changes were proposed in this pull request? This PR proposes to follow up #15153 and complete SPARK-17599. `FileSystem` operation (`fs.getFileBlockLocations`) can still fail if the file path does not exist. For example see the exception message below: ``` Error occurred while processing: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv ... java.io.FileNotFoundException: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv ... org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:249) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:229) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:314) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:297) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles(InMemoryFileIndex.scala:297) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:174) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:173) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles(InMemoryFileIndex.scala:173) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:126) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:67) at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:161) at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$tempFileIndex$1(DataSource.scala:152) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:166) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:261) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:196) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:206) at com.hwx.StreamTest$.main(StreamTest.scala:97) at com.hwx.StreamTest.main(StreamTest.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv ... ``` So, it fixes it to make a warning instead. ## How was this patch tested? It's hard to write a test. Manually tested multiple times. Author: hyukjinkwon <[email protected]> Closes #21408 from HyukjinKwon/missing-files. (cherry picked from commit 8a54582) Signed-off-by: hyukjinkwon <[email protected]>
… doesn't exist ## What changes were proposed in this pull request? This PR proposes to follow up #15153 and complete SPARK-17599. `FileSystem` operation (`fs.getFileBlockLocations`) can still fail if the file path does not exist. For example see the exception message below: ``` Error occurred while processing: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv ... java.io.FileNotFoundException: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv ... org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:249) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:229) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:314) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:297) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles(InMemoryFileIndex.scala:297) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:174) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:173) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles(InMemoryFileIndex.scala:173) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:126) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91) at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:67) at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:161) at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$tempFileIndex$1(DataSource.scala:152) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:166) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:261) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:196) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:206) at com.hwx.StreamTest$.main(StreamTest.scala:97) at com.hwx.StreamTest.main(StreamTest.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv ... ``` So, it fixes it to make a warning instead. ## How was this patch tested? It's hard to write a test. Manually tested multiple times. Author: hyukjinkwon <[email protected]> Closes #21408 from HyukjinKwon/missing-files.
…es.ignoreMissingFiles ## What changes were proposed in this pull request? Spark's `InMemoryFileIndex` contains two places where `FileNotFound` exceptions are caught and logged as warnings (during [directory listing](https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L274) and [block location lookup](https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L333)). This logic was added in apache#15153 and apache#21408. I think that this is a dangerous default behavior because it can mask bugs caused by race conditions (e.g. overwriting a table while it's being read) or S3 consistency issues (there's more discussion on this in the [JIRA ticket](https://issues.apache.org/jira/browse/SPARK-27676)). Failing fast when we detect missing files is not sufficient to make concurrent table reads/writes or S3 listing safe (there are other classes of eventual consistency issues to worry about), but I think it's still beneficial to throw exceptions and fail-fast on the subset of inconsistencies / races that we _can_ detect because that increases the likelihood that an end user will notice the problem and investigate further. There may be some cases where users _do_ want to ignore missing files, but I think that should be an opt-in behavior via the existing `spark.sql.files.ignoreMissingFiles` flag (the current behavior is itself race-prone because a file might be be deleted between catalog listing and query execution time, triggering FileNotFoundExceptions on executors (which are handled in a way that _does_ respect `ignoreMissingFIles`)). This PR updates `InMemoryFileIndex` to guard the log-and-ignore-FileNotFoundException behind the existing `spark.sql.files.ignoreMissingFiles` flag. **Note**: this is a change of default behavior, so I think it needs to be mentioned in release notes. ## How was this patch tested? New unit tests to simulate file-deletion race conditions, tested with both values of the `ignoreMissingFIles` flag. Closes apache#24668 from JoshRosen/SPARK-27676. Lead-authored-by: Josh Rosen <[email protected]> Co-authored-by: Josh Rosen <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…es.ignoreMissingFiles ## What changes were proposed in this pull request? Spark's `InMemoryFileIndex` contains two places where `FileNotFound` exceptions are caught and logged as warnings (during [directory listing](https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L274) and [block location lookup](https://github.com/apache/spark/blob/bcd3b61c4be98565352491a108e6394670a0f413/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala#L333)). This logic was added in apache#15153 and apache#21408. I think that this is a dangerous default behavior because it can mask bugs caused by race conditions (e.g. overwriting a table while it's being read) or S3 consistency issues (there's more discussion on this in the [JIRA ticket](https://issues.apache.org/jira/browse/SPARK-27676)). Failing fast when we detect missing files is not sufficient to make concurrent table reads/writes or S3 listing safe (there are other classes of eventual consistency issues to worry about), but I think it's still beneficial to throw exceptions and fail-fast on the subset of inconsistencies / races that we _can_ detect because that increases the likelihood that an end user will notice the problem and investigate further. There may be some cases where users _do_ want to ignore missing files, but I think that should be an opt-in behavior via the existing `spark.sql.files.ignoreMissingFiles` flag (the current behavior is itself race-prone because a file might be be deleted between catalog listing and query execution time, triggering FileNotFoundExceptions on executors (which are handled in a way that _does_ respect `ignoreMissingFIles`)). This PR updates `InMemoryFileIndex` to guard the log-and-ignore-FileNotFoundException behind the existing `spark.sql.files.ignoreMissingFiles` flag. **Note**: this is a change of default behavior, so I think it needs to be mentioned in release notes. ## How was this patch tested? New unit tests to simulate file-deletion race conditions, tested with both values of the `ignoreMissingFIles` flag. Closes apache#24668 from JoshRosen/SPARK-27676. Lead-authored-by: Josh Rosen <[email protected]> Co-authored-by: Josh Rosen <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
What changes were proposed in this pull request?
The
ListingFileCatalog
lists files given a set of resolved paths. If a folder is deleted at any time between the paths were resolved and the file catalog can check for the folder, the Spark job fails. This may abruptly stop long running StructuredStreaming jobs for example.Folders may be deleted by users or automatically by retention policies. These cases should not prevent jobs from successfully completing.
How was this patch tested?
Unit test in
FileCatalogSuite