Skip to content

Commit

Permalink
[minior fix]: adjust the projection statistics (#7428)
Browse files Browse the repository at this point in the history
* adjust the projection statistics

* update the sql test case

* fix clippy
  • Loading branch information
liukun4515 authored Aug 31, 2023
1 parent 6b618d2 commit 58fc80e
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 67 deletions.
102 changes: 88 additions & 14 deletions datafusion/core/src/physical_plan/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ impl ExecutionPlan for ProjectionExec {
stats_projection(
self.input.statistics(),
self.expr.iter().map(|(e, _)| Arc::clone(e)),
self.schema.clone(),
)
}
}
Expand Down Expand Up @@ -395,9 +396,13 @@ fn get_field_metadata(
fn stats_projection(
stats: Statistics,
exprs: impl Iterator<Item = Arc<dyn PhysicalExpr>>,
schema: SchemaRef,
) -> Statistics {
let inner_exprs = exprs.collect::<Vec<_>>();
let column_statistics = stats.column_statistics.map(|input_col_stats| {
exprs
inner_exprs
.clone()
.into_iter()
.map(|e| {
if let Some(col) = e.as_any().downcast_ref::<Column>() {
input_col_stats[col.index()].clone()
Expand All @@ -410,12 +415,35 @@ fn stats_projection(
.collect()
});

Statistics {
is_exact: stats.is_exact,
num_rows: stats.num_rows,
column_statistics,
// TODO stats: knowing the type of the new columns we can guess the output size
total_byte_size: None,
let primitive_row_size = inner_exprs
.into_iter()
.map(|e| match e.data_type(schema.as_ref()) {
Ok(data_type) => data_type.primitive_width(),
Err(_) => None,
})
.try_fold(0usize, |init, v| v.map(|value| init + value));

match (primitive_row_size, stats.num_rows) {
(Some(row_size), Some(row_count)) => {
Statistics {
is_exact: stats.is_exact,
num_rows: stats.num_rows,
column_statistics,
// Use the row_size * row_count as the total byte size
total_byte_size: Some(row_size * row_count),
}
}
_ => {
Statistics {
is_exact: stats.is_exact,
num_rows: stats.num_rows,
column_statistics,
// TODO stats: knowing the type of the new columns we can guess the output size
// If we can't get the exact statistics for the project
// Before we get the exact result, we just use the child status
total_byte_size: stats.total_byte_size,
}
}
}
}

Expand Down Expand Up @@ -479,12 +507,12 @@ impl RecordBatchStream for ProjectionStream {

#[cfg(test)]
mod tests {

use super::*;
use crate::physical_plan::common::collect;
use crate::physical_plan::expressions::{self, col};
use crate::test::{self};
use crate::test_util;
use arrow_schema::DataType;
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::binary;
Expand Down Expand Up @@ -591,9 +619,8 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_stats_projection_columns_only() {
let source = Statistics {
fn get_stats() -> Statistics {
Statistics {
is_exact: true,
num_rows: Some(5),
total_byte_size: Some(23),
Expand All @@ -617,19 +644,31 @@ mod tests {
null_count: None,
},
]),
};
}
}

fn get_schema() -> Schema {
let field_0 = Field::new("col0", DataType::Int64, false);
let field_1 = Field::new("col1", DataType::Utf8, false);
let field_2 = Field::new("col2", DataType::Float32, false);
Schema::new(vec![field_0, field_1, field_2])
}
#[tokio::test]
async fn test_stats_projection_columns_only() {
let source = get_stats();
let schema = get_schema();

let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(expressions::Column::new("col1", 1)),
Arc::new(expressions::Column::new("col0", 0)),
];

let result = stats_projection(source, exprs.into_iter());
let result = stats_projection(source, exprs.into_iter(), Arc::new(schema));

let expected = Statistics {
is_exact: true,
num_rows: Some(5),
total_byte_size: None,
total_byte_size: Some(23),
column_statistics: Some(vec![
ColumnStatistics {
distinct_count: Some(1),
Expand All @@ -648,4 +687,39 @@ mod tests {

assert_eq!(result, expected);
}

#[tokio::test]
async fn test_stats_projection_column_with_primitive_width_only() {
let source = get_stats();
let schema = get_schema();

let exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(expressions::Column::new("col2", 2)),
Arc::new(expressions::Column::new("col0", 0)),
];

let result = stats_projection(source, exprs.into_iter(), Arc::new(schema));

let expected = Statistics {
is_exact: true,
num_rows: Some(5),
total_byte_size: Some(60),
column_statistics: Some(vec![
ColumnStatistics {
distinct_count: None,
max_value: Some(ScalarValue::Float32(Some(1.1))),
min_value: Some(ScalarValue::Float32(Some(0.1))),
null_count: None,
},
ColumnStatistics {
distinct_count: Some(5),
max_value: Some(ScalarValue::Int64(Some(21))),
min_value: Some(ScalarValue::Int64(Some(-4))),
null_count: Some(0),
},
]),
};

assert_eq!(result, expected);
}
}
68 changes: 37 additions & 31 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1500,15 +1500,16 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]
----CoalesceBatchesExec: target_batch_size=4096
------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + UInt32(12)@2, join_t2.t2_id + UInt32(1)@1)]
--------CoalescePartitionsExec
----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id, join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)]
------CoalesceBatchesExec: target_batch_size=4096
--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id + UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)]
----------CoalescePartitionsExec
------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)]
--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]
----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
--------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)]
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------MemoryExec: partitions=1, partition_sizes=[1]

statement ok
set datafusion.optimizer.repartition_joins = true;
Expand All @@ -1527,18 +1528,19 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id]
----CoalesceBatchesExec: target_batch_size=4096
------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id + UInt32(12)@2, join_t2.t2_id + UInt32(1)@1)]
--------CoalesceBatchesExec: target_batch_size=4096
----------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(12)@2], 2), input_partitions=2
------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]
--------CoalesceBatchesExec: target_batch_size=4096
----------RepartitionExec: partitioning=Hash([join_t2.t2_id + UInt32(1)@1], 2), input_partitions=2
------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)]
--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]
----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id, join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)]
------CoalesceBatchesExec: target_batch_size=4096
--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id + UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)]
----------CoalesceBatchesExec: target_batch_size=4096
------------RepartitionExec: partitioning=Hash([join_t2.t2_id + UInt32(1)@1], 2), input_partitions=2
--------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)]
----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------------MemoryExec: partitions=1, partition_sizes=[1]
----------CoalesceBatchesExec: target_batch_size=4096
------------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(12)@2], 2), input_partitions=2
--------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as join_t1.t1_id + UInt32(12)]
----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------------MemoryExec: partitions=1, partition_sizes=[1]

# Left side expr key inner join

Expand Down Expand Up @@ -1619,10 +1621,13 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as t2_id]
----CoalesceBatchesExec: target_batch_size=4096
------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, join_t2.t2_id - UInt32(11)@1)]
--------MemoryExec: partitions=1, partition_sizes=[1]
--------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)]
------CoalesceBatchesExec: target_batch_size=4096
--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id - UInt32(11)@1, t1_id@0)]
----------CoalescePartitionsExec
------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------MemoryExec: partitions=1, partition_sizes=[1]

Expand All @@ -1644,15 +1649,16 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name]
--ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as t2_id]
----CoalesceBatchesExec: target_batch_size=4096
------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, join_t2.t2_id - UInt32(11)@1)]
--------CoalesceBatchesExec: target_batch_size=4096
----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
--------------MemoryExec: partitions=1, partition_sizes=[1]
--------CoalesceBatchesExec: target_batch_size=4096
----------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@1], 2), input_partitions=2
------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)]
------CoalesceBatchesExec: target_batch_size=4096
--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id - UInt32(11)@1, t1_id@0)]
----------CoalesceBatchesExec: target_batch_size=4096
------------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@1], 2), input_partitions=2
--------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)]
----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
------------------MemoryExec: partitions=1, partition_sizes=[1]
----------CoalesceBatchesExec: target_batch_size=4096
------------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2
--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
----------------MemoryExec: partitions=1, partition_sizes=[1]

Expand Down
46 changes: 24 additions & 22 deletions datafusion/sqllogictest/test_files/subquery.slt
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,18 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum
----------TableScan: t2 projection=[t2_id, t2_int]
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum]
--CoalesceBatchesExec: target_batch_size=8192
----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)]
------CoalesceBatchesExec: target_batch_size=8192
--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as t2_id]
--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
----------CoalesceBatchesExec: target_batch_size=8192
------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4
--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int)@0 as SUM(t2.t2_int), t2_id@1 as t2_id]
----CoalesceBatchesExec: target_batch_size=8192
------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
--------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as t2_id]
----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
------------CoalesceBatchesExec: target_batch_size=8192
--------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4
----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)]
------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]

query II rowsort
SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1
Expand All @@ -211,17 +212,18 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int * Float64(1)) + Int64(1) AS t2
----------TableScan: t2 projection=[t2_id, t2_int]
physical_plan
ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int * Float64(1)) + Int64(1)@1 as t2_sum]
--CoalesceBatchesExec: target_batch_size=8192
----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)]
------CoalesceBatchesExec: target_batch_size=8192
--------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
------ProjectionExec: expr=[SUM(t2.t2_int * Float64(1))@1 + 1 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id]
--------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))]
----------CoalesceBatchesExec: target_batch_size=8192
------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4
--------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))]
----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int * Float64(1)) + Int64(1)@0 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@1 as t2_id]
----CoalesceBatchesExec: target_batch_size=8192
------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)]
--------ProjectionExec: expr=[SUM(t2.t2_int * Float64(1))@1 + 1 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id]
----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))]
------------CoalesceBatchesExec: target_batch_size=8192
--------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4
----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))]
------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
--------CoalesceBatchesExec: target_batch_size=8192
----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4
------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]

query IR rowsort
SELECT t1_id, (SELECT sum(t2_int * 1.0) + 1 FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1
Expand Down

0 comments on commit 58fc80e

Please sign in to comment.