From a83accfcfd6a92afac5040c50577258ab83d10dd Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 18 Jan 2017 01:57:12 +0800 Subject: [PATCH] [SPARK-19065][SQL] Don't inherit expression id in dropDuplicates ## What changes were proposed in this pull request? `dropDuplicates` will create an Alias using the same exprId, so `StreamExecution` should also replace Alias if necessary. ## How was this patch tested? test("SPARK-19065: dropDuplicates should not create expressions using the same id") Author: Shixiong Zhu Closes #16564 from zsxwing/SPARK-19065. --- .../scala/org/apache/spark/sql/Dataset.scala | 5 +--- .../org/apache/spark/sql/DatasetSuite.scala | 7 ----- .../spark/sql/streaming/StreamSuite.scala | 26 +++++++++++++++++++ 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 1a7a5ba798077..24b9b810fc5ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2003,10 +2003,7 @@ class Dataset[T] private[sql]( if (groupColExprIds.contains(attr.exprId)) { attr } else { - // Removing duplicate rows should not change output attributes. We should keep - // the original exprId of the attribute. Otherwise, to select a column in original - // dataset will cause analysis exception due to unresolved attribute. - Alias(new First(attr).toAggregateExpression(), attr.name)(exprId = attr.exprId) + Alias(new First(attr).toAggregateExpression(), attr.name)() } } Aggregate(groupCols, aggCols, logicalPlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 731a28c237bae..b37bf131e8dce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -898,13 +898,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext { (1, 2), (1, 1), (2, 1), (2, 2)) } - test("dropDuplicates should not change child plan output") { - val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS() - checkDataset( - ds.dropDuplicates("_1").select(ds("_1").as[String], ds("_2").as[Int]), - ("a", 1), ("b", 1)) - } - test("SPARK-16097: Encoders.tuple should handle null object correctly") { val enc = Encoders.tuple(Encoders.tuple(Encoders.STRING, Encoders.STRING), Encoders.STRING) val data = Seq((("a", "b"), "c"), (null, "d")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index e964e646d22aa..f31dc8add48d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -304,6 +304,32 @@ class StreamSuite extends StreamTest { q.stop() } } + + test("SPARK-19065: dropDuplicates should not create expressions using the same id") { + withTempPath { testPath => + val data = Seq((1, 2), (2, 3), (3, 4)) + data.toDS.write.mode("overwrite").json(testPath.getCanonicalPath) + val schema = spark.read.json(testPath.getCanonicalPath).schema + val query = spark + .readStream + .schema(schema) + .json(testPath.getCanonicalPath) + .dropDuplicates("_1") + .writeStream + .format("memory") + .queryName("testquery") + .outputMode("complete") + .start() + try { + query.processAllAvailable() + if (query.exception.isDefined) { + throw query.exception.get + } + } finally { + query.stop() + } + } + } } /**