diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index f7f8b0f4524d..12c89eee1931 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -336,6 +336,7 @@ impl ExecutionPlan for ProjectionExec { stats_projection( self.input.statistics(), self.expr.iter().map(|(e, _)| Arc::clone(e)), + self.schema.clone(), ) } } @@ -395,9 +396,13 @@ fn get_field_metadata( fn stats_projection( stats: Statistics, exprs: impl Iterator>, + schema: SchemaRef, ) -> Statistics { + let inner_exprs = exprs.collect::>(); let column_statistics = stats.column_statistics.map(|input_col_stats| { - exprs + inner_exprs + .clone() + .into_iter() .map(|e| { if let Some(col) = e.as_any().downcast_ref::() { input_col_stats[col.index()].clone() @@ -410,12 +415,35 @@ fn stats_projection( .collect() }); - Statistics { - is_exact: stats.is_exact, - num_rows: stats.num_rows, - column_statistics, - // TODO stats: knowing the type of the new columns we can guess the output size - total_byte_size: None, + let primitive_row_size = inner_exprs + .into_iter() + .map(|e| match e.data_type(schema.as_ref()) { + Ok(data_type) => data_type.primitive_width(), + Err(_) => None, + }) + .try_fold(0usize, |init, v| v.map(|value| init + value)); + + match (primitive_row_size, stats.num_rows) { + (Some(row_size), Some(row_count)) => { + Statistics { + is_exact: stats.is_exact, + num_rows: stats.num_rows, + column_statistics, + // Use the row_size * row_count as the total byte size + total_byte_size: Some(row_size * row_count), + } + } + _ => { + Statistics { + is_exact: stats.is_exact, + num_rows: stats.num_rows, + column_statistics, + // TODO stats: knowing the type of the new columns we can guess the output size + // If we can't get the exact statistics for the project + // Before we get the exact result, we just use the child status + total_byte_size: stats.total_byte_size, + } + } } } @@ -479,12 +507,12 @@ impl RecordBatchStream for ProjectionStream { #[cfg(test)] mod tests { - use super::*; use crate::physical_plan::common::collect; use crate::physical_plan::expressions::{self, col}; use crate::test::{self}; use crate::test_util; + use arrow_schema::DataType; use datafusion_common::ScalarValue; use datafusion_expr::Operator; use datafusion_physical_expr::expressions::binary; @@ -591,9 +619,8 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_stats_projection_columns_only() { - let source = Statistics { + fn get_stats() -> Statistics { + Statistics { is_exact: true, num_rows: Some(5), total_byte_size: Some(23), @@ -617,19 +644,31 @@ mod tests { null_count: None, }, ]), - }; + } + } + + fn get_schema() -> Schema { + let field_0 = Field::new("col0", DataType::Int64, false); + let field_1 = Field::new("col1", DataType::Utf8, false); + let field_2 = Field::new("col2", DataType::Float32, false); + Schema::new(vec![field_0, field_1, field_2]) + } + #[tokio::test] + async fn test_stats_projection_columns_only() { + let source = get_stats(); + let schema = get_schema(); let exprs: Vec> = vec![ Arc::new(expressions::Column::new("col1", 1)), Arc::new(expressions::Column::new("col0", 0)), ]; - let result = stats_projection(source, exprs.into_iter()); + let result = stats_projection(source, exprs.into_iter(), Arc::new(schema)); let expected = Statistics { is_exact: true, num_rows: Some(5), - total_byte_size: None, + total_byte_size: Some(23), column_statistics: Some(vec![ ColumnStatistics { distinct_count: Some(1), @@ -648,4 +687,39 @@ mod tests { assert_eq!(result, expected); } + + #[tokio::test] + async fn test_stats_projection_column_with_primitive_width_only() { + let source = get_stats(); + let schema = get_schema(); + + let exprs: Vec> = vec![ + Arc::new(expressions::Column::new("col2", 2)), + Arc::new(expressions::Column::new("col0", 0)), + ]; + + let result = stats_projection(source, exprs.into_iter(), Arc::new(schema)); + + let expected = Statistics { + is_exact: true, + num_rows: Some(5), + total_byte_size: Some(60), + column_statistics: Some(vec![ + ColumnStatistics { + distinct_count: None, + max_value: Some(ScalarValue::Float32(Some(1.1))), + min_value: Some(ScalarValue::Float32(Some(0.1))), + null_count: None, + }, + ColumnStatistics { + distinct_count: Some(5), + max_value: Some(ScalarValue::Int64(Some(21))), + min_value: Some(ScalarValue::Int64(Some(-4))), + null_count: Some(0), + }, + ]), + }; + + assert_eq!(result, expected); + } } diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 5bc36e4c6d7b..cd93e51951a8 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -1500,15 +1500,16 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name] --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id] -----CoalesceBatchesExec: target_batch_size=4096 -------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t1.t1_id + UInt32(12)@2, join_t2.t2_id + UInt32(1)@1)] ---------CoalescePartitionsExec +----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id, join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)] +------CoalesceBatchesExec: target_batch_size=4096 +--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id + UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)] +----------CoalescePartitionsExec +------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)] +--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +----------------MemoryExec: partitions=1, partition_sizes=[1] ----------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as join_t1.t1_id + UInt32(12)] ------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 --------------MemoryExec: partitions=1, partition_sizes=[1] ---------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)] -----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -------------MemoryExec: partitions=1, partition_sizes=[1] statement ok set datafusion.optimizer.repartition_joins = true; @@ -1527,18 +1528,19 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name] --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@3 as t2_id] -----CoalesceBatchesExec: target_batch_size=4096 -------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t1.t1_id + UInt32(12)@2, join_t2.t2_id + UInt32(1)@1)] ---------CoalesceBatchesExec: target_batch_size=4096 -----------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(12)@2], 2), input_partitions=2 -------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as join_t1.t1_id + UInt32(12)] ---------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -----------------MemoryExec: partitions=1, partition_sizes=[1] ---------CoalesceBatchesExec: target_batch_size=4096 -----------RepartitionExec: partitioning=Hash([join_t2.t2_id + UInt32(1)@1], 2), input_partitions=2 -------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)] ---------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -----------------MemoryExec: partitions=1, partition_sizes=[1] +----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, join_t1.t1_id + UInt32(12)@4 as join_t1.t1_id + UInt32(12), t2_id@0 as t2_id, join_t2.t2_id + UInt32(1)@1 as join_t2.t2_id + UInt32(1)] +------CoalesceBatchesExec: target_batch_size=4096 +--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id + UInt32(1)@1, join_t1.t1_id + UInt32(12)@2)] +----------CoalesceBatchesExec: target_batch_size=4096 +------------RepartitionExec: partitioning=Hash([join_t2.t2_id + UInt32(1)@1], 2), input_partitions=2 +--------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 + 1 as join_t2.t2_id + UInt32(1)] +----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +------------------MemoryExec: partitions=1, partition_sizes=[1] +----------CoalesceBatchesExec: target_batch_size=4096 +------------RepartitionExec: partitioning=Hash([join_t1.t1_id + UInt32(12)@2], 2), input_partitions=2 +--------------ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t1_id@0 + 12 as join_t1.t1_id + UInt32(12)] +----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +------------------MemoryExec: partitions=1, partition_sizes=[1] # Left side expr key inner join @@ -1619,10 +1621,13 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name] --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as t2_id] -----CoalesceBatchesExec: target_batch_size=4096 -------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(t1_id@0, join_t2.t2_id - UInt32(11)@1)] ---------MemoryExec: partitions=1, partition_sizes=[1] ---------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] +----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)] +------CoalesceBatchesExec: target_batch_size=4096 +--------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(join_t2.t2_id - UInt32(11)@1, t1_id@0)] +----------CoalescePartitionsExec +------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] +--------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +----------------MemoryExec: partitions=1, partition_sizes=[1] ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ------------MemoryExec: partitions=1, partition_sizes=[1] @@ -1644,15 +1649,16 @@ Projection: join_t1.t1_id, join_t2.t2_id, join_t1.t1_name physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, t2_id@2 as t2_id, t1_name@1 as t1_name] --ProjectionExec: expr=[t1_id@0 as t1_id, t1_name@1 as t1_name, t2_id@2 as t2_id] -----CoalesceBatchesExec: target_batch_size=4096 -------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(t1_id@0, join_t2.t2_id - UInt32(11)@1)] ---------CoalesceBatchesExec: target_batch_size=4096 -----------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 -------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ---------------MemoryExec: partitions=1, partition_sizes=[1] ---------CoalesceBatchesExec: target_batch_size=4096 -----------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@1], 2), input_partitions=2 -------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] +----ProjectionExec: expr=[t1_id@2 as t1_id, t1_name@3 as t1_name, t2_id@0 as t2_id, join_t2.t2_id - UInt32(11)@1 as join_t2.t2_id - UInt32(11)] +------CoalesceBatchesExec: target_batch_size=4096 +--------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(join_t2.t2_id - UInt32(11)@1, t1_id@0)] +----------CoalesceBatchesExec: target_batch_size=4096 +------------RepartitionExec: partitioning=Hash([join_t2.t2_id - UInt32(11)@1], 2), input_partitions=2 +--------------ProjectionExec: expr=[t2_id@0 as t2_id, t2_id@0 - 11 as join_t2.t2_id - UInt32(11)] +----------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 +------------------MemoryExec: partitions=1, partition_sizes=[1] +----------CoalesceBatchesExec: target_batch_size=4096 +------------RepartitionExec: partitioning=Hash([t1_id@0], 2), input_partitions=2 --------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 ----------------MemoryExec: partitions=1, partition_sizes=[1] diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index d88291b56fb1..fe074da1bba0 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -177,17 +177,18 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int) AS t2_sum ----------TableScan: t2 projection=[t2_id, t2_int] physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 as t2_sum] ---CoalesceBatchesExec: target_batch_size=8192 -----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)] -------CoalesceBatchesExec: target_batch_size=8192 ---------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 -----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as t2_id] ---------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)] -----------CoalesceBatchesExec: target_batch_size=8192 -------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 ---------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)] -----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int)@0 as SUM(t2.t2_int), t2_id@1 as t2_id] +----CoalesceBatchesExec: target_batch_size=8192 +------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)] +--------ProjectionExec: expr=[SUM(t2.t2_int)@1 as SUM(t2.t2_int), t2_id@0 as t2_id] +----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)] +------------CoalesceBatchesExec: target_batch_size=8192 +--------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 +----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int)] +------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +--------CoalesceBatchesExec: target_batch_size=8192 +----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 +------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] query II rowsort SELECT t1_id, (SELECT sum(t2_int) FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1 @@ -211,17 +212,18 @@ Projection: t1.t1_id, __scalar_sq_1.SUM(t2.t2_int * Float64(1)) + Int64(1) AS t2 ----------TableScan: t2 projection=[t2_id, t2_int] physical_plan ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int * Float64(1)) + Int64(1)@1 as t2_sum] ---CoalesceBatchesExec: target_batch_size=8192 -----HashJoinExec: mode=Partitioned, join_type=Left, on=[(t1_id@0, t2_id@1)] -------CoalesceBatchesExec: target_batch_size=8192 ---------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 -----------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] -------ProjectionExec: expr=[SUM(t2.t2_int * Float64(1))@1 + 1 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id] ---------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))] -----------CoalesceBatchesExec: target_batch_size=8192 -------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 ---------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))] -----------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +--ProjectionExec: expr=[t1_id@2 as t1_id, SUM(t2.t2_int * Float64(1)) + Int64(1)@0 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@1 as t2_id] +----CoalesceBatchesExec: target_batch_size=8192 +------HashJoinExec: mode=Partitioned, join_type=Right, on=[(t2_id@1, t1_id@0)] +--------ProjectionExec: expr=[SUM(t2.t2_int * Float64(1))@1 + 1 as SUM(t2.t2_int * Float64(1)) + Int64(1), t2_id@0 as t2_id] +----------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))] +------------CoalesceBatchesExec: target_batch_size=8192 +--------------RepartitionExec: partitioning=Hash([t2_id@0], 4), input_partitions=4 +----------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id], aggr=[SUM(t2.t2_int * Float64(1))] +------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] +--------CoalesceBatchesExec: target_batch_size=8192 +----------RepartitionExec: partitioning=Hash([t1_id@0], 4), input_partitions=4 +------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0] query IR rowsort SELECT t1_id, (SELECT sum(t2_int * 1.0) + 1 FROM t2 WHERE t2.t2_id = t1.t1_id) as t2_sum from t1