Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10548] [SPARK-10563] [SQL] Fix concurrent SQL executions #8710

Closed
wants to merge 11 commits into from
18 changes: 16 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import java.util.UUID.randomUUID
import scala.collection.JavaConverters._
import scala.collection.{Map, Set}
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.collection.mutable.{HashMap, HashSet}
import scala.reflect.{ClassTag, classTag}
import scala.util.control.NonFatal

Expand Down Expand Up @@ -348,10 +348,24 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
override protected def childValue(parent: Properties): Properties = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 I'm thinking maybe we should not use new Properties(parent) here. Instead, always copy the parent's Properties to the child's Properties. Do you think if the child thread needs to see the further changes to the parent thread's Properties after creating?

This is really confusing when using Executor like ForkJoinPool (scala.concurrent.ExecutionContext.Implicits.global), in which thread A creates thread B but thread B is not a child of thread A. But thread B still can see the changes in thread A.

/cc @jerryshao since you added this line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @zsxwing , reasons to use InheritableThreadLocal can be seen here (mesos/spark#937). mainly it is used for Spark Streaming with FIFO scheduling strategy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. However, new Properties(parent) keeps a reference to parent rather than copying them. So If we make any change to the parent thread's properties after creating the child thread, the child thread will see them.

Is it necessary that the child thread keeps track of the further updates of the parent thread's properties? I think copying them would be more reasonable.

I didn't mean removing this line. I mean changing it to

val child = new Properties()
child.putAll(parent)
child

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree with you that copying is more reasonable, for now I cannot image any scenario which requires to keep track of parent's properties, I think it is OK for me to change it, we can always fix this if there's any special scenario.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao Thanks :)

@andrewor14 how about just copying the parent properties rather than adding nonInheritedLocalProperties? It looks simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I was actually going to do it in a separate patch. Incidentally @tdas @JoshRosen and I just talked about this last night and we all agreed to make it do a clone instead so the semantics are simpler.

However, my one concern is that doing so will change semantics for non-SQL users in 1.5.1, so my proposal is the following: I will make the changes in this patch and merge this patch ONLY into master. Then I'll create a new patch for branch 1.5 that will have the current changes (the ones where we don't clone except for SQL). I think that's the safest way forward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I have updated this in the latest commit, and filed SPARK-10563 for this issue.

// Note: make a clone such that changes in the parent properties aren't reflected in
// the those of the children threads, which has confusing semantics (SPARK-10564).
val p = new Properties
val filtered = parent.asScala.filter { case (k, _) =>
!nonInheritedLocalProperties.contains(k)
}
p.putAll(filtered.asJava)
p
}
override protected def initialValue(): Properties = new Properties()
}

/**
* Keys of local properties that should not be inherited by children threads.
*/
private[spark] val nonInheritedLocalProperties: HashSet[String] = new HashSet[String]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exposing a mutable HashSet in the thread-safe SparkContext looks dangerous. Actually, I suggest not to add nonInheritedLocalProperties in the master branch. How about just cloning the parent properties without adding the nonInheritedLocalProperties logic? I understand that we still need nonInheritedLocalProperties for 1.5 branch to avoid changing the semantics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole point of this is to avoid inheriting the SQL execution ID, which fixes SPARK-10548. How can we fix this issue with just cloning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this private in the latest commit and added a setter method for it. Does this address your concern?


/* ------------------------------------------------------------------------------------- *
| Initialization. This code initializes the context in a manner that is exception-safe. |
| All internal fields holding state are initialized here, and any error prompts the |
Expand Down
24 changes: 24 additions & 0 deletions core/src/test/scala/org/apache/spark/ThreadingSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,30 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
throwable.foreach { t => throw t }
}

test("inheritance exclusions (SPARK-10548)") {
sc = new SparkContext("local", "test")
sc.nonInheritedLocalProperties.add("do-not-inherit-me")
sc.setLocalProperty("do-inherit-me", "parent")
sc.setLocalProperty("do-not-inherit-me", "parent")
var throwable: Option[Throwable] = None
val threads = (1 to 5).map { i =>
new Thread() {
override def run() {
// only the ones we intend to inherit will be passed to the children
try {
assert(sc.getLocalProperty("do-inherit-me") === "parent")
assert(sc.getLocalProperty("do-not-inherit-me") === null)
} catch {
case t: Throwable => throwable = Some(t)
}
}
}
}
threads.foreach(_.start())
threads.foreach(_.join())
throwable.foreach { t => throw t }
}

test("mutations to local properties should not affect submitted jobs (SPARK-6629)") {
val jobStarted = new Semaphore(0)
val jobEnded = new Semaphore(0)
Expand Down
5 changes: 5 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
sparkContext.addSparkListener(listener)
sparkContext.ui.foreach(new SQLTab(this, _))

// Ensure query execution IDs are not inherited across the thread hierarchy, which is
// the default behavior for SparkContext local properties. Otherwise, we may confuse
// the listener as to which query is being executed. (SPARK-10548)
sparkContext.nonInheritedLocalProperties.add(SQLExecution.EXECUTION_ID_KEY)

/**
* Set Spark SQL configuration properties.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.sql.test.SharedSQLContext

class SQLExecutionSuite extends SharedSQLContext {
import testImplicits._

test("query execution IDs are not inherited across threads") {
sparkContext.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, "123")
sparkContext.setLocalProperty("do-inherit-me", "some-value")
var throwable: Option[Throwable] = None
val thread = new Thread {
override def run(): Unit = {
try {
assert(sparkContext.getLocalProperty("do-inherit-me") === "some-value")
assert(sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) === null)
} catch {
case t: Throwable =>
throwable = Some(t)
}
}
}
thread.start()
thread.join()
throwable.foreach { t => throw t }
}

// This is the end-to-end version of the previous test.
test("parallel query execution (SPARK-10548)") {
(1 to 5).foreach { i =>
// Scala's parallel collections spawns new threads as children of the existing threads.
// We need to run this multiple times to ensure new threads are spawned. Without the fix
// for SPARK-10548, this usually fails on the second try.
val df = sparkContext.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b")
(1 to 10).par.foreach { _ => df.count() }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought DataFrame is not thread-safe and should not be used like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What makes you think that? SparkContext/SQLContext/DataFrames should be threadsafe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think we have fixed that, and if not I would also consider that a bug :)

}
}
}