Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13747][SQL] Fix concurrent query with fork-join pool #11586

Closed
wants to merge 3 commits into from

Conversation

andrewor14
Copy link
Contributor

What changes were proposed in this pull request?

Fix this use case, which was already fixed in SPARK-10548 in 1.6 but was broken in master due to #9264:

(1 to 100).par.foreach { _ => sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() }

This threw IllegalArgumentException consistently before this patch. For more detail, see the JIRA.

How was this patch tested?

New test in SQLExecutionSuite.

@andrewor14
Copy link
Contributor Author

@zsxwing

@zsxwing
Copy link
Member

zsxwing commented Mar 8, 2016

LGTM

@SparkQA
Copy link

SparkQA commented Mar 8, 2016

Test build #52687 has finished for PR 11586 at commit 661c28e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

try {
// Should not throw IllegalArgumentException
(1 to 100).par.foreach { _ =>
sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed sqlcontext.implicits

@SparkQA
Copy link

SparkQA commented Mar 10, 2016

Test build #52772 has finished for PR 11586 at commit 9273874.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Mar 10, 2016

Merging to master

@asfgit asfgit closed this in 37fcda3 Mar 10, 2016
@andrewor14 andrewor14 deleted the fix-concurrent-sql branch March 10, 2016 01:49
@@ -613,7 +613,12 @@ class DAGScheduler(
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
Await.ready(waiter.completionFuture, atMost = Duration.Inf)
// Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlockManager#doPut() does this on line 891:

Await.ready(replicationFuture, Duration.Inf)

Should that be replaced as well ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No necessary. The issue here is because the ThreadLocal properties doesn't work well with ForkJoinPool. A new task may see some ThreadLocal properties from other task since they are using the same thread.

Does BlockManager use ThreadLocal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently no.

roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
## What changes were proposed in this pull request?

Fix this use case, which was already fixed in SPARK-10548 in 1.6 but was broken in master due to apache#9264:

```
(1 to 100).par.foreach { _ => sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() }
```

This threw `IllegalArgumentException` consistently before this patch. For more detail, see the JIRA.

## How was this patch tested?

New test in `SQLExecutionSuite`.

Author: Andrew Or <[email protected]>

Closes apache#11586 from andrewor14/fix-concurrent-sql.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants