Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20848][SQL] Shutdown the pool after reading parquet files #18073

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,8 @@ object ParquetFileFormat extends Logging {
partFiles: Seq[FileStatus],
ignoreCorruptFiles: Boolean): Seq[Footer] = {
val parFiles = partFiles.par
parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
val pool = new ForkJoinPool(8)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it be better to share one global thread pool? Creating a lot of thread pools may not increase the concurrency

Copy link
Member Author

@viirya viirya May 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main concern is that if we share a thread pool for parquet reading, we may limit the concurrency as @srowen pointed out in the JIRA.

If we have multiple parquet reading in parallel, they will share one pool. Currently they own their pools.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if using a shared one will change current behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok let's keep the previous behavior

parFiles.tasksupport = new ForkJoinTaskSupport(pool)
parFiles.flatMap { currentFile =>
try {
// Skips row group information since we only need the schema.
Expand All @@ -495,6 +496,8 @@ object ParquetFileFormat extends Logging {
} else {
throw new IOException(s"Could not read footer for file: $currentFile", e)
}
} finally {
pool.shutdown()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we terminate pool inside flatMap?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not doing it outside? For example?

    val parFiles = partFiles.par
    val pool = new ForkJoinPool(8)
    parFiles.tasksupport = new ForkJoinTaskSupport(pool)
    try {
      parFiles.flatMap { currentFile =>
        ...
      }.seq
    } finally {
      pool.shutdown()
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect this will fail some test, but it didn't...

When you fix this error, could you call ThreadUtils.newForkJoinPool instead to set a better thread name?

Copy link
Member

@zsxwing zsxwing May 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not doing it outside? For example?

Just realized the toSeq is lazy. But shutting down in flatMap is also not correct. NVM. I was wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing @gatorsmile

I was shutdowning it outside at the beginning of this PR. I changed to current way after @srowen's suggestion.

I was thinking it can be wrong initially. But seems it is fine and I think the tasks are all invoked at the beginning and no more tasks are submitted later, so to shutdown inside is ok.

I can go to submit a follow-up if you still think we need to change it. Thank you.

Copy link
Member

@zsxwing zsxwing May 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't check the details. But I guess the implementation will submit tasks one by one. Then it's possible that when the first task is shutting down the pool, some tasks has not yet been submitted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. We should take a safer approach. Let me submit a follow-up for this. Thanks @zsxwing.

}
}.seq
}
Expand Down