diff --git a/src/daft-scan/src/scan_task_iters/mod.rs b/src/daft-scan/src/scan_task_iters/mod.rs index 4201a2d9a5..226d4c3ee2 100644 --- a/src/daft-scan/src/scan_task_iters/mod.rs +++ b/src/daft-scan/src/scan_task_iters/mod.rs @@ -341,7 +341,7 @@ fn split_and_merge_pass( Ok(Arc::new(scan_tasks)) } else if cfg.scantask_splitting_level == 2 { let split_tasks = { - let splitter = split_parquet::SplitParquetScanTasks::new(iter, cfg); + let splitter = split_parquet::SplitParquetScanTasksIterator::new(iter, cfg); Box::new(splitter.into_iter()) as BoxScanTaskIter }; let merged_tasks = merge_by_sizes(split_tasks, pushdowns, cfg); diff --git a/src/daft-scan/src/scan_task_iters/split_parquet/mod.rs b/src/daft-scan/src/scan_task_iters/split_parquet/mod.rs index 19c6ccc6a9..85fba82a42 100644 --- a/src/daft-scan/src/scan_task_iters/split_parquet/mod.rs +++ b/src/daft-scan/src/scan_task_iters/split_parquet/mod.rs @@ -19,39 +19,17 @@ mod split_parquet_file; /// /// Note that this may be expensive if the incoming stream has many large ScanTasks, incurring a /// higher cost at planning-time. -/// -/// # Examples -/// -/// ``` -/// # use daft_scan::scan_task_iters::BoxScanTaskIter; -/// # use common_daft_config::DaftExecutionConfig; -/// # let input_tasks: BoxScanTaskIter = unimplemented!(); -/// # let config = DaftExecutionConfig::default(); -/// let splitter = SplitParquetScanTasks::new(input_tasks, &config); -/// let split_tasks = splitter.into_iter(); -/// ``` -pub struct SplitParquetScanTasks<'cfg> { - retriever: fetch_parquet_metadata::RetrieveParquetMetadataIterator<'cfg>, +pub struct SplitParquetScanTasksIterator<'cfg> { + split_result_iter: Flatten>, } -impl<'cfg> SplitParquetScanTasks<'cfg> { +impl<'cfg> SplitParquetScanTasksIterator<'cfg> { pub fn new(inputs: BoxScanTaskIter<'cfg>, cfg: &'cfg DaftExecutionConfig) -> Self { let decider = split_parquet_decision::DecideSplitIterator::new(inputs, cfg); let retriever = fetch_parquet_metadata::RetrieveParquetMetadataIterator::new(decider, cfg); - SplitParquetScanTasks { retriever } - } -} - -pub struct SplitParquetScanTasksIterator<'cfg>( - Flatten>, -); - -impl<'cfg> IntoIterator for SplitParquetScanTasks<'cfg> { - type IntoIter = SplitParquetScanTasksIterator<'cfg>; - type Item = DaftResult; - - fn into_iter(self) -> Self::IntoIter { - SplitParquetScanTasksIterator(self.retriever.flatten()) + SplitParquetScanTasksIterator { + split_result_iter: retriever.flatten(), + } } } @@ -59,6 +37,6 @@ impl<'cfg> Iterator for SplitParquetScanTasksIterator<'cfg> { type Item = DaftResult; fn next(&mut self) -> Option { - self.0.next() + self.split_result_iter.next() } }