-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23939][SQL] Add transform_keys function #22013
Conversation
ok to test |
Test build #94315 has finished for PR 22013 at commit
|
while (i < arr.numElements) { | ||
keyVar.value.set(arr.keyArray().get(i, keyVar.dataType)) | ||
valueVar.value.set(arr.valueArray().get(i, valueVar.dataType)) | ||
resultKeys.update(i, f.eval(input)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assumes that the transformation will return a unique key right? If it doesn't you'll break the map semantics. For example: transform_keys(some_map, (k, v) -> 0)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
override def dataType: DataType = { | ||
val valueType = input.dataType.asInstanceOf[MapType].valueType | ||
MapType(function.dataType, valueType, input.nullable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here input.nullable
is wrong. This should indicate whether the value contains null, not whether the returned object can be null or not.
usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.", | ||
examples = """ | ||
Examples: | ||
> SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: missing space -> k, v
@@ -365,3 +365,69 @@ case class ArrayAggregate( | |||
|
|||
override def prettyName: String = "aggregate" | |||
} | |||
|
|||
/** | |||
* Transform Keys in a map using the transform_keys function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a better comment?
while (i < arr.numElements) { | ||
keyVar.value.set(arr.keyArray().get(i, keyVar.dataType)) | ||
valueVar.value.set(arr.valueArray().get(i, valueVar.dataType)) | ||
resultKeys.update(i, f.eval(input)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm missing something, but couldn't f.eval(input)
be evaluated to null
? Keys are not allowed to benull
. Other functions have usually a null
check and throw RuntimeException
for such cases.
dfExample5.cache() | ||
dfExample6.cache() | ||
// Test with cached relation, the Project will be evaluated with codegen | ||
testMapOfPrimitiveTypesCombination() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have do that if the expression implements CodegenFallback
?
|
||
test("TransformKeys") { | ||
val ai0 = Literal.create( | ||
Map(1 -> 1, 2 -> 2, 3 -> 3), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's maybe irrelevant but WDYT about adding test cases with null
values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this!
Included test cases, both here and in DataFrameFunctionsSuite.
Test build #94404 has finished for PR 22013 at commit
|
Test build #94405 has finished for PR 22013 at commit
|
Test build #94451 has finished for PR 22013 at commit
|
Test build #94455 has finished for PR 22013 at commit
|
Test build #94457 has finished for PR 22013 at commit
|
@hvanhovell @mn-mikke @mgaido91 Thanks for the review! I have addressed all your comments and added appropriate test cases for the same. |
usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.", | ||
examples = """ | ||
Examples: | ||
> SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we need one more right parenthesis after the second array(1, 2, 3)
?
Examples: | ||
> SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + 1); | ||
map(array(2, 3, 4), array(1, 2, 3)) | ||
> SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
def transformKeys(expr: Expression, f: (Expression, Expression) => Expression): Expression = { | ||
val valueType = expr.dataType.asInstanceOf[MapType].valueType | ||
val keyType = expr.dataType.asInstanceOf[MapType].keyType | ||
TransformKeys(expr, createLambda(keyType, false, valueType, true, f)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use valueContainsNull
instead of true
?
test("TransformKeys") { | ||
val ai0 = Literal.create( | ||
Map(1 -> 1, 2 -> 2, 3 -> 3), | ||
MapType(IntegerType, IntegerType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add valueContainsNull
explicitly?
MapType(IntegerType, IntegerType)) | ||
val ai2 = Literal.create( | ||
Map(1 -> 1, 2 -> null, 3 -> 3), | ||
MapType(IntegerType, IntegerType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add tests for Literal.create(null, MapType(IntegerType, IntegerType))
?
@@ -2117,6 +2117,198 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { | |||
assert(ex4.getMessage.contains("data type mismatch: argument 3 requires int type")) | |||
} | |||
|
|||
test("transform keys function - test various primitive data types combinations") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need so many cases here. We only need to verify the api works end to end.
Evaluation checks of the function should be in HigherOrderFunctionsSuite
.
Test build #94518 has finished for PR 22013 at commit
|
Thanks for the review @ueshin! I have addressed all your comments. |
Test build #94758 has finished for PR 22013 at commit
|
Test build #94765 has finished for PR 22013 at commit
|
Test build #94775 has finished for PR 22013 at commit
|
keyVar.value.set(map.keyArray().get(i, keyVar.dataType)) | ||
valueVar.value.set(map.valueArray().get(i, valueVar.dataType)) | ||
val result = f.eval(inputRow) | ||
if (result == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra space between ==
and null
.
MapType(function.dataType, map.valueType, map.valueContainsNull) | ||
} | ||
|
||
@transient val MapType(keyType, valueType, valueContainsNull) = argument.dataType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lazy val
?
Could you add a test when argument
is not a map in invalid cases of DataFrameFunctionsSuite
?
|
||
override def dataType: DataType = { | ||
val map = argument.dataType.asInstanceOf[MapType] | ||
MapType(function.dataType, map.valueType, map.valueContainsNull) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use valueType
and valueContainsNull
from the following val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about this?
} | ||
|
||
override def prettyName: String = "transform_keys" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
|
||
def testInvalidLambdaFunctions(): Unit = { | ||
val ex1 = intercept[AnalysisException] { | ||
dfExample1.selectExpr("transform_keys(i, k -> k )") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra space after k -> k
.
dfExample2.selectExpr("transform_keys(j, (k, v, x) -> k + 1)") | ||
} | ||
assert(ex2.getMessage.contains( | ||
"The number of lambda function arguments '3' does not match")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
).toDF("i") | ||
|
||
val dfExample2 = Seq( | ||
Map[Int, Double](1 -> 1.0E0, 2 -> 1.4E0, 3 -> 1.7E0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need E0
?
|
||
create or replace temporary view nested as values | ||
(1, map(1,1,2,2,3,3)), | ||
(2, map(4,4,5,5,6,6)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
(1, map(1, 1, 2, 2, 3, 3)),
(2, map(4, 4, 5, 5, 6, 6))
MapType(StringType, StringType, valueContainsNull = true)) | ||
val as2 = Literal.create(null, | ||
MapType(StringType, StringType, valueContainsNull = false)) | ||
val asn = Literal.create(Map.empty[StringType, StringType], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as3
?
testInvalidLambdaFunctions() | ||
dfExample1.cache() | ||
dfExample2.cache() | ||
testInvalidLambdaFunctions() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need dfExample3.cache()
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ueshin I would like to ask you a generic question regarding higher-order functions. Is it necessary to perform checks with codegen paths if all the newly added functions extends from CodegenFallback
? Eventually, is there a plan to add coden for these functions in future?
Btw, we need one more right parenthesis after the second |
val LambdaFunction( | ||
_, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: Nil, _) = function | ||
(keyVar, valueVar) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: how about:
@transient lazy val LambdaFunction(_,
(keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: Nil, _) = function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant we don't need to surround by:
@transient lazy val (keyVar, valueVar) = {
...
(keyVar, valueVar)
}
just
@transient lazy val LambdaFunction(_,
(keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: Nil, _) = function
should work.
Test build #94788 has finished for PR 22013 at commit
|
@transient lazy val MapType(keyType, valueType, valueContainsNull) = argument.dataType | ||
|
||
override def dataType: DataType = { | ||
MapType(function.dataType, valueType, valueContainsNull) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just in one line?
|
||
override def nullSafeEval(inputRow: InternalRow, argumentValue: Any): Any = { | ||
val map = argumentValue.asInstanceOf[MapData] | ||
val f = functionForEval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we use functionForEval
directly?
Test build #94811 has finished for PR 22013 at commit
|
Test build #94817 has finished for PR 22013 at commit
|
Test build #94819 has finished for PR 22013 at commit
|
Thanks! merging to master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just one very minor comment, thanks
function: Expression) | ||
extends MapBasedSimpleHigherOrderFunction with CodegenFallback { | ||
|
||
override def nullable: Boolean = argument.nullable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be moved to SimpleHigherOrderFunction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense.
Let's have wrap-up prs for higher-order functions after the remaining 2 prs are merged.
## What changes were proposed in this pull request? - Revert [SPARK-23935][SQL] Adding map_entries function: #21236 - Revert [SPARK-23937][SQL] Add map_filter SQL function: #21986 - Revert [SPARK-23940][SQL] Add transform_values SQL function: #22045 - Revert [SPARK-23939][SQL] Add transform_keys function: #22013 - Revert [SPARK-23938][SQL] Add map_zip_with function: #22017 - Revert the changes of map_entries in [SPARK-24331][SPARKR][SQL] Adding arrays_overlap, array_repeat, map_entries to SparkR: #21434 ## How was this patch tested? The existing tests. Closes #22827 from gatorsmile/revertMap2.4. Authored-by: gatorsmile <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
This pr adds transform_keys function which applies the function to each entry of the map and transforms the keys.
How was this patch tested?
Added tests.