diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 80be16e088..6d0899e4d3 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -42,11 +42,13 @@ use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use crate::table_state::DeltaTableState; use crate::DeltaTable; +use super::datafusion_utils::Expression; + /// Delete Records from the Delta Table. /// See this module's documentaiton for more information pub struct DeleteBuilder { /// Which records to delete - predicate: Option, + predicate: Option, /// A snapshot of the table's state snapshot: DeltaTableState, /// Delta object store for handling data files @@ -92,22 +94,11 @@ impl DeleteBuilder { } /// A predicate that determines if a record is deleted - pub fn with_predicate(mut self, predicate: Expr) -> Self { - self.predicate = Some(predicate); + pub fn with_predicate>(mut self, predicate: E) -> Self { + self.predicate = Some(predicate.into()); self } - /// Parse the provided query into a Datafusion expression - pub fn with_str_predicate( - mut self, - predicate: impl AsRef, - ) -> Result { - let expr = self.snapshot.parse_predicate_expression(predicate)?; - self.predicate = Some(expr); - - Ok(self) - } - /// The Datafusion session state to use pub fn with_session_state(mut self, state: SessionState) -> Self { self.state = Some(state); @@ -302,8 +293,16 @@ impl std::future::IntoFuture for DeleteBuilder { session.state() }); + let predicate = match this.predicate { + Some(predicate) => match predicate { + Expression::DataFusion(expr) => Some(expr), + Expression::String(s) => Some(this.snapshot.parse_predicate_expression(s)?), + }, + None => None, + }; + let ((actions, version), metrics) = execute( - this.predicate, + predicate, this.store.clone(), &this.snapshot, state, diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index 6bf1584708..48c33bc634 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -24,9 +24,9 @@ pub mod vacuum; #[cfg(feature = "datafusion")] use self::{delete::DeleteBuilder, load::LoadBuilder, update::UpdateBuilder, write::WriteBuilder}; #[cfg(feature = "datafusion")] -use arrow::record_batch::RecordBatch; +pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream; #[cfg(feature = "datafusion")] -pub use datafusion::physical_plan::common::collect as collect_sendable_stream; +use arrow::record_batch::RecordBatch; #[cfg(all(feature = "arrow", feature = "parquet"))] use optimize::OptimizeBuilder; @@ -168,3 +168,33 @@ impl AsRef for DeltaOps { &self.0 } } + +#[cfg(feature = "datafusion")] +mod datafusion_utils { + use datafusion_expr::Expr; + + /// Used to represent user input of either a Datafusion expression or string expression + pub enum Expression { + /// Datafusion Expression + DataFusion(Expr), + /// String Expression + String(String), + } + + impl From for Expression { + fn from(val: Expr) -> Self { + Expression::DataFusion(val) + } + } + + impl From<&str> for Expression { + fn from(val: &str) -> Self { + Expression::String(val.to_string()) + } + } + impl From for Expression { + fn from(val: String) -> Self { + Expression::String(val) + } + } +} diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index 34dbf27113..0d0024c61e 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -56,32 +56,7 @@ use crate::{ DeltaResult, DeltaTable, DeltaTableError, }; -use super::{transaction::commit, write::write_execution_plan}; - -/// Used to represent user input of either a Datafusion expression or string expression -pub enum Expression { - /// Datafusion Expression - DataFusion(Expr), - /// String Expression - String(String), -} - -impl From for Expression { - fn from(val: Expr) -> Self { - Expression::DataFusion(val) - } -} - -impl From<&str> for Expression { - fn from(val: &str) -> Self { - Expression::String(val.to_string()) - } -} -impl From for Expression { - fn from(val: String) -> Self { - Expression::String(val) - } -} +use super::{datafusion_utils::Expression, transaction::commit, write::write_execution_plan}; /// Updates records in the Delta Table. /// See this module's documentation for more information