Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-4668][CH] Merge two phase hash-based aggregate into one aggregate in the spark plan when there is no shuffle #4669

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,4 +282,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
}

override def mergeTwoPhasesHashBaseAggregateIfNeed(): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ case class CHHashAggregateExecTransformer(
val typeList = new util.ArrayList[TypeNode]()
val nameList = new util.ArrayList[String]()
val (inputAttrs, outputAttrs) = {
if (modes.isEmpty) {
// When there is no aggregate function, it does not need
if (modes.isEmpty || modes.forall(_ == Complete)) {
// When there is no aggregate function or there is complete mode, it does not need
// to handle outputs according to the AggregateMode
for (attr <- child.output) {
typeList.add(ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
Expand Down Expand Up @@ -212,7 +212,7 @@ case class CHHashAggregateExecTransformer(
val aggregateFunc = aggExpr.aggregateFunction
val childrenNodeList = new util.ArrayList[ExpressionNode]()
val childrenNodes = aggExpr.mode match {
case Partial =>
case Partial | Complete =>
aggregateFunc.children.toList.map(
expr => {
ExpressionConverter
Expand Down Expand Up @@ -446,7 +446,7 @@ case class CHHashAggregateExecPullOutHelper(
}
resIndex += aggBufferAttr.size
resIndex
case Final =>
case Final | Complete =>
aggregateAttr += aggregateAttributeList(resIndex)
resIndex += 1
resIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,13 @@ class GlutenClickHouseColumnarShuffleAQESuite
}

test("TPCH Q18") {
runTPCHQuery(18) { df => }
runTPCHQuery(18) {
df =>
val hashAggregates = collect(df.queryExecution.executedPlan) {
case hash: HashAggregateExecBaseTransformer => hash
}
assert(hashAggregates.size == 3)
}
}

test("TPCH Q19") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ class GlutenClickHouseDSV2ColumnarShuffleSuite extends GlutenClickHouseTPCHAbstr
}

test("TPCH Q3") {
runTPCHQuery(3) { df => }
runTPCHQuery(3) {
df =>
val aggs = df.queryExecution.executedPlan.collectWithSubqueries {
case agg: HashAggregateExecBaseTransformer => agg
}
assert(aggs.size == 1)
}
}

test("TPCH Q4") {
Expand Down Expand Up @@ -74,43 +80,91 @@ class GlutenClickHouseDSV2ColumnarShuffleSuite extends GlutenClickHouseTPCHAbstr
}

test("TPCH Q11") {
runTPCHQuery(11) { df => }
runTPCHQuery(11) {
df =>
val aggs = df.queryExecution.executedPlan.collectWithSubqueries {
case agg: HashAggregateExecBaseTransformer => agg
}
assert(aggs.size == 3)
}
}

test("TPCH Q12") {
runTPCHQuery(12) { df => }
}

test("TPCH Q13") {
runTPCHQuery(13) { df => }
runTPCHQuery(13) {
df =>
val aggs = df.queryExecution.executedPlan.collectWithSubqueries {
case agg: HashAggregateExecBaseTransformer => agg
}
assert(aggs.size == 3)
}
}

test("TPCH Q14") {
runTPCHQuery(14) { df => }
runTPCHQuery(14) {
df =>
val aggs = df.queryExecution.executedPlan.collectWithSubqueries {
case agg: HashAggregateExecBaseTransformer => agg
}
assert(aggs.size == 1)
}
}

test("TPCH Q15") {
runTPCHQuery(15) { df => }
runTPCHQuery(15) {
df =>
val aggs = df.queryExecution.executedPlan.collectWithSubqueries {
case agg: HashAggregateExecBaseTransformer => agg
}
assert(aggs.size == 4)
}
}

test("TPCH Q16") {
runTPCHQuery(16, noFallBack = false) { df => }
}

test("TPCH Q17") {
runTPCHQuery(17) { df => }
runTPCHQuery(17) {
df =>
val aggs = df.queryExecution.executedPlan.collectWithSubqueries {
case agg: HashAggregateExecBaseTransformer => agg
}
assert(aggs.size == 3)
}
}

test("TPCH Q18") {
runTPCHQuery(18) { df => }
runTPCHQuery(18) {
df =>
val aggs = df.queryExecution.executedPlan.collectWithSubqueries {
case agg: HashAggregateExecBaseTransformer => agg
}
assert(aggs.size == 4)
}
}

test("TPCH Q19") {
runTPCHQuery(19) { df => }
runTPCHQuery(19) {
df =>
val aggs = df.queryExecution.executedPlan.collectWithSubqueries {
case agg: HashAggregateExecBaseTransformer => agg
}
assert(aggs.size == 1)
}
}

test("TPCH Q20") {
runTPCHQuery(20) { df => }
runTPCHQuery(20) {
df =>
val aggs = df.queryExecution.executedPlan.collectWithSubqueries {
case agg: HashAggregateExecBaseTransformer => agg
}
assert(aggs.size == 1)
}
}

test("TPCH Q21") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class GlutenClickHouseNativeWriteTableSuite
.set("spark.gluten.sql.enable.native.validation", "false")
// TODO: support default ANSI policy
.set("spark.sql.storeAssignmentPolicy", "legacy")
// .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "debug")
.set("spark.sql.warehouse.dir", getWarehouseDir)
.setMaster("local[1]")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ class GlutenClickHouseTPCDSParquetSuite extends GlutenClickHouseTPCDSAbstractSui
.set("spark.memory.offHeap.size", "4g")
.set("spark.gluten.sql.validation.logLevel", "ERROR")
.set("spark.gluten.sql.validation.printStackOnFailure", "true")
// .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", "debug")
// .setMaster("local[1]")
}

executeTPCDSTest(false)
Expand Down
Loading
Loading