Skip to content

Commit

Permalink
[SPARK-19065][SQL] Don't inherit expression id in dropDuplicates
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

`dropDuplicates` will create an Alias using the same exprId, so `StreamExecution` should also replace Alias if necessary.

## How was this patch tested?

test("SPARK-19065: dropDuplicates should not create expressions using the same id")

Author: Shixiong Zhu <[email protected]>

Closes #16564 from zsxwing/SPARK-19065.
  • Loading branch information
zsxwing authored and cloud-fan committed Jan 17, 2017
1 parent 20e6280 commit a83accf
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
5 changes: 1 addition & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2003,10 +2003,7 @@ class Dataset[T] private[sql](
if (groupColExprIds.contains(attr.exprId)) {
attr
} else {
// Removing duplicate rows should not change output attributes. We should keep
// the original exprId of the attribute. Otherwise, to select a column in original
// dataset will cause analysis exception due to unresolved attribute.
Alias(new First(attr).toAggregateExpression(), attr.name)(exprId = attr.exprId)
Alias(new First(attr).toAggregateExpression(), attr.name)()
}
}
Aggregate(groupCols, aggCols, logicalPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,13 +898,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
(1, 2), (1, 1), (2, 1), (2, 2))
}

test("dropDuplicates should not change child plan output") {
val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
checkDataset(
ds.dropDuplicates("_1").select(ds("_1").as[String], ds("_2").as[Int]),
("a", 1), ("b", 1))
}

test("SPARK-16097: Encoders.tuple should handle null object correctly") {
val enc = Encoders.tuple(Encoders.tuple(Encoders.STRING, Encoders.STRING), Encoders.STRING)
val data = Seq((("a", "b"), "c"), (null, "d"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,32 @@ class StreamSuite extends StreamTest {
q.stop()
}
}

test("SPARK-19065: dropDuplicates should not create expressions using the same id") {
withTempPath { testPath =>
val data = Seq((1, 2), (2, 3), (3, 4))
data.toDS.write.mode("overwrite").json(testPath.getCanonicalPath)
val schema = spark.read.json(testPath.getCanonicalPath).schema
val query = spark
.readStream
.schema(schema)
.json(testPath.getCanonicalPath)
.dropDuplicates("_1")
.writeStream
.format("memory")
.queryName("testquery")
.outputMode("complete")
.start()
try {
query.processAllAvailable()
if (query.exception.isDefined) {
throw query.exception.get
}
} finally {
query.stop()
}
}
}
}

/**
Expand Down

0 comments on commit a83accf

Please sign in to comment.