Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch to using scan_data version of scan for execute() as well #265

Merged
merged 14 commits into from
Jul 16, 2024

Conversation

nicklan
Copy link
Collaborator

@nicklan nicklan commented Jun 21, 2024

Previously we had two independent code paths for interacting with tables via a scan, execute() or scan_data().

scan_data is what most engines will use, and is a bit more complex in that it evaluates expressions over the returned add files. This meant that bugs like #261 could happen because our tests used execute which didn't catch the issue.

This PR makes execute use scan_data under the hood. It's a bit more complex, but now we won't need to maintain two code paths.

Until #271 merges, this PR will fail tests, because nullable columns are not filled in as expected.

@@ -242,7 +243,8 @@ impl DataSkippingFilter {
// 3. The selection evaluator does DISTINCT(col(predicate), 'false') to produce true (= keep) when
// the predicate is true/null and false (= skip) when the predicate is false.
let select_stats_evaluator = engine.get_expression_handler().get_evaluator(
stats_schema.clone(),
// safety: kernel is very broken if we don't have the schema for Add actions
get_log_schema().project(&[ADD_NAME]).unwrap(),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was incorrectly passing stats_schema as the input schema before. the input to this evaluator is the raw log data, so we want to add schema. it only worked before because we were ignoring the input schema

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I fully understand -- the Add schema has stats as a string (which happens to be a JSON object literal), so it would seem difficult to do data skipping over that? Or is the needed string-to-json parsing already injected somewhere else?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the input schema. This evaluator is called on the raw data read from the log, so we're asking for it to get add.stats out of the Add. STATS_EXPR is just Expr::column("add.stats"), and note the output is just a String.

Copy link

@scarman-db scarman-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGMT

kernel/src/engine/arrow_expression.rs Outdated Show resolved Hide resolved
kernel/src/scan/mod.rs Outdated Show resolved Hide resolved
Copy link
Collaborator

@hntd187 hntd187 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGMT but on the correct account this time

Comment on lines +262 to +263
scan_files =
state::visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remind me why we pass and return scan_files, instead of just passing a &mut scan_files?
(I have vague memories we discussed it at some point in the past, but I forgot the details)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inside visit_scan_files we construct a ScanFileVisitor which needs to store this context. Having it take ownership simplifies things significantly there (especially for FFI).

LMK if that's enough context, I can explain more deeply if needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: If this were a performance sensitive loop, passing and returning by value would defeat return value optimizations and cause object copying that the outparam (&mut) approach would not. But this is once per data chunk, so we're probably fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true. as you say, this isn't in a hot loop so I think it's okay.

@@ -242,7 +243,8 @@ impl DataSkippingFilter {
// 3. The selection evaluator does DISTINCT(col(predicate), 'false') to produce true (= keep) when
// the predicate is true/null and false (= skip) when the predicate is false.
let select_stats_evaluator = engine.get_expression_handler().get_evaluator(
stats_schema.clone(),
// safety: kernel is very broken if we don't have the schema for Add actions
get_log_schema().project(&[ADD_NAME]).unwrap(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I fully understand -- the Add schema has stats as a string (which happens to be a JSON object literal), so it would seem difficult to do data skipping over that? Or is the needed string-to-json parsing already injected somewhere else?

kernel/src/scan/mod.rs Outdated Show resolved Hide resolved
kernel/src/scan/mod.rs Outdated Show resolved Hide resolved
@nicklan nicklan force-pushed the unify-scan-execute-and-scan-data branch from 3f99297 to e993a0d Compare July 9, 2024 23:02
@nicklan nicklan requested review from scovich and sppalkia July 9, 2024 23:48
Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! Only nits left at this point.

dv_mask = rest;
results.push(scan_result);
}
let global_state = Arc::new(self.global_scan_state());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be an Arc at all any more? The only use of global_state is a call to clone, and the resulting gs is passed by shared reference to a helper method?

(I'm actually a bit surprised that &gs even compiles, since the method wants a &GlobalState, not &Arc<GlobalState> -- I guess that's impl Deref and/or impl AsRef at work?)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kernel/src/scan/mod.rs Show resolved Hide resolved
Comment on lines +262 to +263
scan_files =
state::visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: If this were a performance sensitive loop, passing and returning by value would defeat return value optimizations and cause object copying that the outparam (&mut) approach would not. But this is once per data chunk, so we're probably fine.

kernel/src/scan/mod.rs Show resolved Hide resolved
kernel/src/scan/mod.rs Outdated Show resolved Hide resolved
kernel/src/scan/mod.rs Show resolved Hide resolved
Copy link
Collaborator Author

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

kernel/src/scan/mod.rs Show resolved Hide resolved
Comment on lines +262 to +263
scan_files =
state::visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback)?;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true. as you say, this isn't in a hot loop so I think it's okay.

kernel/src/scan/mod.rs Show resolved Hide resolved
global_state.read_schema.clone(),
None,
)?;
let gs = global_state.clone();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct/necessary? We're now cloning the state itself (it used to be an arc). Can we not just pass &global_state at L296 below?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh, I now remember why I had it as an Arc. We have to move the read_result into the iter, since we return it, but doing move |read_result| moves any captured variable, and we can't move global_state into an FnMut.

I think the best option is actually to go back to using an Arc, so I will do that.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it have also worked to capture a reference, e.g. hoist the initialization of gs outside the iter? Because shared refs are clone+copy?

@nicklan nicklan merged commit e8c0226 into delta-io:main Jul 16, 2024
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants