From 5172dd6b3185e4e99d5939ed8b35c2ded5656a79 Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 6 Jul 2023 15:54:02 -0400 Subject: [PATCH] refactor: unify with_predicate for delete ops (#1512) # Description A small refactor on the delete op to have the same interface as the update op Allows a single method to be used for predicates represented as an Datafusion `Expr` or as a string. --- rust/src/operations/delete.rs | 29 ++++++++++++++--------------- rust/src/operations/mod.rs | 34 ++++++++++++++++++++++++++++++++-- rust/src/operations/update.rs | 27 +-------------------------- 3 files changed, 47 insertions(+), 43 deletions(-) diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 37cff07ba4..7f363daf8e 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -42,11 +42,13 @@ use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use crate::table_state::DeltaTableState; use crate::DeltaTable; +use super::datafusion_utils::Expression; + /// Delete Records from the Delta Table. /// See this module's documentaiton for more information pub struct DeleteBuilder { /// Which records to delete - predicate: Option, + predicate: Option, /// A snapshot of the table's state snapshot: DeltaTableState, /// Delta object store for handling data files @@ -92,22 +94,11 @@ impl DeleteBuilder { } /// A predicate that determines if a record is deleted - pub fn with_predicate(mut self, predicate: Expr) -> Self { - self.predicate = Some(predicate); + pub fn with_predicate>(mut self, predicate: E) -> Self { + self.predicate = Some(predicate.into()); self } - /// Parse the provided query into a Datafusion expression - pub fn with_str_predicate( - mut self, - predicate: impl AsRef, - ) -> Result { - let expr = self.snapshot.parse_predicate_expression(predicate)?; - self.predicate = Some(expr); - - Ok(self) - } - /// The Datafusion session state to use pub fn with_session_state(mut self, state: SessionState) -> Self { self.state = Some(state); @@ -302,8 +293,16 @@ impl std::future::IntoFuture for DeleteBuilder { session.state() }); + let predicate = match this.predicate { + Some(predicate) => match predicate { + Expression::DataFusion(expr) => Some(expr), + Expression::String(s) => Some(this.snapshot.parse_predicate_expression(s)?), + }, + None => None, + }; + let ((actions, version), metrics) = execute( - this.predicate, + predicate, this.store.clone(), &this.snapshot, state, diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index 2307a197e8..8382335e47 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -25,9 +25,9 @@ pub mod vacuum; #[cfg(feature = "datafusion")] use self::{delete::DeleteBuilder, load::LoadBuilder, update::UpdateBuilder, write::WriteBuilder}; #[cfg(feature = "datafusion")] -use arrow::record_batch::RecordBatch; +pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream; #[cfg(feature = "datafusion")] -pub use datafusion::physical_plan::common::collect as collect_sendable_stream; +use arrow::record_batch::RecordBatch; #[cfg(all(feature = "arrow", feature = "parquet"))] use optimize::OptimizeBuilder; use restore::RestoreBuilder; @@ -176,3 +176,33 @@ impl AsRef for DeltaOps { &self.0 } } + +#[cfg(feature = "datafusion")] +mod datafusion_utils { + use datafusion_expr::Expr; + + /// Used to represent user input of either a Datafusion expression or string expression + pub enum Expression { + /// Datafusion Expression + DataFusion(Expr), + /// String Expression + String(String), + } + + impl From for Expression { + fn from(val: Expr) -> Self { + Expression::DataFusion(val) + } + } + + impl From<&str> for Expression { + fn from(val: &str) -> Self { + Expression::String(val.to_string()) + } + } + impl From for Expression { + fn from(val: String) -> Self { + Expression::String(val) + } + } +} diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index a9cf28abb5..26500fd208 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -56,32 +56,7 @@ use crate::{ DeltaResult, DeltaTable, DeltaTableError, }; -use super::{transaction::commit, write::write_execution_plan}; - -/// Used to represent user input of either a Datafusion expression or string expression -pub enum Expression { - /// Datafusion Expression - DataFusion(Expr), - /// String Expression - String(String), -} - -impl From for Expression { - fn from(val: Expr) -> Self { - Expression::DataFusion(val) - } -} - -impl From<&str> for Expression { - fn from(val: &str) -> Self { - Expression::String(val.to_string()) - } -} -impl From for Expression { - fn from(val: String) -> Self { - Expression::String(val) - } -} +use super::{datafusion_utils::Expression, transaction::commit, write::write_execution_plan}; /// Updates records in the Delta Table. /// See this module's documentation for more information