Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve ParquetExec and related documentation #10647

Merged
merged 8 commits into from
May 28, 2024
106 changes: 98 additions & 8 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,79 @@ pub use metrics::ParquetFileMetrics;
pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
pub use statistics::{RequestedStatistics, StatisticsConverter};

/// Execution plan for scanning one or more Parquet partitions
/// Execution plan for reading one or more Parquet files.
///
/// ```text
/// ▲
/// │
/// │ Produce a stream of
/// │ RecordBatches
/// │
/// ┌───────────────────────┐
/// │ │
/// │ ParquetExec │
/// │ │
/// └───────────────────────┘
/// ▲
/// │ Asynchronously read from one
/// │ or more parquet files via
/// │ ObjectStore interface
/// │
/// │
/// .───────────────────.
/// │ )
/// │`───────────────────'│
/// │ ObjectStore │
/// │.───────────────────.│
/// │ )
/// `───────────────────'
///
/// ```
/// # Features
///
/// Supports the following optimizations:
///
/// * Multi-threaded (aka multi-partition): read from one or more files in
/// parallel. Can read concurrently from multiple row groups from a single file.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call this "concurrency" instead of "multi-threading". IIRC we don't implement ANY threading in this operator and solely rely on tokio to dispatch concurrent bits for us. I think it's fine to mention that the concurrency in this operator CAN lead to multi-core usage under specific circumstances.

///
/// * Predicate push down: skips row groups and pages based on
/// min/max/null_counts in the row group metadata, the page index and bloom
/// filters.
///
/// * Projection pushdown: reads and decodes only the columns required.
///
/// * Limit pushdown: stop execution early after some number of rows are read.
///
/// * Custom readers: controls I/O for accessing pages. See
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// * Custom readers: controls I/O for accessing pages. See
/// * Custom readers: implements I/O for accessing byte ranges and the metadata object. See

It's not steering the IO process, it's actually responsible for performing (or not performing) it. For example, a custom impl. could totally NOT use an object store (which is esp. interesting for the metadata bit, see other comment below).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call -- updated

/// [`ParquetFileReaderFactory`] for more details.
///
/// * Schema adapters: read parquet files with different schemas into a unified
/// table schema. This can be used to implement "schema evolution". See
/// [`SchemaAdapterFactory`] for more details.
///
/// * metadata_size_hint: controls the number of bytes read from the end of the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW this is passed on to the reader (custom or builtin) and the reader uses that to gather the metadata. The reader CAN however use another more precise source for this information or not read the metadata from object store at all (e.g. it could use an extra service, a dataset-based source or some sort of cache).

/// file in the initial I/O.
///
/// # Execution Overview
///
/// * Step 1: [`ParquetExec::execute`] is called, returning a [`FileStream`]
/// configured to open parquet files with a [`ParquetOpener`].
///
/// * Step 2: When the stream is polled, the [`ParquetOpener`] is called to open
/// the file.
///
/// * Step 3: The `ParquetOpener` gets the file metadata by reading the footer,
/// and applies any predicates and projections to determine what pages must be
/// read.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets the metadata from the ParquetFileReaderFactory or more specifically the AsyncFileReader that this factory returns. The ParquetOpener doesn't care where the metadata comes from.

///
/// * Step 4: The stream begins reading data, fetching the required pages
/// and incrementally decoding them.
///
/// * Step 5: As each [`RecordBatch]` is read, it may be adapted by a
/// [`SchemaAdapter`] to match the table schema. By default missing columns are
/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`].
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
#[derive(Debug, Clone)]
pub struct ParquetExec {
/// Base configuration for this scan
Expand All @@ -85,9 +157,9 @@ pub struct ParquetExec {
metrics: ExecutionPlanMetricsSet,
/// Optional predicate for row filtering during parquet scan
predicate: Option<Arc<dyn PhysicalExpr>>,
/// Optional predicate for pruning row groups
/// Optional predicate for pruning row groups (derived from `predicate`)
pruning_predicate: Option<Arc<PruningPredicate>>,
/// Optional predicate for pruning pages
/// Optional predicate for pruning pages (derived from `predicate`)
page_pruning_predicate: Option<Arc<PagePruningPredicate>>,
/// Optional hint for the size of the parquet metadata
metadata_size_hint: Option<usize>,
Expand Down Expand Up @@ -642,11 +714,22 @@ fn should_enable_page_index(
.unwrap_or(false)
}

/// Factory of parquet file readers.
/// Interface for creating [`AsyncFileReader`]s to read parquet files.
///
/// This interface is used by [`ParquetOpener`] in order to create readers for
/// parquet files. Implementations of this trait can be used to provide custom
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's "this trait" in this case? I guess you're referring to AsyncFileReader, not ParquetFileReaderFactory here. To avoid confusion and give the user more freedom how/where the implement "pre-cached data, I/O ..." etc., I suggest to start a new paragraph and say:

The combined implementations of [`ParquetFileReaderFactory`] and [`AsyncFileReader`]
can be used to provide custom data access operations such as
pre-cached data, I/O coalescing, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent idea. I did so

/// data access operations such as pre-cached data, I/O coalescing, etc.
///
/// Provides means to implement custom data access interface.
/// [`DefaultParquetFileReaderFactory`] by default returns a
/// [`ParquetObjectReader`].
pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
/// Provides `AsyncFileReader` over parquet file specified in `FileMeta`
/// Provides an `AsyncFileReader` for reading data from a parquet file specified
///
/// # Arguments
/// * partition_index - Index of the partition (for reporting metrics)
/// * file_meta - The file to be read
/// * metadata_size_hint - If specified, the first IO reads this many bytes from the footer
/// * metrics - Execution metrics
fn create_reader(
&self,
partition_index: usize,
Expand All @@ -663,13 +746,20 @@ pub struct DefaultParquetFileReaderFactory {
}

impl DefaultParquetFileReaderFactory {
/// Create a factory.
/// Create a new `DefaultParquetFileReaderFactory`.
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self { store }
}
}

/// Implements [`AsyncFileReader`] for a parquet file in object storage
/// Implements [`AsyncFileReader`] for a parquet file in object storage.
///
/// This implementation uses the [`ParquetObjectReader`] to read data from the
/// object store on demand, as required, tracking the number of bytes read.
///
/// This implementation does not coalesce I/O operations or cache bytes. Such
/// optimizations can be done either at the object store level or by providing a
/// custom implementation of [`ParquetFileReaderFactory`].
pub(crate) struct ParquetFileReader {
file_metrics: ParquetFileMetrics,
inner: ParquetObjectReader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,38 @@ use arrow_schema::{Schema, SchemaRef};
use std::fmt::Debug;
use std::sync::Arc;

/// Factory of schema adapters.
/// Factory for creating [`SchemaAdapter`]
///
/// Provides means to implement custom schema adaptation.
/// This interface provides a way to implement custom schema adaptation logic
/// for ParquetExec (for example, to fill missing columns with default value
/// other than null)
pub trait SchemaAdapterFactory: Debug + Send + Sync + 'static {
/// Provides `SchemaAdapter` for the ParquetExec.
fn create(&self, schema: SchemaRef) -> Box<dyn SchemaAdapter>;
}

/// A utility which can adapt file-level record batches to a table schema which may have a schema
/// Adapt file-level [`RecordBatch`]es to a table schema, which may have a schema
/// obtained from merging multiple file-level schemas.
///
/// This is useful for enabling schema evolution in partitioned datasets.
///
/// This has to be done in two stages.
///
/// 1. Before reading the file, we have to map projected column indexes from the table schema to
/// the file schema.
/// 1. Before reading the file, we have to map projected column indexes from the
/// table schema to the file schema.
///
/// 2. After reading a record batch we need to map the read columns back to the expected columns
/// indexes and insert null-valued columns wherever the file schema was missing a colum present
/// in the table schema.
/// 2. After reading a record batch map the read columns back to the expected
/// columns indexes and insert null-valued columns wherever the file schema was
/// missing a colum present in the table schema.
alamb marked this conversation as resolved.
Show resolved Hide resolved
pub trait SchemaAdapter: Send + Sync {
/// Map a column index in the table schema to a column index in a particular
/// file schema
///
/// Panics if index is not in range for the table schema
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize>;

/// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema.
/// Creates a `SchemaMapping` that can be used to cast or map the columns
/// from the file schema to the table schema.
///
/// If the provided `file_schema` contains columns of a different type to the expected
/// `table_schema`, the method will attempt to cast the array data from the file schema
Expand All @@ -62,7 +65,8 @@ pub trait SchemaAdapter: Send + Sync {
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>;
}

/// Transforms a RecordBatch from Parquet to a RecordBatch that meets the table schema.
/// Transforms a [`RecordBatch`] read from a Parquet file to a [`RecordBatch`]
/// that has the the table schema.
pub trait SchemaMapper: Send + Sync {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;
Expand Down