Skip to content

Commit

Permalink
Minor: provide default implementation for ExecutionPlan::statistics (#…
Browse files Browse the repository at this point in the history
…7911)

* Minor: provide default implementation for ExecutionPlan::statistics

* fix: update statistics
  • Loading branch information
alamb authored Oct 24, 2023
1 parent 9e848bf commit ba50a8b
Show file tree
Hide file tree
Showing 10 changed files with 13 additions and 58 deletions.
6 changes: 1 addition & 5 deletions datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{
project_schema, DisplayAs, DisplayFormatType, ExecutionPlan,
SendableRecordBatchStream, Statistics,
SendableRecordBatchStream,
};
use datafusion::prelude::*;
use datafusion_expr::{Expr, LogicalPlanBuilder};
Expand Down Expand Up @@ -270,8 +270,4 @@ impl ExecutionPlan for CustomExec {
None,
)?))
}

fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}
8 changes: 1 addition & 7 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2057,9 +2057,7 @@ mod tests {
use super::*;
use crate::datasource::file_format::options::CsvReadOptions;
use crate::datasource::MemTable;
use crate::physical_plan::{
expressions, DisplayFormatType, Partitioning, Statistics,
};
use crate::physical_plan::{expressions, DisplayFormatType, Partitioning};
use crate::physical_plan::{DisplayAs, SendableRecordBatchStream};
use crate::physical_planner::PhysicalPlanner;
use crate::prelude::{SessionConfig, SessionContext};
Expand Down Expand Up @@ -2670,10 +2668,6 @@ mod tests {
) -> Result<SendableRecordBatchStream> {
unimplemented!("NoOpExecutionPlan::execute");
}

fn statistics(&self) -> Result<Statistics> {
unimplemented!("NoOpExecutionPlan::statistics");
}
}

// Produces an execution plan where the schema is mismatched from
Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::prelude::{CsvReadOptions, SessionContext};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::{Statistics, TableReference};
use datafusion_common::TableReference;
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_expr::PhysicalSortExpr;

Expand Down Expand Up @@ -238,10 +238,6 @@ impl ExecutionPlan for UnboundedExec {
batch: self.batch.clone(),
}))
}

fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}

#[derive(Debug)]
Expand Down
7 changes: 1 addition & 6 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
use super::{DisplayAs, Distribution, SendableRecordBatchStream};

use crate::display::DisplayableExecutionPlan;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics};
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};

use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::{internal_err, DataFusionError, Result};
Expand Down Expand Up @@ -195,11 +195,6 @@ impl ExecutionPlan for AnalyzeExec {
futures::stream::once(output),
)))
}

fn statistics(&self) -> Result<Statistics> {
// Statistics an an ANALYZE plan are not relevant
Ok(Statistics::new_unknown(&self.schema()))
}
}

/// Creates the ouput of AnalyzeExec as a RecordBatch
Expand Down
7 changes: 1 addition & 6 deletions datafusion/physical-plan/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use super::expressions::PhysicalSortExpr;
use super::{DisplayAs, SendableRecordBatchStream};
use crate::stream::RecordBatchStreamAdapter;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics};
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};

use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion_common::display::StringifiedPlan;
Expand Down Expand Up @@ -167,11 +167,6 @@ impl ExecutionPlan for ExplainExec {
futures::stream::iter(vec![Ok(record_batch)]),
)))
}

fn statistics(&self) -> Result<Statistics> {
// Statistics an EXPLAIN plan are not relevant
Ok(Statistics::new_unknown(&self.schema()))
}
}

/// If this plan should be shown, given the previous plan that was
Expand Down
5 changes: 0 additions & 5 deletions datafusion/physical-plan/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::sync::Arc;
use super::expressions::PhysicalSortExpr;
use super::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
};
use crate::metrics::MetricsSet;
use crate::stream::RecordBatchStreamAdapter;
Expand Down Expand Up @@ -276,10 +275,6 @@ impl ExecutionPlan for FileSinkExec {
stream,
)))
}

fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}

/// Create a output record batch with a count
Expand Down
8 changes: 6 additions & 2 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,12 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
None
}

/// Returns the global output statistics for this `ExecutionPlan` node.
fn statistics(&self) -> Result<Statistics>;
/// Returns statistics for this `ExecutionPlan` node. If statistics are not
/// available, should return [`Statistics::new_unknown`] (the default), not
/// an error.
fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}

/// Indicate whether a data exchange is needed for the input of `plan`, which will be very helpful
Expand Down
6 changes: 1 addition & 5 deletions datafusion/physical-plan/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::stream::RecordBatchStreamAdapter;
use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream};

use arrow::datatypes::SchemaRef;
use datafusion_common::{internal_err, plan_err, DataFusionError, Result, Statistics};
use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};

Expand Down Expand Up @@ -187,8 +187,4 @@ impl ExecutionPlan for StreamingTableExec {
None => stream,
})
}

fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}
12 changes: 0 additions & 12 deletions datafusion/physical-plan/src/test/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,10 +453,6 @@ impl ExecutionPlan for ErrorExec {
) -> Result<SendableRecordBatchStream> {
internal_err!("ErrorExec, unsurprisingly, errored in partition {partition}")
}

fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}

/// A mock execution plan that simply returns the provided statistics
Expand Down Expand Up @@ -627,10 +623,6 @@ impl ExecutionPlan for BlockingExec {
_refs: Arc::clone(&self.refs),
}))
}

fn statistics(&self) -> Result<Statistics> {
unimplemented!()
}
}

/// A [`RecordBatchStream`] that is pending forever.
Expand Down Expand Up @@ -764,10 +756,6 @@ impl ExecutionPlan for PanicExec {
ready: false,
}))
}

fn statistics(&self) -> Result<Statistics> {
unimplemented!()
}
}

/// A [`RecordBatchStream`] that yields every other batch and panics
Expand Down
6 changes: 1 addition & 5 deletions datafusion/physical-plan/src/unnest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use super::DisplayAs;
use crate::{
expressions::Column, DisplayFormatType, Distribution, EquivalenceProperties,
ExecutionPlan, Partitioning, PhysicalExpr, PhysicalSortExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics,
SendableRecordBatchStream,
};

use arrow::array::{
Expand Down Expand Up @@ -159,10 +159,6 @@ impl ExecutionPlan for UnnestExec {
unnest_time: 0,
}))
}

fn statistics(&self) -> Result<Statistics> {
Ok(Statistics::new_unknown(&self.schema()))
}
}

/// A stream that issues [RecordBatch]es with unnested column data.
Expand Down

0 comments on commit ba50a8b

Please sign in to comment.