Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: checkpoint error with Azure Synapse #1848

Merged
merged 1 commit into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/deltalake-core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,6 @@ mod tests {
.with_update("value", col("value") + lit(1))
.await
.unwrap();

assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);
assert_eq!(metrics.num_added_files, 1);
Expand Down
35 changes: 34 additions & 1 deletion crates/deltalake-core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,18 @@ pub struct CheckPoint {
pub(crate) version: i64, // 20 digits decimals
/// The number of actions that are stored in the checkpoint.
pub(crate) size: i64,
#[serde(skip_serializing_if = "Option::is_none")]
/// The number of fragments if the last checkpoint was written in multiple parts. This field is optional.
pub(crate) parts: Option<u32>, // 10 digits decimals
#[serde(skip_serializing_if = "Option::is_none")]
/// The number of bytes of the checkpoint. This field is optional.
pub(crate) size_in_bytes: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
/// The number of AddFile actions in the checkpoint. This field is optional.
pub(crate) num_of_add_files: Option<i64>,
}

#[derive(Default)]
/// Builder for CheckPoint
pub struct CheckPointBuilder {
/// Delta table version
Expand Down Expand Up @@ -118,7 +122,7 @@ impl CheckPoint {
Self {
version,
size,
parts,
parts: parts.or(None),
size_in_bytes: None,
num_of_add_files: None,
}
Expand Down Expand Up @@ -909,6 +913,35 @@ mod tests {
drop(tmp_dir);
}

#[tokio::test]
async fn checkpoint_without_added_files_and_no_parts() {
let (dt, tmp_dir) = create_test_table().await;
let check_point = CheckPointBuilder::new(0, 0).build();
let checkpoint_data_paths = dt.get_checkpoint_data_paths(&check_point);
assert_eq!(checkpoint_data_paths.len(), 1);
assert_eq!(
serde_json::to_string(&check_point).unwrap(),
"{\"version\":0,\"size\":0}"
);
drop(tmp_dir);
}

#[tokio::test]
async fn checkpoint_with_added_files() {
let num_of_file_added: i64 = 4;
let (dt, tmp_dir) = create_test_table().await;
let check_point = CheckPointBuilder::new(0, 0)
.with_num_of_add_files(num_of_file_added)
.build();
let checkpoint_data_paths = dt.get_checkpoint_data_paths(&check_point);
assert_eq!(checkpoint_data_paths.len(), 1);
assert_eq!(
serde_json::to_string(&check_point).unwrap(),
"{\"version\":0,\"size\":0,\"num_of_add_files\":4}"
);
drop(tmp_dir);
}

#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
#[test]
fn normalize_table_uri_s3() {
Expand Down
Loading