Skip to content

Commit

Permalink
fix(rust, python): delay opening files in streaming engine (pola-rs#9251
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ritchie46 authored and c-peters committed Jul 14, 2023
1 parent 8d13f7e commit 0d4a81e
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 35 deletions.
64 changes: 45 additions & 19 deletions polars/polars-lazy/polars-pipe/src/executors/sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,23 @@ pub(crate) struct CsvSource {
#[allow(dead_code)]
// this exist because we need to keep ownership
schema: SchemaRef,
reader: *mut CsvReader<'static, File>,
batched_reader: Either<*mut BatchedCsvReaderMmap<'static>, *mut BatchedCsvReaderRead<'static>>,
reader: Option<*mut CsvReader<'static, File>>,
batched_reader:
Option<Either<*mut BatchedCsvReaderMmap<'static>, *mut BatchedCsvReaderRead<'static>>>,
n_threads: usize,
chunk_index: IdxSize,
path: Option<PathBuf>,
options: Option<CsvParserOptions>,
verbose: bool,
}

impl CsvSource {
pub(crate) fn new(
path: PathBuf,
schema: SchemaRef,
options: CsvParserOptions,
verbose: bool,
) -> PolarsResult<Self> {
// Delay initializing the reader
// otherwise all files would be opened during construction of the pipeline
// leading to Too many Open files error
fn init_reader(&mut self) -> PolarsResult<()> {
let options = self.options.take().unwrap();
let path = self.path.take().unwrap();
let mut with_columns = options.with_columns;
let mut projected_len = 0;
with_columns.as_ref().map(|columns| {
Expand All @@ -38,25 +42,25 @@ impl CsvSource {
if projected_len == 0 {
with_columns = None;
}
let n_rows = _set_n_rows_for_scan(options.n_rows);

let n_cols = if projected_len > 0 {
projected_len
} else {
schema.len()
self.schema.len()
};
let n_rows = _set_n_rows_for_scan(options.n_rows);
// inversely scale the chunk size by the number of threads so that we reduce memory pressure
// in streaming
let chunk_size = determine_chunk_size(n_cols, POOL.current_num_threads())?;

if verbose {
if self.verbose {
eprintln!("STREAMING CHUNK SIZE: {chunk_size} rows")
}

let reader = CsvReader::from_path(&path)
.unwrap()
.has_header(options.has_header)
.with_schema(schema.clone())
.with_schema(self.schema.clone())
.with_delimiter(options.delimiter)
.with_ignore_errors(options.ignore_errors)
.with_skip_rows(options.skip_rows)
Expand All @@ -69,7 +73,8 @@ impl CsvSource {
.with_quote_char(options.quote_char)
.with_end_of_line_char(options.eol_char)
.with_encoding(options.encoding)
.with_rechunk(options.rechunk)
// never rechunk in streaming
.with_rechunk(false)
.with_chunk_size(chunk_size)
.with_row_count(options.row_count)
.with_try_parse_dates(options.try_parse_dates);
Expand All @@ -86,13 +91,26 @@ impl CsvSource {
let batched_reader = Box::leak(batched_reader) as *mut BatchedCsvReaderMmap;
Either::Left(batched_reader)
};
self.reader = Some(reader);
self.batched_reader = Some(batched_reader);
Ok(())
}

pub(crate) fn new(
path: PathBuf,
schema: SchemaRef,
options: CsvParserOptions,
verbose: bool,
) -> PolarsResult<Self> {
Ok(CsvSource {
schema,
reader,
batched_reader,
reader: None,
batched_reader: None,
n_threads: POOL.current_num_threads(),
chunk_index: 0,
path: Some(path),
options: Some(options),
verbose,
})
}
}
Expand All @@ -101,14 +119,18 @@ impl Drop for CsvSource {
fn drop(&mut self) {
unsafe {
match self.batched_reader {
Either::Left(ptr) => {
Some(Either::Left(ptr)) => {
let _to_drop = Box::from_raw(ptr);
}
Either::Right(ptr) => {
Some(Either::Right(ptr)) => {
let _to_drop = Box::from_raw(ptr);
}
// nothing initialized, nothing to drop
_ => {}
}
if let Some(ptr) = self.reader {
let _to_drop = Box::from_raw(ptr);
}
let _to_drop = Box::from_raw(self.reader);
};
}
}
Expand All @@ -118,7 +140,11 @@ unsafe impl Sync for CsvSource {}

impl Source for CsvSource {
fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult<SourceResult> {
let batches = match self.batched_reader {
if self.reader.is_none() {
self.init_reader()?
}

let batches = match self.batched_reader.unwrap() {
Either::Left(batched_reader) => {
let reader = unsafe { &mut *batched_reader };

Expand Down
59 changes: 44 additions & 15 deletions polars/polars-lazy/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,25 @@ use crate::operators::{DataChunk, PExecutionContext, Source, SourceResult};
use crate::pipeline::determine_chunk_size;

pub struct ParquetSource {
batched_reader: BatchedParquetReader,
batched_reader: Option<BatchedParquetReader>,
n_threads: usize,
chunk_index: IdxSize,
path: Option<PathBuf>,
options: Option<ParquetOptions>,
#[allow(dead_code)]
cloud_options: Option<CloudOptions>,
schema: Option<SchemaRef>,
verbose: bool,
}

impl ParquetSource {
#[allow(unused_variables)]
pub(crate) fn new(
path: PathBuf,
options: ParquetOptions,
cloud_options: Option<CloudOptions>,
schema: &Schema,
verbose: bool,
) -> PolarsResult<Self> {
// Delay initializing the reader
// otherwise all files would be opened during construction of the pipeline
// leading to Too many Open files error
fn init_reader(&mut self) -> PolarsResult<()> {
let path = self.path.take().unwrap();
let options = self.options.take().unwrap();
let schema = self.schema.take().unwrap();
let projection: Option<Vec<_>> = options.with_columns.map(|with_columns| {
with_columns
.iter()
Expand All @@ -37,10 +42,9 @@ impl ParquetSource {
});

let n_cols = projection.as_ref().map(|v| v.len()).unwrap_or(schema.len());
let n_threads = POOL.current_num_threads();
let chunk_size = determine_chunk_size(n_cols, n_threads)?;
let chunk_size = determine_chunk_size(n_cols, self.n_threads)?;

if verbose {
if self.verbose {
eprintln!("STREAMING CHUNK SIZE: {chunk_size} rows")
}

Expand All @@ -54,7 +58,7 @@ impl ParquetSource {
#[cfg(feature = "async")]
{
let uri = path.to_string_lossy();
ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref())?
ParquetAsyncReader::from_uri(&uri, self.cloud_options.as_ref())?
.with_n_rows(options.n_rows)
.with_row_count(options.row_count)
.with_projection(projection)
Expand All @@ -71,18 +75,43 @@ impl ParquetSource {
.use_statistics(options.use_statistics)
.batched(chunk_size)?
};
self.batched_reader = Some(batched_reader);
Ok(())
}

#[allow(unused_variables)]
pub(crate) fn new(
path: PathBuf,
options: ParquetOptions,
cloud_options: Option<CloudOptions>,
schema: SchemaRef,
verbose: bool,
) -> PolarsResult<Self> {
let n_threads = POOL.current_num_threads();

Ok(ParquetSource {
batched_reader,
batched_reader: None,
n_threads,
chunk_index: 0,
options: Some(options),
path: Some(path),
cloud_options,
schema: Some(schema),
verbose,
})
}
}

impl Source for ParquetSource {
fn get_batches(&mut self, _context: &PExecutionContext) -> PolarsResult<SourceResult> {
let batches = self.batched_reader.next_batches(self.n_threads)?;
if self.batched_reader.is_none() {
self.init_reader()?;
}
let batches = self
.batched_reader
.as_mut()
.unwrap()
.next_batches(self.n_threads)?;
Ok(match batches {
None => SourceResult::Finished,
Some(batches) => SourceResult::GotMoreData(
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/polars-pipe/src/pipeline/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ where
path,
options,
cloud_options,
&file_info.schema,
file_info.schema,
verbose,
)?;
Ok(Box::new(src) as Box<dyn Source>)
Expand Down

0 comments on commit 0d4a81e

Please sign in to comment.