-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20848][SQL] Shutdown the pool after reading parquet files #18073
Conversation
cc @srowen |
parFiles.flatMap { currentFile => | ||
val readParquetTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) | ||
parFiles.tasksupport = readParquetTaskSupport | ||
val footers = parFiles.flatMap { currentFile => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could probably put the shutdown in a finally block to avoid adding this new reference. I think you can rearrange to use the existing one below. It does help shut it down in case of an error too. Also I think you can hold a reference just to the ForkJoinPool in order to shut it down rather than a ref to ForkJoinTaskSupport. No big deal
Test build #77254 has finished for PR 18073 at commit
|
cc @cloud-fan @gatorsmile This is a regression in 2.1. I think we may want to include this in 2.2. Please take a look. Thanks. |
val numThreadAfter = Thread.activeCount | ||
// Hard to test a correct thread number, | ||
// but it shouldn't increase more than a reasonable number. | ||
assert(numThreadAfter - numThreadBefore < 20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after waiting for enough time, can we expect this to be 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It reduces to a few (about 3) after waiting an enough time. The number returned by Thread.activeCount is only an estimate. So we may not expect this to be 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks hacky, can we think of a better way to test it? If not, I suggest to remove this test, as the fix is straightforward and we can verify it manually by some profile tools.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Let's remove the test.
Test build #77279 has finished for PR 18073 at commit
|
@@ -479,7 +479,8 @@ object ParquetFileFormat extends Logging { | |||
partFiles: Seq[FileStatus], | |||
ignoreCorruptFiles: Boolean): Seq[Footer] = { | |||
val parFiles = partFiles.par | |||
parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) | |||
val pool = new ForkJoinPool(8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will it be better to share one global thread pool? Creating a lot of thread pools may not increase the concurrency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main concern is that if we share a thread pool for parquet reading, we may limit the concurrency as @srowen pointed out in the JIRA.
If we have multiple parquet reading in parallel, they will share one pool. Currently they own their pools.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if using a shared one will change current behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok let's keep the previous behavior
@viirya can you test it manually with JVisualVM or other tools and attach the screen shot in this PR? thanks! |
Test build #77296 has finished for PR 18073 at commit
|
@cloud-fan My dev environment is not convenient to run GUI-based tools like jconsole. I use a command-line tool jvmtop. Screen shots (the column "#T" is the number of threads): |
## What changes were proposed in this pull request? From JIRA: On each call to spark.read.parquet, a new ForkJoinPool is created. One of the threads in the pool is kept in the WAITING state, and never stopped, which leads to unbounded growth in number of threads. We should shutdown the pool after reading parquet files. ## How was this patch tested? Added a test to ParquetFileFormatSuite. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes #18073 from viirya/SPARK-20848. (cherry picked from commit f72ad30) Signed-off-by: Wenchen Fan <[email protected]>
## What changes were proposed in this pull request? From JIRA: On each call to spark.read.parquet, a new ForkJoinPool is created. One of the threads in the pool is kept in the WAITING state, and never stopped, which leads to unbounded growth in number of threads. We should shutdown the pool after reading parquet files. ## How was this patch tested? Added a test to ParquetFileFormatSuite. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes #18073 from viirya/SPARK-20848. (cherry picked from commit f72ad30) Signed-off-by: Wenchen Fan <[email protected]>
thanks, merging to master/2.2/2.1! |
@@ -495,6 +496,8 @@ object ParquetFileFormat extends Logging { | |||
} else { | |||
throw new IOException(s"Could not read footer for file: $currentFile", e) | |||
} | |||
} finally { | |||
pool.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we terminate pool
inside flatMap
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not doing it outside? For example?
val parFiles = partFiles.par
val pool = new ForkJoinPool(8)
parFiles.tasksupport = new ForkJoinTaskSupport(pool)
try {
parFiles.flatMap { currentFile =>
...
}.seq
} finally {
pool.shutdown()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expect this will fail some test, but it didn't...
When you fix this error, could you call ThreadUtils.newForkJoinPool
instead to set a better thread name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not doing it outside? For example?
Just realized the NVM. I was wrong.toSeq
is lazy. But shutting down in flatMap is also not correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was shutdowning it outside at the beginning of this PR. I changed to current way after @srowen's suggestion.
I was thinking it can be wrong initially. But seems it is fine and I think the tasks are all invoked at the beginning and no more tasks are submitted later, so to shutdown inside is ok.
I can go to submit a follow-up if you still think we need to change it. Thank you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't check the details. But I guess the implementation will submit tasks one by one. Then it's possible that when the first task is shutting down the pool, some tasks has not yet been submitted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. We should take a safer approach. Let me submit a follow-up for this. Thanks @zsxwing.
… files ## What changes were proposed in this pull request? This is a follow-up to #18073. Taking a safer approach to shutdown the pool to prevent possible issue. Also using `ThreadUtils.newForkJoinPool` instead to set a better thread name. ## How was this patch tested? Manually test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes #18100 from viirya/SPARK-20848-followup. (cherry picked from commit 6b68d61) Signed-off-by: Wenchen Fan <[email protected]>
… files ## What changes were proposed in this pull request? This is a follow-up to #18073. Taking a safer approach to shutdown the pool to prevent possible issue. Also using `ThreadUtils.newForkJoinPool` instead to set a better thread name. ## How was this patch tested? Manually test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes #18100 from viirya/SPARK-20848-followup. (cherry picked from commit 6b68d61) Signed-off-by: Wenchen Fan <[email protected]>
… files ## What changes were proposed in this pull request? This is a follow-up to #18073. Taking a safer approach to shutdown the pool to prevent possible issue. Also using `ThreadUtils.newForkJoinPool` instead to set a better thread name. ## How was this patch tested? Manually test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes #18100 from viirya/SPARK-20848-followup.
From JIRA: On each call to spark.read.parquet, a new ForkJoinPool is created. One of the threads in the pool is kept in the WAITING state, and never stopped, which leads to unbounded growth in number of threads. We should shutdown the pool after reading parquet files. Added a test to ParquetFileFormatSuite. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes apache#18073 from viirya/SPARK-20848. (cherry picked from commit f72ad30) Signed-off-by: Wenchen Fan <[email protected]>
… files This is a follow-up to apache#18073. Taking a safer approach to shutdown the pool to prevent possible issue. Also using `ThreadUtils.newForkJoinPool` instead to set a better thread name. Manually test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <[email protected]> Closes apache#18100 from viirya/SPARK-20848-followup. (cherry picked from commit 6b68d61) Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
From JIRA: On each call to spark.read.parquet, a new ForkJoinPool is created. One of the threads in the pool is kept in the WAITING state, and never stopped, which leads to unbounded growth in number of threads.
We should shutdown the pool after reading parquet files.
How was this patch tested?
Added a test to ParquetFileFormatSuite.
Please review http://spark.apache.org/contributing.html before opening a pull request.