-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9435][SQL] Reuse function in Java UDF to correctly support expressions that require equality comparison between ScalaUDF #16553
Conversation
cc @marmbrus, I just saw you in the JIRA. Could you please take a look? |
@@ -488,219 +488,241 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends | |||
* @since 1.3.0 | |||
*/ | |||
def register(name: String, f: UDF1[_, _], returnType: DataType): Unit = { | |||
val func = f.asInstanceOf[UDF1[Any, Any]].call(_: Any) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is commented out code above thats used to generate these functions. We should update it or delete it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sure. Thanks!
Test build #71224 has finished for PR 16553 at commit
|
Test build #71225 has finished for PR 16553 at commit
|
| functionRegistry.registerFunction( | ||
| name, | ||
| (e: Seq[Expression]) => ScalaUDF(f$anyCast.call($anyParams), returnType, e)) | ||
| (e: Seq[Expression]) => ScalaUDF(func, returnType, e)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I verified this by overwriting the current changes after copying and pasting and checking no diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can confirm they are the same.
Test build #71235 has finished for PR 16553 at commit
|
@marmbrus, could you take another look when you have some time? |
LGTM. cc @marmbrus for final sign off |
@gatorsmile Thanks! |
}, DataTypes.LongType); | ||
|
||
spark.range(10).toDF("x").createOrReplaceTempView("tmp"); | ||
List<Row> results = spark.sql("SELECT inc(x) FROM tmp GROUP BY inc(x)").collectAsList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is not so obvious what it goes to test for. Can we add few comments showing that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, makes sense. Thanks!
One minor comment, otherwise LGTM. |
Test build #71398 has finished for PR 16553 at commit
|
@marmbrus Can this be merged by any change maybe? |
gentle ping.. |
retest this please |
Maybe we can merge it now and you can resolve any extra comment from @marmbrus as a followup |
@gatorsmile Thanks ! |
Test build #71885 has finished for PR 16553 at commit
|
(FWIW, I checked |
…ressions that require equality comparison between ScalaUDF ## What changes were proposed in this pull request? Currently, running the codes in Java ```java spark.udf().register("inc", new UDF1<Long, Long>() { Override public Long call(Long i) { return i + 1; } }, DataTypes.LongType); spark.range(10).toDF("x").createOrReplaceTempView("tmp"); Row result = spark.sql("SELECT inc(x) FROM tmp GROUP BY inc(x)").head(); Assert.assertEquals(7, result.getLong(0)); ``` fails as below: ``` org.apache.spark.sql.AnalysisException: expression 'tmp.`x`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;; Aggregate [UDF(x#19L)], [UDF(x#19L) AS UDF(x)#23L] +- SubqueryAlias tmp, `tmp` +- Project [id#16L AS x#19L] +- Range (0, 10, step=1, splits=Some(8)) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57) ``` The root cause is because we were creating the function every time when it needs to build as below: ```scala scala> def inc(i: Int) = i + 1 inc: (i: Int)Int scala> (inc(_: Int)).hashCode res15: Int = 1231799381 scala> (inc(_: Int)).hashCode res16: Int = 2109839984 scala> (inc(_: Int)) == (inc(_: Int)) res17: Boolean = false ``` This seems leading to the comparison failure between `ScalaUDF`s created from Java UDF API, for example, in `Expression.semanticEquals`. In case of Scala one, it seems already fine. Both can be tested easily as below if any reviewer is more comfortable with Scala: ```scala val df = Seq((1, 10), (2, 11), (3, 12)).toDF("x", "y") val javaUDF = new UDF1[Int, Int] { override def call(i: Int): Int = i + 1 } // spark.udf.register("inc", javaUDF, IntegerType) // Uncomment this for Java API // spark.udf.register("inc", (i: Int) => i + 1) // Uncomment this for Scala API df.createOrReplaceTempView("tmp") spark.sql("SELECT inc(y) FROM tmp GROUP BY inc(y)").show() ``` ## How was this patch tested? Unit test in `JavaUDFSuite.java` and `./dev/lint-java`. Author: hyukjinkwon <[email protected]> Closes #16553 from HyukjinKwon/SPARK-9435. (cherry picked from commit e576c1e) Signed-off-by: gatorsmile <[email protected]>
Thanks! Merging to master. |
Thank you @gatorsmile |
…ressions that require equality comparison between ScalaUDF ## What changes were proposed in this pull request? Currently, running the codes in Java ```java spark.udf().register("inc", new UDF1<Long, Long>() { Override public Long call(Long i) { return i + 1; } }, DataTypes.LongType); spark.range(10).toDF("x").createOrReplaceTempView("tmp"); Row result = spark.sql("SELECT inc(x) FROM tmp GROUP BY inc(x)").head(); Assert.assertEquals(7, result.getLong(0)); ``` fails as below: ``` org.apache.spark.sql.AnalysisException: expression 'tmp.`x`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;; Aggregate [UDF(x#19L)], [UDF(x#19L) AS UDF(x)#23L] +- SubqueryAlias tmp, `tmp` +- Project [id#16L AS x#19L] +- Range (0, 10, step=1, splits=Some(8)) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57) ``` The root cause is because we were creating the function every time when it needs to build as below: ```scala scala> def inc(i: Int) = i + 1 inc: (i: Int)Int scala> (inc(_: Int)).hashCode res15: Int = 1231799381 scala> (inc(_: Int)).hashCode res16: Int = 2109839984 scala> (inc(_: Int)) == (inc(_: Int)) res17: Boolean = false ``` This seems leading to the comparison failure between `ScalaUDF`s created from Java UDF API, for example, in `Expression.semanticEquals`. In case of Scala one, it seems already fine. Both can be tested easily as below if any reviewer is more comfortable with Scala: ```scala val df = Seq((1, 10), (2, 11), (3, 12)).toDF("x", "y") val javaUDF = new UDF1[Int, Int] { override def call(i: Int): Int = i + 1 } // spark.udf.register("inc", javaUDF, IntegerType) // Uncomment this for Java API // spark.udf.register("inc", (i: Int) => i + 1) // Uncomment this for Scala API df.createOrReplaceTempView("tmp") spark.sql("SELECT inc(y) FROM tmp GROUP BY inc(y)").show() ``` ## How was this patch tested? Unit test in `JavaUDFSuite.java` and `./dev/lint-java`. Author: hyukjinkwon <[email protected]> Closes apache#16553 from HyukjinKwon/SPARK-9435.
…ressions that require equality comparison between ScalaUDF ## What changes were proposed in this pull request? Currently, running the codes in Java ```java spark.udf().register("inc", new UDF1<Long, Long>() { Override public Long call(Long i) { return i + 1; } }, DataTypes.LongType); spark.range(10).toDF("x").createOrReplaceTempView("tmp"); Row result = spark.sql("SELECT inc(x) FROM tmp GROUP BY inc(x)").head(); Assert.assertEquals(7, result.getLong(0)); ``` fails as below: ``` org.apache.spark.sql.AnalysisException: expression 'tmp.`x`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;; Aggregate [UDF(x#19L)], [UDF(x#19L) AS UDF(x)#23L] +- SubqueryAlias tmp, `tmp` +- Project [id#16L AS x#19L] +- Range (0, 10, step=1, splits=Some(8)) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57) ``` The root cause is because we were creating the function every time when it needs to build as below: ```scala scala> def inc(i: Int) = i + 1 inc: (i: Int)Int scala> (inc(_: Int)).hashCode res15: Int = 1231799381 scala> (inc(_: Int)).hashCode res16: Int = 2109839984 scala> (inc(_: Int)) == (inc(_: Int)) res17: Boolean = false ``` This seems leading to the comparison failure between `ScalaUDF`s created from Java UDF API, for example, in `Expression.semanticEquals`. In case of Scala one, it seems already fine. Both can be tested easily as below if any reviewer is more comfortable with Scala: ```scala val df = Seq((1, 10), (2, 11), (3, 12)).toDF("x", "y") val javaUDF = new UDF1[Int, Int] { override def call(i: Int): Int = i + 1 } // spark.udf.register("inc", javaUDF, IntegerType) // Uncomment this for Java API // spark.udf.register("inc", (i: Int) => i + 1) // Uncomment this for Scala API df.createOrReplaceTempView("tmp") spark.sql("SELECT inc(y) FROM tmp GROUP BY inc(y)").show() ``` ## How was this patch tested? Unit test in `JavaUDFSuite.java` and `./dev/lint-java`. Author: hyukjinkwon <[email protected]> Closes apache#16553 from HyukjinKwon/SPARK-9435.
What changes were proposed in this pull request?
Currently, running the codes in Java
fails as below:
The root cause is because we were creating the function every time when it needs to build as below:
This seems leading to the comparison failure between
ScalaUDF
s created from Java UDF API, for example, inExpression.semanticEquals
.In case of Scala one, it seems already fine.
Both can be tested easily as below if any reviewer is more comfortable with Scala:
How was this patch tested?
Unit test in
JavaUDFSuite.java
and./dev/lint-java
.