-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18120 ][SQL] Call QueryExecutionListener callback methods for … #16664
Conversation
…DataFrameWriter methods QueryExecutionListener has two methods onSuccess() and onFailure() that takes a QueryExecution object as a parameter that gets called when a query is executed. It gets called for several of the DataSet methods like take, head, first, collect etc. but doesn't get called for any of the DataFrameWriter methods like saveAsTable, save etc. This commit fixes this issue and makes calls to these two methods from DataFrameWriter output methods. Also, added a new property "spark.sql.queryExecutionListeners" that can be used to specify instances of QueryExecutionListeners that should be attached to the SparkSession when the spark application starts up. Testing was done using unit tests.
ok to test |
Test build #71741 has finished for PR 16664 at commit
|
* called. | ||
*/ | ||
private def executeAndCallQEListener( | ||
funcName: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting is wrong here.
* | ||
* @param funcName A identifier for the method executing the query | ||
* @param qe the @see [[QueryExecution]] object associated with the | ||
* query |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fits in the previous line.
/cc @liancheng |
Test build #72177 has finished for PR 16664 at commit
|
Test build #72178 has finished for PR 16664 at commit
|
@yhuai @marmbrus @liancheng Can someone review my PR please. Thanks. |
@yhuai @marmbrus @liancheng if none of you are going to take look, I'll give the code another pass and not wait for your feedback before pushing. |
I think @sameeragarwal plans to review. I glanced and it looks fine. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only minor things.
* specified by using the @see [[org.apache.spark.sql.DataFrameWriter#option]] method | ||
* @param writeParams will contain any extra information that the write method wants to provide | ||
*/ | ||
case class OutputParams( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add @DeveloperApi
.
Seq(1 -> 100).toDF("x", "y").write.saveAsTable("bar") | ||
} | ||
assert(onWriteSuccessCalled) | ||
spark.listenerManager.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be in a finally block no?
callSaveFunction(Seq(1 -> 100).toDF("x", "y"), path.getAbsolutePath) | ||
} | ||
assert(testQueryExecutionListener.onWriteSuccessCalled) | ||
spark.listenerManager.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Feels like it should be in SharedSQLContext.afterEach
...
docs/sql-programming-guide.md
Outdated
@@ -1302,8 +1302,9 @@ Configuration of in-memory caching can be done using the `setConf` method on `Sp | |||
|
|||
## Other Configuration Options | |||
|
|||
The following options can also be used to tune the performance of query execution. It is possible | |||
that these options will be deprecated in future release as more optimizations are performed automatically. | |||
The following options can also be used to tune the performance of query execution and attaching |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this new option belongs in this section. It has nothing to do with performance and this description now sounds weird. A separate section for it would be better, even if it's the only option there.
val SESSION_LOCAL_TIMEZONE = | ||
SQLConfigBuilder("spark.sql.session.timeZone") | ||
.doc("""The ID of session local timezone, e.g. "GMT", "America/Los_Angeles", etc.""") | ||
.stringConf | ||
.createWithDefault(TimeZone.getDefault().getID()) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Please remove this empty line
* methods. | ||
* | ||
* @param funcName A identifier for the method executing the query | ||
* @param qe the @see [[QueryExecution]] object associated with the query |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please fix the doc by following what #16013 did?
@marmbrus |
@@ -190,6 +192,32 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |||
} | |||
|
|||
/** | |||
* Executes the query and calls the {@link org.apache.spark.sql.util.QueryExecutionListener} | |||
* methods. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about changing it to
Wrap a DataFrameWriter action to track the QueryExecution and time cost, then report to the user-registered callback functions.
* @param action the function that executes the query after which the listener methods gets | ||
* called. | ||
*/ | ||
private def executeAndCallQEListener( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about renaming it withAction
? It is more consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you are saying rename the method executeAndCallQEListener to withAction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
|
||
val QUERY_EXECUTION_LISTENERS = | ||
ConfigBuilder("spark.sql.queryExecutionListeners") | ||
.doc("QueryExecutionListeners to be attached to the SparkSession") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you improve this line? Add what you wrote in the sql-programming-guide.md
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case I updated the doc to read "A comma-separated list of classes that implement QueryExecutionListener that will be attached to the SparkSession". I could attach the whole line I put in sql-programming-guide.md but it will make it look out of place compared to the docs for other properties in the same class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not have a separate document for the Spark SQL configuration. We expect users to do it using the command set -v
. This command will output the contents of doc
.
@@ -514,6 +576,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |||
* shorten names(none, `snappy`, `gzip`, and `lzo`). This will override | |||
* `spark.sql.parquet.compression.codec`.</li> | |||
* </ul> | |||
* Calls the callback methods in @see[[QueryExecutionListener]] methods after query execution with | |||
* @see[[OutputParams]] having datasourceType set as string constant "parquet" and | |||
* destination set as the path to which the data is written |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do not need to add these comments to all the functions.
df.queryExecution, | ||
OutputParams(source, destination, extraOptions.toMap)) { | ||
dataSource.write(mode, df) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the style issue.
withAction("save", df.queryExecution, OutputParams(source, destination, extraOptions.toMap)) {
dataSource.write(mode, df)
}
qe, | ||
new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)) { | ||
qe.toRdd | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: also the style issue.
val outputParms = OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)
withAction("insertInto", qe, outputParms)(qe.toRdd)
"saveAsTable", | ||
qe, | ||
new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)) { | ||
qe.toRdd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to call new
here. Please follow the above example. Thanks!
@@ -660,12 +660,21 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(false) | |||
|
|||
|
|||
val QUERY_EXECUTION_LISTENERS = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can put it into StaticSQLConf
executeAndCallQEListener( | ||
"saveAsTable", | ||
qe, | ||
new OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
source
? Why not using a qualified table name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
source reflects the Datasource type to which the data is written. So in case of the parquet(), csv() methods it will be "parquet" and "csv". So in case of saveAsTable() should it be "hive" or "db" since qualified table name is not actually a datasource type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got your points. source
looks ok to me.
I just quickly went over the code. It looks ok to me, but I will review it again when the comments are resolved. Thanks! |
executeAndCallQEListener( | ||
"save", | ||
df.queryExecution, | ||
OutputParams(source, destination, extraOptions.toMap)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the source is JDBC, you will also pass credentials
. Be careful on this.
Test build #72329 has finished for PR 16664 at commit
|
.config("spark.sql.queryExecutionListeners", classOf[NoZeroArgConstructorListener].getName) | ||
.getOrCreate() | ||
} | ||
assert(!SparkSession.getDefaultSession.isDefined) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert(SparkSession.getDefaultSession.isEmpty)
.config("spark.sql.queryExecutionListeners", "non.existent.QueryExecutionListener") | ||
.getOrCreate() | ||
} | ||
assert(!SparkSession.getDefaultSession.isDefined) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same here. isEmpty
qe: QueryExecution, | ||
durationNs: Long, | ||
options: Option[OutputParams] | ||
): Unit = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: -> options: Option[OutputParams]): Unit = {}
Could you update the PR title to |
Just finished this round of reviews. Thanks! This PR enables the QueryExecutionListener when users using the DataFrameWriter methods. However, it still misses the other code paths, especially, the DDL statements. For example, CTAS when using the |
I think it's ok to enable the listener for |
why do we need the new config |
qe: QueryExecution, | ||
outputParams: OutputParams)(action: => Unit) = { | ||
try { | ||
val start = System.nanoTime() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dataset.withAction
will reset metrics of physical plans, shall we do it here? And can we create a general function for both Dataset
and DataFrameWriter
?
* @param writeParams will contain any extra information that the write method wants to provide | ||
*/ | ||
@DeveloperApi | ||
case class OutputParams( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks reasonable to provide more information to the listeners for write operations. However, this will be public, I think we should think about it more carefully to get a better design, can we do it later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry arguments to this class seem to have been picked pretty randomly. Can you explain more why these parameters are picked?
Sorry I'm really confused, probably because I haven't kept track with this pr. But the diff doesn't match the pr description. Are we fixing a bug here or introducing a bunch of new APIs? Actually we are not only introducing new APIs, we are also breaking old APIs in this patch. Please separate the bug fix part from the API changing part. |
@@ -1300,10 +1300,28 @@ Configuration of in-memory caching can be done using the `setConf` method on `Sp | |||
|
|||
</table> | |||
|
|||
## QueryExecutionListener Options |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like a completely unrelated change to the bug fix.
I actually disagree that this particular change should be a separate PR. Part of exposing these new queries to the listener is providing information of what these queries are doing, and the current (developer) API does not have a way to expose that. We can discuss ways of maybe exposing this information in a way that doesn't break the existing API (I thought about a couple but I didn't like any of them, so my preference was to just modify the existing developer API). But I strongly feel the bug fix is not complete without this information being exposed in some way. |
That's probably because you are not familiar with the SQL component. The existing API already has references to the QueryExecution object, which actually includes all of the information your compatibility-breaking API is currently exposing. |
The QueryExecution object doesn't have details related to the output metadata. Like for eg. if I call df.write.parquet("/my/path"). The path to which the DataFrame is written i.e. "/my/path" is not available in the QueryExecution object. |
That's fair but not what I was told; if that's the case then great, but I'll let Salil comment since he's looked at this code way more than I have. |
It does. It contains the entire plan. |
@rxin , in
knowing the entire plan is not enough, it would be better if we also have these write options(provider, partitioning, extraOptions, etc.) |
When I was working on this PR the output path wasn't there but if you are confident that it is there then it might have been added recently. I can check and get back to you. |
I think that's a separate "bug" we should fix, i.e. DataFrameWriter should use InsertIntoDataSourceCommand so we can consolidate the two paths. |
Basically I see no reason to add some specific parameter to a listener API that is meant to be generic which already contains reference to QueryExecution. What are you going to do if next time you want to find some other information with a take or collect query? Do you go in and add another interface-breaking change for that? If the goal is to expose information for writing data out properly, then just make it work with the existing interface and fix the issue that using DataFrameWriter doesn't call the callback (and doesn't have the correct information set in QueryExecution). |
Actually @cloud-fan are you sure it is a problem right now? DataSOurce.write itself creates the commands, and if the information are propagated correctly, the QueryExecution object should have a command InsertIntoHadoopFsRelationCommand. |
|
Yea we should fix that. |
Does that mean the information would show up in the plan? That would be great. |
@vanzin yes, InsertXXX command will carry all the write options. |
@cloud-fan From what I understand we need to modify InsertXXX command to carry all the write options instead of the change suggested in this PR. Right now the QueryExecution object doesn't carry any of the output options. Am I correct? |
@salilsurendran yes, and we can send another PR to fix the InsertXXX command problem |
…methods for DataFrameWriter methods We only notify `QueryExecutionListener` for several `Dataset` operations, e.g. collect, take, etc. We should also do the notification for `DataFrameWriter` operations. new regression test close apache#16664 Author: Wenchen Fan <[email protected]> Closes apache#16962 from cloud-fan/insert.
…DataFrameWriter methods
What changes were proposed in this pull request?
QueryExecutionListener has two methods onSuccess() and onFailure() that takes a QueryExecution object as a parameter that gets called when a query is executed. It gets called for several of the DataSet methods like take, head, first, collect etc. but doesn't get called for any of the DataFrameWriter methods like saveAsTable, save etc. This commit fixes this issue and makes calls to these two methods from DataFrameWriter output methods.
Also, added a new property "spark.sql.queryExecutionListeners" that can be used to specify instances of QueryExecutionListeners that should be attached to the SparkSession when the spark application starts up.
How was this patch tested?
Testing was done using unit tests contained in two suites. The unit tests can be executed by :
test-only *SparkSQLQueryExecutionListenerSuite
test-only *DataFrameCallbackSuite