-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24250][SQL] support accessing SQLConf inside tasks #21299
Conversation
…sed only on the driver" This reverts commit a4206d5.
}.getOrElse(CreateJacksonParser.internalRow(_: JsonFactory, _: InternalRow)) | ||
|
||
JsonInferSchema.infer(rdd, parsedOptions, rowParser) | ||
JsonInferSchema.infer(sampled, parsedOptions, CreateJacksonParser.string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon @MaxGekk can you take a look at the json changes? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String Reader of JacksonParser can be slower than specialized UTF8StreamJsonParser if it can be used when you pass an array of bytes (even if it spends some time for encoding detection). You can check that by JsonBenchmarks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reran the benchmarks on your branch. There is a difference in the schema inferring benchmarks
JSON schema inferring: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
No encoding 46348 / 46679 2.2 463.5 1.0X
UTF-8 is set 45651 / 45731 2.2 456.5 1.0X
before your changes:
JSON schema inferring: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------
No encoding 38902 / 39282 2.6 389.0 1.0X
UTF-8 is set 56959 / 57261 1.8 569.6 0.7X
As I wrote above "array-based" parser is faster than "string-based" but "string-based" parser is faster than "reader based" one.
import org.apache.spark.{TaskContext, TaskContextImpl} | ||
import org.apache.spark.internal.config.{ConfigEntry, ConfigProvider, ConfigReader} | ||
|
||
class ReadOnlySQLConf(context: TaskContext) extends SQLConf { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice :)
override def clear(): Unit = { | ||
throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.") | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to allow us to do clone
? clone
will create mutable SQLConf
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to clone or copy SQLConf
in tasks, let's ban it.
Test build #90511 has finished for PR 21299 at commit
|
Test build #90512 has finished for PR 21299 at commit
|
Test build #90508 has finished for PR 21299 at commit
|
Test build #90519 has finished for PR 21299 at commit
|
Test build #90520 has finished for PR 21299 at commit
|
Retest this please. |
val allConfigs = sparkSession.sessionState.conf.getAllConfs | ||
allConfigs.foreach { | ||
// Excludes external configs defined by users. | ||
case (key, value) if key.startsWith("spark") => sc.setLocalProperty(key, value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This causes scala.MatchError
. We need to cover the other case, too.
Test build #90542 has finished for PR 21299 at commit
|
val sd = getStreamDecoder(enc, binary, binary.length) | ||
|
||
jsonFactory.createParser(sd) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why these two removed? Looks like no SQLConf involved here?
val allConfigs = sparkSession.sessionState.conf.getAllConfs | ||
allConfigs.foreach { | ||
// Excludes external configs defined by users. | ||
case (key, value) if key.startsWith("spark") => sc.setLocalProperty(key, value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only propagate config values that have been set other than default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see. getAllConfs
only returns set configs.
@@ -898,7 +898,6 @@ object SparkSession extends Logging { | |||
* @since 2.0.0 | |||
*/ | |||
def getOrCreate(): SparkSession = synchronized { | |||
assertOnDriver() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this meaning we can create SparkSession on driver?
Test build #90549 has finished for PR 21299 at commit
|
retest this please |
Test build #90550 has finished for PR 21299 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea looks good to me, it should be very useful to have this feature.
body | ||
} finally { | ||
sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId) | ||
allConfigs.foreach { | ||
case (key, _) => sparkSession.sparkContext.setLocalProperty(key, null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be set back to the original value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, although it's very unlikely that users set some sql configs to local property. let me change it.
Test build #90568 has finished for PR 21299 at commit
|
Retest this please. |
Test build #90570 has finished for PR 21299 at commit
|
retest this please |
Test build #90587 has finished for PR 21299 at commit
|
Test build #90590 has finished for PR 21299 at commit
|
also cc @squito |
…sed only on the driver" This reverts commit a4206d5. This is from apache#21299 and to ease the review of it. Author: Wenchen Fan <[email protected]> Closes apache#21341 from cloud-fan/revert.
|
||
def withSQLConfPropagated[T](sparkSession: SparkSession)(body: => T): T = { | ||
// Set all the specified SQL configs to local properties, so that they can be available at | ||
// the executor side. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
properties are serialized per task. how unusual would it be for there to be a large list of properties? if that would be reasonable, then it might make more sense to use a Broadcast.
(separately, task serialization should probably avoid re-serializing the properties every time, but this could make that existing issue much worse,)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically broadcast is faster than local properties if there are a lot of properties, but one problem is you need to carry the broadcast handler everywhere, which I don't think is applicable to SQLConf.get
.
BTW we currently have hundreds of SQL configs, even a user set all of them for a job, the overhead is low. I tried
sc.makeRDD(Seq(1,2,3)).collect
1.to(100).foreach(i => sc.setLocalProperty(i.toString * 10, i.toString * 10))
sc.makeRDD(Seq(1,2,3)).collect
and didn't observe performance difference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can avoid serializing local properties for each task, but that's a general optimization to local properties, we can do that in another PR.
Test build #90681 has finished for PR 21299 at commit
|
Test build #90683 has finished for PR 21299 at commit
|
} | ||
} | ||
|
||
def withSQLConfPropagated[T](sparkSession: SparkSession)(body: => T): T = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe cleaner in following way:
def withSQLConfPropagated[T](sparkSession: SparkSession)(body: => T): T = {
val sc = sparkSession.sparkContext
// Set all the specified SQL configs to local properties, so that they can be available at
// the executor side.
val allConfigs = sparkSession.sessionState.conf.getAllConfs
val originalLocalProps = allConfigs.collect {
case (key, value) if key.startsWith("spark") =>
val originalValue = sc.getLocalProperty(key)
sc.setLocalProperty(key, value)
(key, originalValue)
}
try {
body
} finally {
originalLocalProps.foreach {
case (key, value) => sc.setLocalProperty(key, value)
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Test build #90786 has finished for PR 21299 at commit
|
thanks, merging to master! |
Hi, @cloud-fan. cc @gatorsmile |
I've reverted it, will re-submit it soon. |
What changes were proposed in this pull request?
Previously in #20136 we decided to forbid tasks to access
SQLConf
, because it doesn't work and always give you the default conf value. In #21190 we fixed the check and all the places that violate it.Currently the pattern of accessing configs at the executor side is: read the configs at the driver side, then access the variables holding the config values in the RDD closure, so that they will be serialized to the executor side. Something like
However, this pattern is hard to apply if the config needs to be propagated via a long call stack. An example is
DataType.sameType
, and see how many changes were made in #21190 .When it comes to code generation, it's even worse. I tried it locally and we need to change a ton of files to propagate configs to code generators.
This PR proposes to allow tasks to access
SQLConf
. The idea is, we can save all the SQL configs to job properties when an SQL execution is triggered. At executor side we rebuild theSQLConf
from job properties.How was this patch tested?
a new test suite