diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index ba2df2f77c..d1ffd9a548 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -198,12 +198,10 @@ impl std::future::IntoFuture for ConstraintBuilder { .build(Some(&this.snapshot), this.log_store.clone(), operation)? .await?; - let table = if let Some(new_snapshot) = commit.snapshot() { - DeltaTable::new_with_state(this.log_store, new_snapshot) - } else { - DeltaTable::new_with_state(this.log_store, this.snapshot) - }; - Ok(table) + Ok(DeltaTable::new_with_state( + this.log_store, + commit.snapshot(), + )) }) } } diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 1487a6b23c..df61fa2e6b 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -189,16 +189,16 @@ async fn excute_non_empty_expr( async fn execute( predicate: Option, log_store: LogStoreRef, - snapshot: &DeltaTableState, + snapshot: DeltaTableState, state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, -) -> DeltaResult<(Option, DeleteMetrics)> { +) -> DeltaResult<(DeltaTableState, DeleteMetrics)> { let exec_start = Instant::now(); let mut metrics = DeleteMetrics::default(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files(&snapshot, log_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis(); let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); @@ -208,7 +208,7 @@ async fn execute( } else { let write_start = Instant::now(); let add = excute_non_empty_expr( - snapshot, + &snapshot, log_store.clone(), &state, &predicate, @@ -261,12 +261,12 @@ async fn execute( predicate: Some(fmt_expr_to_sql(&predicate)?), }; if actions.is_empty() { - return Ok((None, metrics)); + return Ok((snapshot.clone(), metrics)); } let commit = CommitBuilder::from(commit_properties) .with_actions(actions) - .build(Some(snapshot), log_store, operation)? + .build(Some(&snapshot), log_store, operation)? .await?; Ok((commit.snapshot(), metrics)) } @@ -304,19 +304,17 @@ impl std::future::IntoFuture for DeleteBuilder { let (new_snapshot, metrics) = execute( predicate, this.log_store.clone(), - &this.snapshot, + this.snapshot, state, this.writer_properties, this.commit_properties, ) .await?; - let table = if let Some(new_snapshot) = new_snapshot { - DeltaTable::new_with_state(this.log_store, new_snapshot) - } else { - DeltaTable::new_with_state(this.log_store, this.snapshot) - }; - Ok((table, metrics)) + Ok(( + DeltaTable::new_with_state(this.log_store, new_snapshot), + metrics, + )) }) } } diff --git a/crates/core/src/operations/drop_constraints.rs b/crates/core/src/operations/drop_constraints.rs index 4f0e82bfb5..f1d320d2ff 100644 --- a/crates/core/src/operations/drop_constraints.rs +++ b/crates/core/src/operations/drop_constraints.rs @@ -91,12 +91,10 @@ impl std::future::IntoFuture for DropConstraintBuilder { .build(Some(&this.snapshot), this.log_store.clone(), operation)? .await?; - let table = if let Some(new_snapshot) = commit.snapshot() { - DeltaTable::new_with_state(this.log_store, new_snapshot) - } else { - DeltaTable::new_with_state(this.log_store, this.snapshot) - }; - Ok(table) + Ok(DeltaTable::new_with_state( + this.log_store, + commit.snapshot(), + )) }) } } diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index f4b1dcb0ef..150438be9a 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -931,7 +931,7 @@ async fn execute( predicate: Expression, source: DataFrame, log_store: LogStoreRef, - snapshot: &DeltaTableState, + snapshot: DeltaTableState, state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, @@ -941,7 +941,7 @@ async fn execute( match_operations: Vec, not_match_target_operations: Vec, not_match_source_operations: Vec, -) -> DeltaResult<(Option, MergeMetrics)> { +) -> DeltaResult<(DeltaTableState, MergeMetrics)> { let mut metrics = MergeMetrics::default(); let exec_start = Instant::now(); @@ -986,7 +986,7 @@ async fn execute( let scan_config = DeltaScanConfigBuilder::default() .with_file_column(true) .with_parquet_pushdown(false) - .build(snapshot)?; + .build(&snapshot)?; let target_provider = Arc::new(DeltaTableProvider::try_new( snapshot.clone(), @@ -1016,7 +1016,7 @@ async fn execute( } else { try_construct_early_filter( predicate.clone(), - snapshot, + &snapshot, &state, &source, &source_name, @@ -1369,7 +1369,7 @@ async fn execute( let rewrite_start = Instant::now(); let add_actions = write_execution_plan( - Some(snapshot), + Some(&snapshot), state.clone(), write, table_partition_cols.clone(), @@ -1448,12 +1448,12 @@ async fn execute( }; if actions.is_empty() { - return Ok((None, metrics)); + return Ok((snapshot, metrics)); } let commit = CommitBuilder::from(commit_properties) .with_actions(actions) - .build(Some(snapshot), log_store.clone(), operation)? + .build(Some(&snapshot), log_store.clone(), operation)? .await?; Ok((commit.snapshot(), metrics)) } @@ -1512,11 +1512,11 @@ impl std::future::IntoFuture for MergeBuilder { session.state() }); - let (new_snapshot, metrics) = execute( + let (snapshot, metrics) = execute( this.predicate, this.source, this.log_store.clone(), - &this.snapshot, + this.snapshot, state, this.writer_properties, this.commit_properties, @@ -1529,12 +1529,10 @@ impl std::future::IntoFuture for MergeBuilder { ) .await?; - let table = if let Some(new_snapshot) = new_snapshot { - DeltaTable::new_with_state(this.log_store, new_snapshot) - } else { - DeltaTable::new_with_state(this.log_store, this.snapshot) - }; - Ok((table, metrics)) + Ok(( + DeltaTable::new_with_state(this.log_store, snapshot), + metrics, + )) }) } } diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 4057ce362b..6606a8c339 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -207,7 +207,7 @@ pub trait TableReference: Send + Sync { fn metadata(&self) -> &Metadata; /// Try to cast this table reference to a `EagerSnapshot` - fn eager_snapshot(&self) -> Option<&EagerSnapshot>; + fn eager_snapshot(&self) -> &EagerSnapshot; } impl TableReference for EagerSnapshot { @@ -223,8 +223,8 @@ impl TableReference for EagerSnapshot { self.table_config() } - fn eager_snapshot(&self) -> Option<&EagerSnapshot> { - Some(self) + fn eager_snapshot(&self) -> &EagerSnapshot { + self } } @@ -241,8 +241,8 @@ impl TableReference for DeltaTableState { self.snapshot.metadata() } - fn eager_snapshot(&self) -> Option<&EagerSnapshot> { - Some(&self.snapshot) + fn eager_snapshot(&self) -> &EagerSnapshot { + &self.snapshot } } @@ -512,13 +512,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { // unwrap() is safe here due to the above check // TODO: refactor to only depend on TableReference Trait - let read_snapshot = - this.table_data - .unwrap() - .eager_snapshot() - .ok_or(DeltaTableError::Generic( - "Expected an instance of EagerSnapshot".to_owned(), - ))?; + let read_snapshot = this.table_data.unwrap().eager_snapshot(); let mut attempt_number = 1; while attempt_number <= this.max_retries { @@ -595,34 +589,38 @@ pub struct PostCommit<'a> { impl<'a> PostCommit<'a> { /// Runs the post commit activities - async fn run_post_commit_hook(&self) -> DeltaResult> { + async fn run_post_commit_hook(&self) -> DeltaResult { if let Some(table) = self.table_data { - if let Some(mut snapshot) = table.eager_snapshot().cloned() { - if self.version - snapshot.version() > 1 { - // This may only occur during concurrent write actions. We need to update the state first to - 1 - // then we can advance. - snapshot - .update(self.log_store.clone(), Some(self.version - 1)) - .await?; - snapshot.advance(vec![&self.data])?; - } else { - snapshot.advance(vec![&self.data])?; - } - let state = DeltaTableState { - app_transaction_version: HashMap::new(), - snapshot, - }; - // Execute each hook - if self.create_checkpoint { - self.create_checkpoint(&state, &self.log_store, self.version) - .await?; - } - return Ok(Some(state)); + let mut snapshot = table.eager_snapshot().clone(); + if self.version - snapshot.version() > 1 { + // This may only occur during concurrent write actions. We need to update the state first to - 1 + // then we can advance. + snapshot + .update(self.log_store.clone(), Some(self.version - 1)) + .await?; + snapshot.advance(vec![&self.data])?; } else { - return Ok(None); + snapshot.advance(vec![&self.data])?; } + let state = DeltaTableState { + app_transaction_version: HashMap::new(), + snapshot, + }; + // Execute each hook + if self.create_checkpoint { + self.create_checkpoint(&state, &self.log_store, self.version) + .await?; + } + Ok(state) } else { - return Ok(None); + let state = DeltaTableState::try_new( + &Path::default(), + self.log_store.object_store(), + Default::default(), + Some(self.version), + ) + .await?; + Ok(state) } } async fn create_checkpoint( @@ -642,7 +640,7 @@ impl<'a> PostCommit<'a> { /// A commit that successfully completed pub struct FinalizedCommit { /// The new table state after a commmit - pub snapshot: Option, + pub snapshot: DeltaTableState, /// Version of the finalized commit pub version: i64, @@ -650,7 +648,7 @@ pub struct FinalizedCommit { impl FinalizedCommit { /// The new table state after a commmit - pub fn snapshot(&self) -> Option { + pub fn snapshot(&self) -> DeltaTableState { self.snapshot.clone() } /// Version of the finalized commit @@ -668,14 +666,12 @@ impl<'a> std::future::IntoFuture for PostCommit<'a> { Box::pin(async move { match this.run_post_commit_hook().await { - Ok(snapshot) => { - return Ok(FinalizedCommit { - snapshot, - version: this.version, - }) - } - Err(err) => return Err(err), - }; + Ok(snapshot) => Ok(FinalizedCommit { + snapshot, + version: this.version, + }), + Err(err) => Err(err), + } }) } } diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 1d14a27ce4..69b289e290 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -169,12 +169,12 @@ async fn execute( predicate: Option, updates: HashMap, log_store: LogStoreRef, - snapshot: &DeltaTableState, + snapshot: DeltaTableState, state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, safe_cast: bool, -) -> DeltaResult<(Option, UpdateMetrics)> { +) -> DeltaResult<(DeltaTableState, UpdateMetrics)> { // Validate the predicate and update expressions. // // If the predicate is not set, then all files need to be updated. @@ -190,7 +190,7 @@ async fn execute( let version = snapshot.version(); if updates.is_empty() { - return Ok((None, metrics)); + return Ok((snapshot, metrics)); } let predicate = match predicate { @@ -215,11 +215,11 @@ async fn execute( let table_partition_cols = current_metadata.partition_columns.clone(); let scan_start = Instant::now(); - let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files(&snapshot, log_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; if candidates.candidates.is_empty() { - return Ok((None, metrics)); + return Ok((snapshot, metrics)); } let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); @@ -227,7 +227,7 @@ async fn execute( let execution_props = state.execution_props(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) + let scan = DeltaScanBuilder::new(&snapshot, log_store.clone(), &state) .with_files(&candidates.candidates) .build() .await?; @@ -350,7 +350,7 @@ async fn execute( )?); let add_actions = write_execution_plan( - Some(snapshot), + Some(&snapshot), state.clone(), projection.clone(), table_partition_cols.clone(), @@ -416,7 +416,7 @@ async fn execute( let commit = CommitBuilder::from(commit_properties) .with_actions(actions) - .build(Some(snapshot), log_store, operation)? + .build(Some(&snapshot), log_store, operation)? .await?; Ok((commit.snapshot(), metrics)) @@ -442,11 +442,11 @@ impl std::future::IntoFuture for UpdateBuilder { session.state() }); - let (new_snapshot, metrics) = execute( + let (snapshot, metrics) = execute( this.predicate, this.updates, this.log_store.clone(), - &this.snapshot, + this.snapshot, state, this.writer_properties, this.commit_properties, @@ -454,12 +454,10 @@ impl std::future::IntoFuture for UpdateBuilder { ) .await?; - let table = if let Some(new_snapshot) = new_snapshot { - DeltaTable::new_with_state(this.log_store, new_snapshot) - } else { - DeltaTable::new_with_state(this.log_store, this.snapshot) - }; - Ok((table, metrics)) + Ok(( + DeltaTable::new_with_state(this.log_store, snapshot), + metrics, + )) }) } } diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 1babf6ca01..ab4f7b731c 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -810,16 +810,7 @@ impl std::future::IntoFuture for WriteBuilder { )? .await?; - // TODO we do not have the table config available, but since we are merging only our newly - // created actions, it may be safe to assume, that we want to include all actions. - // then again, having only some tombstones may be misleading. - if let (Some(mut snapshot), Some(new_snapshot)) = (this.snapshot, commit.snapshot) { - Ok(DeltaTable::new_with_state(this.log_store, new_snapshot)) - } else { - let mut table = DeltaTable::new(this.log_store, Default::default()); - table.update().await?; - Ok(table) - } + Ok(DeltaTable::new_with_state(this.log_store, commit.snapshot)) }) } }