Skip to content

Commit

Permalink
refactor: unify with_predicate for delete ops (#1512)
Browse files Browse the repository at this point in the history
# Description
A small refactor on the delete op to have the same interface as the
update op

Allows a single method to be used for predicates represented as an
Datafusion `Expr` or as a string.
  • Loading branch information
Blajda authored Jul 6, 2023
1 parent 6c47273 commit 5172dd6
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 43 deletions.
29 changes: 14 additions & 15 deletions rust/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ use crate::storage::{DeltaObjectStore, ObjectStoreRef};
use crate::table_state::DeltaTableState;
use crate::DeltaTable;

use super::datafusion_utils::Expression;

/// Delete Records from the Delta Table.
/// See this module's documentaiton for more information
pub struct DeleteBuilder {
/// Which records to delete
predicate: Option<Expr>,
predicate: Option<Expression>,
/// A snapshot of the table's state
snapshot: DeltaTableState,
/// Delta object store for handling data files
Expand Down Expand Up @@ -92,22 +94,11 @@ impl DeleteBuilder {
}

/// A predicate that determines if a record is deleted
pub fn with_predicate(mut self, predicate: Expr) -> Self {
self.predicate = Some(predicate);
pub fn with_predicate<E: Into<Expression>>(mut self, predicate: E) -> Self {
self.predicate = Some(predicate.into());
self
}

/// Parse the provided query into a Datafusion expression
pub fn with_str_predicate(
mut self,
predicate: impl AsRef<str>,
) -> Result<Self, DeltaTableError> {
let expr = self.snapshot.parse_predicate_expression(predicate)?;
self.predicate = Some(expr);

Ok(self)
}

/// The Datafusion session state to use
pub fn with_session_state(mut self, state: SessionState) -> Self {
self.state = Some(state);
Expand Down Expand Up @@ -302,8 +293,16 @@ impl std::future::IntoFuture for DeleteBuilder {
session.state()
});

let predicate = match this.predicate {
Some(predicate) => match predicate {
Expression::DataFusion(expr) => Some(expr),
Expression::String(s) => Some(this.snapshot.parse_predicate_expression(s)?),
},
None => None,
};

let ((actions, version), metrics) = execute(
this.predicate,
predicate,
this.store.clone(),
&this.snapshot,
state,
Expand Down
34 changes: 32 additions & 2 deletions rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ pub mod vacuum;
#[cfg(feature = "datafusion")]
use self::{delete::DeleteBuilder, load::LoadBuilder, update::UpdateBuilder, write::WriteBuilder};
#[cfg(feature = "datafusion")]
use arrow::record_batch::RecordBatch;
pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream;
#[cfg(feature = "datafusion")]
pub use datafusion::physical_plan::common::collect as collect_sendable_stream;
use arrow::record_batch::RecordBatch;
#[cfg(all(feature = "arrow", feature = "parquet"))]
use optimize::OptimizeBuilder;
use restore::RestoreBuilder;
Expand Down Expand Up @@ -176,3 +176,33 @@ impl AsRef<DeltaTable> for DeltaOps {
&self.0
}
}

#[cfg(feature = "datafusion")]
mod datafusion_utils {
use datafusion_expr::Expr;

/// Used to represent user input of either a Datafusion expression or string expression
pub enum Expression {
/// Datafusion Expression
DataFusion(Expr),
/// String Expression
String(String),
}

impl From<Expr> for Expression {
fn from(val: Expr) -> Self {
Expression::DataFusion(val)
}
}

impl From<&str> for Expression {
fn from(val: &str) -> Self {
Expression::String(val.to_string())
}
}
impl From<String> for Expression {
fn from(val: String) -> Self {
Expression::String(val)
}
}
}
27 changes: 1 addition & 26 deletions rust/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,7 @@ use crate::{
DeltaResult, DeltaTable, DeltaTableError,
};

use super::{transaction::commit, write::write_execution_plan};

/// Used to represent user input of either a Datafusion expression or string expression
pub enum Expression {
/// Datafusion Expression
DataFusion(Expr),
/// String Expression
String(String),
}

impl From<Expr> for Expression {
fn from(val: Expr) -> Self {
Expression::DataFusion(val)
}
}

impl From<&str> for Expression {
fn from(val: &str) -> Self {
Expression::String(val.to_string())
}
}
impl From<String> for Expression {
fn from(val: String) -> Self {
Expression::String(val)
}
}
use super::{datafusion_utils::Expression, transaction::commit, write::write_execution_plan};

/// Updates records in the Delta Table.
/// See this module's documentation for more information
Expand Down

0 comments on commit 5172dd6

Please sign in to comment.