Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch to using scan_data version of scan for execute() as well #265

Merged
merged 14 commits into from
Jul 16, 2024
4 changes: 2 additions & 2 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ fn try_main() -> DeltaResult<()> {
let scan_data = scan.scan_data(engine.as_ref())?;

// get any global state associated with this scan
let global_state = scan.global_scan_state();
let global_state = Arc::new(scan.global_scan_state());

// create the channels we'll use. record_batch_[t/r]x are used for the threads to send back the
// processed RecordBatches to themain thread
Expand Down Expand Up @@ -221,7 +221,7 @@ fn try_main() -> DeltaResult<()> {
// this is the work each thread does
fn do_work(
engine: Arc<dyn Engine>,
scan_state: GlobalScanState,
scan_state: Arc<GlobalScanState>,
record_batch_tx: Sender<RecordBatch>,
scan_file_rx: spmc::Receiver<ScanFile>,
) {
Expand Down
4 changes: 3 additions & 1 deletion kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use tracing::debug;

use crate::actions::visitors::SelectionVectorVisitor;
use crate::actions::{get_log_schema, ADD_NAME};
use crate::error::DeltaResult;
use crate::expressions::{BinaryOperator, Expression as Expr, UnaryOperator, VariadicOperator};
use crate::schema::{DataType, PrimitiveType, SchemaRef, StructField, StructType};
Expand Down Expand Up @@ -242,7 +243,8 @@ impl DataSkippingFilter {
// 3. The selection evaluator does DISTINCT(col(predicate), 'false') to produce true (= keep) when
// the predicate is true/null and false (= skip) when the predicate is false.
let select_stats_evaluator = engine.get_expression_handler().get_evaluator(
stats_schema.clone(),
// safety: kernel is very broken if we don't have the schema for Add actions
get_log_schema().project(&[ADD_NAME]).unwrap(),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was incorrectly passing stats_schema as the input schema before. the input to this evaluator is the raw log data, so we want to add schema. it only worked before because we were ignoring the input schema

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I fully understand -- the Add schema has stats as a string (which happens to be a JSON object literal), so it would seem difficult to do data skipping over that? Or is the needed string-to-json parsing already injected somewhere else?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the input schema. This evaluator is called on the raw data read from the log, so we're asking for it to get add.stats out of the Add. STATS_EXPR is just Expr::column("add.stats"), and note the output is just a String.

STATS_EXPR.clone(),
DataType::STRING,
);
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ lazy_static! {
// for `scan_row_schema` in scan/mod.rs! You'll also need to update ScanFileVisitor as the
// indexes will be off
pub(crate) static ref SCAN_ROW_SCHEMA: Arc<StructType> = Arc::new(StructType::new(vec!(
StructField::new("path", DataType::STRING, true),
StructField::new("path", DataType::STRING, false),
StructField::new("size", DataType::LONG, true),
StructField::new("modificationTime", DataType::LONG, true),
StructField::new("stats", DataType::STRING, true),
Expand Down
253 changes: 128 additions & 125 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
//! Functionality to create and execute scans (reads) over data stored in a delta table

use std::collections::HashMap;
use std::sync::Arc;

use itertools::Itertools;
use tracing::debug;
use url::Url;

use self::log_replay::{log_replay_iter, scan_action_iter};
use self::log_replay::scan_action_iter;
use self::state::GlobalScanState;
use crate::actions::deletion_vector::{split_vector, treemap_to_bools, DeletionVectorDescriptor};
use crate::actions::{get_log_schema, Add, ADD_NAME, REMOVE_NAME};
use crate::actions::{get_log_schema, ADD_NAME, REMOVE_NAME};
use crate::column_mapping::ColumnMappingMode;
use crate::expressions::{Expression, Scalar};
use crate::scan::state::{DvInfo, Stats};
use crate::schema::{DataType, Schema, SchemaRef, StructField, StructType};
use crate::snapshot::Snapshot;
use crate::{DeltaResult, Engine, EngineData, Error, FileMeta};
Expand Down Expand Up @@ -177,30 +179,6 @@ impl Scan {
&self.predicate
}

/// Get an iterator of Add actions that should be included in scan for a query. This handles
/// log-replay, reconciling Add and Remove actions, and applying data skipping (if possible)
pub(crate) fn files(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<Add>> + Send> {
let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?;
let checkpoint_read_schema = get_log_schema().project(&[ADD_NAME])?;

let log_iter = self.snapshot.log_segment.replay(
engine,
commit_read_schema,
checkpoint_read_schema,
self.predicate.clone(),
)?;

Ok(log_replay_iter(
engine,
log_iter,
&self.logical_schema,
&self.predicate,
))
}

/// Get an iterator of [`EngineData`]s that should be included in scan for a query. This handles
/// log-replay, reconciling Add and Remove actions, and applying data skipping (if
/// possible). Each item in the returned iterator is a tuple of:
Expand Down Expand Up @@ -256,99 +234,87 @@ impl Scan {
// This calls [`Scan::files`] to get a set of `Add` actions for the scan, and then uses the
// `engine`'s [`crate::ParquetHandler`] to read the actual table data.
pub fn execute(&self, engine: &dyn Engine) -> DeltaResult<Vec<ScanResult>> {
struct ScanFile {
path: String,
size: i64,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
}
fn scan_data_callback(
batches: &mut Vec<ScanFile>,
path: &str,
size: i64,
_: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
) {
batches.push(ScanFile {
path: path.to_string(),
size,
dv_info,
partition_values,
});
scovich marked this conversation as resolved.
Show resolved Hide resolved
}

debug!(
"Executing scan with logical schema {:#?} and physical schema {:#?}",
self.logical_schema, self.physical_schema
);
let output_schema = DataType::from(self.schema().clone());
let parquet_handler = engine.get_parquet_handler();

let mut results: Vec<ScanResult> = vec![];
let files = self.files(engine)?;
for add_result in files {
let add = add_result?;
let meta = FileMeta {
last_modified: add.modification_time,
size: add.size as usize,
location: self.snapshot.table_root.join(&add.path)?,
};

let read_results =
parquet_handler.read_parquet_files(&[meta], self.physical_schema.clone(), None)?;

let read_expression = if self.have_partition_cols
|| self.snapshot.column_mapping_mode != ColumnMappingMode::None
{
// Loop over all fields and create the correct expressions for them
let all_fields = self
.all_fields
.iter()
.map(|field| match field {
ColumnType::Partition(field_idx) => {
let field = self.logical_schema.fields.get_index(*field_idx).ok_or_else(|| {
Error::generic("logical schema did not contain expected field, can't execute scan")
})?.1;
let name = field.physical_name(self.snapshot.column_mapping_mode)?;
let value_expression = parse_partition_value(
add.partition_values.get(name),
field.data_type(),
)?;
Ok::<Expression, Error>(Expression::Literal(value_expression))
}
ColumnType::Selected(field_name) => Ok(Expression::column(field_name)),
})
.try_collect()?;
Some(Expression::Struct(all_fields))
} else {
None
};
debug!("Final expression for read: {read_expression:?}");

let dv_treemap = add
.deletion_vector
.as_ref()
.map(|dv_descriptor| {
let fs_client = engine.get_file_system_client();
dv_descriptor.read(fs_client, &self.snapshot.table_root)
})
.transpose()?;

let mut dv_mask = dv_treemap.map(treemap_to_bools);

for read_result in read_results {
let len = if let Ok(ref res) = read_result {
res.length()
} else {
0
};

let read_result = match read_expression {
Some(ref read_expression) => engine
.get_expression_handler()
.get_evaluator(
self.physical_schema.clone(),
read_expression.clone(),
output_schema.clone(),
)
.evaluate(read_result?.as_ref()),
None => {
// if we don't have partition columns, the result is just what we read
read_result
}
};

// need to split the dv_mask. what's left in dv_mask covers this result, and rest
// will cover the following results
let rest = split_vector(dv_mask.as_mut(), len, None);
let scan_result = ScanResult {
raw_data: read_result,
mask: dv_mask,
};
dv_mask = rest;
results.push(scan_result);
}
let global_state = Arc::new(self.global_scan_state());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be an Arc at all any more? The only use of global_state is a call to clone, and the resulting gs is passed by shared reference to a helper method?

(I'm actually a bit surprised that &gs even compiles, since the method wants a &GlobalState, not &Arc<GlobalState> -- I guess that's impl Deref and/or impl AsRef at work?)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let scan_data = self.scan_data(engine)?;
let mut scan_files = vec![];
for data in scan_data {
let (data, vec) = data?;
scan_files =
state::visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback)?;
Comment on lines +269 to +270
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remind me why we pass and return scan_files, instead of just passing a &mut scan_files?
(I have vague memories we discussed it at some point in the past, but I forgot the details)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inside visit_scan_files we construct a ScanFileVisitor which needs to store this context. Having it take ownership simplifies things significantly there (especially for FFI).

LMK if that's enough context, I can explain more deeply if needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: If this were a performance sensitive loop, passing and returning by value would defeat return value optimizations and cause object copying that the outparam (&mut) approach would not. But this is once per data chunk, so we're probably fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true. as you say, this isn't in a hot loop so I think it's okay.

scovich marked this conversation as resolved.
Show resolved Hide resolved
}
Ok(results)
scan_files
.into_iter()
.map(|scan_file| -> DeltaResult<_> {
let file_path = self.snapshot.table_root.join(&scan_file.path)?;
let mut selection_vector = scan_file
.dv_info
.get_selection_vector(engine, &self.snapshot.table_root)?;
let meta = FileMeta {
last_modified: 0,
size: scan_file.size as usize,
location: file_path,
};
let read_result_iter = engine.get_parquet_handler().read_parquet_files(
&[meta],
global_state.read_schema.clone(),
None,
)?;
let gs = global_state.clone(); // Arc clone
Ok(read_result_iter.into_iter().map(move |read_result| {
let read_result = read_result?;
// to transform the physical data into the correct logical form
let logical = transform_to_logical_internal(
engine,
read_result,
&gs,
&scan_file.partition_values,
&self.all_fields,
self.have_partition_cols,
);
let len = logical.as_ref().map_or(0, |res| res.length());
// need to split the dv_mask. what's left in dv_mask covers this result, and rest
// will cover the following results. we `take()` out of `selection_vector` to avoid
// trying to return a captured variable. We're going to reassign `selection_vector`
// to `rest` in a moment anyway
let mut sv = selection_vector.take();
let rest = split_vector(sv.as_mut(), len, None);
let result = ScanResult {
raw_data: logical,
mask: sv,
};
selection_vector = rest;
Ok(result)
}))
})
.flatten_ok()
.try_collect()?
}
}

Expand Down Expand Up @@ -438,17 +404,39 @@ pub fn selection_vector(
Ok(treemap_to_bools(dv_treemap))
}

/// Transform the raw data read from parquet into the correct logical form, based on the provided
/// global scan state and partition values
pub fn transform_to_logical(
engine: &dyn Engine,
data: Box<dyn EngineData>,
global_state: &GlobalScanState,
partition_values: &std::collections::HashMap<String, String>,
partition_values: &HashMap<String, String>,
) -> DeltaResult<Box<dyn EngineData>> {
let (all_fields, _read_fields, have_partition_cols) = get_state_info(
&global_state.logical_schema,
&global_state.partition_columns,
global_state.column_mapping_mode,
)?;
transform_to_logical_internal(
engine,
data,
global_state,
partition_values,
&all_fields,
have_partition_cols,
)
}

// We have this function because `execute` can save `all_fields` and `have_partition_cols` in the
// scan, and then reuse them for each batch transform
fn transform_to_logical_internal(
engine: &dyn Engine,
data: Box<dyn EngineData>,
global_state: &GlobalScanState,
partition_values: &std::collections::HashMap<String, String>,
scovich marked this conversation as resolved.
Show resolved Hide resolved
all_fields: &[ColumnType],
have_partition_cols: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let read_schema = global_state.read_schema.clone();
if have_partition_cols || global_state.column_mapping_mode != ColumnMappingMode::None {
// need to add back partition cols and/or fix-up mapped columns
Expand Down Expand Up @@ -596,8 +584,29 @@ mod tests {
use crate::schema::PrimitiveType;
use crate::Table;

fn get_files_for_scan(scan: Scan, engine: &dyn Engine) -> DeltaResult<Vec<String>> {
let scan_data = scan.scan_data(engine)?;
fn scan_data_callback(
paths: &mut Vec<String>,
path: &str,
_size: i64,
_: Option<Stats>,
dv_info: DvInfo,
_partition_values: HashMap<String, String>,
) {
paths.push(path.to_string());
assert!(dv_info.deletion_vector.is_none());
}
let mut files = vec![];
for data in scan_data {
let (data, vec) = data?;
files = state::visit_scan_files(data.as_ref(), &vec, files, scan_data_callback)?;
}
Ok(files)
}

#[test]
fn test_scan_files() {
fn test_scan_data_paths() {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
Expand All @@ -606,14 +615,12 @@ mod tests {
let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let scan = snapshot.into_scan_builder().build().unwrap();
let files: Vec<Add> = scan.files(&engine).unwrap().try_collect().unwrap();

let files = get_files_for_scan(scan, &engine).unwrap();
assert_eq!(files.len(), 1);
assert_eq!(
&files[0].path,
files[0],
"part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet"
);
assert!(&files[0].deletion_vector.is_none());
}

#[test]
Expand Down Expand Up @@ -689,8 +696,7 @@ mod tests {
let table = Table::new(url);
let snapshot = table.snapshot(&engine, None)?;
let scan = snapshot.into_scan_builder().build()?;
let files: Vec<DeltaResult<Add>> = scan.files(&engine)?.collect();

let files = get_files_for_scan(scan, &engine)?;
// test case:
//
// commit0: P and M, no add/remove
Expand All @@ -701,10 +707,7 @@ mod tests {
//
// thus replay should produce only file-70b
assert_eq!(
files
.into_iter()
.map(|file| file.unwrap().path)
.collect::<Vec<_>>(),
files,
vec!["part-00000-70b1dcdf-0236-4f63-a072-124cdbafd8a0-c000.snappy.parquet"]
);
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct GlobalScanState {
/// this struct can be used by an engine to materialize a selection vector
#[derive(Debug)]
pub struct DvInfo {
deletion_vector: Option<DeletionVectorDescriptor>,
pub(crate) deletion_vector: Option<DeletionVectorDescriptor>,
}

/// Give engines an easy way to consume stats
Expand Down
Loading