Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python, rust): expose custom_metadata for all operations #2032

Merged
merged 11 commits into from
Jan 7, 2024
19 changes: 18 additions & 1 deletion crates/deltalake-core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@ use super::datafusion_utils::into_expr;

/// Build a constraint to add to a table
pub struct ConstraintBuilder {
/// A snapshot of the table's state
snapshot: DeltaTableState,
/// Name of the constraint
name: Option<String>,
/// Constraint expression
expr: Option<Expression>,
/// Delta object store for handling data files
log_store: LogStoreRef,
/// Datafusion session state relevant for executing the input plan
state: Option<SessionState>,
/// Additional metadata to be added to commit
app_metadata: Option<HashMap<String, serde_json::Value>>,
}

impl ConstraintBuilder {
Expand All @@ -47,6 +54,7 @@ impl ConstraintBuilder {
snapshot,
log_store,
state: None,
app_metadata: None,
}
}

Expand All @@ -66,6 +74,15 @@ impl ConstraintBuilder {
self.state = Some(state);
self
}

/// Additional metadata to be added to commit info
pub fn with_metadata(
mut self,
metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
) -> Self {
self.app_metadata = Some(HashMap::from_iter(metadata));
self
}
}

impl std::future::IntoFuture for ConstraintBuilder {
Expand Down Expand Up @@ -195,7 +212,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
&actions,
operations,
&this.snapshot,
None,
this.app_metadata,
)
.await?;

Expand Down
29 changes: 27 additions & 2 deletions crates/deltalake-core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! ````

use std::cmp::max;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::ops::BitXor;
use std::time::{SystemTime, UNIX_EPOCH};

Expand Down Expand Up @@ -84,6 +84,8 @@ pub struct RestoreBuilder {
ignore_missing_files: bool,
/// Protocol downgrade allowed
protocol_downgrade_allowed: bool,
/// Additional metadata to be added to commit
app_metadata: Option<HashMap<String, serde_json::Value>>,
}

impl RestoreBuilder {
Expand All @@ -96,6 +98,7 @@ impl RestoreBuilder {
datetime_to_restore: None,
ignore_missing_files: false,
protocol_downgrade_allowed: false,
app_metadata: None,
}
}

Expand Down Expand Up @@ -123,6 +126,15 @@ impl RestoreBuilder {
self.protocol_downgrade_allowed = protocol_downgrade_allowed;
self
}

/// Additional metadata to be added to commit info
pub fn with_metadata(
mut self,
metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
) -> Self {
self.app_metadata = Some(HashMap::from_iter(metadata));
self
}
}

async fn execute(
Expand All @@ -132,6 +144,7 @@ async fn execute(
datetime_to_restore: Option<DateTime<Utc>>,
ignore_missing_files: bool,
protocol_downgrade_allowed: bool,
app_metadata: Option<HashMap<String, serde_json::Value>>,
) -> DeltaResult<RestoreMetrics> {
if !(version_to_restore
.is_none()
Expand Down Expand Up @@ -234,6 +247,17 @@ async fn execute(
reader_features: snapshot.protocol().reader_features.clone(),
}
};
let mut app_metadata = match app_metadata {
Some(meta) => meta,
None => HashMap::new(),
};

app_metadata.insert("readVersion".to_owned(), snapshot.version().into());

if let Ok(map) = serde_json::to_value(&metrics) {
app_metadata.insert("operationMetrics".to_owned(), map);
}

actions.push(Action::Protocol(protocol));
actions.extend(files_to_add.into_iter().map(Action::Add));
actions.extend(files_to_remove.into_iter().map(Action::Remove));
Expand All @@ -245,7 +269,7 @@ async fn execute(
datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }),
},
&actions,
None,
Some(app_metadata),
)
.await?;
let commit_version = snapshot.version() + 1;
Expand Down Expand Up @@ -297,6 +321,7 @@ impl std::future::IntoFuture for RestoreBuilder {
this.datetime_to_restore,
this.ignore_missing_files,
this.protocol_downgrade_allowed,
this.app_metadata,
)
.await?;
let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot);
Expand Down
26 changes: 23 additions & 3 deletions crates/deltalake-core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub struct VacuumBuilder {
dry_run: bool,
/// Override the source of time
clock: Option<Arc<dyn Clock>>,
/// Additional metadata to be added to commit
app_metadata: Option<HashMap<String, serde_json::Value>>,
}

/// Details for the Vacuum operation including which files were
Expand Down Expand Up @@ -136,6 +138,7 @@ impl VacuumBuilder {
enforce_retention_duration: true,
dry_run: false,
clock: None,
app_metadata: None,
}
}

Expand Down Expand Up @@ -164,6 +167,15 @@ impl VacuumBuilder {
self
}

/// Additional metadata to be added to commit info
pub fn with_metadata(
mut self,
metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
) -> Self {
self.app_metadata = Some(HashMap::from_iter(metadata));
self
}

/// Determine which files can be deleted. Does not actually peform the deletion
async fn create_vacuum_plan(&self) -> Result<VacuumPlan, VacuumError> {
let min_retention = Duration::milliseconds(
Expand Down Expand Up @@ -240,7 +252,7 @@ impl std::future::IntoFuture for VacuumBuilder {
}

let metrics = plan
.execute(this.log_store.as_ref(), &this.snapshot)
.execute(this.log_store.as_ref(), &this.snapshot, this.app_metadata)
.await?;
Ok((
DeltaTable::new_with_state(this.log_store, this.snapshot),
Expand Down Expand Up @@ -270,6 +282,7 @@ impl VacuumPlan {
self,
store: &dyn LogStore,
snapshot: &DeltaTableState,
app_metadata: Option<HashMap<String, serde_json::Value>>,
) -> Result<VacuumMetrics, DeltaTableError> {
if self.files_to_delete.is_empty() {
return Ok(VacuumMetrics {
Expand Down Expand Up @@ -309,7 +322,14 @@ impl VacuumPlan {

let start_actions = vec![Action::CommitInfo(commit_info)];

commit(store, &start_actions, start_operation, snapshot, None).await?;
commit(
store,
&start_actions,
start_operation,
snapshot,
app_metadata.clone(),
)
.await?;
// Finish VACUUM START COMMIT

let locations = futures::stream::iter(self.files_to_delete)
Expand Down Expand Up @@ -349,7 +369,7 @@ impl VacuumPlan {

let end_actions = vec![Action::CommitInfo(commit_info)];

commit(store, &end_actions, end_operation, snapshot, None).await?;
commit(store, &end_actions, end_operation, snapshot, app_metadata).await?;
// Finish VACUUM END COMMIT

Ok(VacuumMetrics {
Expand Down
11 changes: 10 additions & 1 deletion python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class RawDeltaTable:
dry_run: bool,
retention_hours: Optional[int],
enforce_retention_duration: bool,
custom_metadata: Optional[Dict[str, str]],
) -> List[str]: ...
def compact_optimize(
self,
Expand All @@ -65,6 +66,7 @@ class RawDeltaTable:
max_concurrent_tasks: Optional[int],
min_commit_interval: Optional[int],
writer_properties: Optional[Dict[str, Optional[str]]],
custom_metadata: Optional[Dict[str, str]],
) -> str: ...
def z_order_optimize(
self,
Expand All @@ -75,16 +77,19 @@ class RawDeltaTable:
max_spill_size: Optional[int],
min_commit_interval: Optional[int],
writer_properties: Optional[Dict[str, Optional[str]]],
custom_metadata: Optional[Dict[str, str]],
) -> str: ...
def add_constraints(
self,
constraints: Dict[str, str],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def restore(
self,
target: Optional[Any],
ignore_missing_files: bool,
protocol_downgrade_allowed: bool,
custom_metadata: Optional[Dict[str, str]],
) -> str: ...
def history(self, limit: Optional[int]) -> List[str]: ...
def update_incremental(self) -> None: ...
Expand All @@ -97,14 +102,16 @@ class RawDeltaTable:
self,
predicate: Optional[str],
writer_properties: Optional[Dict[str, Optional[str]]],
custom_metadata: Optional[Dict[str, str]],
) -> str: ...
def repair(self, dry_run: bool) -> str: ...
def update(
self,
updates: Dict[str, str],
predicate: Optional[str],
writer_properties: Optional[Dict[str, Optional[str]]],
safe_cast: bool = False,
safe_cast: bool,
custom_metadata: Optional[Dict[str, str]],
) -> str: ...
def merge_execute(
self,
Expand All @@ -113,6 +120,7 @@ class RawDeltaTable:
source_alias: Optional[str],
target_alias: Optional[str],
writer_properties: Optional[Dict[str, Optional[str]]],
custom_metadata: Optional[Dict[str, str]],
safe_cast: bool,
matched_update_updates: Optional[List[Dict[str, str]]],
matched_update_predicate: Optional[List[Optional[str]]],
Expand Down Expand Up @@ -186,6 +194,7 @@ def create_deltalake(
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
custom_metadata: Optional[Dict[str, str]],
) -> None: ...
def batch_distinct(batch: pyarrow.RecordBatch) -> pyarrow.RecordBatch: ...

Expand Down
Loading
Loading