From dd248f5d62ac3e647e1109f6185c0677ba6faf55 Mon Sep 17 00:00:00 2001 From: Robert Kruszewski Date: Mon, 23 Sep 2024 17:13:48 +0100 Subject: [PATCH] VortexScanExec stats are computed only once --- bench-vortex/src/bin/tpch_benchmark.rs | 3 + vortex-datafusion/src/lib.rs | 80 ++++++++------------------ vortex-datafusion/src/memory.rs | 8 +-- vortex-datafusion/src/statistics.rs | 58 +++++++++++++++++++ 4 files changed, 88 insertions(+), 61 deletions(-) create mode 100644 vortex-datafusion/src/statistics.rs diff --git a/bench-vortex/src/bin/tpch_benchmark.rs b/bench-vortex/src/bin/tpch_benchmark.rs index e8207476b..7baf43723 100644 --- a/bench-vortex/src/bin/tpch_benchmark.rs +++ b/bench-vortex/src/bin/tpch_benchmark.rs @@ -70,6 +70,9 @@ async fn bench_main( let formats = [ Format::Arrow, Format::Parquet, + Format::InMemoryVortex { + enable_pushdown: false, + }, Format::InMemoryVortex { enable_pushdown: true, }, diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index a42c5ddd3..cfb501ad4 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -12,30 +12,27 @@ use arrow_array::RecordBatch; use arrow_schema::{DataType, Schema, SchemaRef}; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::prelude::{DataFrame, SessionContext}; -use datafusion_common::stats::Precision; -use datafusion_common::{ - exec_datafusion_err, ColumnStatistics, DataFusionError, Result as DFResult, ScalarValue, - Statistics, -}; +use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult, Statistics}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_expr::{Expr, Operator}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; use futures::Stream; -use itertools::Itertools; use memory::{VortexMemTable, VortexMemTableOptions}; use persistent::config::VortexTableOptions; use persistent::provider::VortexFileTableProvider; use vortex::array::ChunkedArray; -use vortex::stats::{ArrayStatistics, Stat}; use vortex::{Array, ArrayDType, IntoArrayVariant}; use vortex_dtype::field::Field; -use vortex_error::{vortex_err, VortexExpect, VortexResult}; +use vortex_error::{vortex_err, VortexResult}; + +use crate::statistics::chunked_array_df_stats; pub mod memory; pub mod persistent; mod datatype; mod plans; +mod statistics; const SUPPORTED_BINARY_OPS: &[Operator] = &[ Operator::Eq, @@ -183,6 +180,23 @@ struct VortexScanExec { array: ChunkedArray, scan_projection: Vec, plan_properties: PlanProperties, + statistics: Statistics, +} + +impl VortexScanExec { + pub fn try_new( + array: ChunkedArray, + scan_projection: Vec, + plan_properties: PlanProperties, + ) -> VortexResult { + let statistics = chunked_array_df_stats(&array, &scan_projection)?; + Ok(Self { + array, + scan_projection, + plan_properties, + statistics, + }) + } } impl Debug for VortexScanExec { @@ -294,54 +308,6 @@ impl ExecutionPlan for VortexScanExec { } fn statistics(&self) -> DFResult { - let mut nbytes: usize = 0; - let column_statistics = self.array.as_ref().with_dyn(|a| { - let struct_arr = a - .as_struct_array() - .ok_or_else(|| vortex_err!("Not a struct array"))?; - self.scan_projection - .iter() - .map(|i| { - struct_arr - .field(*i) - .ok_or_else(|| vortex_err!("Projection references unknown field {i}")) - }) - .map_ok(|arr| { - nbytes += arr.nbytes(); - ColumnStatistics { - null_count: arr - .statistics() - .get_as::(Stat::NullCount) - .map(|n| n as usize) - .map(Precision::Exact) - .unwrap_or(Precision::Absent), - max_value: arr - .statistics() - .get(Stat::Max) - .map(|n| { - ScalarValue::try_from(n) - .vortex_expect("cannot convert scalar to df scalar") - }) - .map(Precision::Exact) - .unwrap_or(Precision::Absent), - min_value: arr - .statistics() - .get(Stat::Min) - .map(|n| { - ScalarValue::try_from(n) - .vortex_expect("cannot convert scalar to df scalar") - }) - .map(Precision::Exact) - .unwrap_or(Precision::Absent), - distinct_count: Precision::Absent, - } - }) - .collect::>>() - })?; - Ok(Statistics { - num_rows: Precision::Exact(self.array.len()), - total_byte_size: Precision::Exact(nbytes), - column_statistics, - }) + Ok(self.statistics.clone()) } } diff --git a/vortex-datafusion/src/memory.rs b/vortex-datafusion/src/memory.rs index 61380793b..8bda8d7a4 100644 --- a/vortex-datafusion/src/memory.rs +++ b/vortex-datafusion/src/memory.rs @@ -125,11 +125,11 @@ impl TableProvider for VortexMemTable { ExecutionMode::Bounded, ); - Ok(Arc::new(VortexScanExec { - array: self.array.clone(), - scan_projection: output_projection.clone(), + Ok(Arc::new(VortexScanExec::try_new( + self.array.clone(), + output_projection.clone(), plan_properties, - })) + )?)) } } } diff --git a/vortex-datafusion/src/statistics.rs b/vortex-datafusion/src/statistics.rs new file mode 100644 index 000000000..fa41bb2ba --- /dev/null +++ b/vortex-datafusion/src/statistics.rs @@ -0,0 +1,58 @@ +use datafusion_common::stats::Precision; +use datafusion_common::{ColumnStatistics, Result as DFResult, ScalarValue, Statistics}; +use itertools::Itertools; +use vortex::array::ChunkedArray; +use vortex::stats::{ArrayStatistics, Stat}; +use vortex_error::{vortex_err, VortexExpect, VortexResult}; + +pub fn chunked_array_df_stats(array: &ChunkedArray, projection: &[usize]) -> DFResult { + let mut nbytes: usize = 0; + let column_statistics = array.as_ref().with_dyn(|a| { + let struct_arr = a + .as_struct_array() + .ok_or_else(|| vortex_err!("Not a struct array"))?; + projection + .iter() + .map(|i| { + struct_arr + .field(*i) + .ok_or_else(|| vortex_err!("Projection references unknown field {i}")) + }) + .map_ok(|arr| { + nbytes += arr.nbytes(); + ColumnStatistics { + null_count: arr + .statistics() + .get_as::(Stat::NullCount) + .map(|n| n as usize) + .map(Precision::Exact) + .unwrap_or(Precision::Absent), + max_value: arr + .statistics() + .get(Stat::Max) + .map(|n| { + ScalarValue::try_from(n) + .vortex_expect("cannot convert scalar to df scalar") + }) + .map(Precision::Exact) + .unwrap_or(Precision::Absent), + min_value: arr + .statistics() + .get(Stat::Min) + .map(|n| { + ScalarValue::try_from(n) + .vortex_expect("cannot convert scalar to df scalar") + }) + .map(Precision::Exact) + .unwrap_or(Precision::Absent), + distinct_count: Precision::Absent, + } + }) + .collect::>>() + })?; + Ok(Statistics { + num_rows: Precision::Exact(array.len()), + total_byte_size: Precision::Exact(nbytes), + column_statistics, + }) +}