Skip to content

Commit

Permalink
adjust the projection statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun4515 committed Aug 28, 2023
1 parent ea9144e commit 5aaa677
Showing 1 changed file with 92 additions and 14 deletions.
106 changes: 92 additions & 14 deletions datafusion/core/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ impl ExecutionPlan for ProjectionExec {
stats_projection(
self.input.statistics(),
self.expr.iter().map(|(e, _)| Arc::clone(e)),
self.schema.clone(),
)
}
}
Expand Down Expand Up @@ -395,9 +396,13 @@ fn get_field_metadata(
fn stats_projection(
stats: Statistics,
exprs: impl Iterator<Item = Arc<dyn PhysicalExpr>>,
schema: SchemaRef,
) -> Statistics {
let inner_exprs = exprs.map(|v| v).collect::<Vec<_>>();
let column_statistics = stats.column_statistics.map(|input_col_stats| {
exprs
inner_exprs
.clone()
.into_iter()
.map(|e| {
if let Some(col) = e.as_any().downcast_ref::<Column>() {
input_col_stats[col.index()].clone()
Expand All @@ -410,12 +415,39 @@ fn stats_projection(
.collect()
});

Statistics {
is_exact: stats.is_exact,
num_rows: stats.num_rows,
column_statistics,
// TODO stats: knowing the type of the new columns we can guess the output size
total_byte_size: None,
let primitive_row_size = inner_exprs
.clone()
.into_iter()
.map(|e| match e.data_type(schema.as_ref()) {
Ok(data_type) => data_type.primitive_width(),
Err(_) => None,
})
.fold(Some(0usize), |init, v| match (init, v) {
(Some(l), Some(r)) => Some(l + r),
_ => None,
});

match (primitive_row_size, stats.num_rows) {
(Some(row_size), Some(row_count)) => {
Statistics {
is_exact: stats.is_exact,
num_rows: stats.num_rows,
column_statistics,
// Use the row_size * row_count as the total byte size
total_byte_size: Some(row_size * row_count),
}
}
_ => {
Statistics {
is_exact: stats.is_exact,
num_rows: stats.num_rows,
column_statistics,
// TODO stats: knowing the type of the new columns we can guess the output size
// If we can't get the exact statistics for the project
// Before we get the exact result, we just use the child status
total_byte_size: stats.total_byte_size,
}
}
}
}

Expand Down Expand Up @@ -479,12 +511,12 @@ impl RecordBatchStream for ProjectionStream {

#[cfg(test)]
mod tests {

use super::*;
use crate::physical_plan::common::collect;
use crate::physical_plan::expressions::{self, col};
use crate::test::{self};
use crate::test_util;
use arrow_schema::DataType;
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::binary;
Expand Down Expand Up @@ -591,9 +623,8 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_stats_projection_columns_only() {
let source = Statistics {
fn get_stats() -> Statistics {
Statistics {
is_exact: true,
num_rows: Some(5),
total_byte_size: Some(23),
Expand All @@ -617,19 +648,31 @@ mod tests {
null_count: None,
},
]),
};
}
}

fn get_schema() -> Schema {
let field_0 = Field::new("col0", DataType::Int64, false);
let field_1 = Field::new("col1", DataType::Utf8, false);
let field_2 = Field::new("col2", DataType::Float32, false);
Schema::new(vec![field_0, field_1, field_2])
}
#[tokio::test]
async fn test_stats_projection_columns_only() {
let source = get_stats();
let schema = get_schema();

let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(expressions::Column::new("col1", 1)),
Arc::new(expressions::Column::new("col0", 0)),
];

let result = stats_projection(source, exprs.into_iter());
let result = stats_projection(source, exprs.into_iter(), Arc::new(schema));

let expected = Statistics {
is_exact: true,
num_rows: Some(5),
total_byte_size: None,
total_byte_size: Some(23),
column_statistics: Some(vec![
ColumnStatistics {
distinct_count: Some(1),
Expand All @@ -648,4 +691,39 @@ mod tests {

assert_eq!(result, expected);
}

#[tokio::test]
async fn test_stats_projection_column_with_primitive_width_only() {
let source = get_stats();
let schema = get_schema();

let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(expressions::Column::new("col2", 2)),
Arc::new(expressions::Column::new("col0", 0)),
];

let result = stats_projection(source, exprs.into_iter(), Arc::new(schema));

let expected = Statistics {
is_exact: true,
num_rows: Some(5),
total_byte_size: Some(60),
column_statistics: Some(vec![
ColumnStatistics {
distinct_count: None,
max_value: Some(ScalarValue::Float32(Some(1.1))),
min_value: Some(ScalarValue::Float32(Some(0.1))),
null_count: None,
},
ColumnStatistics {
distinct_count: Some(5),
max_value: Some(ScalarValue::Int64(Some(21))),
min_value: Some(ScalarValue::Int64(Some(-4))),
null_count: Some(0),
},
]),
};

assert_eq!(result, expected);
}
}

0 comments on commit 5aaa677

Please sign in to comment.