Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python, rust): expose custom_metadata for all operations #2032

Merged
merged 11 commits into from
Jan 7, 2024
23 changes: 23 additions & 0 deletions crates/deltalake-core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@ use super::datafusion_utils::into_expr;

/// Build a constraint to add to a table
pub struct ConstraintBuilder {
/// A snapshot of the table's state
snapshot: DeltaTableState,
/// Name of the constraint
name: Option<String>,
/// Constraint expression
expr: Option<Expression>,
/// Delta object store for handling data files
log_store: LogStoreRef,
/// Datafusion session state relevant for executing the input plan
state: Option<SessionState>,
/// Additional metadata to be added to commit
app_metadata: Option<HashMap<String, serde_json::Value>>,
}

impl ConstraintBuilder {
Expand All @@ -47,6 +54,7 @@ impl ConstraintBuilder {
snapshot,
log_store,
state: None,
app_metadata: None,
}
}

Expand All @@ -66,6 +74,15 @@ impl ConstraintBuilder {
self.state = Some(state);
self
}

/// Additional metadata to be added to commit info
pub fn with_metadata(
mut self,
metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
) -> Self {
self.app_metadata = Some(HashMap::from_iter(metadata));
self
}
}

impl std::future::IntoFuture for ConstraintBuilder {
Expand Down Expand Up @@ -174,13 +191,19 @@ impl std::future::IntoFuture for ConstraintBuilder {
expr: expr_str.clone(),
};

let app_metadata = match this.app_metadata {
Some(metadata) => metadata,
None => HashMap::default(),
};

let commit_info = CommitInfo {
timestamp: Some(Utc::now().timestamp_millis()),
operation: Some(operations.name().to_string()),
operation_parameters: Some(operational_parameters),
read_version: Some(this.snapshot.version()),
isolation_level: Some(IsolationLevel::Serializable),
is_blind_append: Some(false),
info: app_metadata,
..Default::default()
};

Expand Down
25 changes: 22 additions & 3 deletions crates/deltalake-core/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct FileSystemCheckBuilder {
log_store: LogStoreRef,
/// Don't remove actions to the table log. Just determine which files can be removed
dry_run: bool,
/// Additional metadata to be added to commit
app_metadata: Option<HashMap<String, serde_json::Value>>,
}

/// Details of the FSCK operation including which files were removed from the log
Expand Down Expand Up @@ -78,6 +80,7 @@ impl FileSystemCheckBuilder {
snapshot: state,
log_store,
dry_run: false,
app_metadata: None,
}
}

Expand All @@ -87,6 +90,15 @@ impl FileSystemCheckBuilder {
self
}

/// Additional metadata to be added to commit info
pub fn with_metadata(
mut self,
metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
) -> Self {
self.app_metadata = Some(HashMap::from_iter(metadata));
self
}

async fn create_fsck_plan(&self) -> DeltaResult<FileSystemCheckPlan> {
let mut files_relative: HashMap<&str, &Add> =
HashMap::with_capacity(self.snapshot.files().len());
Expand Down Expand Up @@ -126,7 +138,11 @@ impl FileSystemCheckBuilder {
}

impl FileSystemCheckPlan {
pub async fn execute(self, snapshot: &DeltaTableState) -> DeltaResult<FileSystemCheckMetrics> {
pub async fn execute(
self,
snapshot: &DeltaTableState,
app_metadata: Option<HashMap<String, serde_json::Value>>,
) -> DeltaResult<FileSystemCheckMetrics> {
if self.files_to_remove.is_empty() {
return Ok(FileSystemCheckMetrics {
dry_run: false,
Expand Down Expand Up @@ -159,7 +175,10 @@ impl FileSystemCheckPlan {
files_removed: removed_file_paths,
};

let mut app_metadata = HashMap::new();
let mut app_metadata = match app_metadata {
Some(meta) => meta,
None => HashMap::new(),
};

app_metadata.insert("readVersion".to_owned(), snapshot.version().into());
if let Ok(map) = serde_json::to_value(&metrics) {
Expand Down Expand Up @@ -199,7 +218,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder {
));
}

let metrics = plan.execute(&this.snapshot).await?;
let metrics = plan.execute(&this.snapshot, this.app_metadata).await?;
let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot);
table.update().await?;
Ok((table, metrics))
Expand Down
13 changes: 9 additions & 4 deletions crates/deltalake-core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
this.max_concurrent_tasks,
this.max_spill_size,
this.min_commit_interval,
this.app_metadata,
)
.await?;
let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot);
Expand Down Expand Up @@ -589,6 +590,7 @@ impl MergePlan {
#[allow(unused_variables)] // used behind a feature flag
max_spill_size: usize,
min_commit_interval: Option<Duration>,
app_metadata: Option<HashMap<String, serde_json::Value>>,
) -> Result<Metrics, DeltaTableError> {
let operations = std::mem::take(&mut self.operations);

Expand Down Expand Up @@ -702,14 +704,17 @@ impl MergePlan {
last_commit = now;

buffered_metrics.preserve_insertion_order = true;
let mut metadata = HashMap::new();
metadata.insert("readVersion".to_owned(), self.read_table_version.into());
let mut app_metadata = match app_metadata.clone() {
Some(meta) => meta,
None => HashMap::new(),
};
app_metadata.insert("readVersion".to_owned(), self.read_table_version.into());
let maybe_map_metrics = serde_json::to_value(std::mem::replace(
&mut buffered_metrics,
orig_metrics.clone(),
));
if let Ok(map) = maybe_map_metrics {
metadata.insert("operationMetrics".to_owned(), map);
app_metadata.insert("operationMetrics".to_owned(), map);
}

table.update_incremental(None).await?;
Expand All @@ -721,7 +726,7 @@ impl MergePlan {
&actions,
self.task_parameters.input_parameters.clone().into(),
table.get_state(),
Some(metadata),
Some(app_metadata.clone()),
)
.await?;
}
Expand Down
29 changes: 27 additions & 2 deletions crates/deltalake-core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! ````

use std::cmp::max;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::ops::BitXor;
use std::time::{SystemTime, UNIX_EPOCH};

Expand Down Expand Up @@ -84,6 +84,8 @@ pub struct RestoreBuilder {
ignore_missing_files: bool,
/// Protocol downgrade allowed
protocol_downgrade_allowed: bool,
/// Additional metadata to be added to commit
app_metadata: Option<HashMap<String, serde_json::Value>>,
}

impl RestoreBuilder {
Expand All @@ -96,6 +98,7 @@ impl RestoreBuilder {
datetime_to_restore: None,
ignore_missing_files: false,
protocol_downgrade_allowed: false,
app_metadata: None,
}
}

Expand Down Expand Up @@ -123,6 +126,15 @@ impl RestoreBuilder {
self.protocol_downgrade_allowed = protocol_downgrade_allowed;
self
}

/// Additional metadata to be added to commit info
pub fn with_metadata(
mut self,
metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
) -> Self {
self.app_metadata = Some(HashMap::from_iter(metadata));
self
}
}

async fn execute(
Expand All @@ -132,6 +144,7 @@ async fn execute(
datetime_to_restore: Option<DateTime<Utc>>,
ignore_missing_files: bool,
protocol_downgrade_allowed: bool,
app_metadata: Option<HashMap<String, serde_json::Value>>,
) -> DeltaResult<RestoreMetrics> {
if !(version_to_restore
.is_none()
Expand Down Expand Up @@ -234,6 +247,17 @@ async fn execute(
reader_features: snapshot.protocol().reader_features.clone(),
}
};
let mut app_metadata = match app_metadata {
Some(meta) => meta,
None => HashMap::new(),
};

app_metadata.insert("readVersion".to_owned(), snapshot.version().into());

if let Ok(map) = serde_json::to_value(&metrics) {
app_metadata.insert("operationMetrics".to_owned(), map);
}

actions.push(Action::Protocol(protocol));
actions.extend(files_to_add.into_iter().map(Action::Add));
actions.extend(files_to_remove.into_iter().map(Action::Remove));
Expand All @@ -245,7 +269,7 @@ async fn execute(
datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }),
},
&actions,
None,
Some(app_metadata),
)
.await?;
let commit_version = snapshot.version() + 1;
Expand Down Expand Up @@ -297,6 +321,7 @@ impl std::future::IntoFuture for RestoreBuilder {
this.datetime_to_restore,
this.ignore_missing_files,
this.protocol_downgrade_allowed,
this.app_metadata,
)
.await?;
let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot);
Expand Down
27 changes: 23 additions & 4 deletions crates/deltalake-core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub struct VacuumBuilder {
dry_run: bool,
/// Override the source of time
clock: Option<Arc<dyn Clock>>,
/// Additional metadata to be added to commit
app_metadata: Option<HashMap<String, serde_json::Value>>,
}

/// Details for the Vacuum operation including which files were
Expand Down Expand Up @@ -136,6 +138,7 @@ impl VacuumBuilder {
enforce_retention_duration: true,
dry_run: false,
clock: None,
app_metadata: None,
}
}

Expand Down Expand Up @@ -164,6 +167,15 @@ impl VacuumBuilder {
self
}

/// Additional metadata to be added to commit info
pub fn with_metadata(
mut self,
metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
) -> Self {
self.app_metadata = Some(HashMap::from_iter(metadata));
self
}

/// Determine which files can be deleted. Does not actually peform the deletion
async fn create_vacuum_plan(&self) -> Result<VacuumPlan, VacuumError> {
let min_retention = Duration::milliseconds(
Expand Down Expand Up @@ -240,7 +252,7 @@ impl std::future::IntoFuture for VacuumBuilder {
}

let metrics = plan
.execute(this.log_store.as_ref(), &this.snapshot)
.execute(this.log_store.as_ref(), &this.snapshot, this.app_metadata)
.await?;
Ok((
DeltaTable::new_with_state(this.log_store, this.snapshot),
Expand Down Expand Up @@ -270,6 +282,7 @@ impl VacuumPlan {
self,
store: &dyn LogStore,
snapshot: &DeltaTableState,
app_metadata: Option<HashMap<String, serde_json::Value>>,
) -> Result<VacuumMetrics, DeltaTableError> {
if self.files_to_delete.is_empty() {
return Ok(VacuumMetrics {
Expand All @@ -295,8 +308,10 @@ impl VacuumPlan {

// Begin VACUUM START COMMIT
let mut commit_info = start_operation.get_commit_info();
let mut extra_info = HashMap::<String, Value>::new();

let mut extra_info = match app_metadata.clone() {
Some(meta) => meta,
None => HashMap::new(),
};
commit_info.timestamp = Some(Utc::now().timestamp_millis());
extra_info.insert(
"clientVersion".to_string(),
Expand Down Expand Up @@ -335,7 +350,11 @@ impl VacuumPlan {

// Begin VACUUM END COMMIT
let mut commit_info = end_operation.get_commit_info();
let mut extra_info = HashMap::<String, Value>::new();

let mut extra_info = match app_metadata.clone() {
Some(meta) => meta,
None => HashMap::new(),
};

commit_info.timestamp = Some(Utc::now().timestamp_millis());
extra_info.insert(
Expand Down
9 changes: 7 additions & 2 deletions crates/deltalake-core/tests/command_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box<dyn Error>> {
)
.await?;

let maybe_metrics = plan.execute(dt.log_store(), &dt.state, 1, 20, None).await;
let maybe_metrics = plan
.execute(dt.log_store(), &dt.state, 1, 20, None, None)
.await;

assert!(maybe_metrics.is_err());
assert_eq!(dt.version(), version + 1);
Expand Down Expand Up @@ -355,7 +357,9 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box<dyn Error>> {
)
.await?;

let metrics = plan.execute(dt.log_store(), &dt.state, 1, 20, None).await?;
let metrics = plan
.execute(dt.log_store(), &dt.state, 1, 20, None, None)
.await?;
assert_eq!(metrics.num_files_added, 1);
assert_eq!(metrics.num_files_removed, 2);

Expand Down Expand Up @@ -400,6 +404,7 @@ async fn test_commit_interval() -> Result<(), Box<dyn Error>> {
1,
20,
Some(Duration::from_secs(0)), // this will cause as many commits as num_files_added
None,
)
.await?;
assert_eq!(metrics.num_files_added, 2);
Expand Down
Loading
Loading