-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48356][SQL] Support for FOR statement #48794
base: master
Are you sure you want to change the base?
Conversation
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingInterpreter.scala
Outdated
Show resolved
Hide resolved
case m: Map[_, _] => | ||
// arguments of CreateMap are in the format: (key1, val1, key2, val2, ...) | ||
val mapArgs = m.keys.toSeq.flatMap { key => | ||
Seq(createExpressionFromValue(key), createExpressionFromValue(m(key))) | ||
} | ||
CreateMap(mapArgs, false) | ||
case s: GenericRowWithSchema => | ||
// struct types match this case | ||
// arguments of CreateNamedStruct are in the format: (name1, val1, name2, val2, ...) | ||
val namedStructArgs = s.schema.names.toSeq.flatMap { colName => | ||
val valueExpression = createExpressionFromValue(s.getAs(colName)) | ||
Seq(Literal(colName), valueExpression) | ||
} | ||
CreateNamedStruct(namedStructArgs) | ||
case _ => Literal(value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for my knowledge, can you explain what does the case with the Map
means exactly, i.e. when will this happen?
also, how did we check that this is the complete list of the relevant cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When Map or Struct are in the result set of the query, we can't use Literal(value) to convert them to expressions because Literals don't support them. So for example for Map we recursively convert both keys and values to expressions first, and then create a map expression using CreateMap. The process is similar for structs.
The way i checked is i went through all the spark data types, and for each checked in code of Literal whether it's supported. I only found these two which are not, however I agree we can't be completely sure, and new types will be added to Spark in the future which Literals may or may not support. Probably I should add an error message for currently unsupported type, in case it comes up. Does that make sense to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I would say internal error is fine in this case (i.e. no need to introduce new error for this) since it would mean that we have a bug.
Other than that, this sounds fine to me, but let's wait for Max and/or Wenchen to comment on this if they have any concerns.
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
override def next(): CompoundStatementExec = state match { | ||
|
||
case ForState.VariableAssignment => | ||
variablesMap = createVariablesMapFromRow(cachedQueryResult()(currRow)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to create this every time? Can we fill variablesMap
once and then reuse it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to create it every time because the map is different for every row in the result set. You can see we call it on the currRow
.
Can we rebase first to include already merged changes regarding the label checks, logical plans, etc? And I'll review afterwards again? |
…ave/iterate/normal case
446fc05
to
2e10f0b
Compare
@davidm-db @miland-db Rebased, you can review again |
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
Outdated
Show resolved
Hide resolved
assert(statements === Seq( | ||
"statement1", | ||
"lbl1" | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't have drop var statements here due to the fact that they are dropped in handleLeaveStatement
?
this is the thing we talked about that will be properly resolved once the proper execution and scopes are introduced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's right. In this case the variables are dropped immediately when the leave statement is encountered, instead of the usual behavior which is to return the dropVariable exec nodes from the iterator.
sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNodeSuite.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNodeSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNodeSuite.scala
Outdated
Show resolved
Hide resolved
private var isResultCacheValid = false | ||
private def cachedQueryResult(): Array[Row] = { | ||
if (!isResultCacheValid) { | ||
queryResult = query.buildDataFrame(session).collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
food for thought: does DataFrame have a mechanism to partially collect the data so we don't collect all the results in memory? since we are already using the caching concept, this would be easy to add to the logic of cachedQueryResult
.
quickly researching, we can do something like:
sliced_df = df.offset(starting_index).limit(ending_index - starting_index)
but there might be something better...
I wouldn't block the PR on this, but I think we definitely need to consider something like this for a follow-up.
cc: @cloud-fan @MaxGekk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, currently the entire result is collected to the driver so it would be problematic if the result size is too large. We should definitely follow up on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's see what Wenchen and Max have to say and maybe create a follow-up work item so we don't forget it.
new Iterator[CompoundStatementExec] { | ||
|
||
override def hasNext: Boolean = { | ||
val resultSize = cachedQueryResult().length |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe have a cacheSize
that's a class member and set only once in cachedQueryResult()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is necessary, as the queryResult is a fixed size array, meaning we don't calculate the length here.
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionNode.scala
Show resolved
Hide resolved
I've left comments, but in general the approach looks good to me! |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
...st/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SqlScriptingLogicalPlans.scala
Outdated
Show resolved
Hide resolved
...st/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SqlScriptingLogicalPlans.scala
Outdated
Show resolved
Hide resolved
// create and execute declare var statements | ||
variablesMap.keys.toSeq | ||
.map(colName => createDeclareVarExec(colName, variablesMap(colName))) | ||
.foreach(declareVarExec => declareVarExec.buildDataFrame(session).collect()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we set isExecuted = true
here as well? or it is not important since we don't return it anywhere?
same question for the line 722 as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's important since the execs are not persisted or used anywhere else.
What changes were proposed in this pull request?
In this PR, support for FOR statement in SQL scripting is introduced. Examples:
Implementation notes:
As local variables for SQL scripting are currently a work in progress, session variables are used to simulate them.
When FOR begins executing, session variables are declared for each column in the result set, and optionally for the for variable if it is present ("row" in the example above).
On each iteration, these variables are overwritten with the values from the row currently being iterated.
The variables are dropped upon loop completion.
This means that if a session variable which matches the name of a column in the result set already exists, the for statement will drop that variable after completion. If that variable would be referenced after the for statement, the script would fail as the variable would not exist. This limitation is already present in the current iteration of SQL scripting, and will be fixed once local variables are introduced. Also, with local variables the implementation of for statement will be much simpler.
Grammar/parser changes:
forStatement
grammar rulevisitForStatement
rule visitorForStatement
logical operatorWhy are the changes needed?
FOR statement is an part of SQL scripting control flow logic.
Does this PR introduce any user-facing change?
No
How was this patch tested?
New tests are introduced to all of the three scripting test suites:
SqlScriptingParserSuite
,SqlScriptingExecutionNodeSuite
andSqlScriptingInterpreterSuite
.Was this patch authored or co-authored using generative AI tooling?
No