Skip to content

Commit

Permalink
[SPARK-47270][SQL] Dataset.isEmpty projects CommandResults locally
Browse files Browse the repository at this point in the history
  • Loading branch information
turboFei committed Nov 1, 2024
1 parent 7b67d5d commit da4df40
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.annotation.meta.getter

import org.apache.spark.{SPARK_VERSION, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.kyuubi.SparkDatasetHelper
import org.apache.spark.util.kvstore.KVIndex

import org.apache.kyuubi.Logging
Expand All @@ -48,7 +49,7 @@ object KyuubiSparkUtil extends Logging {
interruptOnCancel = true)
debug(s"Execute initialization sql: $sql")
try {
spark.sql(sql).take(1).isEmpty
SparkDatasetHelper.commandResultOptimized(spark.sql(sql)).isEmpty
} finally {
spark.sparkContext.clearJobGroup()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, GlobalLimit, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
Expand Down Expand Up @@ -290,4 +290,13 @@ object SparkDatasetHelper extends Logging {
nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec" ||
nodeName == "org.apache.spark.sql.execution.CommandResultExec"
}

/** SPARK-47270: Returns a optimized plan for CommandResult, convert to `LocalRelation`. */
def commandResultOptimized[T](dataset: Dataset[T]): Dataset[T] = {
dataset.logicalPlan match {
case c: CommandResult =>
Dataset(dataset.sparkSession, LocalRelation(c.output, c.rows))(dataset.encoder)
case _ => dataset
}
}
}

0 comments on commit da4df40

Please sign in to comment.