-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixing snapshot creation for earlier versions than the latest checkpoint #322
base: main
Are you sure you want to change the base?
Conversation
This commit introduces a new unit test to verify that the Delta table implementation can correctly build a snapshot at a version that is earlier than the latest checkpoint. Specifically, it: - Tests snapshot creation at version 10 when later checkpoints exist - Adds delta dataset with multiple checkpoints as test data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a great test, thanks!
Do I understand correctly, based on the test output, that snapshot creation just plain fails today, because the listing doesn't even find the older files the snapshot needs?
(we need to fix the formatting issues before merging)
kernel/src/snapshot.rs
Outdated
// assert_eq!( | ||
// checkpoint_meta.size_in_bytes, | ||
// Some(21857), | ||
// "Checkpoint size in bytes should be 21857" | ||
// ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why commented out, out of curiosity? Is the byte count wrong and needs to be updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm getting a None
result for checkpoint_meta.size_in_bytes
; hence, the test is failing. I commented the code so that I first focus on the snapshot versioning bug! I'm looking into this one, too, now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The root cause of the issue is a mismatch between the casing conventions used in the JSON data and the struct definition. The struct is expecting camelCase (e.g., sizeInBytes
) due to the #[serde(rename_all = "camelCase")]
attribute, but the actual JSON data is using snake_case (size_in_bytes
).
Your short-term fix of adding an alias is a good temporary solution:
#[serde(alias = "size_in_bytes")]
pub(crate) size_in_bytes: Option<i64>,
However, for a long-term solution, we need to address the inconsistency between the Delta protocol specification and the implementation in delta-rs (I built my test data from delta-rs,
assuming that the other test data was also created using the same, including the last_checkpoint file).
According to the Delta protocol documentation, the last checkpoint file schema should indeed use camelCase for field names. The fact that delta-rs is writing the metadata in snake_case suggests a deviation from the protocol specification.
The long-term solution should involve:
- Align the delta-rs implementation with the Delta protocol specification by ensuring the fields in the last checkpoint file are written using camelCase field names.
- Updating the
CheckpointMetadata
struct to expect camelCase without needing aliases. - If any backward compatibility is required, consider implementing a more robust deserialization to handle camelCase and snake_case variants.
It would be worth investigating why delta-rs is writing the metadata in snake_case contrary to the protocol specification.
What are your thoughts @scovich ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a new issue to move this conversation #326 . Would love to know your thoughts on the issue link.
Yes, you're right! Currently, the code only considers the latest checkpoint and doesn't look into the older checkpoints, which leads to this issue. Also, I would love to know your thoughts on this issue too: Enroute sending the patch for the Snapshot issue along with the suggested formatting fixes. |
- Added serde alias for 'size_in_bytes' field in CheckpointMetadata struct - This allows deserialization of both camelCase and snake_case variants - Addresses issue with inconsistent field naming in _last_checkpoint file This is a temporary workaround for the issue described in delta-incubator#326. The long-term solution will involve aligning the checkpoint writing logic with the Delta protocol specification to use camelCase field names consistently. See delta-incubator#326 for full details.
- Added commit files for versions 25, 26, 27, and 28 to the multiple-checkpoint test dataset - Last checkpoint remains at version 24 - Purpose: Enable testing of snapshot creation for versions between the last checkpoint and the latest commit This change allows us to test scenarios where: 1. A snapshot is requested for a version after the last checkpoint 2. The behavior of version selection when commits exist beyond the last checkpoint 3. The correct handling of file listing and filtering for versions between checkpoints and the latest commit These additions will help ensure the snapshot creation logic correctly handles various version scenarios, particularly focusing on the interaction between checkpoints and subsequent commits.
This commit introduces a new unit test 'test_snapshot_with_version_after_last_checkpoint' to verify correct snapshot behavior when requesting a version that is after the last checkpoint but not the latest commit. Test data state: - Located in ./tests/data/multiple-checkpoint/ - Contains commits up to version 28 - Last checkpoint is at version 24 - Requested snapshot version is 26 The test ensures: 1. Snapshot creation succeeds for version 26 2. Correct commit files are included (versions 25 and 26) 3. Older commits are excluded (version 24 and earlier) 4. Newer commits are excluded (versions 27 and 28) 5. The correct checkpoint file (version 24) is used 6. The effective version of the snapshot is set correctly This test improves coverage of the snapshot creation logic, particularly for cases where the requested version falls between the last checkpoint and the latest commit.
This commit updates the snapshot creation process to more efficiently utilize the last checkpoint information. Key changes include: 1. Streamlined logic for determining which log files to list based on the presence of a checkpoint and the requested version. 2. Use checkpoint data to list files when available, regardless of the requested version, allowing for more efficient file retrieval. 3. Fall back to listing all log files when no checkpoint is found. This approach optimizes file reading operations, particularly for tables with long histories, while maintaining correct behavior for all version request scenarios. The subsequent filtering of commits based on the requested version remains unchanged, ensuring accurate snapshot creation.
This commit introduces a new unit test: 'test_snapshot_at_latest_checkpoint_version'. The test verifies that: 1. Snapshot creation succeeds when requesting the exact version of the latest checkpoint. 2. The created snapshot has the correct version. 3. The appropriate checkpoint file is used. 4. No commit files after the checkpoint version are included. 5. The effective version matches the checkpoint version. This test covers an important edge case in snapshot creation, ensuring correct behavior when the requested version aligns exactly with the latest checkpoint. It complements existing tests and improves coverage of the snapshot creation logic.
This commit updates the `list_log_files_with_checkpoint` function to incorporate version filtering, previously handled in `try_new`. Changes include: 1. Add `requested_version: Option<Version>` parameter to `list_log_files_with_checkpoint`. 2. Implement version filtering logic within the commit file selection process. 3. Remove redundant version filtering from `try_new`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for this. Had one basic question.
kernel/src/snapshot.rs
Outdated
@@ -152,12 +152,32 @@ impl Snapshot { | |||
(Some(cp), None) => { | |||
list_log_files_with_checkpoint(&cp, fs_client.as_ref(), &log_url)? | |||
} | |||
(Some(cp), Some(version)) if cp.version >= version => { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kernel/src/snapshot.rs
Outdated
(Some(cp), Some(version)) if cp.version <= version => { | ||
list_log_files_with_checkpoint(&cp, fs_client.as_ref(), &log_url)? | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note also that the tests fail on mac. |
- Merged list_log_files and list_log_files_with_checkpoint into a single function - Enhanced file filtering to correctly handle checkpoint boundaries - Updated test cases to cover all scenarios, including: * Initial commits without checkpoints * Checkpoint versions * Versions between checkpoints * Accumulating commits after checkpoints - Added detailed comments explaining each test case - Improved handling of requested versions at or near checkpoint versions - Optimized file sorting and filtering for better performance This refactor simplifies the codebase, improves test coverage, and ensures correct behavior for all Delta log file listing scenarios, particularly around checkpoint boundaries.
Fixed. |
- Optimize file selection based on checkpoints and requested versions - Ensure correct handling of commit files and checkpoints - Improve efficiency by leveraging most recent checkpoints - Add logic to handle cases before and after checkpoints
Created a unified It would be super helpful if you could review my assumptions about the correct behavior by examining the test cases. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking a stab at this! I had some ideas on how we might make the code simpler, PTAL.
kernel/src/snapshot.rs
Outdated
The logic ensures that: | ||
a) We never include commit files beyond the requested version. | ||
b) We use the most recent checkpoint as a starting point for efficiency. | ||
c) We include the checkpoint version file only if it's explicitly requested. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate what this means? Is it related to 1/ and 3/ examples above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this explanation is a overkill, removed it.
kernel/src/snapshot.rs
Outdated
) -> DeltaResult<(Vec<FileMeta>, Vec<FileMeta>)> { | ||
let version_prefix = format!("{:020}", 0); | ||
let start_from = log_root.join(&version_prefix)?; | ||
let start_from = log_root.join(&format!("{:020}", 0))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to always start listing from 0? Shouldn't we prefer to start from last checkpoint version when possible?
kernel/src/snapshot.rs
Outdated
// If no requested version, try to get the last checkpoint | ||
let last_checkpoint = if requested_version.is_none() { | ||
read_last_checkpoint(fs_client, log_root)? | ||
} else { | ||
None | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't a last checkpoint hint still useful as long as it's not after the requested version?
// If no requested version, try to get the last checkpoint | |
let last_checkpoint = if requested_version.is_none() { | |
read_last_checkpoint(fs_client, log_root)? | |
} else { | |
None | |
}; | |
// Try to get the last checkpoint, and use it if not beyond the requested version. | |
let last_checkpoint = read_last_checkpoint(fs_client, log_root) | |
.filter(|hint| !requested_version.is_some_and(|version| version < hint.version)); | |
let start_from = last_checkpoint.unwrap_or(0u64); |
(note: Boolean negation is hard to read. Option.is_some_or would be better here, but it's not stable rust yet)
.filter(|hint| requested_version.is_none_or(|requested| hint.version <= requested));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, made the change to use the hint the requested version is more than the checkpoint version.
kernel/src/snapshot.rs
Outdated
// If we're using last_checkpoint and we've found the next checkpoint, stop | ||
if requested_version.is_none() | ||
&& last_checkpoint.is_some() | ||
&& file_version > last_checkpoint.as_ref().unwrap().version as i64 | ||
{ | ||
next_checkpoint_version = Some(file_version); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I understand this part of the code. Why would the presence of a last checkpoint hint decide whether we keep iterating or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the original code, I think it was missing adequate version checks, and also didn't truncate the existing commit file list after finding a new checkpoint? Something like this, perhaps?
for maybe_meta in fs_client.list_from(&start_from)? {
...
// NOTE: Because our listing hint was for a version file, the listing returns all version
// files before it returns any non-version file. So we should only continue if the current
// file has a version, and that version is not beyond the requested version (if any).
let file_version = match log_path.version {
Some(v) if !requested_version.is_some_and(|requested| requested < v) => v,
_ => break,
};
// A file version above the current checkpoint version needs special handling (see below)
let beyond_current_checkpoint = !last_checkpoint_version.is_some_and(|cv| cv == file_version);
// NOTE: v.checkpoint.parquet conveniently appears before v.json in the listing
if log_path.is_checkpoint {
if beyond_current_checkpoint {
// First file of a new checkpoint - start over
//
// TODO: We can't trust a new multi-part checkpoint until we prove it's complete. We
// must keep the previous checkpoint until we've seen all files for the new checkpoint.
last_checkpoint_version = Some(file_version);
current_version = Some(file_version);
checkpoint_files.clear();
commit_files.clear();
}
checkpoint_files.push(meta.clone());
} else if log_path.is_commit {
// Only include commits _after_ the current checkpoint. This may produce an
// empty commit file list, if the snapshot version we chose has a checkpoint.
if beyond_current_checkpoint {
// Blow up if this file presents a version gap:
// OK - [0.json] (first commit)
// BAD - [4.json] (missing checkpoint)
// OK - [4.checkpoint.parquet, 5.json] (first commit after checkpoint)
// BAD - [4.checkpoint.parquet, 6.json] (missing commit)
// OK - [..., 4.json, 5.json] (successive commits)
// BAD - [..., 4.json, 6.json] (missing commit)
match current_version {
Some(v) if v + 1 < file_version => /* missing commit error! */,
None if 0 < file_version => /* missing checkpoint error! */,
_ => (),
}
current_version = Some(file_version);
commit_files.push(meta.clone())
}
}
}
// Commit files are already filtered, but need to be sorted newest-first
commit_files.sort_unstable_by(|a, b| b.location.cmp(&a.location));
// TODO: Filter out all but one complete checkpoint, in case e.g. client A wrote
// a single-part checkpoint and client B also wrote a multi-part checkpoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just getting way too messy to handle all the scenarios, especially with multiple-multipart-multi-writer checkpointing!
I guess tonight I'll go for a quick implementation of group iterator from https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaLogGroupingIterator.scala ! I'll turn around in a few hours.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thought about the grouping iterator: The scala code returns a (Version, Seq[FileStatus])
tuple. But (a) that's not super readable and (b) most versions won't have checkpoints. It might be worth creating an actual struct instead, so we can name the fields and avoid instantiating a bunch of single-element vectors?
struct VersionFiles {
version: Version,
commitFile: Option<LogPath>,
checkpointFiles: Vec<LogPath>,
}
kernel/src/snapshot.rs
Outdated
|
||
// Stop if we've passed the requested version | ||
if let Some(req_version) = requested_version { | ||
if file_version > req_version as i64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any idea why the existing code was casting from u64 to i64 (which this new code seems to follow)?
I suspect it's because 0
is signed by default, and that passing 0u64
would eliminate type mismatch issues?
@@ -546,4 +659,646 @@ mod tests { | |||
Some(3) | |||
); | |||
} | |||
|
|||
#[test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside: @nicklan -- This file is getting pretty large. I wonder what the rust best practice is for where unit tests should live?
My experience from non-rust projects is that bulky unit tests should generally live in separate (test-only) source files -- especially tests that only use public interfaces. In-file testing would make more sense for exercising private/internal code that isn't accessible outside the source file.
- Correct explanation for requested version matching checkpoint - Clarify that both commit and checkpoint files are included - Align comment with existing test cases and implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going for a quick implementation of a group iterator: https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaLogGroupingIterator.scala. It makes it much easier to handle the multipart concurrent write scenarios; handling all the corners is getting too messy! @scovich. I will turn around soon!
kernel/src/snapshot.rs
Outdated
The logic ensures that: | ||
a) We never include commit files beyond the requested version. | ||
b) We use the most recent checkpoint as a starting point for efficiency. | ||
c) We include the checkpoint version file only if it's explicitly requested. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this explanation is a overkill, removed it.
kernel/src/snapshot.rs
Outdated
// If no requested version, try to get the last checkpoint | ||
let last_checkpoint = if requested_version.is_none() { | ||
read_last_checkpoint(fs_client, log_root)? | ||
} else { | ||
None | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, made the change to use the hint the requested version is more than the checkpoint version.
kernel/src/snapshot.rs
Outdated
// If we're using last_checkpoint and we've found the next checkpoint, stop | ||
if requested_version.is_none() | ||
&& last_checkpoint.is_some() | ||
&& file_version > last_checkpoint.as_ref().unwrap().version as i64 | ||
{ | ||
next_checkpoint_version = Some(file_version); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just getting way too messy to handle all the scenarios, especially with multiple-multipart-multi-writer checkpointing!
I guess tonight I'll go for a quick implementation of group iterator from https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaLogGroupingIterator.scala ! I'll turn around in a few hours.
Fair enough. I was hoping we could just work in a simple fix for the immediate problem before tackling the full solution, but at some point it's no longer "simple." Hopefully the grouping iterator makes life easier all around. |
This commit introduces the DeltaLogGroupingIterator, a crucial component for processing Delta Lake log files. The iterator groups log files into checkpoint nodes, handling various scenarios including single-part checkpoints, multi-part checkpoints, and commits without checkpoints. Key features and improvements: 1. Efficient sorting and processing of log files: - Files are sorted by version and type (checkpoints before commits) - Handles version gaps and ensures proper sequencing of files 2. Flexible checkpoint handling: - Supports both single-part and multi-part checkpoints - Correctly groups multi-part checkpoint files - Detects and reports incomplete multi-part checkpoints 3. Robust error handling: - Detects and reports version gaps in the log - Ensures the log starts from version 0 when required - Reports incomplete multi-part checkpoints 4. Memory-efficient linked list structure: - Uses Rc<RefCell<>> for shared ownership and interior mutability - Allows for easy traversal of the log structure 5. Iterator implementation: - Provides a standard Rust iterator interface for easy consumption of log data
This commit enhances the LogPath struct with new functionality to handle multi-part checkpoint files in Delta Lake log processing. Two new methods have been added to improve the identification and parsing of multi-part checkpoint files: 1. is_multi_part_checkpoint(): - Determines if a file is a multi-part checkpoint - Handles both single-part and multi-part checkpoint file formats - Returns a boolean indicating if the file is a multi-part checkpoint 2. get_checkpoint_part_numbers(): - Extracts part number and total parts for multi-part checkpoints - Returns Option<(u64, u64)> representing (part_number, total_parts) - Returns None for single-part checkpoints or non-checkpoint files Key improvements: - Robust parsing of checkpoint filenames - Clear distinction between single-part and multi-part checkpoints - Efficient extraction of part information from multi-part checkpoints
- Introduce new Error variant for invalid Delta Log structures - Improve error reporting for log processing issues - Supports recent changes in DeltaLogGroupingIterator and LogPath
- Replace manual file processing with DeltaLogGroupingIterator - Improve handling of multi-part checkpoints and version requests - Enhance error handling for invalid Delta log structures - Optimize file filtering and sorting for different scenarios - Update comments to explain complex logic and edge cases - Maintain backwards compatibility with existing test cases
hey @hackintoshrao we are prioritizing #344 which should unblock this work, sorry for the delay! |
@hackintoshrao #344 finally merged which hopefully unblocks you here |
Implement DeltaLogGroupingIterator for Efficient Snapshot Creation
This PR implements the DeltaLogGroupingIterator, significantly improving the snapshot creation process. This new approach offers a more efficient and robust method for handling Delta log files, particularly tables with multiple checkpoints and versions.
Key Improvements:
DeltaLogGroupingIterator Implementation:
Linked List Structure:
The Delta log is now represented as a linked list of checkpoint nodes:
Node 1 (No Checkpoint)
->Node 2 (Checkpoint A)
->Node 3 (Checkpoint B)
-> ...Each node contains:
Memory Efficiency:
Improved Version Handling:
a) Requested version after the last checkpoint
b) Requested version between checkpoints
c) Requested version before any checkpoint