Skip to content

Commit

Permalink
always return state
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Apr 26, 2024
1 parent 04a91e2 commit 4544772
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 112 deletions.
10 changes: 4 additions & 6 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,10 @@ impl std::future::IntoFuture for ConstraintBuilder {
.build(Some(&this.snapshot), this.log_store.clone(), operation)?
.await?;

let table = if let Some(new_snapshot) = commit.snapshot() {
DeltaTable::new_with_state(this.log_store, new_snapshot)
} else {
DeltaTable::new_with_state(this.log_store, this.snapshot)
};
Ok(table)
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}
Expand Down
24 changes: 11 additions & 13 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,16 @@ async fn excute_non_empty_expr(
async fn execute(
predicate: Option<Expr>,
log_store: LogStoreRef,
snapshot: &DeltaTableState,
snapshot: DeltaTableState,
state: SessionState,
writer_properties: Option<WriterProperties>,
mut commit_properties: CommitProperties,
) -> DeltaResult<(Option<DeltaTableState>, DeleteMetrics)> {
) -> DeltaResult<(DeltaTableState, DeleteMetrics)> {
let exec_start = Instant::now();
let mut metrics = DeleteMetrics::default();

let scan_start = Instant::now();
let candidates = find_files(snapshot, log_store.clone(), &state, predicate.clone()).await?;
let candidates = find_files(&snapshot, log_store.clone(), &state, predicate.clone()).await?;
metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis();

let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true))));
Expand All @@ -208,7 +208,7 @@ async fn execute(
} else {
let write_start = Instant::now();
let add = excute_non_empty_expr(
snapshot,
&snapshot,
log_store.clone(),
&state,
&predicate,
Expand Down Expand Up @@ -261,12 +261,12 @@ async fn execute(
predicate: Some(fmt_expr_to_sql(&predicate)?),
};
if actions.is_empty() {
return Ok((None, metrics));
return Ok((snapshot.clone(), metrics));
}

let commit = CommitBuilder::from(commit_properties)
.with_actions(actions)
.build(Some(snapshot), log_store, operation)?
.build(Some(&snapshot), log_store, operation)?
.await?;
Ok((commit.snapshot(), metrics))
}
Expand Down Expand Up @@ -304,19 +304,17 @@ impl std::future::IntoFuture for DeleteBuilder {
let (new_snapshot, metrics) = execute(
predicate,
this.log_store.clone(),
&this.snapshot,
this.snapshot,
state,
this.writer_properties,
this.commit_properties,
)
.await?;

let table = if let Some(new_snapshot) = new_snapshot {
DeltaTable::new_with_state(this.log_store, new_snapshot)
} else {
DeltaTable::new_with_state(this.log_store, this.snapshot)
};
Ok((table, metrics))
Ok((
DeltaTable::new_with_state(this.log_store, new_snapshot),
metrics,
))
})
}
}
Expand Down
10 changes: 4 additions & 6 deletions crates/core/src/operations/drop_constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,10 @@ impl std::future::IntoFuture for DropConstraintBuilder {
.build(Some(&this.snapshot), this.log_store.clone(), operation)?
.await?;

let table = if let Some(new_snapshot) = commit.snapshot() {
DeltaTable::new_with_state(this.log_store, new_snapshot)
} else {
DeltaTable::new_with_state(this.log_store, this.snapshot)
};
Ok(table)
Ok(DeltaTable::new_with_state(
this.log_store,
commit.snapshot(),
))
})
}
}
Expand Down
28 changes: 13 additions & 15 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ async fn execute(
predicate: Expression,
source: DataFrame,
log_store: LogStoreRef,
snapshot: &DeltaTableState,
snapshot: DeltaTableState,
state: SessionState,
writer_properties: Option<WriterProperties>,
mut commit_properties: CommitProperties,
Expand All @@ -941,7 +941,7 @@ async fn execute(
match_operations: Vec<MergeOperationConfig>,
not_match_target_operations: Vec<MergeOperationConfig>,
not_match_source_operations: Vec<MergeOperationConfig>,
) -> DeltaResult<(Option<DeltaTableState>, MergeMetrics)> {
) -> DeltaResult<(DeltaTableState, MergeMetrics)> {
let mut metrics = MergeMetrics::default();
let exec_start = Instant::now();

Expand Down Expand Up @@ -986,7 +986,7 @@ async fn execute(
let scan_config = DeltaScanConfigBuilder::default()
.with_file_column(true)
.with_parquet_pushdown(false)
.build(snapshot)?;
.build(&snapshot)?;

let target_provider = Arc::new(DeltaTableProvider::try_new(
snapshot.clone(),
Expand Down Expand Up @@ -1016,7 +1016,7 @@ async fn execute(
} else {
try_construct_early_filter(
predicate.clone(),
snapshot,
&snapshot,
&state,
&source,
&source_name,
Expand Down Expand Up @@ -1369,7 +1369,7 @@ async fn execute(

let rewrite_start = Instant::now();
let add_actions = write_execution_plan(
Some(snapshot),
Some(&snapshot),
state.clone(),
write,
table_partition_cols.clone(),
Expand Down Expand Up @@ -1448,12 +1448,12 @@ async fn execute(
};

if actions.is_empty() {
return Ok((None, metrics));
return Ok((snapshot, metrics));
}

let commit = CommitBuilder::from(commit_properties)
.with_actions(actions)
.build(Some(snapshot), log_store.clone(), operation)?
.build(Some(&snapshot), log_store.clone(), operation)?
.await?;
Ok((commit.snapshot(), metrics))
}
Expand Down Expand Up @@ -1512,11 +1512,11 @@ impl std::future::IntoFuture for MergeBuilder {
session.state()
});

let (new_snapshot, metrics) = execute(
let (snapshot, metrics) = execute(
this.predicate,
this.source,
this.log_store.clone(),
&this.snapshot,
this.snapshot,
state,
this.writer_properties,
this.commit_properties,
Expand All @@ -1529,12 +1529,10 @@ impl std::future::IntoFuture for MergeBuilder {
)
.await?;

let table = if let Some(new_snapshot) = new_snapshot {
DeltaTable::new_with_state(this.log_store, new_snapshot)
} else {
DeltaTable::new_with_state(this.log_store, this.snapshot)
};
Ok((table, metrics))
Ok((
DeltaTable::new_with_state(this.log_store, snapshot),
metrics,
))
})
}
}
Expand Down
88 changes: 42 additions & 46 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ pub trait TableReference: Send + Sync {
fn metadata(&self) -> &Metadata;

/// Try to cast this table reference to a `EagerSnapshot`
fn eager_snapshot(&self) -> Option<&EagerSnapshot>;
fn eager_snapshot(&self) -> &EagerSnapshot;
}

impl TableReference for EagerSnapshot {
Expand All @@ -223,8 +223,8 @@ impl TableReference for EagerSnapshot {
self.table_config()
}

fn eager_snapshot(&self) -> Option<&EagerSnapshot> {
Some(self)
fn eager_snapshot(&self) -> &EagerSnapshot {
self
}
}

Expand All @@ -241,8 +241,8 @@ impl TableReference for DeltaTableState {
self.snapshot.metadata()
}

fn eager_snapshot(&self) -> Option<&EagerSnapshot> {
Some(&self.snapshot)
fn eager_snapshot(&self) -> &EagerSnapshot {
&self.snapshot
}
}

Expand Down Expand Up @@ -512,13 +512,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {

// unwrap() is safe here due to the above check
// TODO: refactor to only depend on TableReference Trait
let read_snapshot =
this.table_data
.unwrap()
.eager_snapshot()
.ok_or(DeltaTableError::Generic(
"Expected an instance of EagerSnapshot".to_owned(),
))?;
let read_snapshot = this.table_data.unwrap().eager_snapshot();

let mut attempt_number = 1;
while attempt_number <= this.max_retries {
Expand Down Expand Up @@ -595,34 +589,38 @@ pub struct PostCommit<'a> {

impl<'a> PostCommit<'a> {
/// Runs the post commit activities
async fn run_post_commit_hook(&self) -> DeltaResult<Option<DeltaTableState>> {
async fn run_post_commit_hook(&self) -> DeltaResult<DeltaTableState> {
if let Some(table) = self.table_data {
if let Some(mut snapshot) = table.eager_snapshot().cloned() {
if self.version - snapshot.version() > 1 {
// This may only occur during concurrent write actions. We need to update the state first to - 1
// then we can advance.
snapshot
.update(self.log_store.clone(), Some(self.version - 1))
.await?;
snapshot.advance(vec![&self.data])?;
} else {
snapshot.advance(vec![&self.data])?;
}
let state = DeltaTableState {
app_transaction_version: HashMap::new(),
snapshot,
};
// Execute each hook
if self.create_checkpoint {
self.create_checkpoint(&state, &self.log_store, self.version)
.await?;
}
return Ok(Some(state));
let mut snapshot = table.eager_snapshot().clone();
if self.version - snapshot.version() > 1 {
// This may only occur during concurrent write actions. We need to update the state first to - 1
// then we can advance.
snapshot
.update(self.log_store.clone(), Some(self.version - 1))
.await?;
snapshot.advance(vec![&self.data])?;
} else {
return Ok(None);
snapshot.advance(vec![&self.data])?;
}
let state = DeltaTableState {
app_transaction_version: HashMap::new(),
snapshot,
};
// Execute each hook
if self.create_checkpoint {
self.create_checkpoint(&state, &self.log_store, self.version)
.await?;
}
Ok(state)
} else {
return Ok(None);
let state = DeltaTableState::try_new(
&Path::default(),
self.log_store.object_store(),
Default::default(),
Some(self.version),
)
.await?;
Ok(state)
}
}
async fn create_checkpoint(
Expand All @@ -642,15 +640,15 @@ impl<'a> PostCommit<'a> {
/// A commit that successfully completed
pub struct FinalizedCommit {
/// The new table state after a commmit
pub snapshot: Option<DeltaTableState>,
pub snapshot: DeltaTableState,

/// Version of the finalized commit
pub version: i64,
}

impl FinalizedCommit {
/// The new table state after a commmit
pub fn snapshot(&self) -> Option<DeltaTableState> {
pub fn snapshot(&self) -> DeltaTableState {
self.snapshot.clone()
}
/// Version of the finalized commit
Expand All @@ -668,14 +666,12 @@ impl<'a> std::future::IntoFuture for PostCommit<'a> {

Box::pin(async move {
match this.run_post_commit_hook().await {
Ok(snapshot) => {
return Ok(FinalizedCommit {
snapshot,
version: this.version,
})
}
Err(err) => return Err(err),
};
Ok(snapshot) => Ok(FinalizedCommit {
snapshot,
version: this.version,
}),
Err(err) => Err(err),
}
})
}
}
Expand Down
Loading

0 comments on commit 4544772

Please sign in to comment.