-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Support creating views via SQL #9423
Conversation
a01dff7
to
f128c2a
Compare
c7410ca
to
ced7c0c
Compare
plan foreach { | ||
case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, _, _, query, _, _) => | ||
val identifier = Spark3Util.toV1TableIdentifier(ident) | ||
ViewHelper.verifyTemporaryObjectsNotExists(false, identifier, query, Seq.empty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally we'd have these checks be part of CreateV2ViewExec
but by the time we reach CreateV2ViewExec
the underlying `query' doesn't contain the info needed to detect these cases here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the identifier is a ResolvedIdentifier
, don't we already know that a temporary view does not exist? That's checked when resolving views. Does the UnresolvedIdentifier
has allowTemp
set to false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what the check does is to make sure that a view doesn't reference a temporary/global view. We particularly test for these 2 cases in createViewReferencingGlobalTempView()
/ createViewReferencingTempView()
...k-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala
Outdated
Show resolved
Hide resolved
.../v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala
Outdated
Show resolved
Hide resolved
.../v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala
Outdated
Show resolved
Hide resolved
.../v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala
Outdated
Show resolved
Hide resolved
comment: Option[String], | ||
properties: Map[String, String], | ||
originalText: Option[String], | ||
query: LogicalPlan, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want the parsed logical plan here? Is the purpose to run the analyzer rules to make sure that it is valid?
My guess is that this came from the Spark implementation, which probably uses the logical plan here for when there is no SQL text (which would be why originalText
is optional). Unless I'm missing something, I think what we should do is not have the parsed query. Instead, I would have a validation that parses and analyzes the query and throws an exception if analysis fails (because of bad references, missing tables, or reliance on temporary views).
...ons/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala
Outdated
Show resolved
Hide resolved
...tensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
Outdated
Show resolved
Hide resolved
} | ||
|
||
val queryColumnNames = analyzedPlan.schema.fieldNames | ||
SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this matter if the column aliases were specified?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to perform this check, because Spark does the same and fails if you provide duplicate columns in the underlying query:
spark-sql (default)> create temporary view tempv2 as select id, id from iceberg1.foo WHERE id < 12;
[COLUMN_ALREADY_EXISTS] The column `id` already exists. Consider to choose another name or rename the existing column.
I've left that check and also added a test for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are aliases, we could succeed though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking into this a bit more, query
does correspond to originalText
. That's good because we don't have to call the parser separately and this ensures that the query can be analyzed correctly. We should keep this design for the logical plan.
I think there are a few things that we can improve, though. Since the query plan is linked in like this, it will be automatically analyzed so this should already catch things like trying to project a missing column (by the way, we should have a test for that). It will also catch missing relations (tables and other views) but we probably need to handle resolution a bit differently --- we want view resolution to happen just like it would when loading a view. I think that means that we should call our own code from createViewRelation
to rewrite identifiers. (Ideally, we could alias here as well, but it may need some special handling if GetColumnByOrdinal
doesn't work with an undetermined type.)
Rewriting identifiers needs to happen immediately so that Spark doesn't substitute any temporary views. I think this needs to be done before ResolveViews
so we should do it in RewriteViewCommands
.
To summarize:
RewriteViewCommands
should call our code to rewrite identifiers in the parsedquery
(and alias columns, if possible)- If
RewriteViewCommands
can't apply the aliases, then we will need a rule that applies them once thequery
isresolved
RewriteViewCommands
should also check for temporary view references and fail if any are found (and maybe temporary functions as well?)- Checking for the right number of columns should be done in the checker (validation phase) if it isn't done already because we use
GetColumnByOrdinal
for aliases - We should drop the
query
when possible because it shouldn't be needed after the analysis phase. Instead we should get the schema of the query (after aliasing) and pass that along.
...tensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
Outdated
Show resolved
Hide resolved
...tensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
Outdated
Show resolved
Hide resolved
...tensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
Outdated
Show resolved
Hide resolved
...tensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
Outdated
Show resolved
Hide resolved
ced7c0c
to
99a3439
Compare
...tensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
Outdated
Show resolved
Hide resolved
99a3439
to
2639053
Compare
....5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
Outdated
Show resolved
Hide resolved
...tensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
Outdated
Show resolved
Hide resolved
...ons/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala
Show resolved
Hide resolved
...tensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala
Outdated
Show resolved
Hide resolved
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
Outdated
Show resolved
Hide resolved
...v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala
Outdated
Show resolved
Hide resolved
3be9df0
to
526ca0b
Compare
526ca0b
to
7b86876
Compare
...v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala
Outdated
Show resolved
Hide resolved
|
||
case c@CreateIcebergView(ResolvedIdentifier(_, ident), _, query, columnAliases, columnComments, _, _, _, _, _, _) | ||
if query.resolved && !c.rewritten => | ||
val rewritten = rewriteIdentifiers(query, ident.asMultipartIdentifier) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nastra, this can't be done here. It needs to be done before any of Spark's rules run. Otherwise, a temporary view may already have been substituted into the plan and could allow the checks to pass even though the view is invalid.
This rule is only responsible for applying the column aliases and comments. Rewriting the identifiers should be done in RewriteViewCommands
.
....5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
Outdated
Show resolved
Hide resolved
....5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
Outdated
Show resolved
Hide resolved
....5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala
Outdated
Show resolved
Hide resolved
|
||
case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties, | ||
Some(queryText), query, allowExisting, replace) => | ||
verifyTemporaryObjectsDontExist(resolved.identifier, query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the query needs to have CTE substitution run before running the temporary object check. Otherwise, there could be conflicts between CTE substitution and temporary names. If there is a conflict, the CTE should be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're absolutely right. I've done that and also added a test
queryText = queryText, | ||
columnAliases = columnAliases, | ||
columnComments = columnComments, | ||
queryColumnNames = queryColumnNames, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd also be fine with not tracking this and passing an empty array through. That's not correct for Spark, but it would work for us since Iceberg doesn't store these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
technically we don't need to track it in this PR but we'll eventually need it when we show the properties of views and such. I'd probably keep it here but let me know if you'd like me to remove it here and introduce it in an upcoming PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems reasonable to include it. Let's just leave it as-is since you even have a test for it.
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
Show resolved
Hide resolved
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
Outdated
Show resolved
Hide resolved
@Test | ||
public void createViewWithCTE() throws NoSuchTableException { | ||
insertRows(10); | ||
String viewName = "simpleViewWithCTE"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want a test for a CTE that conflicts with a temporary table name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if you meant a temp view name here (instead of temp table name) but I added a test where a CTE and a temp view both have the same name and when data is being read, the CTE takes precedences
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct. I did mean temporary view name. Thanks!
case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, | ||
_, _, _, _, _, _) => | ||
verifyColumnCount(ident, columnAliases, query) | ||
SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames, SQLConf.get.resolver) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
Merged! Thanks for all the work on this, @nastra! It's great to have this done. |
Creating a View exists early in ResolveSessionCatalog, thus this is performing a
pre-substitution
fromCreateView
toCreateIcebergView
.Most of the code lives inside
CreateV2ViewExec
to be aligned with how Spark implements the same functionality for V1 View creation in the CreateViewCommand