From 361ec590b7105aa609de4b126302ae03cf723651 Mon Sep 17 00:00:00 2001 From: Scott Donnelly Date: Tue, 14 May 2024 20:27:14 +0100 Subject: [PATCH] refactor: remove concurrency when processing individual ManifestEntries in a Manifest --- crates/iceberg/src/arrow/reader.rs | 155 +++++++---------------------- crates/iceberg/src/scan.rs | 74 +++++++------- 2 files changed, 70 insertions(+), 159 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 8eb2dfae3..999fc602d 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -21,7 +21,6 @@ use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; -use crate::scan::FileScanTask; use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream}; use crate::spec::{Datum, SchemaRef}; use crate::Result; @@ -30,13 +29,12 @@ use arrow_arith::boolean::{and, is_not_null, is_null, not, or}; use arrow_array::{ArrayRef, BooleanArray, RecordBatch}; use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq}; use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef}; +use async_stream::try_stream; use bytes::Bytes; use fnv::FnvHashSet; -use futures::channel::mpsc::{channel, Sender}; use futures::future::BoxFuture; use futures::stream::StreamExt; use futures::{try_join, TryFutureExt}; -use futures::{SinkExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter}; use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader}; use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY}; @@ -46,15 +44,11 @@ use std::collections::{HashMap, HashSet}; use std::ops::Range; use std::str::FromStr; use std::sync::Arc; -use tokio::spawn; - -const CHANNEL_BUFFER_SIZE: usize = 10; -const CONCURRENCY_LIMIT_TASKS: usize = 10; /// Builder to create ArrowReader pub struct ArrowReaderBuilder { batch_size: Option, - field_ids: Arc>, + field_ids: Vec, file_io: FileIO, schema: SchemaRef, predicate: Option, @@ -65,7 +59,7 @@ impl ArrowReaderBuilder { pub fn new(file_io: FileIO, schema: SchemaRef) -> Self { ArrowReaderBuilder { batch_size: None, - field_ids: Arc::new(vec![]), + field_ids: vec![], file_io, schema, predicate: None, @@ -81,10 +75,7 @@ impl ArrowReaderBuilder { /// Sets the desired column projection with a list of field ids. pub fn with_field_ids(mut self, field_ids: impl IntoIterator) -> Self { - let field_ids = field_ids.into_iter().collect(); - let field_ids_arc = Arc::new(field_ids); - self.field_ids = field_ids_arc; - + self.field_ids = field_ids.into_iter().collect(); self } @@ -109,7 +100,7 @@ impl ArrowReaderBuilder { /// Reads data from Parquet files pub struct ArrowReader { batch_size: Option, - field_ids: Arc>, + field_ids: Vec, #[allow(dead_code)] schema: SchemaRef, file_io: FileIO, @@ -119,9 +110,7 @@ pub struct ArrowReader { impl ArrowReader { /// Take a stream of FileScanTasks and reads all the files. /// Returns a stream of Arrow RecordBatches containing the data from the files - pub fn read(self, tasks: FileScanTaskStream) -> Result { - let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE); - + pub fn read(self, mut tasks: FileScanTaskStream) -> Result { // Collect Parquet column indices from field ids let mut collector = CollectFieldIdVisitor { field_ids: HashSet::default(), @@ -130,116 +119,44 @@ impl ArrowReader { visit(&mut collector, predicates)?; } - let tasks = tasks.map(move |task| self.build_file_scan_task_context(task, sender.clone())); - - spawn(async move { - tasks - .try_for_each_concurrent(CONCURRENCY_LIMIT_TASKS, Self::process_file_scan_task) - .await - }); - - Ok(receiver.boxed()) - } - - fn build_file_scan_task_context( - &self, - task: Result, - sender: Sender>, - ) -> Result { - Ok(FileScanTaskContext::new( - task?, - self.file_io.clone(), - sender, - self.batch_size, - self.field_ids.clone(), - self.schema.clone(), - self.predicate.clone(), - )) - } - - async fn process_file_scan_task(mut context: FileScanTaskContext) -> Result<()> { - let file_scan_task = context.take_task(); - - // Collect Parquet column indices from field ids - let mut collector = CollectFieldIdVisitor { - field_ids: HashSet::default(), - }; - if let Some(predicate) = &context.predicate { - visit(&mut collector, predicate)?; - } - - let parquet_file = context - .file_io - .new_input(file_scan_task.data().data_file().file_path())?; - let (parquet_metadata, parquet_reader) = - try_join!(parquet_file.metadata(), parquet_file.reader())?; - let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); - - let mut batch_stream_builder = - ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?; + Ok(try_stream! { + while let Some(Ok(task)) = tasks.next().await { - let parquet_schema = batch_stream_builder.parquet_schema(); - let arrow_schema = batch_stream_builder.schema(); + let parquet_file = self + .file_io + .new_input(task.data().data_file().file_path())?; + let (parquet_metadata, parquet_reader) = + try_join!(parquet_file.metadata(), parquet_file.reader())?; + let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader); - let projection_mask = context.get_arrow_projection_mask(parquet_schema, arrow_schema)?; - batch_stream_builder = batch_stream_builder.with_projection(projection_mask); + let mut batch_stream_builder = + ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?; - let parquet_schema = batch_stream_builder.parquet_schema(); - let row_filter = context.get_row_filter(parquet_schema, &collector)?; - - if let Some(row_filter) = row_filter { - batch_stream_builder = batch_stream_builder.with_row_filter(row_filter); - } + let parquet_schema = batch_stream_builder.parquet_schema(); + let arrow_schema = batch_stream_builder.schema(); - if let Some(batch_size) = context.batch_size { - batch_stream_builder = batch_stream_builder.with_batch_size(batch_size); - } + let projection_mask = self.get_arrow_projection_mask(parquet_schema, arrow_schema)?; + batch_stream_builder = batch_stream_builder.with_projection(projection_mask); - let mut batch_stream = batch_stream_builder.build()?; + let parquet_schema = batch_stream_builder.parquet_schema(); + let row_filter = self.get_row_filter(parquet_schema, &collector)?; - while let Some(batch) = batch_stream.next().await { - context.sender.send(Ok(batch?)).await?; - } + if let Some(row_filter) = row_filter { + batch_stream_builder = batch_stream_builder.with_row_filter(row_filter); + } - Ok(()) - } -} + if let Some(batch_size) = self.batch_size { + batch_stream_builder = batch_stream_builder.with_batch_size(batch_size); + } -struct FileScanTaskContext { - file_scan_task: Option, - file_io: FileIO, - sender: Sender>, - batch_size: Option, - field_ids: Arc>, - schema: SchemaRef, - predicate: Option, -} + let mut batch_stream = batch_stream_builder.build()?; -impl FileScanTaskContext { - fn new( - file_scan_task: FileScanTask, - file_io: FileIO, - sender: Sender>, - batch_size: Option, - field_ids: Arc>, - schema: SchemaRef, - predicate: Option, - ) -> Self { - FileScanTaskContext { - file_scan_task: Some(file_scan_task), - file_io, - sender, - batch_size, - field_ids, - schema, - predicate, + while let Some(batch) = batch_stream.next().await { + yield batch?; + } + } } - } - - fn take_task(&mut self) -> FileScanTask { - let mut result = None; - std::mem::swap(&mut self.file_scan_task, &mut result); - result.unwrap() + .boxed()) } fn get_arrow_projection_mask( @@ -297,8 +214,8 @@ impl FileScanTaskContext { } let mut indices = vec![]; - for field_id in self.field_ids.as_ref() { - if let Some(col_idx) = column_map.get(&(*field_id as i32)) { + for &field_id in &self.field_ids { + if let Some(col_idx) = column_map.get(&(field_id as i32)) { indices.push(*col_idx); } else { return Err(Error::new( diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index c90715d4f..ecef4badd 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -40,7 +40,6 @@ use tokio::spawn; const CHANNEL_BUFFER_SIZE: usize = 10; const CONCURRENCY_LIMIT_MANIFEST_FILES: usize = 10; -const CONCURRENCY_LIMIT_MANIFEST_ENTRIES: usize = 10; const SUPPORTED_MANIFEST_FILE_CONTENT_TYPES: [ManifestContentType; 1] = [ManifestContentType::Data]; /// A stream of [`FileScanTask`]. @@ -212,13 +211,13 @@ impl TableScan { self.case_sensitive, )?; - let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE); - let manifest_list = context .snapshot .load_manifest_list(&context.file_io, &context.table_metadata) .await?; + let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE); + spawn(async move { let _ = ConcurrentFileScanStreamContext::new(context, sender) .run(manifest_list) @@ -391,56 +390,51 @@ impl ConcurrentFileScanStreamContext { Arc, ), ) -> Result<()> { - let (manifest_file, file_io, sender, context) = manifest_and_file_io_and_sender; + let (manifest_file, file_io, mut sender, context) = manifest_and_file_io_and_sender; let manifest = manifest_file.load_manifest(&file_io).await?; + for manifest_entry in manifest.entries() { + if !manifest_entry.is_alive() { + continue; + } - let manifest_entries = manifest - .entries() - .iter() - .filter(|x| x.is_alive()) - .map(|manifest_entry| Ok((manifest_entry, sender.clone(), context.clone()))); + Self::reject_unsupported_manifest_entry_content_types(manifest_entry)?; - futures::stream::iter(manifest_entries) - .try_for_each_concurrent( - CONCURRENCY_LIMIT_MANIFEST_ENTRIES, - Self::process_manifest_entry, - ) - .await + if let Some(bound_predicate) = context.bound_filter() { + // reject any manifest entries whose data file's metrics don't match the filter. + if !InclusiveMetricsEvaluator::eval( + bound_predicate, + manifest_entry.data_file(), + false, + )? { + return Ok(()); + } + } + + // TODO: Apply ExpressionEvaluator + + sender + .send(Ok(Self::manifest_entry_to_file_scan_task(manifest_entry))) + .await?; + } + + Ok(()) } - async fn process_manifest_entry( - manifest_entry_and_sender: ( - &ManifestEntryRef, - Sender>, - Arc, - ), + fn reject_unsupported_manifest_entry_content_types( + manifest_entry: &ManifestEntryRef, ) -> Result<()> { - let (manifest_entry, mut sender, context) = manifest_entry_and_sender; - - if !matches!(manifest_entry.content_type(), DataContentType::Data) { - return Err(Error::new( + if matches!(manifest_entry.content_type(), DataContentType::Data) { + Ok(()) + } else { + Err(Error::new( ErrorKind::FeatureUnsupported, format!( "Files of type '{:?}' are not supported yet.", manifest_entry.content_type() ), - )); - } - - if let Some(bound_predicate) = context.bound_filter() { - // reject any manifest entries whose data file's metrics don't match the filter. - if !InclusiveMetricsEvaluator::eval(bound_predicate, manifest_entry.data_file(), false)? - { - return Ok(()); - } + )) } - - // TODO: Apply ExpressionEvaluator - - Ok(sender - .send(Ok(Self::manifest_entry_to_file_scan_task(manifest_entry))) - .await?) } fn manifest_entry_to_file_scan_task(manifest_entry: &ManifestEntryRef) -> FileScanTask {