From 511d996bc958adaa7e71fd3a70073d76cdc18642 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Fri, 20 Dec 2024 15:15:29 +0800 Subject: [PATCH] Implement skeleton logic with proper module visibility --- src/daft-scan/src/scan_task_iters/mod.rs | 29 ++++++++---- .../split_parquet/fetch_parquet_metadata.rs | 46 +++++++++++++++++++ .../src/scan_task_iters/split_parquet/mod.rs | 44 ++++++++++++++++++ .../split_parquet/split_parquet_decision.rs | 27 +++++++++++ .../split_parquet/split_parquet_file.rs | 13 ++++++ 5 files changed, 150 insertions(+), 9 deletions(-) create mode 100644 src/daft-scan/src/scan_task_iters/split_parquet/fetch_parquet_metadata.rs create mode 100644 src/daft-scan/src/scan_task_iters/split_parquet/mod.rs create mode 100644 src/daft-scan/src/scan_task_iters/split_parquet/split_parquet_decision.rs create mode 100644 src/daft-scan/src/scan_task_iters/split_parquet/split_parquet_file.rs diff --git a/src/daft-scan/src/scan_task_iters/mod.rs b/src/daft-scan/src/scan_task_iters/mod.rs index b4e4d6e3c3..4201a2d9a5 100644 --- a/src/daft-scan/src/scan_task_iters/mod.rs +++ b/src/daft-scan/src/scan_task_iters/mod.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +mod split_parquet; + use common_daft_config::DaftExecutionConfig; use common_error::{DaftError, DaftResult}; use common_file_formats::{FileFormatConfig, ParquetSourceConfig}; @@ -316,15 +318,16 @@ fn split_and_merge_pass( .iter() .all(|st| st.as_any().downcast_ref::().is_some()) { + // TODO(desmond): Here we downcast Arc to Arc. ScanTask and DummyScanTask (test only) are + // the only non-test implementer of ScanTaskLike. It might be possible to avoid the downcast by implementing merging + // at the trait level, but today that requires shifting around a non-trivial amount of code to avoid circular dependencies. + let iter: BoxScanTaskIter = Box::new(scan_tasks.as_ref().iter().map(|st| { + st.clone() + .as_any_arc() + .downcast::() + .map_err(|e| DaftError::TypeError(format!("Expected Arc, found {:?}", e))) + })); if cfg.scantask_splitting_level == 1 { - // TODO(desmond): Here we downcast Arc to Arc. ScanTask and DummyScanTask (test only) are - // the only non-test implementer of ScanTaskLike. It might be possible to avoid the downcast by implementing merging - // at the trait level, but today that requires shifting around a non-trivial amount of code to avoid circular dependencies. - let iter: BoxScanTaskIter = Box::new(scan_tasks.as_ref().iter().map(|st| { - st.clone().as_any_arc().downcast::().map_err(|e| { - DaftError::TypeError(format!("Expected Arc, found {:?}", e)) - }) - })); let split_tasks = split_by_row_groups( iter, cfg.parquet_split_row_groups_max_files, @@ -337,7 +340,15 @@ fn split_and_merge_pass( .collect::>>()?; Ok(Arc::new(scan_tasks)) } else if cfg.scantask_splitting_level == 2 { - todo!("Implement aggressive scantask splitting"); + let split_tasks = { + let splitter = split_parquet::SplitParquetScanTasks::new(iter, cfg); + Box::new(splitter.into_iter()) as BoxScanTaskIter + }; + let merged_tasks = merge_by_sizes(split_tasks, pushdowns, cfg); + let scan_tasks: Vec> = merged_tasks + .map(|st| st.map(|task| task as Arc)) + .collect::>>()?; + Ok(Arc::new(scan_tasks)) } else { panic!( "DAFT_SCANTASK_SPLITTING_LEVEL must be either 1 or 2, received: {}", diff --git a/src/daft-scan/src/scan_task_iters/split_parquet/fetch_parquet_metadata.rs b/src/daft-scan/src/scan_task_iters/split_parquet/fetch_parquet_metadata.rs new file mode 100644 index 0000000000..9b5e90feaf --- /dev/null +++ b/src/daft-scan/src/scan_task_iters/split_parquet/fetch_parquet_metadata.rs @@ -0,0 +1,46 @@ +use common_daft_config::DaftExecutionConfig; +use common_error::DaftResult; + +use super::{split_parquet_decision, split_parquet_file}; +use crate::ScanTaskRef; + +pub(super) struct RetrieveParquetMetadataIterator<'cfg> { + decider: split_parquet_decision::DecideSplitIterator<'cfg>, + _cfg: &'cfg DaftExecutionConfig, +} + +impl<'cfg> RetrieveParquetMetadataIterator<'cfg> { + pub(super) fn new( + decider: split_parquet_decision::DecideSplitIterator<'cfg>, + cfg: &'cfg DaftExecutionConfig, + ) -> Self { + Self { decider, _cfg: cfg } + } +} + +pub(super) enum ParquetSplitScanTaskGenerator { + _NoSplit(std::iter::Once>), + _Split(split_parquet_file::ParquetFileSplitter), +} + +impl<'cfg> Iterator for RetrieveParquetMetadataIterator<'cfg> { + type Item = ParquetSplitScanTaskGenerator; + + fn next(&mut self) -> Option { + if let Some(_decision) = self.decider.next() { + todo!("Implement windowed metadata fetching and yielding of ParquetSplitScanTaskGenerator"); + } + None + } +} + +impl Iterator for ParquetSplitScanTaskGenerator { + type Item = DaftResult; + + fn next(&mut self) -> Option { + match self { + Self::_NoSplit(iter) => iter.next(), + Self::_Split(iter) => iter.next(), + } + } +} diff --git a/src/daft-scan/src/scan_task_iters/split_parquet/mod.rs b/src/daft-scan/src/scan_task_iters/split_parquet/mod.rs new file mode 100644 index 0000000000..f7b9922766 --- /dev/null +++ b/src/daft-scan/src/scan_task_iters/split_parquet/mod.rs @@ -0,0 +1,44 @@ +use std::iter::Flatten; + +use common_daft_config::DaftExecutionConfig; +use common_error::DaftResult; + +use super::BoxScanTaskIter; +use crate::ScanTaskRef; + +mod fetch_parquet_metadata; +mod split_parquet_decision; +mod split_parquet_file; + +pub struct SplitParquetScanTasks<'cfg> { + retriever: fetch_parquet_metadata::RetrieveParquetMetadataIterator<'cfg>, +} + +impl<'cfg> SplitParquetScanTasks<'cfg> { + pub fn new(inputs: BoxScanTaskIter<'cfg>, cfg: &'cfg DaftExecutionConfig) -> Self { + let decider = split_parquet_decision::DecideSplitIterator::new(inputs, cfg); + let retriever = fetch_parquet_metadata::RetrieveParquetMetadataIterator::new(decider, cfg); + SplitParquetScanTasks { retriever } + } +} + +pub struct SplitParquetScanTasksIterator<'cfg>( + Flatten>, +); + +impl<'cfg> IntoIterator for SplitParquetScanTasks<'cfg> { + type IntoIter = SplitParquetScanTasksIterator<'cfg>; + type Item = DaftResult; + + fn into_iter(self) -> Self::IntoIter { + SplitParquetScanTasksIterator(self.retriever.flatten()) + } +} + +impl<'cfg> Iterator for SplitParquetScanTasksIterator<'cfg> { + type Item = DaftResult; + + fn next(&mut self) -> Option { + self.0.next() + } +} diff --git a/src/daft-scan/src/scan_task_iters/split_parquet/split_parquet_decision.rs b/src/daft-scan/src/scan_task_iters/split_parquet/split_parquet_decision.rs new file mode 100644 index 0000000000..3d6fd9476f --- /dev/null +++ b/src/daft-scan/src/scan_task_iters/split_parquet/split_parquet_decision.rs @@ -0,0 +1,27 @@ +use common_daft_config::DaftExecutionConfig; + +use crate::scan_task_iters::BoxScanTaskIter; + +pub(super) struct DecideSplitIterator<'cfg> { + inputs: BoxScanTaskIter<'cfg>, + _cfg: &'cfg DaftExecutionConfig, +} + +impl<'cfg> DecideSplitIterator<'cfg> { + pub fn new(inputs: BoxScanTaskIter<'cfg>, cfg: &'cfg DaftExecutionConfig) -> Self { + Self { inputs, _cfg: cfg } + } +} + +pub(super) struct Decision {} + +impl<'cfg> Iterator for DecideSplitIterator<'cfg> { + type Item = Decision; + + fn next(&mut self) -> Option { + if let Some(_scan_task) = self.inputs.next() { + return Some(Decision {}); + } + None + } +} diff --git a/src/daft-scan/src/scan_task_iters/split_parquet/split_parquet_file.rs b/src/daft-scan/src/scan_task_iters/split_parquet/split_parquet_file.rs new file mode 100644 index 0000000000..c4c3e5ae95 --- /dev/null +++ b/src/daft-scan/src/scan_task_iters/split_parquet/split_parquet_file.rs @@ -0,0 +1,13 @@ +use common_error::DaftResult; + +use crate::ScanTaskRef; + +pub(super) struct ParquetFileSplitter {} + +impl Iterator for ParquetFileSplitter { + type Item = DaftResult; + + fn next(&mut self) -> Option { + todo!("Split the parquet file"); + } +}