From 345b58ebc90beb4e30b5bde4c9b91153ef184ba6 Mon Sep 17 00:00:00 2001 From: "chenliang.lu" Date: Mon, 2 Dec 2024 12:47:15 +0800 Subject: [PATCH] [KYUUBI #6830] Allow indicate advisory shuffle partition size when merge small files --- .../main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala | 8 ++++++++ .../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala | 3 ++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala index 4218c41fa16..8e0e74f2a15 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala @@ -103,6 +103,14 @@ object KyuubiSQLConf { .booleanConf .createWithDefault(false) + val REBALANCE_ADVISORY_PARTITION_SIZE_IN_BYTES = + buildConf("spark.sql.optimizer.rebalanceAdvisoryPartitionSize") + .doc("The advisory size in bytes of the shuffle partition during merge small files. " + + s"It takes effect when Spark coalesces small shuffle partitions or splits skewed " + + s"shuffle partition.") + .version("1.10.1") + .fallbackConf(ADVISORY_PARTITION_SIZE_IN_BYTES) + val WATCHDOG_MAX_PARTITIONS = buildConf("spark.sql.watchdog.maxPartitions") .doc("Set the max partition number when spark scans a data source. " + diff --git a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala index 3cbacdd2f03..426088a1f22 100644 --- a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala +++ b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala @@ -27,7 +27,8 @@ trait RepartitionBuilderWithRebalance extends RepartitionBuilder { query: LogicalPlan): LogicalPlan = { if (!conf.getConf(KyuubiSQLConf.INFER_REBALANCE_AND_SORT_ORDERS) || dynamicPartitionColumns.nonEmpty) { - RebalancePartitions(dynamicPartitionColumns, query) + val adviceBytes = conf.getConf(KyuubiSQLConf.REBALANCE_ADVISORY_PARTITION_SIZE_IN_BYTES) + RebalancePartitions(dynamicPartitionColumns, query, None, Option.apply(adviceBytes)) } else { val maxColumns = conf.getConf(KyuubiSQLConf.INFER_REBALANCE_AND_SORT_ORDERS_MAX_COLUMNS) val inferred = InferRebalanceAndSortOrders.infer(query)