-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Add constructor for Tablechanges
to get a CDF scan
#459
base: main
Are you sure you want to change the base?
[WIP] Add constructor for Tablechanges
to get a CDF scan
#459
Conversation
Log segment is going to be used by both snapshot and table changes. It makes sense to separate it into its own module
…nto snapshot_cleanup
Co-authored-by: Ryan Johnson <[email protected]>
This reverts commit 470446b.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking really good so far, are we starting with versions and adding timestamps later?
@@ -28,6 +28,8 @@ pub(crate) const METADATA_NAME: &str = "metaData"; | |||
pub(crate) const PROTOCOL_NAME: &str = "protocol"; | |||
pub(crate) const SET_TRANSACTION_NAME: &str = "txn"; | |||
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo"; | |||
#[allow(unused)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume when we're ready to go here this allow won't be needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I'll go ahead and add a comment
@@ -171,6 +171,9 @@ pub enum Error { | |||
/// The file already exists at the path, prohibiting a non-overwrite write | |||
#[error("File already exists: {0}")] | |||
FileAlreadyExists(String), | |||
|
|||
#[error("Change data feed is disabled in range: {0} to {}", _1.map_or("end".into(), |ver| ver.to_string()))] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error message has some confusing syntax, but I think we should clarify here. The error is not because it's disabled in the range (or the entire range perhaps), there is nothing wrong with doing that but that CDF doesn't allow you to disable it anywhere in the range and still work.
kernel/src/log_segment.rs
Outdated
self | ||
} | ||
pub(crate) fn with_end_version(mut self, version: Version) -> Self { | ||
let _ = self.end_version.insert(version); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just self.end_version = Some(version);
it's a bit more clear about what you are doing and you don't need the let _ =
syntax.
// We assume listing returned ordered. If `in_order_commit_files` is false, we want reverse order. | ||
if !in_order_commit_files { | ||
// We assume listing returned ordered, we want reverse order | ||
commit_files.reverse(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming listing returns reversed order, so you want just non-reversed order? Sorry nothing wrong (big tables might not like the perf here) but this just reads in a bit of a confusing way to me.
let version_prefix = format!("{:020}", begin_version); | ||
let start_from = log_root.join(&version_prefix)?; | ||
|
||
let mut max_checkpoint_version = version.map_or(-1, |x| x as i64); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is -1 valid?
let start_flag = start_snapshot.metadata().configuration.get(CDF_ENABLE_FLAG); | ||
let end_flag = end_snapshot.metadata().configuration.get(CDF_ENABLE_FLAG); | ||
|
||
// Verify CDF is enabled at the beginning and end of the interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe (correct me if I'm wrong) but CDF needs to remain enabled for the ENTIRE span of the versions they are requesting. I do not think it's enough to just validate the start and end.
kernel/src/table_changes/mod.rs
Outdated
let start_snapshot = | ||
Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?; | ||
let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?; | ||
println!("Start snapshot: {:?}", start_snapshot); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace with log messages eventually instead of print
// Get a log segment for the CDF range | ||
let fs_client = engine.get_file_system_client(); | ||
let mut builder = LogSegmentBuilder::new(fs_client, &table_root); | ||
builder = builder.with_start_version(start_version); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would change the builder here to allow it to take an optional end version so you can take the guarding out here and make it this one long call chain. It'll be a bit easier to read
Ok(TableChanges { | ||
snapshot: start_snapshot, | ||
log_segment, | ||
schema: end_snapshot.schema().clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the snapshot keeps a lot of the struct members anyways, why not just keep the ending snapshot instead of all the clones?
What changes are proposed in this pull request?
This PR introduces the
TableChanges
struct which represents a Change Data Feed scan.TableChanges
is constructed from aTable
, and performs 2 protocol and metadata scans. One is for the start version, and ensures that CDF is enabled at the beginning version. The second protocol and metadata scan is for the end version. This one is used to extract the schema at the end version and ensure that the final version has CDF enabled.We also introduce
LogSegment::replay_commits
, which returns an Iterator over each commit. For each commit, there is an iterator for all the actions in the commit. This will be useful for iterating over log files when performing the scan.Depends on: #457
How was this change tested?