Skip to content

Commit

Permalink
Allow schema pruning for delete first pass
Browse files Browse the repository at this point in the history
Resolves #1411

Re-orders the delete find files to rewrite command to put the empty project (input_file_name) before the non-determinstic filter udf. This allows for top level schema pruning and it Spark 3.4 should allow for nested pruning as well. Also updated the formatting (and `Column(InputFileName())` -> `input_file_name()`) to match Update.

New UTs.

Performance improvement on delete on data condition.

Closes #1412

Signed-off-by: Shixiong Zhu <[email protected]>
GitOrigin-RevId: abfee4cf9f8d8ffaef9e397ad9c237c576b8b807
  • Loading branch information
Kimahriman authored and scottsand-db committed Oct 14, 2022
1 parent 6bca231 commit 9017ac0
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EqualNullSafe, Expression, If, InputFileName, Literal, Not}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, EqualNullSafe, Expression, If, Literal, Not}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{DeltaDelete, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.metric.SQLMetrics.{createMetric, createTimingMetric}
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.types.LongType

trait DeleteCommandMetrics { self: LeafRunnableCommand =>
Expand Down Expand Up @@ -215,11 +216,12 @@ case class DeleteCommand(
if (candidateFiles.isEmpty) {
Array.empty[String]
} else {
data
.filter(new Column(cond))
data.filter(new Column(cond))
.select(input_file_name())
.filter(deletedRowUdf())
.select(new Column(InputFileName())).distinct()
.as[String].collect()
.distinct()
.as[String]
.collect()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.functions.struct
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils

abstract class DeleteSuiteBase extends QueryTest
Expand Down Expand Up @@ -356,6 +359,46 @@ abstract class DeleteSuiteBase extends QueryTest
assert(e4.contains("Subqueries are not supported"))
}

test("schema pruning on data condition") {
val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
append(input, Nil)

val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
checkDelete(Some("key = 2"),
Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil)
}

val scans = executedPlans.flatMap(_.collect {
case f: FileSourceScanExec => f
})

// The first scan is for finding files to delete. We only are matching against the key
// so that should be the only field in the schema
assert(scans.head.schema.findNestedField(Seq("key")).nonEmpty)
assert(scans.head.schema.findNestedField(Seq("value")).isEmpty)
}


test("nested schema pruning on data condition") {
val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
.select(struct("key", "value").alias("nested"))
append(input, Nil)

val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
checkDelete(Some("nested.key = 2"),
Row(Row(1, 4)) :: Row(Row(1, 1)) :: Row(Row(0, 3)) :: Nil)
}

val scans = executedPlans.flatMap(_.collect {
case f: FileSourceScanExec => f
})

// Currently nested schemas can't be pruned, but Spark 3.4 loosens some of the restrictions
// on non-determinstic expressions, and this should be pruned to just "nested STRUCT<key: int>"
// after upgrading
assert(scans.head.schema == StructType.fromDDL("nested STRUCT<key: int, value: int>"))
}

/**
* @param function the unsupported function.
* @param functionType The type of the unsupported expression to be tested.
Expand Down

0 comments on commit 9017ac0

Please sign in to comment.