Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table Scan: Add Row Group Skipping #558

Merged
merged 9 commits into from
Aug 29, 2024
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ once_cell = "1"
opendal = "0.49"
ordered-float = "4"
parquet = "52"
paste = "1"
pilota = "0.11.2"
pretty_assertions = "1.4"
port_scanner = "0.1.5"
rand = "0.8"
regex = "1.10.5"
reqwest = { version = "0.12", default-features = false, features = ["json"] }
rust_decimal = "1.31"
Expand Down
2 changes: 2 additions & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ once_cell = { workspace = true }
opendal = { workspace = true }
ordered-float = { workspace = true }
parquet = { workspace = true, features = ["async"] }
paste = { workspace = true }
reqwest = { workspace = true }
rust_decimal = { workspace = true }
serde = { workspace = true }
Expand All @@ -84,5 +85,6 @@ ctor = { workspace = true }
iceberg-catalog-memory = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
pretty_assertions = { workspace = true }
rand = { workspace = true }
tempfile = { workspace = true }
tera = { workspace = true }
210 changes: 145 additions & 65 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::str::FromStr;
use std::sync::Arc;

use arrow_arith::boolean::{and, is_not_null, is_null, not, or};
use arrow_array::{ArrayRef, BooleanArray, RecordBatch};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
use arrow_string::like::starts_with;
Expand All @@ -32,7 +32,7 @@ use fnv::FnvHashSet;
use futures::channel::mpsc::{channel, Sender};
use futures::future::BoxFuture;
use futures::{try_join, SinkExt, StreamExt, TryFutureExt, TryStreamExt};
use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
use parquet::arrow::arrow_reader::{ArrowPredicateFn, ArrowReaderOptions, RowFilter};
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
use parquet::file::metadata::ParquetMetaData;
Expand All @@ -41,6 +41,7 @@ use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::error::Result;
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::runtime::spawn;
Expand All @@ -54,6 +55,7 @@ pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
file_io: FileIO,
concurrency_limit_data_files: usize,
row_group_filtering_enabled: bool,
}

impl ArrowReaderBuilder {
Expand All @@ -65,13 +67,13 @@ impl ArrowReaderBuilder {
batch_size: None,
file_io,
concurrency_limit_data_files: num_cpus,
row_group_filtering_enabled: true,
}
}

/// Sets the max number of in flight data files that are being fetched
pub fn with_data_file_concurrency_limit(mut self, val: usize) -> Self {
self.concurrency_limit_data_files = val;

self
}

Expand All @@ -82,12 +84,19 @@ impl ArrowReaderBuilder {
self
}

/// Determines whether to enable row group filtering.
pub fn with_row_group_filtering_enabled(mut self, row_group_filtering_enabled: bool) -> Self {
self.row_group_filtering_enabled = row_group_filtering_enabled;
self
}

/// Build the ArrowReader.
pub fn build(self) -> ArrowReader {
ArrowReader {
batch_size: self.batch_size,
file_io: self.file_io,
concurrency_limit_data_files: self.concurrency_limit_data_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
}
}
}
Expand All @@ -100,6 +109,8 @@ pub struct ArrowReader {

/// the maximum number of data files that can be fetched at the same time
concurrency_limit_data_files: usize,

row_group_filtering_enabled: bool,
}

impl ArrowReader {
Expand All @@ -109,6 +120,7 @@ impl ArrowReader {
let file_io = self.file_io.clone();
let batch_size = self.batch_size;
let concurrency_limit_data_files = self.concurrency_limit_data_files;
let row_group_filtering_enabled = self.row_group_filtering_enabled;

let (tx, rx) = channel(concurrency_limit_data_files);
let mut channel_for_error = tx.clone();
Expand All @@ -124,8 +136,14 @@ impl ArrowReader {
let file_path = task.data_file_path().to_string();

spawn(async move {
Self::process_file_scan_task(task, batch_size, file_io, tx)
.await
Self::process_file_scan_task(
task,
batch_size,
file_io,
tx,
row_group_filtering_enabled,
)
.await
})
.await
.map_err(|e| e.with_context("file_path", file_path))
Expand All @@ -149,55 +167,95 @@ impl ArrowReader {
batch_size: Option<usize>,
file_io: FileIO,
mut tx: Sender<Result<RecordBatch>>,
row_group_filtering_enabled: bool,
) -> Result<()> {
// Collect Parquet column indices from field ids
let mut collector = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};

if let Some(predicates) = task.predicate() {
visit(&mut collector, predicates)?;
}

// Get the metadata for the Parquet file we need to read and build
// a reader for the data within
let parquet_file = file_io.new_input(task.data_file_path())?;

let (parquet_metadata, parquet_reader) =
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
// Start creating the record batch stream, which wraps the parquet file reader
let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(
parquet_file_reader,
// Page index will be required in upcoming row selection PR
ArrowReaderOptions::new().with_page_index(false),
)
.await?;

let parquet_schema = batch_stream_builder.parquet_schema();
let arrow_schema = batch_stream_builder.schema();
// Create a projection mask for the batch stream to select which columns in the
// Parquet file that we want in the response
let projection_mask = Self::get_arrow_projection_mask(
task.project_field_ids(),
task.schema(),
parquet_schema,
arrow_schema,
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
)?;
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);

let parquet_schema = batch_stream_builder.parquet_schema();
let row_filter = Self::get_row_filter(task.predicate(), parquet_schema, &collector)?;

if let Some(row_filter) = row_filter {
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
}
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);

if let Some(batch_size) = batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
}

let mut batch_stream = batch_stream_builder.build()?;
if let Some(predicate) = task.predicate() {
let (iceberg_field_ids, field_id_map) = Self::build_field_id_set_and_map(
record_batch_stream_builder.parquet_schema(),
predicate,
)?;

let row_filter = Self::get_row_filter(
predicate,
record_batch_stream_builder.parquet_schema(),
&iceberg_field_ids,
&field_id_map,
)?;
record_batch_stream_builder = record_batch_stream_builder.with_row_filter(row_filter);

let mut selected_row_groups = None;
if row_group_filtering_enabled {
let result = Self::get_selected_row_group_indices(
predicate,
record_batch_stream_builder.metadata(),
&field_id_map,
task.schema(),
)?;

selected_row_groups = Some(result);
}

if let Some(selected_row_groups) = selected_row_groups {
record_batch_stream_builder =
record_batch_stream_builder.with_row_groups(selected_row_groups);
}
}

while let Some(batch) = batch_stream.try_next().await? {
// Build the batch stream and send all the RecordBatches that it generates
// to the requester.
let mut record_batch_stream = record_batch_stream_builder.build()?;
while let Some(batch) = record_batch_stream.try_next().await? {
tx.send(Ok(batch)).await?
}

Ok(())
}

fn build_field_id_set_and_map(
parquet_schema: &SchemaDescriptor,
predicate: &BoundPredicate,
) -> Result<(HashSet<i32>, HashMap<i32, usize>)> {
// Collects all Iceberg field IDs referenced in the filter predicate
let mut collector = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};
visit(&mut collector, predicate)?;

let iceberg_field_ids = collector.field_ids();
let field_id_map = build_field_id_map(parquet_schema)?;

Ok((iceberg_field_ids, field_id_map))
}

fn get_arrow_projection_mask(
field_ids: &[i32],
iceberg_schema_of_task: &Schema,
Expand Down Expand Up @@ -269,43 +327,59 @@ impl ArrowReader {
}

fn get_row_filter(
predicates: Option<&BoundPredicate>,
predicates: &BoundPredicate,
parquet_schema: &SchemaDescriptor,
collector: &CollectFieldIdVisitor,
) -> Result<Option<RowFilter>> {
if let Some(predicates) = predicates {
let field_id_map = build_field_id_map(parquet_schema)?;

// Collect Parquet column indices from field ids.
// If the field id is not found in Parquet schema, it will be ignored due to schema evolution.
let mut column_indices = collector
.field_ids
.iter()
.filter_map(|field_id| field_id_map.get(field_id).cloned())
.collect::<Vec<_>>();

column_indices.sort();

// The converter that converts `BoundPredicates` to `ArrowPredicates`
let mut converter = PredicateConverter {
parquet_schema,
column_map: &field_id_map,
column_indices: &column_indices,
};

// After collecting required leaf column indices used in the predicate,
// creates the projection mask for the Arrow predicates.
let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
let predicate_func = visit(&mut converter, predicates)?;
let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
Ok(Some(RowFilter::new(vec![Box::new(arrow_predicate)])))
} else {
Ok(None)
iceberg_field_ids: &HashSet<i32>,
field_id_map: &HashMap<i32, usize>,
) -> Result<RowFilter> {
// Collect Parquet column indices from field ids.
// If the field id is not found in Parquet schema, it will be ignored due to schema evolution.
let mut column_indices = iceberg_field_ids
.iter()
.filter_map(|field_id| field_id_map.get(field_id).cloned())
.collect::<Vec<_>>();
column_indices.sort();

// The converter that converts `BoundPredicates` to `ArrowPredicates`
let mut converter = PredicateConverter {
parquet_schema,
column_map: field_id_map,
column_indices: &column_indices,
};

// After collecting required leaf column indices used in the predicate,
// creates the projection mask for the Arrow predicates.
let projection_mask = ProjectionMask::leaves(parquet_schema, column_indices.clone());
let predicate_func = visit(&mut converter, predicates)?;
let arrow_predicate = ArrowPredicateFn::new(projection_mask, predicate_func);
Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
}

fn get_selected_row_group_indices(
predicate: &BoundPredicate,
parquet_metadata: &Arc<ParquetMetaData>,
field_id_map: &HashMap<i32, usize>,
snapshot_schema: &Schema,
) -> Result<Vec<usize>> {
let row_groups_metadata = parquet_metadata.row_groups();
let mut results = Vec::with_capacity(row_groups_metadata.len());

for (idx, row_group_metadata) in row_groups_metadata.iter().enumerate() {
if RowGroupMetricsEvaluator::eval(
predicate,
row_group_metadata,
field_id_map,
snapshot_schema,
)? {
results.push(idx);
}
}

Ok(results)
}
}

/// Build the map of field id to Parquet column index in the schema.
/// Build the map of parquet field id to Parquet column index in the schema.
fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result<HashMap<i32, usize>> {
let mut column_map = HashMap::new();
for (idx, field) in parquet_schema.columns().iter().enumerate() {
Expand Down Expand Up @@ -345,6 +419,12 @@ struct CollectFieldIdVisitor {
field_ids: HashSet<i32>,
}

impl CollectFieldIdVisitor {
fn field_ids(self) -> HashSet<i32> {
self.field_ids
}
}

impl BoundPredicateVisitor for CollectFieldIdVisitor {
type T = ();

Expand Down
Loading
Loading