Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ParquetExec and related documentation #10647

Merged
merged 8 commits into from
May 28, 2024
127 changes: 114 additions & 13 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,84 @@ use crate::datasource::schema_adapter::{
pub use metrics::ParquetFileMetrics;
pub use statistics::{RequestedStatistics, StatisticsConverter};

/// Execution plan for scanning one or more Parquet partitions
/// Execution plan for reading one or more Parquet files.
///
/// ```text
/// ▲
/// │
/// │ Produce a stream of
/// │ RecordBatches
/// │
/// ┌───────────────────────┐
/// │ │
/// │ ParquetExec │
/// │ │
/// └───────────────────────┘
/// ▲
/// │ Asynchronously read from one
/// │ or more parquet files via
/// │ ObjectStore interface
/// │
/// │
/// .───────────────────.
/// │ )
/// │`───────────────────'│
/// │ ObjectStore │
/// │.───────────────────.│
/// │ )
/// `───────────────────'
///
/// ```
/// # Features
///
/// Supports the following optimizations:
///
/// * Concurrent reads: Can read from one or more files in parallel as multiple
/// partitions, including concurrently reading multiple row groups from a single
/// file.
///
/// * Predicate push down: skips row groups and pages based on
/// min/max/null_counts in the row group metadata, the page index and bloom
/// filters.
///
/// * Projection pushdown: reads and decodes only the columns required.
///
/// * Limit pushdown: stop execution early after some number of rows are read.
///
/// * Custom readers: customize reading parquet files, e.g. to cache metadata,
/// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more
/// details.
///
/// * Schema adapters: read parquet files with different schemas into a unified
/// table schema. This can be used to implement "schema evolution". See
/// [`SchemaAdapterFactory`] for more details.
///
/// * metadata_size_hint: controls the number of bytes read from the end of the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW this is passed on to the reader (custom or builtin) and the reader uses that to gather the metadata. The reader CAN however use another more precise source for this information or not read the metadata from object store at all (e.g. it could use an extra service, a dataset-based source or some sort of cache).

/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a
/// custom reader is used, it supplies the metadata directly and this parameter
/// is ignored. See [`Self::with_parquet_file_reader_factory`] for more details.
///
/// # Execution Overview
///
/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`]
/// configured to open parquet files with a [`ParquetOpener`].
///
/// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to open
/// the file.
///
/// * Step 3: The `ParquetOpener` gets the file metadata via
/// [`ParquetFileReaderFactory`] and applies any predicates
/// and projections to determine what pages must be read.
///
/// * Step 4: The stream begins reading data, fetching the required pages
/// and incrementally decoding them.
///
/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a
/// [`SchemaAdapter`] to match the table schema. By default missing columns are
/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`].
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
/// [`SchemaAdapter`]: crate::datasource::schema_adapter::SchemaAdapter
#[derive(Debug, Clone)]
pub struct ParquetExec {
/// Base configuration for this scan
Expand All @@ -86,9 +163,9 @@ pub struct ParquetExec {
metrics: ExecutionPlanMetricsSet,
/// Optional predicate for row filtering during parquet scan
predicate: Option<Arc<dyn PhysicalExpr>>,
/// Optional predicate for pruning row groups
/// Optional predicate for pruning row groups (derived from `predicate`)
pruning_predicate: Option<Arc<PruningPredicate>>,
/// Optional predicate for pruning pages
/// Optional predicate for pruning pages (derived from `predicate`)
page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
/// Optional hint for the size of the parquet metadata
metadata_size_hint: Option<usize>,
Expand Down Expand Up @@ -190,11 +267,13 @@ impl ParquetExec {

/// Optional user defined parquet file reader factory.
///
/// `ParquetFileReaderFactory` complements `TableProvider`, It enables users to provide custom
/// implementation for data access operations.
/// You can use [`ParquetFileReaderFactory`] to more precisely control how
/// data is read from parquet files (e.g. skip re-reading metadata, coalesce
/// I/O operations, etc).
///
/// If custom `ParquetFileReaderFactory` is provided, then data access operations will be routed
/// to this factory instead of `ObjectStore`.
/// The default reader factory reads directly from an [`ObjectStore`]
/// instance using individual I/O operations for the footer and then for
/// each page.
pub fn with_parquet_file_reader_factory(
mut self,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
Expand Down Expand Up @@ -643,11 +722,21 @@ fn should_enable_page_index(
.unwrap_or(false)
}

/// Factory of parquet file readers.
/// Interface for reading parquet files.
///
/// Provides means to implement custom data access interface.
/// The combined implementations of [`ParquetFileReaderFactory`] and
/// [`AsyncFileReader`] can be used to provide custom data access operations
/// such as pre-cached data, I/O coalescing, etc.
///
/// See [`DefaultParquetFileReaderFactory`] for a simple implementation.
pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
/// Provides `AsyncFileReader` over parquet file specified in `FileMeta`
/// Provides an `AsyncFileReader` for reading data from a parquet file specified
///
/// # Arguments
/// * partition_index - Index of the partition (for reporting metrics)
/// * file_meta - The file to be read
/// * metadata_size_hint - If specified, the first IO reads this many bytes from the footer
/// * metrics - Execution metrics
fn create_reader(
&self,
partition_index: usize,
Expand All @@ -657,20 +746,32 @@ pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
) -> Result<Box<dyn AsyncFileReader + Send>>;
}

/// Default parquet reader factory.
/// Default implementation of [`ParquetFileReaderFactory`]
///
/// This implementation:
/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance.
/// 2. Reads the footer and page metadata on demand.
/// 3. Does not cache metadata or coalesce I/O operations.
#[derive(Debug)]
pub struct DefaultParquetFileReaderFactory {
store: Arc<dyn ObjectStore>,
}

impl DefaultParquetFileReaderFactory {
/// Create a factory.
/// Create a new `DefaultParquetFileReaderFactory`.
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self { store }
}
}

/// Implements [`AsyncFileReader`] for a parquet file in object storage
/// Implements [`AsyncFileReader`] for a parquet file in object storage.
///
/// This implementation uses the [`ParquetObjectReader`] to read data from the
/// object store on demand, as required, tracking the number of bytes read.
///
/// This implementation does not coalesce I/O operations or cache bytes. Such
/// optimizations can be done either at the object store level or by providing a
/// custom implementation of [`ParquetFileReaderFactory`].
pub(crate) struct ParquetFileReader {
file_metrics: ParquetFileMetrics,
inner: ParquetObjectReader,
Expand Down
28 changes: 17 additions & 11 deletions datafusion/core/src/datasource/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.

//! Schema Adapter provides a method of translating the RecordBatches that come out of the
//! [`SchemaAdapter`] and [`SchemaAdapterFactory`] to adapt file-level record batches to a table schema.
//!
//! Adapter provides a method of translating the RecordBatches that come out of the
//! physical format into how they should be used by DataFusion. For instance, a schema
//! can be stored external to a parquet file that maps parquet logical types to arrow types.

Expand All @@ -26,35 +28,38 @@ use datafusion_common::plan_err;
use std::fmt::Debug;
use std::sync::Arc;

/// Factory of schema adapters.
/// Factory for creating [`SchemaAdapter`]
///
/// Provides means to implement custom schema adaptation.
/// This interface provides a way to implement custom schema adaptation logic
/// for ParquetExec (for example, to fill missing columns with default value
/// other than null)
pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
/// Provides `SchemaAdapter`.
fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
}

/// A utility which can adapt file-level record batches to a table schema which may have a schema
/// Adapt file-level [`RecordBatch`]es to a table schema, which may have a schema
/// obtained from merging multiple file-level schemas.
///
/// This is useful for enabling schema evolution in partitioned datasets.
///
/// This has to be done in two stages.
///
/// 1. Before reading the file, we have to map projected column indexes from the table schema to
/// the file schema.
/// 1. Before reading the file, we have to map projected column indexes from the
/// table schema to the file schema.
///
/// 2. After reading a record batch we need to map the read columns back to the expected columns
/// indexes and insert null-valued columns wherever the file schema was missing a colum present
/// in the table schema.
/// 2. After reading a record batch map the read columns back to the expected
/// columns indexes and insert null-valued columns wherever the file schema was
/// missing a column present in the table schema.
pub trait SchemaAdapter: Send + Sync {
/// Map a column index in the table schema to a column index in a particular
/// file schema
///
/// Panics if index is not in range for the table schema
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;

/// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema.
/// Creates a `SchemaMapping` that can be used to cast or map the columns
/// from the file schema to the table schema.
///
/// If the provided `file_schema` contains columns of a different type to the expected
/// `table_schema`, the method will attempt to cast the array data from the file schema
Expand All @@ -68,7 +73,8 @@ pub trait SchemaAdapter: Send + Sync {
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
}

/// Transforms a RecordBatch from the physical layer to a RecordBatch that meets the table schema.
/// Creates a `SchemaMapping` that can be used to cast or map the columns
/// from the file schema to the table schema.
pub trait SchemaMapper: Send + Sync {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
Expand Down
Loading