Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scantask-2): Implement new module for splitting Parquet ScanTask #3628

Merged
merged 3 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions src/daft-scan/src/scan_task_iters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::Arc;

mod split_parquet;

use common_daft_config::DaftExecutionConfig;
use common_error::{DaftError, DaftResult};
use common_file_formats::{FileFormatConfig, ParquetSourceConfig};
Expand Down Expand Up @@ -316,15 +318,16 @@ fn split_and_merge_pass(
.iter()
.all(|st| st.as_any().downcast_ref::<ScanTask>().is_some())
{
// TODO(desmond): Here we downcast Arc<dyn ScanTaskLike> to Arc<ScanTask>. ScanTask and DummyScanTask (test only) are
// the only non-test implementer of ScanTaskLike. It might be possible to avoid the downcast by implementing merging
// at the trait level, but today that requires shifting around a non-trivial amount of code to avoid circular dependencies.
let iter: BoxScanTaskIter = Box::new(scan_tasks.as_ref().iter().map(|st| {
st.clone()
.as_any_arc()
.downcast::<ScanTask>()
.map_err(|e| DaftError::TypeError(format!("Expected Arc<ScanTask>, found {:?}", e)))
}));
if cfg.scantask_splitting_level == 1 {
// TODO(desmond): Here we downcast Arc<dyn ScanTaskLike> to Arc<ScanTask>. ScanTask and DummyScanTask (test only) are
// the only non-test implementer of ScanTaskLike. It might be possible to avoid the downcast by implementing merging
// at the trait level, but today that requires shifting around a non-trivial amount of code to avoid circular dependencies.
let iter: BoxScanTaskIter = Box::new(scan_tasks.as_ref().iter().map(|st| {
st.clone().as_any_arc().downcast::<ScanTask>().map_err(|e| {
DaftError::TypeError(format!("Expected Arc<ScanTask>, found {:?}", e))
})
}));
let split_tasks = split_by_row_groups(
iter,
cfg.parquet_split_row_groups_max_files,
Expand All @@ -337,7 +340,15 @@ fn split_and_merge_pass(
.collect::<DaftResult<Vec<_>>>()?;
Ok(Arc::new(scan_tasks))
} else if cfg.scantask_splitting_level == 2 {
todo!("Implement aggressive scantask splitting");
let split_tasks = {
let splitter = split_parquet::SplitParquetScanTasksIterator::new(iter, cfg);
Box::new(splitter.into_iter()) as BoxScanTaskIter
};
let merged_tasks = merge_by_sizes(split_tasks, pushdowns, cfg);
let scan_tasks: Vec<Arc<dyn ScanTaskLike>> = merged_tasks
.map(|st| st.map(|task| task as Arc<dyn ScanTaskLike>))
.collect::<DaftResult<Vec<_>>>()?;
Ok(Arc::new(scan_tasks))
} else {
panic!(
"DAFT_SCANTASK_SPLITTING_LEVEL must be either 1 or 2, received: {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;

use super::{split_parquet_decision, split_parquet_file};
use crate::ScanTaskRef;

/// Retrieves Parquet metadata for the incoming "Decisions".
///
/// # Returns
///
/// Returns [`ParquetSplitScanTaskGenerator`] instances which are themselves iterators that can yield:
/// - A single [`ScanTaskRef`] if no split was needed
/// - Multiple [`ScanTaskRef`]s if the task was split
///
/// # Implementation Details
///
/// Retrieval of Parquet metadata is performed in batches using a windowed approach for efficiency.
pub(super) struct RetrieveParquetMetadataIterator<'cfg> {
decider: split_parquet_decision::DecideSplitIterator<'cfg>,
_cfg: &'cfg DaftExecutionConfig,
}

impl<'cfg> RetrieveParquetMetadataIterator<'cfg> {
pub(super) fn new(
decider: split_parquet_decision::DecideSplitIterator<'cfg>,
cfg: &'cfg DaftExecutionConfig,
) -> Self {
Self { decider, _cfg: cfg }
}
}

pub(super) enum ParquetSplitScanTaskGenerator {
_NoSplit(std::iter::Once<DaftResult<ScanTaskRef>>),
_Split(split_parquet_file::ParquetFileSplitter),
}

impl<'cfg> Iterator for RetrieveParquetMetadataIterator<'cfg> {
type Item = ParquetSplitScanTaskGenerator;

fn next(&mut self) -> Option<Self::Item> {
if let Some(_decision) = self.decider.next() {
todo!("Implement windowed metadata fetching and yielding of ParquetSplitScanTaskGenerator");
}
None
}
}

impl Iterator for ParquetSplitScanTaskGenerator {
type Item = DaftResult<ScanTaskRef>;

fn next(&mut self) -> Option<Self::Item> {
match self {
Self::_NoSplit(iter) => iter.next(),
Self::_Split(iter) => iter.next(),
}
}
}
42 changes: 42 additions & 0 deletions src/daft-scan/src/scan_task_iters/split_parquet/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::iter::Flatten;

use common_daft_config::DaftExecutionConfig;
use common_error::DaftResult;

use super::BoxScanTaskIter;
use crate::ScanTaskRef;

mod fetch_parquet_metadata;
mod split_parquet_decision;
mod split_parquet_file;

/// Splits input scan tasks into smaller scan tasks based on Parquet file metadata.
///
/// This struct provides functionality to split scan tasks by:
/// 1. Deciding whether or not to split each input ScanTask
/// 2. Fetching the Parquet metadata of the ScanTask (if deemed necessary to split)
/// 3. Performing the splitting of the ScanTask into smaller ScanTasks
///
/// Note that this may be expensive if the incoming stream has many large ScanTasks, incurring a
/// higher cost at planning-time.
pub struct SplitParquetScanTasksIterator<'cfg> {
split_result_iter: Flatten<fetch_parquet_metadata::RetrieveParquetMetadataIterator<'cfg>>,
}

impl<'cfg> SplitParquetScanTasksIterator<'cfg> {
pub fn new(inputs: BoxScanTaskIter<'cfg>, cfg: &'cfg DaftExecutionConfig) -> Self {
let decider = split_parquet_decision::DecideSplitIterator::new(inputs, cfg);
let retriever = fetch_parquet_metadata::RetrieveParquetMetadataIterator::new(decider, cfg);
SplitParquetScanTasksIterator {
split_result_iter: retriever.flatten(),
}
}
}

impl<'cfg> Iterator for SplitParquetScanTasksIterator<'cfg> {
type Item = DaftResult<ScanTaskRef>;

fn next(&mut self) -> Option<Self::Item> {
self.split_result_iter.next()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use common_daft_config::DaftExecutionConfig;

use crate::scan_task_iters::BoxScanTaskIter;

/// An iterator that determines whether incoming ScanTasks should be split by Parquet rowgroups.
///
/// # Returns
///
/// Returns an iterator of [`Decision`] objects indicating whether and how to split each task.
pub(super) struct DecideSplitIterator<'cfg> {
inputs: BoxScanTaskIter<'cfg>,
_cfg: &'cfg DaftExecutionConfig,
}

impl<'cfg> DecideSplitIterator<'cfg> {
pub fn new(inputs: BoxScanTaskIter<'cfg>, cfg: &'cfg DaftExecutionConfig) -> Self {
Self { inputs, _cfg: cfg }
}
}

pub(super) struct Decision {}

impl<'cfg> Iterator for DecideSplitIterator<'cfg> {
type Item = Decision;

fn next(&mut self) -> Option<Self::Item> {
if let Some(_scan_task) = self.inputs.next() {
return Some(Decision {});
}
None
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use common_error::DaftResult;

use crate::ScanTaskRef;

/// Splits its internal ScanTask into smaller ScanTasks based on certain criteria, including
/// the size of the Parquet file and available rowgroups.
///
/// # Implementation Details
///
/// This type implements [`Iterator`] to produce [`ScanTaskRef`]s representing the split tasks.
pub(super) struct ParquetFileSplitter {}

impl Iterator for ParquetFileSplitter {
type Item = DaftResult<ScanTaskRef>;

fn next(&mut self) -> Option<Self::Item> {
todo!("Split the parquet file");
}
}
Loading