-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-10548] [SPARK-10563] [SQL] Fix concurrent SQL executions #8710
Changes from 3 commits
8ceae42
3ec715c
d48c114
bbda199
5297f79
3c00cc6
801fbe0
35bb6f0
984a92f
fce3819
75a8d90
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,7 @@ import java.util.UUID.randomUUID | |
import scala.collection.JavaConverters._ | ||
import scala.collection.{Map, Set} | ||
import scala.collection.generic.Growable | ||
import scala.collection.mutable.HashMap | ||
import scala.collection.mutable.{HashMap, HashSet} | ||
import scala.reflect.{ClassTag, classTag} | ||
import scala.util.control.NonFatal | ||
|
||
|
@@ -348,10 +348,27 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli | |
|
||
// Thread Local variable that can be used by users to pass information down the stack | ||
private val localProperties = new InheritableThreadLocal[Properties] { | ||
override protected def childValue(parent: Properties): Properties = new Properties(parent) | ||
override protected def childValue(parent: Properties): Properties = { | ||
if (nonInheritedLocalProperties.nonEmpty) { | ||
// If there are properties that should not be inherited, filter them out | ||
val p = new Properties | ||
val filtered = parent.asScala.filter { case (k, _) => | ||
!nonInheritedLocalProperties.contains(k) | ||
} | ||
p.putAll(filtered.asJava) | ||
p | ||
} else { | ||
new Properties(parent) | ||
} | ||
} | ||
override protected def initialValue(): Properties = new Properties() | ||
} | ||
|
||
/** | ||
* Keys of local properties that should not be inherited by children threads. | ||
*/ | ||
private[spark] val nonInheritedLocalProperties: HashSet[String] = new HashSet[String] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exposing a mutable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The whole point of this is to avoid inheriting the SQL execution ID, which fixes SPARK-10548. How can we fix this issue with just cloning? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made this private in the latest commit and added a setter method for it. Does this address your concern? |
||
|
||
/* ------------------------------------------------------------------------------------- * | ||
| Initialization. This code initializes the context in a manner that is exception-safe. | | ||
| All internal fields holding state are initialized here, and any error prompts the | | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.execution | ||
|
||
import org.apache.spark.sql.test.SharedSQLContext | ||
|
||
class SQLExecutionSuite extends SharedSQLContext { | ||
import testImplicits._ | ||
|
||
test("query execution IDs are not inherited across threads") { | ||
sparkContext.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, "123") | ||
sparkContext.setLocalProperty("do-inherit-me", "some-value") | ||
var throwable: Option[Throwable] = None | ||
val thread = new Thread { | ||
override def run(): Unit = { | ||
try { | ||
assert(sparkContext.getLocalProperty("do-inherit-me") === "some-value") | ||
assert(sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) === null) | ||
} catch { | ||
case t: Throwable => | ||
throwable = Some(t) | ||
} | ||
} | ||
} | ||
thread.start() | ||
thread.join() | ||
throwable.foreach { t => throw t } | ||
} | ||
|
||
// This is the end-to-end version of the previous test. | ||
test("parallel query execution (SPARK-10548)") { | ||
(1 to 5).foreach { i => | ||
// Scala's parallel collections spawns new threads as children of the existing threads. | ||
// We need to run this multiple times to ensure new threads are spawned. Without the fix | ||
// for SPARK-10548, this usually fails on the second try. | ||
val df = sparkContext.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b") | ||
(1 to 10).par.foreach { _ => df.count() } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought DataFrame is not thread-safe and should not be used like this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What makes you think that? SparkContext/SQLContext/DataFrames should be threadsafe. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I think we have fixed that, and if not I would also consider that a bug :) |
||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andrewor14 I'm thinking maybe we should not use
new Properties(parent)
here. Instead, always copy the parent's Properties to the child's Properties. Do you think if the child thread needs to see the further changes to the parent thread's Properties after creating?This is really confusing when using
Executor
likeForkJoinPool
(scala.concurrent.ExecutionContext.Implicits.global), in which thread A creates thread B but thread B is not a child of thread A. But thread B still can see the changes in thread A./cc @jerryshao since you added this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @zsxwing , reasons to use
InheritableThreadLocal
can be seen here (mesos/spark#937). mainly it is used for Spark Streaming with FIFO scheduling strategy.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. However,
new Properties(parent)
keeps a reference to parent rather than copying them. So If we make any change to the parent thread's properties after creating the child thread, the child thread will see them.Is it necessary that the child thread keeps track of the further updates of the parent thread's properties? I think copying them would be more reasonable.
I didn't mean removing this line. I mean changing it to
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree with you that copying is more reasonable, for now I cannot image any scenario which requires to keep track of parent's properties, I think it is OK for me to change it, we can always fix this if there's any special scenario.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao Thanks :)
@andrewor14 how about just copying the parent properties rather than adding
nonInheritedLocalProperties
? It looks simpler.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I was actually going to do it in a separate patch. Incidentally @tdas @JoshRosen and I just talked about this last night and we all agreed to make it do a clone instead so the semantics are simpler.
However, my one concern is that doing so will change semantics for non-SQL users in 1.5.1, so my proposal is the following: I will make the changes in this patch and merge this patch ONLY into master. Then I'll create a new patch for branch 1.5 that will have the current changes (the ones where we don't clone except for SQL). I think that's the safest way forward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I have updated this in the latest commit, and filed SPARK-10563 for this issue.