Skip to content

Commit

Permalink
Fix iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Dec 20, 2024
1 parent 28f2083 commit 326ff1d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/daft-scan/src/scan_task_iters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ fn split_and_merge_pass(
Ok(Arc::new(scan_tasks))
} else if cfg.scantask_splitting_level == 2 {
let split_tasks = {
let splitter = split_parquet::SplitParquetScanTasks::new(iter, cfg);
let splitter = split_parquet::SplitParquetScanTasksIterator::new(iter, cfg);
Box::new(splitter.into_iter()) as BoxScanTaskIter
};
let merged_tasks = merge_by_sizes(split_tasks, pushdowns, cfg);
Expand Down
36 changes: 7 additions & 29 deletions src/daft-scan/src/scan_task_iters/split_parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,24 @@ mod split_parquet_file;
///
/// Note that this may be expensive if the incoming stream has many large ScanTasks, incurring a
/// higher cost at planning-time.
///
/// # Examples
///
/// ```
/// # use daft_scan::scan_task_iters::BoxScanTaskIter;
/// # use common_daft_config::DaftExecutionConfig;
/// # let input_tasks: BoxScanTaskIter = unimplemented!();
/// # let config = DaftExecutionConfig::default();
/// let splitter = SplitParquetScanTasks::new(input_tasks, &config);
/// let split_tasks = splitter.into_iter();
/// ```
pub struct SplitParquetScanTasks<'cfg> {
retriever: fetch_parquet_metadata::RetrieveParquetMetadataIterator<'cfg>,
pub struct SplitParquetScanTasksIterator<'cfg> {
split_result_iter: Flatten<fetch_parquet_metadata::RetrieveParquetMetadataIterator<'cfg>>,
}

impl<'cfg> SplitParquetScanTasks<'cfg> {
impl<'cfg> SplitParquetScanTasksIterator<'cfg> {
pub fn new(inputs: BoxScanTaskIter<'cfg>, cfg: &'cfg DaftExecutionConfig) -> Self {
let decider = split_parquet_decision::DecideSplitIterator::new(inputs, cfg);
let retriever = fetch_parquet_metadata::RetrieveParquetMetadataIterator::new(decider, cfg);
SplitParquetScanTasks { retriever }
}
}

pub struct SplitParquetScanTasksIterator<'cfg>(
Flatten<fetch_parquet_metadata::RetrieveParquetMetadataIterator<'cfg>>,
);

impl<'cfg> IntoIterator for SplitParquetScanTasks<'cfg> {
type IntoIter = SplitParquetScanTasksIterator<'cfg>;
type Item = DaftResult<ScanTaskRef>;

fn into_iter(self) -> Self::IntoIter {
SplitParquetScanTasksIterator(self.retriever.flatten())
SplitParquetScanTasksIterator {
split_result_iter: retriever.flatten(),
}
}
}

impl<'cfg> Iterator for SplitParquetScanTasksIterator<'cfg> {
type Item = DaftResult<ScanTaskRef>;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
self.split_result_iter.next()
}
}

0 comments on commit 326ff1d

Please sign in to comment.