Skip to content

Commit

Permalink
Implement skeleton logic with proper module visibility
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Dec 20, 2024
1 parent 9b0c1df commit 511d996
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 9 deletions.
29 changes: 20 additions & 9 deletions src/daft-scan/src/scan_task_iters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::Arc;

mod split_parquet;

use common_daft_config::DaftExecutionConfig;
use common_error::{DaftError, DaftResult};
use common_file_formats::{FileFormatConfig, ParquetSourceConfig};
Expand Down Expand Up @@ -316,15 +318,16 @@ fn split_and_merge_pass(
.iter()
.all(|st| st.as_any().downcast_ref::<ScanTask>().is_some())
{
// TODO(desmond): Here we downcast Arc<dyn ScanTaskLike> to Arc<ScanTask>. ScanTask and DummyScanTask (test only) are
// the only non-test implementer of ScanTaskLike. It might be possible to avoid the downcast by implementing merging
// at the trait level, but today that requires shifting around a non-trivial amount of code to avoid circular dependencies.
let iter: BoxScanTaskIter = Box::new(scan_tasks.as_ref().iter().map(|st| {
st.clone()
.as_any_arc()
.downcast::<ScanTask>()
.map_err(|e| DaftError::TypeError(format!("Expected Arc<ScanTask>, found {:?}", e)))
}));
if cfg.scantask_splitting_level == 1 {
// TODO(desmond): Here we downcast Arc<dyn ScanTaskLike> to Arc<ScanTask>. ScanTask and DummyScanTask (test only) are
// the only non-test implementer of ScanTaskLike. It might be possible to avoid the downcast by implementing merging
// at the trait level, but today that requires shifting around a non-trivial amount of code to avoid circular dependencies.
let iter: BoxScanTaskIter = Box::new(scan_tasks.as_ref().iter().map(|st| {
st.clone().as_any_arc().downcast::<ScanTask>().map_err(|e| {
DaftError::TypeError(format!("Expected Arc<ScanTask>, found {:?}", e))
})
}));
let split_tasks = split_by_row_groups(
iter,
cfg.parquet_split_row_groups_max_files,
Expand All @@ -337,7 +340,15 @@ fn split_and_merge_pass(
.collect::<DaftResult<Vec<_>>>()?;
Ok(Arc::new(scan_tasks))
} else if cfg.scantask_splitting_level == 2 {
todo!("Implement aggressive scantask splitting");
let split_tasks = {
let splitter = split_parquet::SplitParquetScanTasks::new(iter, cfg);
Box::new(splitter.into_iter()) as BoxScanTaskIter
};
let merged_tasks = merge_by_sizes(split_tasks, pushdowns, cfg);
let scan_tasks: Vec<Arc<dyn ScanTaskLike>> = merged_tasks
.map(|st| st.map(|task| task as Arc<dyn ScanTaskLike>))
.collect::<DaftResult<Vec<_>>>()?;
Ok(Arc::new(scan_tasks))
} else {
panic!(
"DAFT_SCANTASK_SPLITTING_LEVEL must be either 1 or 2, received: {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;

use super::{split_parquet_decision, split_parquet_file};
use crate::ScanTaskRef;

pub(super) struct RetrieveParquetMetadataIterator<'cfg> {
decider: split_parquet_decision::DecideSplitIterator<'cfg>,
_cfg: &'cfg DaftExecutionConfig,
}

impl<'cfg> RetrieveParquetMetadataIterator<'cfg> {
pub(super) fn new(
decider: split_parquet_decision::DecideSplitIterator<'cfg>,
cfg: &'cfg DaftExecutionConfig,
) -> Self {
Self { decider, _cfg: cfg }
}
}

pub(super) enum ParquetSplitScanTaskGenerator {
_NoSplit(std::iter::Once<DaftResult<ScanTaskRef>>),
_Split(split_parquet_file::ParquetFileSplitter),
}

impl<'cfg> Iterator for RetrieveParquetMetadataIterator<'cfg> {
type Item = ParquetSplitScanTaskGenerator;

fn next(&mut self) -> Option<Self::Item> {
if let Some(_decision) = self.decider.next() {
todo!("Implement windowed metadata fetching and yielding of ParquetSplitScanTaskGenerator");
}
None
}
}

impl Iterator for ParquetSplitScanTaskGenerator {
type Item = DaftResult<ScanTaskRef>;

fn next(&mut self) -> Option<Self::Item> {
match self {
Self::_NoSplit(iter) => iter.next(),
Self::_Split(iter) => iter.next(),
}
}
}
44 changes: 44 additions & 0 deletions src/daft-scan/src/scan_task_iters/split_parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use std::iter::Flatten;

use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;

use super::BoxScanTaskIter;
use crate::ScanTaskRef;

mod fetch_parquet_metadata;
mod split_parquet_decision;
mod split_parquet_file;

pub struct SplitParquetScanTasks<'cfg> {
retriever: fetch_parquet_metadata::RetrieveParquetMetadataIterator<'cfg>,
}

impl<'cfg> SplitParquetScanTasks<'cfg> {
pub fn new(inputs: BoxScanTaskIter<'cfg>, cfg: &'cfg DaftExecutionConfig) -> Self {
let decider = split_parquet_decision::DecideSplitIterator::new(inputs, cfg);
let retriever = fetch_parquet_metadata::RetrieveParquetMetadataIterator::new(decider, cfg);
SplitParquetScanTasks { retriever }
}
}

pub struct SplitParquetScanTasksIterator<'cfg>(
Flatten<fetch_parquet_metadata::RetrieveParquetMetadataIterator<'cfg>>,
);

impl<'cfg> IntoIterator for SplitParquetScanTasks<'cfg> {
type IntoIter = SplitParquetScanTasksIterator<'cfg>;
type Item = DaftResult<ScanTaskRef>;

fn into_iter(self) -> Self::IntoIter {
SplitParquetScanTasksIterator(self.retriever.flatten())
}
}

impl<'cfg> Iterator for SplitParquetScanTasksIterator<'cfg> {
type Item = DaftResult<ScanTaskRef>;

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use common_daft_config::DaftExecutionConfig;

use crate::scan_task_iters::BoxScanTaskIter;

pub(super) struct DecideSplitIterator<'cfg> {
inputs: BoxScanTaskIter<'cfg>,
_cfg: &'cfg DaftExecutionConfig,
}

impl<'cfg> DecideSplitIterator<'cfg> {
pub fn new(inputs: BoxScanTaskIter<'cfg>, cfg: &'cfg DaftExecutionConfig) -> Self {
Self { inputs, _cfg: cfg }
}
}

pub(super) struct Decision {}

impl<'cfg> Iterator for DecideSplitIterator<'cfg> {
type Item = Decision;

fn next(&mut self) -> Option<Self::Item> {
if let Some(_scan_task) = self.inputs.next() {
return Some(Decision {});
}
None
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use common_error::DaftResult;

use crate::ScanTaskRef;

pub(super) struct ParquetFileSplitter {}

impl Iterator for ParquetFileSplitter {
type Item = DaftResult<ScanTaskRef>;

fn next(&mut self) -> Option<Self::Item> {
todo!("Split the parquet file");
}
}

0 comments on commit 511d996

Please sign in to comment.