-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-15988] [SQL] Implement DDL commands: Create/Drop temporary macro #13706
Conversation
Test build #60642 has finished for PR 13706 at commit
|
Test build #60654 has finished for PR 13706 at commit
|
@@ -97,6 +97,9 @@ statement | |||
| CREATE TEMPORARY? FUNCTION qualifiedName AS className=STRING | |||
(USING resource (',' resource)*)? #createFunction | |||
| DROP TEMPORARY? FUNCTION (IF EXISTS)? qualifiedName #dropFunction | |||
| CREATE TEMPORARY MACRO macroName=identifier | |||
'('(columns=colTypeList)?')' expression #createMacro |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: get rid of the parenthesis, you can also just use the colTypeList?
@hvanhovell I have addressed your comments. Thanks. |
Test build #60780 has finished for PR 13706 at commit
|
Test build #60781 has finished for PR 13706 at commit
|
val arguments = Option(ctx.colTypeList).map(visitColTypeList(_)) | ||
.getOrElse(Seq.empty[StructField]).map { col => | ||
AttributeReference(col.name, col.dataType, col.nullable, col.metadata)() } | ||
val colToIndex: Map[String, Int] = arguments.map(_.name).zipWithIndex.toMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this into the CreateMacroCommand
command. This would also be relevant if we were to offer a different API for creating macro's.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why i do not to move this into the CreateMacroCommand? Because analyzer.checkAnalysis() will check if macroFunction of CreateMacroCommand is invalid. macroFunction has UnresolvedAttributes, So analyzer.checkAnalysis() will throw a unresolved exception. If it resolved UnresolvedAttributes before, analyzer.checkAnalysis() does not throw a exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell So i think i will create a new Wrapper class to avoid unresolved exception in order to DataFrame can reuse this feature later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. You could also move this code into the companion object of the CreateMacroCommand
. That woud also work. It is just that this code isn't parser specific.
@lianhuiwang thanks for updating the PR. Could you implement the Macro removal by pattern matching on a (to be created) |
s"expected number of columns: ${columns.size} for Macro $macroName") | ||
} | ||
macroFunction.transformUp { | ||
case b: BoundReference => children(b.ordinal) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not validate the input type here. This would be entirely fine if macro arguments were defined without a DataType
. I am not sure what we need to do here though. We have two options:
- Ignore the DataType and rely on the expressions
inputTypes
to get casting done. This must be documented though. - Introduce casts to make sure the input conforms to the required input.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell good points. Because Analyzer will check expression's checkInputDataTypes after ResolveFunctions, I think we do not validate input type here. Now i do not think it has benefits if we did casts, but it maybe cause unnecessary casts. I will add some comments for it. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok that is perfect.
@hvanhovell I have addressed your comments. Thanks. |
Test build #60840 has finished for PR 13706 at commit
|
Test build #60841 has finished for PR 13706 at commit
|
Test build #77466 has finished for PR 13706 at commit
|
@gatorsmile sorry for reply lately. Now i have merged with master. Thanks. |
Test build #77468 has finished for PR 13706 at commit
|
Test build #77469 has finished for PR 13706 at commit
|
Test build #77467 has finished for PR 13706 at commit
|
Test build #77470 has finished for PR 13706 at commit
|
Test build #77472 has finished for PR 13706 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks pretty good. I left a few comments. I think we need to refine the macro creation path, because the current approach has relatively bad UX when things go wrong.
/** | ||
* This class provides arguments and body expression of the macro function. | ||
*/ | ||
case class MacroFunctionWrapper(columns: Seq[StructField], macroFunction: Expression) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because Analyzer will check macroFunction that is invalid if I donot use MacroFunctionWrapper.
case s: SubqueryExpression => | ||
throw new AnalysisException(s"Cannot support Subquery: ${s} " + | ||
s"for CREATE TEMPORARY MACRO $macroName") | ||
case u: UnresolvedGenerator => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this what Hive does? I really don't see why we should not support this.
Please note that we cannot use generators if we decide that an expression has to be a fully resolved expression at creation time.
@@ -1516,6 +1516,35 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { | |||
) | |||
} | |||
|
|||
test("create/drop temporary macro") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use SQLQueryTestSuite
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add a case for a macro without parameters? E.g.: create temporary macro c() as 3E9
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also test a combination of temporary macros/functions...?
override def run(sparkSession: SparkSession): Seq[Row] = { | ||
val catalog = sparkSession.sessionState.catalog | ||
val columns = funcWrapper.columns.map { col => | ||
AttributeReference(col.name, col.dataType, col.nullable, col.metadata)() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: put }
on a new line
} | ||
val macroFunction = funcWrapper.macroFunction.transform { | ||
case u: UnresolvedAttribute => | ||
val index = colToIndex.get(u.name).getOrElse( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should respect the case-sensitivity settings here. So a lookup might not be the best idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, i will do it, thanks.
val macroInfo = columns.mkString(",") + " -> " + funcWrapper.macroFunction.toString | ||
val info = new ExpressionInfo(macroInfo, macroName, true) | ||
val builder = (children: Seq[Expression]) => { | ||
if (children.size != columns.size) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is slightly better to columns.size
in a separate variable, so we do not include columns
in the closure.
* }}} | ||
*/ | ||
override def visitCreateMacro(ctx: CreateMacroContext): LogicalPlan = withOrigin(ctx) { | ||
val arguments = Option(ctx.colTypeList).map(visitColTypeList(_)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: you can avoid (_)
...
@@ -107,6 +110,14 @@ class SimpleFunctionRegistry extends FunctionRegistry { | |||
functionBuilders.remove(name).isDefined | |||
} | |||
|
|||
override def dropMacro(name: String): Boolean = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A drop function can currently also drop a macro. Can you make sure that this cannot happen?
Maybe we should consolidate this into a single drop function with a macro
flag. cc @gatorsmile WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hive can drop temporary function using command 'drop Macro'. And it also can drop temporary macro using command 'drop temporary function'.
name: String, | ||
info: ExpressionInfo, | ||
functionBuilder: FunctionBuilder): Unit = { | ||
if (functionRegistry.functionExists(name)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not entirely sure if we should throw an exception here. It unfortunately depends on the semantics you follow, SQL will throw an exception, whereas the Dataframe API will just overwrite the function. Let's follow Hive for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hive> create temporary macro max(x int)
> x*x;
OK
Time taken: 0.014 seconds
hive> select max(3) from t1;
OK
9
Time taken: 0.468 seconds, Fetched: 1 row(s)
hive> select max(3,4) from t1;
FAILED: SemanticException [Error 10015]: Line 1:7 Arguments length mismatch '4': The macro max accepts exactly 1 arguments.
Hive overwrites the temporary function without issuing an exception.
s"expected number of columns: ${columns.size} for Macro $macroName") | ||
} | ||
macroFunction.transform { | ||
// Skip to validate the input type because check it at runtime. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we check at runtime? The current code does not seem to respect the types passed, and rely on the macro's expression to do some type validation, this means you can pass anything to the macro and the user can end up with an unexpected result:
create macro plus(a int, b int) as a + b;
select plus(1.0, 1.0) as result -- This returns a decimal, and not an int as expected
So I think we should at least validate the input expressions. The hacky way would be to add casts, and have the analyzer fail if the cast cannot be made (this is terrible UX). A better way to would be to create some sentinel expression that makes sure the analyzer will insert the correct cast, and throws a relevant exception (mentioning the macro) when this blows up...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a related note, we are currently not sure if the macro produces a valid expression. Maybe we should run analysis on the macro expression to make sure it does not fail every query later on, e.g.:
val resolvedMacroFunction = try {
val plan = Project(Alias(macroFunction, "m")() :: Nil, OneRowRelation)
val analyzed @ Project(Seq(named), OneRowRelation) =
sparkSession.sessionState.analyzer.execute(plan)
sparkSession.sessionState.analyzer.checkAnalysis(analyzed)
named.children.head
} catch {
case a: AnalysisException =>
...
}
Note that we cannot use generators if we use this approach...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Now i update it with you ideas. Thanks.
cc @gatorsmile could you take a look at the way this interacts with the session catalog and the function registry? |
Sure. Will do it. Thanks! |
@@ -52,3 +52,6 @@ class NoSuchPartitionsException(db: String, table: String, specs: Seq[TableParti | |||
|
|||
class NoSuchTempFunctionException(func: String) | |||
extends AnalysisException(s"Temporary function '$func' not found") | |||
|
|||
class NoSuchTempMacroException(func: String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove it. For reasons, please see the PR #17716.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Thanks.
/** Drop a temporary macro. */ | ||
def dropTempMacro(name: String, ignoreIfNotExists: Boolean): Unit = { | ||
if (!functionRegistry.dropMacro(name) && !ignoreIfNotExists) { | ||
throw new NoSuchTempMacroException(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hive> DROP TEMPORARY MACRO max;
OK
Time taken: 0.01 seconds
hive> select max(3) from t1;
OK
3
After we drop the macro, the existing function works well. That means, we did not delete the original built-in functions. The built-in function will not be dropped by DROP TEMPORARY MACRO
. After we drop the macro with the same name, the original function max
is using the original built-in function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have update it with this case.
Test build #77527 has finished for PR 13706 at commit
|
Test build #77533 has finished for PR 13706 at commit
|
Let me first clean up the existing function registry #18142. Will ping you when it is ready. Thanks! |
Test build #77544 has finished for PR 13706 at commit
|
@gatorsmile OK. Thanks. |
@gatorsmile, I just wonder if it is ready to proceed further this PR. It looks the PR you linked is merged properly. |
@lianhuiwang Could you restart the work? Thanks! |
@lianhuiwang Please feel free to reopen the PR if you have the bandwidth to restart the work! Thanks! |
What changes were proposed in this pull request?
In https://issues.apache.org/jira/browse/HIVE-2655, Hive have implemented Create/Drop temporary macro. So This PR adds the support of native DDL commands to create and drop temporary macro for SparkSQL.
temporary macro can be considered as a special temporary function that has a FunctionBuilder that transform macro function into expression that can be directly calculated.
How was this patch tested?
add unit tests