-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-10548] [SPARK-10563] [SQL] Fix concurrent SQL executions #8710
Conversation
such as, cough cough, the SQL execution ID. This was a problem because scala's parallel collections spawns threads as children of the existing threads, causing the execution ID to be inherited when it shouldn't be.
Because java.util.Properties' remove method takes in an Any instead of a String, there were some issues with matching the key's hashCode, so removing was not successful in unit tests. Instead, this commit fixes it by manually filtering out the keys and adding them to the child thread's properties.
Test build #42307 has finished for PR 8710 at commit
|
// We need to run this multiple times to ensure new threads are spawned. Without the fix | ||
// for SPARK-10548, this usually fails on the second try. | ||
val df = sparkContext.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b") | ||
(1 to 10).par.foreach { _ => df.count() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought DataFrame is not thread-safe and should not be used like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What makes you think that? SparkContext/SQLContext/DataFrames should be threadsafe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I think we have fixed that, and if not I would also consider that a bug :)
|
Ah, I see. So the issue is:
|
@@ -348,10 +348,27 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli | |||
|
|||
// Thread Local variable that can be used by users to pass information down the stack | |||
private val localProperties = new InheritableThreadLocal[Properties] { | |||
override protected def childValue(parent: Properties): Properties = new Properties(parent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andrewor14 I'm thinking maybe we should not use new Properties(parent)
here. Instead, always copy the parent's Properties to the child's Properties. Do you think if the child thread needs to see the further changes to the parent thread's Properties after creating?
This is really confusing when using Executor
like ForkJoinPool
(scala.concurrent.ExecutionContext.Implicits.global), in which thread A creates thread B but thread B is not a child of thread A. But thread B still can see the changes in thread A.
/cc @jerryshao since you added this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @zsxwing , reasons to use InheritableThreadLocal
can be seen here (mesos/spark#937). mainly it is used for Spark Streaming with FIFO scheduling strategy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. However, new Properties(parent)
keeps a reference to parent rather than copying them. So If we make any change to the parent thread's properties after creating the child thread, the child thread will see them.
Is it necessary that the child thread keeps track of the further updates of the parent thread's properties? I think copying them would be more reasonable.
I didn't mean removing this line. I mean changing it to
val child = new Properties()
child.putAll(parent)
child
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree with you that copying is more reasonable, for now I cannot image any scenario which requires to keep track of parent's properties, I think it is OK for me to change it, we can always fix this if there's any special scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao Thanks :)
@andrewor14 how about just copying the parent properties rather than adding nonInheritedLocalProperties
? It looks simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I was actually going to do it in a separate patch. Incidentally @tdas @JoshRosen and I just talked about this last night and we all agreed to make it do a clone instead so the semantics are simpler.
However, my one concern is that doing so will change semantics for non-SQL users in 1.5.1, so my proposal is the following: I will make the changes in this patch and merge this patch ONLY into master. Then I'll create a new patch for branch 1.5 that will have the current changes (the ones where we don't clone except for SQL). I think that's the safest way forward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I have updated this in the latest commit, and filed SPARK-10563 for this issue.
Test build #42308 has finished for PR 8710 at commit
|
correct! |
... to make the behavior more consistent in SQL vs non-SQL cases.
As of the latest commit this patch should only be merged into master. I consider the fix for SPARK-10563 a little too risky for 1.5.1, so I will open a separate patch for branch-1.5 without that fix. |
retest this please |
Test build #42342 has finished for PR 8710 at commit
|
Test build #42357 has finished for PR 8710 at commit
|
…executions Conflicts: core/src/test/scala/org/apache/spark/ThreadingSuite.scala
Test build #42361 has finished for PR 8710 at commit
|
/** | ||
* Keys of local properties that should not be inherited by children threads. | ||
*/ | ||
private[spark] val nonInheritedLocalProperties: HashSet[String] = new HashSet[String] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exposing a mutable HashSet
in the thread-safe SparkContext
looks dangerous. Actually, I suggest not to add nonInheritedLocalProperties
in the master branch. How about just cloning the parent properties without adding the nonInheritedLocalProperties
logic? I understand that we still need nonInheritedLocalProperties
for 1.5 branch to avoid changing the semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole point of this is to avoid inheriting the SQL execution ID, which fixes SPARK-10548. How can we fix this issue with just cloning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this private in the latest commit and added a setter method for it. Does this address your concern?
@zsxwing I just noticed a potential source of confusion. If I understand correctly your view is that we should just clone the properties instead of having the |
I believe that this is not the cause of If there is no execution id in the local properties when creating a child thread and we change it to clone the properties, then the child thread won't see the execution id that is set by the parent thread. |
Test build #42379 has finished for PR 8710 at commit
|
The fix for SPARK-10548 can be simplified by just cloning the parent properties on inherit rather than excluding specific properties from ever being inherited. This is safe because the child thread must be created BEFORE the parent thread runs a query.
Ah, I see. You're saying the child thread must be created before the query is run, not while it's running. That makes sense. Previously I accounted for the case where the child thread is created in the middle of the query, which I suppose is not possible. I have updated the code based on your suggestion. |
b4bcc3c
to
fce3819
Compare
…executions Conflicts: core/src/test/scala/org/apache/spark/ThreadingSuite.scala
Test build #42441 has finished for PR 8710 at commit
|
Test build #42447 has finished for PR 8710 at commit
|
Test build #42452 has finished for PR 8710 at commit
|
Test build #1753 has finished for PR 8710 at commit
|
Test build #1751 has finished for PR 8710 at commit
|
Test build #1752 has finished for PR 8710 at commit
|
LGTM |
Thanks, I'm merging this into master. |
…nch-1.5 *Note: this is for branch-1.5 only* This is the same as #8710 but affects only SQL. The more general fix for SPARK-10563 is considered risky to backport into a maintenance release, so it is disabled by default and enabled only in SQL. Author: Andrew Or <[email protected]> Closes #8721 from andrewor14/concurrent-sql-executions-1.5 and squashes the following commits: 3b9b462 [Andrew Or] Merge branch 'branch-1.5' of github.com:apache/spark into concurrent-sql-executions-1.5 4435db7 [Andrew Or] Clone properties only for SQL for backward compatibility 0b7e5ab [Andrew Or] Clone parent local properties on inherit
We are still experiencing this. See SPARK-10548. I've verified that we are indeed using a version of Spark with SPARK-10548 implementation yet the issue is still reproducible. In fact, if in the test case, you:
you can anticipate when a thread will throw the exception. |
This still seems to be around. java.lang.IllegalArgumentException: "spark.sql.execution.id is already set" Trace: |
@d-ee do you have a reproducer? Let's move the discussion to JIRA instead of here. |
We're seeing this exception too. We're also running our operations in serial (at least on the surface it seems as if we are). If we execute a This specifically happens when we load Avro files from S3 and save them as Parquet back to S3. The loading works fine but the saving fails on 2nd attempt. Furthermore, if we simply generate a We're using Java 1.8, Scala 2.10.5, with our Spark codebase at commit 15de51c. Our exact reproduction steps are: 1. Run a Spark Shell with appropriate dependencies
2. Run the following setup code within the shell
3. Run this twice - but leaving time for the first execution to finish (so the operations are serialised)
Result:
|
@ljwagerfield it should be fixed in #11586 |
Yes, unfortunately that is only available in the upcoming 2.0 so you will have to upgrade to fix the problem. |
…nch-1.5 *Note: this is for branch-1.5 only* This is the same as apache#8710 but affects only SQL. The more general fix for SPARK-10563 is considered risky to backport into a maintenance release, so it is disabled by default and enabled only in SQL. Author: Andrew Or <[email protected]> Closes apache#8721 from andrewor14/concurrent-sql-executions-1.5 and squashes the following commits: 3b9b462 [Andrew Or] Merge branch 'branch-1.5' of github.com:apache/spark into concurrent-sql-executions-1.5 4435db7 [Andrew Or] Clone properties only for SQL for backward compatibility 0b7e5ab [Andrew Or] Clone parent local properties on inherit (cherry picked from commit 997be78)
Note: this is for master branch only. The fix for branch-1.5 is at #8721.
The query execution ID is currently passed from a thread to its children, which is not the intended behavior. This led to
IllegalArgumentException: spark.sql.execution.id is already set
when running queries in parallel, e.g.:The cause is
SparkContext
's local properties are inherited by default. This patch adds a way to exclude keys we don't want to be inherited, and makes SQL go through that code path.