Skip to content

Commit

Permalink
test: add DeltaScan metric tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alexwilcoxson-rel committed Jun 27, 2024
1 parent b082d3d commit 5cf4a3e
Showing 1 changed file with 41 additions and 0 deletions.
41 changes: 41 additions & 0 deletions crates/core/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ mod local {
#[derive(Debug, Default)]
pub struct ExecutionMetricsCollector {
scanned_files: HashSet<Label>,
pub skip_count: usize,
pub keep_count: usize,
}

impl ExecutionMetricsCollector {
Expand All @@ -83,6 +85,15 @@ mod local {
if let Some(exec) = plan.as_any().downcast_ref::<ParquetExec>() {
let files = get_scanned_files(exec);
self.scanned_files.extend(files);
} else if let Some(exec) = plan.as_any().downcast_ref::<DeltaScan>() {
self.keep_count = exec
.metrics()
.and_then(|m| m.sum_by_name("files_scanned").map(|v| v.as_usize()))
.unwrap_or_default();
self.skip_count = exec
.metrics()
.and_then(|m| m.sum_by_name("files_pruned").map(|v| v.as_usize()))
.unwrap_or_default();
}
Ok(true)
}
Expand Down Expand Up @@ -440,6 +451,10 @@ mod local {
let task_ctx = Arc::new(TaskContext::from(state));
let _result = collect(plan.execute(0, task_ctx)?).await?;
visit_execution_plan(&plan, &mut metrics).unwrap();
} else {
// if scan produces no output from ParquetExec, we still want to visit DeltaScan
// to check its metrics
visit_execution_plan(scan.as_ref(), &mut metrics).unwrap();
}

Ok(metrics)
Expand Down Expand Up @@ -616,6 +631,8 @@ mod local {

let metrics = get_scan_metrics(&table, &state, &[]).await?;
assert_eq!(metrics.num_scanned_files(), 3);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 0);

// (Column name, value from file 1, value from file 2, value from file 3, non existent value)
let tests = [
Expand Down Expand Up @@ -662,25 +679,33 @@ mod local {
let e = col(column).eq(file1_value.clone());
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 2);

// Value does not exist
let e = col(column).eq(non_existent_value.clone());
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 0);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 3);

// Conjunction
let e = col(column)
.gt(file1_value.clone())
.and(col(column).lt(file2_value.clone()));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 2);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 1);

// Disjunction
let e = col(column)
.lt(file1_value.clone())
.or(col(column).gt(file3_value.clone()));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 2);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 1);
}

// Validate Boolean type
Expand All @@ -692,10 +717,14 @@ mod local {
let e = col("boolean").eq(lit(true));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 1);

let e = col("boolean").eq(lit(false));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 1);

let tests = [
TestCase::new_wrapped("utf8", |value| lit(value.to_string())),
Expand Down Expand Up @@ -762,23 +791,31 @@ mod local {
let e = col(column).eq(file1_value.clone());
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 8);

// Value does not exist
let e = col(column).eq(non_existent_value);
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 0);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 9);

// Conjunction
let e = col(column)
.gt(file1_value.clone())
.and(col(column).lt(file2_value));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 2);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 7);

// Disjunction
let e = col(column).lt(file1_value).or(col(column).gt(file3_value));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 2);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 7);

// TODO how to get an expression with the right datatypes eludes me ..
// Validate null pruning
Expand Down Expand Up @@ -808,10 +845,14 @@ mod local {
let e = col("boolean").eq(lit(true));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 1);

let e = col("boolean").eq(lit(false));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
assert_eq!(metrics.skip_count, 1);

// Ensure that tables without stats and partition columns can be pruned for just partitions
// let table = open_table("./tests/data/delta-0.8.0-null-partition").await?;
Expand Down

0 comments on commit 5cf4a3e

Please sign in to comment.