From 8607fd09b727bd0e5fad5d32a5d699d40ce36bfc Mon Sep 17 00:00:00 2001 From: ritchie Date: Fri, 15 Mar 2024 09:35:45 +0100 Subject: [PATCH 1/2] feat: Async parquet: Decode parquet on a blocking thread pool --- crates/polars-io/src/parquet/read_impl.rs | 88 +++++++++++++++++------ 1 file changed, 67 insertions(+), 21 deletions(-) diff --git a/crates/polars-io/src/parquet/read_impl.rs b/crates/polars-io/src/parquet/read_impl.rs index 101caab5f6ad..2093514fe371 100644 --- a/crates/polars-io/src/parquet/read_impl.rs +++ b/crates/polars-io/src/parquet/read_impl.rs @@ -518,7 +518,7 @@ pub struct BatchedParquetReader { #[allow(dead_code)] row_group_fetcher: RowGroupFetcher, limit: usize, - projection: Vec, + projection: Arc<[usize]>, schema: ArrowSchemaRef, metadata: FileMetaDataRef, predicate: Option>, @@ -530,7 +530,7 @@ pub struct BatchedParquetReader { parallel: ParallelStrategy, chunk_size: usize, use_statistics: bool, - hive_partition_columns: Option>, + hive_partition_columns: Option>, /// Has returned at least one materialized frame. has_returned: bool, } @@ -551,7 +551,9 @@ impl BatchedParquetReader { mut parallel: ParallelStrategy, ) -> PolarsResult { let n_row_groups = metadata.row_groups.len(); - let projection = projection.unwrap_or_else(|| (0usize..schema.len()).collect::>()); + let projection = projection + .map(Arc::from) + .unwrap_or_else(|| (0usize..schema.len()).collect::>()); parallel = match parallel { ParallelStrategy::Auto => { @@ -583,7 +585,7 @@ impl BatchedParquetReader { parallel, chunk_size, use_statistics, - hive_partition_columns, + hive_partition_columns: hive_partition_columns.map(Arc::from), has_returned: false, }) } @@ -632,21 +634,65 @@ impl BatchedParquetReader { .fetch_row_groups(row_group_start..row_group_end) .await?; - let dfs = rg_to_dfs( - &store, - &mut self.rows_read, - row_group_start, - row_group_end, - &mut self.limit, - &self.metadata, - &self.schema, - self.predicate.as_deref(), - self.row_index.clone(), - self.parallel, - &self.projection, - self.use_statistics, - self.hive_partition_columns.as_deref(), - )?; + let dfs = match store { + ColumnStore::Local(_) => rg_to_dfs( + &store, + &mut self.rows_read, + row_group_start, + row_group_end, + &mut self.limit, + &self.metadata, + &self.schema, + self.predicate.as_deref(), + self.row_index.clone(), + self.parallel, + &self.projection, + self.use_statistics, + self.hive_partition_columns.as_deref(), + ), + ColumnStore::Fetched(b) => { + // This branch we spawn th decoding and decompression of the bytes on a rayon task. + // This will ensure we don't block the aysnc thread. + + // Reconstruct as that makes it a static. + let store = ColumnStore::Fetched(b); + let (tx, rx) = tokio::sync::oneshot::channel(); + + // Make everything 'static. + let mut rows_read = self.rows_read; + let mut limit = self.limit; + let row_index = self.row_index.clone(); + let predicate = self.predicate.clone(); + let schema = self.schema.clone(); + let metadata = self.metadata.clone(); + let parallel = self.parallel; + let projection = self.projection.clone(); + let use_statistics = self.use_statistics; + let hive_partition_columns = self.hive_partition_columns.clone(); + POOL.spawn(move || { + let dfs = rg_to_dfs( + &store, + &mut rows_read, + row_group_start, + row_group_end, + &mut limit, + &metadata, + &schema, + predicate.as_deref(), + row_index, + parallel, + &projection, + use_statistics, + hive_partition_columns.as_deref(), + ); + tx.send((dfs, rows_read, limit)).unwrap(); + }); + let (dfs, rows_read, limit) = rx.await.unwrap(); + self.rows_read = rows_read; + self.limit = limit; + dfs + }, + }?; self.row_group_offset += n; @@ -681,8 +727,8 @@ impl BatchedParquetReader { if self.chunks_fifo.is_empty() { if skipped_all_rgs { Ok(Some(vec![materialize_empty_df( - Some(self.projection.as_slice()), - self.schema(), + Some(self.projection.as_ref()), + &self.schema, self.hive_partition_columns.as_deref(), self.row_index.as_ref(), )])) From d21b256cad5bebb952127cc223b52736535c0ca8 Mon Sep 17 00:00:00 2001 From: ritchie Date: Fri, 15 Mar 2024 09:44:32 +0100 Subject: [PATCH 2/2] features --- crates/polars-io/src/parquet/read_impl.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/polars-io/src/parquet/read_impl.rs b/crates/polars-io/src/parquet/read_impl.rs index 2093514fe371..096fa9708c7a 100644 --- a/crates/polars-io/src/parquet/read_impl.rs +++ b/crates/polars-io/src/parquet/read_impl.rs @@ -650,11 +650,12 @@ impl BatchedParquetReader { self.use_statistics, self.hive_partition_columns.as_deref(), ), + #[cfg(feature = "async")] ColumnStore::Fetched(b) => { - // This branch we spawn th decoding and decompression of the bytes on a rayon task. - // This will ensure we don't block the aysnc thread. + // This branch we spawn the decoding and decompression of the bytes on a rayon task. + // This will ensure we don't block the async thread. - // Reconstruct as that makes it a static. + // Reconstruct as that makes it a 'static. let store = ColumnStore::Fetched(b); let (tx, rx) = tokio::sync::oneshot::channel();