Skip to content

Commit

Permalink
refactor: remove concurrency when processing individual ManifestEntri…
Browse files Browse the repository at this point in the history
…es in a Manifest
  • Loading branch information
sdd committed Jun 6, 2024
1 parent e135b3a commit 361ec59
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 159 deletions.
155 changes: 36 additions & 119 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::scan::FileScanTask;
use crate::scan::{ArrowRecordBatchStream, FileScanTaskStream};
use crate::spec::{Datum, SchemaRef};
use crate::Result;
Expand All @@ -30,13 +29,12 @@ use arrow_arith::boolean::{and, is_not_null, is_null, not, or};
use arrow_array::{ArrayRef, BooleanArray, RecordBatch};
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
use arrow_schema::{ArrowError, DataType, SchemaRef as ArrowSchemaRef};
use async_stream::try_stream;
use bytes::Bytes;
use fnv::FnvHashSet;
use futures::channel::mpsc::{channel, Sender};
use futures::future::BoxFuture;
use futures::stream::StreamExt;
use futures::{try_join, TryFutureExt};
use futures::{SinkExt, TryStreamExt};
use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
use parquet::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask, PARQUET_FIELD_ID_META_KEY};
Expand All @@ -46,15 +44,11 @@ use std::collections::{HashMap, HashSet};
use std::ops::Range;
use std::str::FromStr;
use std::sync::Arc;
use tokio::spawn;

const CHANNEL_BUFFER_SIZE: usize = 10;
const CONCURRENCY_LIMIT_TASKS: usize = 10;

/// Builder to create ArrowReader
pub struct ArrowReaderBuilder {
batch_size: Option<usize>,
field_ids: Arc<Vec<usize>>,
field_ids: Vec<usize>,
file_io: FileIO,
schema: SchemaRef,
predicate: Option<BoundPredicate>,
Expand All @@ -65,7 +59,7 @@ impl ArrowReaderBuilder {
pub fn new(file_io: FileIO, schema: SchemaRef) -> Self {
ArrowReaderBuilder {
batch_size: None,
field_ids: Arc::new(vec![]),
field_ids: vec![],
file_io,
schema,
predicate: None,
Expand All @@ -81,10 +75,7 @@ impl ArrowReaderBuilder {

/// Sets the desired column projection with a list of field ids.
pub fn with_field_ids(mut self, field_ids: impl IntoIterator<Item = usize>) -> Self {
let field_ids = field_ids.into_iter().collect();
let field_ids_arc = Arc::new(field_ids);
self.field_ids = field_ids_arc;

self.field_ids = field_ids.into_iter().collect();
self
}

Expand All @@ -109,7 +100,7 @@ impl ArrowReaderBuilder {
/// Reads data from Parquet files
pub struct ArrowReader {
batch_size: Option<usize>,
field_ids: Arc<Vec<usize>>,
field_ids: Vec<usize>,
#[allow(dead_code)]
schema: SchemaRef,
file_io: FileIO,
Expand All @@ -119,9 +110,7 @@ pub struct ArrowReader {
impl ArrowReader {
/// Take a stream of FileScanTasks and reads all the files.
/// Returns a stream of Arrow RecordBatches containing the data from the files
pub fn read(self, tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);

pub fn read(self, mut tasks: FileScanTaskStream) -> Result<ArrowRecordBatchStream> {
// Collect Parquet column indices from field ids
let mut collector = CollectFieldIdVisitor {
field_ids: HashSet::default(),
Expand All @@ -130,116 +119,44 @@ impl ArrowReader {
visit(&mut collector, predicates)?;
}

let tasks = tasks.map(move |task| self.build_file_scan_task_context(task, sender.clone()));

spawn(async move {
tasks
.try_for_each_concurrent(CONCURRENCY_LIMIT_TASKS, Self::process_file_scan_task)
.await
});

Ok(receiver.boxed())
}

fn build_file_scan_task_context(
&self,
task: Result<FileScanTask>,
sender: Sender<Result<RecordBatch>>,
) -> Result<FileScanTaskContext> {
Ok(FileScanTaskContext::new(
task?,
self.file_io.clone(),
sender,
self.batch_size,
self.field_ids.clone(),
self.schema.clone(),
self.predicate.clone(),
))
}

async fn process_file_scan_task(mut context: FileScanTaskContext) -> Result<()> {
let file_scan_task = context.take_task();

// Collect Parquet column indices from field ids
let mut collector = CollectFieldIdVisitor {
field_ids: HashSet::default(),
};
if let Some(predicate) = &context.predicate {
visit(&mut collector, predicate)?;
}

let parquet_file = context
.file_io
.new_input(file_scan_task.data().data_file().file_path())?;
let (parquet_metadata, parquet_reader) =
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;
Ok(try_stream! {
while let Some(Ok(task)) = tasks.next().await {

let parquet_schema = batch_stream_builder.parquet_schema();
let arrow_schema = batch_stream_builder.schema();
let parquet_file = self
.file_io
.new_input(task.data().data_file().file_path())?;
let (parquet_metadata, parquet_reader) =
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let arrow_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let projection_mask = context.get_arrow_projection_mask(parquet_schema, arrow_schema)?;
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);
let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(arrow_file_reader).await?;

let parquet_schema = batch_stream_builder.parquet_schema();
let row_filter = context.get_row_filter(parquet_schema, &collector)?;

if let Some(row_filter) = row_filter {
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
}
let parquet_schema = batch_stream_builder.parquet_schema();
let arrow_schema = batch_stream_builder.schema();

if let Some(batch_size) = context.batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
}
let projection_mask = self.get_arrow_projection_mask(parquet_schema, arrow_schema)?;
batch_stream_builder = batch_stream_builder.with_projection(projection_mask);

let mut batch_stream = batch_stream_builder.build()?;
let parquet_schema = batch_stream_builder.parquet_schema();
let row_filter = self.get_row_filter(parquet_schema, &collector)?;

while let Some(batch) = batch_stream.next().await {
context.sender.send(Ok(batch?)).await?;
}
if let Some(row_filter) = row_filter {
batch_stream_builder = batch_stream_builder.with_row_filter(row_filter);
}

Ok(())
}
}
if let Some(batch_size) = self.batch_size {
batch_stream_builder = batch_stream_builder.with_batch_size(batch_size);
}

struct FileScanTaskContext {
file_scan_task: Option<FileScanTask>,
file_io: FileIO,
sender: Sender<Result<RecordBatch>>,
batch_size: Option<usize>,
field_ids: Arc<Vec<usize>>,
schema: SchemaRef,
predicate: Option<BoundPredicate>,
}
let mut batch_stream = batch_stream_builder.build()?;

impl FileScanTaskContext {
fn new(
file_scan_task: FileScanTask,
file_io: FileIO,
sender: Sender<Result<RecordBatch>>,
batch_size: Option<usize>,
field_ids: Arc<Vec<usize>>,
schema: SchemaRef,
predicate: Option<BoundPredicate>,
) -> Self {
FileScanTaskContext {
file_scan_task: Some(file_scan_task),
file_io,
sender,
batch_size,
field_ids,
schema,
predicate,
while let Some(batch) = batch_stream.next().await {
yield batch?;
}
}
}
}

fn take_task(&mut self) -> FileScanTask {
let mut result = None;
std::mem::swap(&mut self.file_scan_task, &mut result);
result.unwrap()
.boxed())
}

fn get_arrow_projection_mask(
Expand Down Expand Up @@ -297,8 +214,8 @@ impl FileScanTaskContext {
}

let mut indices = vec![];
for field_id in self.field_ids.as_ref() {
if let Some(col_idx) = column_map.get(&(*field_id as i32)) {
for &field_id in &self.field_ids {
if let Some(col_idx) = column_map.get(&(field_id as i32)) {
indices.push(*col_idx);
} else {
return Err(Error::new(
Expand Down
74 changes: 34 additions & 40 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use tokio::spawn;

const CHANNEL_BUFFER_SIZE: usize = 10;
const CONCURRENCY_LIMIT_MANIFEST_FILES: usize = 10;
const CONCURRENCY_LIMIT_MANIFEST_ENTRIES: usize = 10;
const SUPPORTED_MANIFEST_FILE_CONTENT_TYPES: [ManifestContentType; 1] = [ManifestContentType::Data];

/// A stream of [`FileScanTask`].
Expand Down Expand Up @@ -212,13 +211,13 @@ impl TableScan {
self.case_sensitive,
)?;

let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);

let manifest_list = context
.snapshot
.load_manifest_list(&context.file_io, &context.table_metadata)
.await?;

let (sender, receiver) = channel(CHANNEL_BUFFER_SIZE);

spawn(async move {
let _ = ConcurrentFileScanStreamContext::new(context, sender)
.run(manifest_list)
Expand Down Expand Up @@ -391,56 +390,51 @@ impl ConcurrentFileScanStreamContext {
Arc<FileScanStreamContext>,
),
) -> Result<()> {
let (manifest_file, file_io, sender, context) = manifest_and_file_io_and_sender;
let (manifest_file, file_io, mut sender, context) = manifest_and_file_io_and_sender;

let manifest = manifest_file.load_manifest(&file_io).await?;
for manifest_entry in manifest.entries() {
if !manifest_entry.is_alive() {
continue;
}

let manifest_entries = manifest
.entries()
.iter()
.filter(|x| x.is_alive())
.map(|manifest_entry| Ok((manifest_entry, sender.clone(), context.clone())));
Self::reject_unsupported_manifest_entry_content_types(manifest_entry)?;

futures::stream::iter(manifest_entries)
.try_for_each_concurrent(
CONCURRENCY_LIMIT_MANIFEST_ENTRIES,
Self::process_manifest_entry,
)
.await
if let Some(bound_predicate) = context.bound_filter() {
// reject any manifest entries whose data file's metrics don't match the filter.
if !InclusiveMetricsEvaluator::eval(
bound_predicate,
manifest_entry.data_file(),
false,
)? {
return Ok(());
}
}

// TODO: Apply ExpressionEvaluator

sender
.send(Ok(Self::manifest_entry_to_file_scan_task(manifest_entry)))
.await?;
}

Ok(())
}

async fn process_manifest_entry(
manifest_entry_and_sender: (
&ManifestEntryRef,
Sender<Result<FileScanTask>>,
Arc<FileScanStreamContext>,
),
fn reject_unsupported_manifest_entry_content_types(
manifest_entry: &ManifestEntryRef,
) -> Result<()> {
let (manifest_entry, mut sender, context) = manifest_entry_and_sender;

if !matches!(manifest_entry.content_type(), DataContentType::Data) {
return Err(Error::new(
if matches!(manifest_entry.content_type(), DataContentType::Data) {
Ok(())
} else {
Err(Error::new(
ErrorKind::FeatureUnsupported,
format!(
"Files of type '{:?}' are not supported yet.",
manifest_entry.content_type()
),
));
}

if let Some(bound_predicate) = context.bound_filter() {
// reject any manifest entries whose data file's metrics don't match the filter.
if !InclusiveMetricsEvaluator::eval(bound_predicate, manifest_entry.data_file(), false)?
{
return Ok(());
}
))
}

// TODO: Apply ExpressionEvaluator

Ok(sender
.send(Ok(Self::manifest_entry_to_file_scan_task(manifest_entry)))
.await?)
}

fn manifest_entry_to_file_scan_task(manifest_entry: &ManifestEntryRef) -> FileScanTask {
Expand Down

0 comments on commit 361ec59

Please sign in to comment.