Skip to content

Commit

Permalink
FileStream: Open next file in parallel while decoding (#5161)
Browse files Browse the repository at this point in the history
* FileStream: Open next file in parallel while decoding

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

* more descriptive method name

* formatting

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
thinkharderdev and alamb authored Feb 7, 2023
1 parent 48732b4 commit 816a0f8
Showing 1 changed file with 67 additions and 19 deletions.
86 changes: 67 additions & 19 deletions datafusion/core/src/physical_plan/file_format/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
//! compliant with the `SendableRecordBatchStream` trait.

use std::collections::VecDeque;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;
Expand Down Expand Up @@ -98,6 +99,11 @@ enum FileStreamState {
partition_values: Vec<ScalarValue>,
/// The reader instance
reader: BoxStream<'static, Result<RecordBatch, ArrowError>>,
/// A [`FileOpenFuture`] for the next file to be processed,
/// and its corresponding partition column values, if any.
/// This allows the next file to be opened in parallel while the
/// current file is read.
next: Option<(FileOpenFuture, Vec<ScalarValue>)>,
},
/// Encountered an error
Error,
Expand Down Expand Up @@ -202,30 +208,39 @@ impl<F: FileOpener> FileStream<F> {
})
}

// Begin opening the next file in parallel while decoding the current file in FileStream.
// Since file opening is mostly IO (and may involve a
// bunch of sequential IO), it can be parallelized with decoding.
fn start_next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> {
let part_file = self.file_iter.pop_front()?;

let file_meta = FileMeta {
object_meta: part_file.object_meta,
range: part_file.range,
extensions: part_file.extensions,
};

Some(
self.file_reader
.open(file_meta)
.map(|future| (future, part_file.partition_values)),
)
}

fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
loop {
match &mut self.state {
FileStreamState::Idle => {
let part_file = match self.file_iter.pop_front() {
Some(file) => file,
None => return Poll::Ready(None),
};

let file_meta = FileMeta {
object_meta: part_file.object_meta,
range: part_file.range,
extensions: part_file.extensions,
};

self.file_stream_metrics.time_opening.start();

match self.file_reader.open(file_meta) {
Ok(future) => {
match self.start_next_file().transpose() {
Ok(Some((future, partition_values))) => {
self.state = FileStreamState::Open {
future,
partition_values: part_file.partition_values,
partition_values,
}
}
Ok(None) => return Poll::Ready(None),
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
Expand All @@ -237,13 +252,34 @@ impl<F: FileOpener> FileStream<F> {
partition_values,
} => match ready!(future.poll_unpin(cx)) {
Ok(reader) => {
let partition_values = mem::take(partition_values);

let next = self.start_next_file().transpose();

self.file_stream_metrics.time_opening.stop();
self.file_stream_metrics.time_scanning_until_data.start();
self.file_stream_metrics.time_scanning_total.start();
self.state = FileStreamState::Scan {
partition_values: std::mem::take(partition_values),
reader,
};

match next {
Ok(Some((next_future, next_partition_values))) => {
self.state = FileStreamState::Scan {
partition_values,
reader,
next: Some((next_future, next_partition_values)),
};
}
Ok(None) => {
self.state = FileStreamState::Scan {
reader,
partition_values,
next: None,
};
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e)));
}
}
}
Err(e) => {
self.state = FileStreamState::Error;
Expand All @@ -253,6 +289,7 @@ impl<F: FileOpener> FileStream<F> {
FileStreamState::Scan {
reader,
partition_values,
next,
} => match ready!(reader.poll_next_unpin(cx)) {
Some(result) => {
self.file_stream_metrics.time_scanning_until_data.stop();
Expand Down Expand Up @@ -287,7 +324,18 @@ impl<F: FileOpener> FileStream<F> {
None => {
self.file_stream_metrics.time_scanning_until_data.stop();
self.file_stream_metrics.time_scanning_total.stop();
self.state = FileStreamState::Idle;

match mem::take(next) {
Some((future, partition_values)) => {
self.file_stream_metrics.time_opening.start();

self.state = FileStreamState::Open {
future,
partition_values,
}
}
None => return Poll::Ready(None),
}
}
},
FileStreamState::Error | FileStreamState::Limit => {
Expand Down

0 comments on commit 816a0f8

Please sign in to comment.