From 1931376a38d6e4b6b73a5f155b299c316108ebf8 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Tue, 13 Feb 2024 17:41:23 -0800 Subject: [PATCH 1/4] working merge task bug --- src/daft-scan/src/scan_task_iters.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/daft-scan/src/scan_task_iters.rs b/src/daft-scan/src/scan_task_iters.rs index f8410eb595..833bf4736e 100644 --- a/src/daft-scan/src/scan_task_iters.rs +++ b/src/daft-scan/src/scan_task_iters.rs @@ -69,11 +69,20 @@ impl Iterator for MergeByFileSize { && child_item.schema == accumulator.schema && child_item.storage_config == accumulator.storage_config && child_item.pushdowns == accumulator.pushdowns; - let smaller_than_max_size_bytes = matches!( - (child_item.size_bytes(), accumulator.size_bytes()), - (Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size <= self.max_size_bytes - ); - child_matches_accumulator && smaller_than_max_size_bytes + + let smaller_than_min_size_bytes = + if let Some(accumulator_bytes) = accumulator.size_bytes() { + accumulator_bytes <= self.min_size_bytes + } else { + false + }; + + let sum_smaller_than_max_size_bytes = if let Some(child_bytes) = child_item.size_bytes() + && let Some(accumulator_bytes) = accumulator.size_bytes() {child_bytes + accumulator_bytes < self.max_size_bytes} else {false}; + + child_matches_accumulator + && smaller_than_min_size_bytes + && sum_smaller_than_max_size_bytes }; if should_merge { From dbe9fd7227c66ff96b87dc16695beeceac375efd Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Tue, 13 Feb 2024 18:54:18 -0800 Subject: [PATCH 2/4] refactor of scan tasks --- src/daft-scan/src/scan_task_iters.rs | 132 +++++++++++++-------------- tests/io/test_merge_scan_tasks.py | 8 +- 2 files changed, 65 insertions(+), 75 deletions(-) diff --git a/src/daft-scan/src/scan_task_iters.rs b/src/daft-scan/src/scan_task_iters.rs index 833bf4736e..586d8f5870 100644 --- a/src/daft-scan/src/scan_task_iters.rs +++ b/src/daft-scan/src/scan_task_iters.rs @@ -45,83 +45,73 @@ struct MergeByFileSize { accumulator: Option, } +impl MergeByFileSize { + fn accumulator_ready(&self) -> bool { + if self.accumulator.is_none() { + true + } else if let Some(acc) = &self.accumulator && let Some(acc_bytes) = acc.size_bytes() && acc_bytes >= self.min_size_bytes { + true + } else { + false + } + } + + fn can_merge(&self, other: &ScanTask) -> bool { + let accumulator = self + .accumulator + .as_ref() + .expect("accumulator should be populated"); + let child_matches_accumulator = other.partition_spec() == accumulator.partition_spec() + && other.file_format_config == accumulator.file_format_config + && other.schema == accumulator.schema + && other.storage_config == accumulator.storage_config + && other.pushdowns == accumulator.pushdowns; + + let sum_smaller_than_max_size_bytes = if let Some(child_bytes) = other.size_bytes() + && let Some(accumulator_bytes) = accumulator.size_bytes() {child_bytes + accumulator_bytes <= self.max_size_bytes} else {false}; + + child_matches_accumulator && sum_smaller_than_max_size_bytes + } +} + impl Iterator for MergeByFileSize { type Item = DaftResult; fn next(&mut self) -> Option { loop { - // Grabs the accumulator, leaving a `None` in its place - let accumulator = self.accumulator.take(); - - match (self.iter.next(), accumulator) { - // When no accumulator exists, trivially place the ScanTask into the accumulator - (Some(Ok(child_item)), None) => { - self.accumulator = Some(child_item); - continue; - } - // When an accumulator exists, attempt a merge and yield the result - (Some(Ok(child_item)), Some(accumulator)) => { - // Whether or not the accumulator and the current item should be merged - let should_merge = { - let child_matches_accumulator = child_item.partition_spec() - == accumulator.partition_spec() - && child_item.file_format_config == accumulator.file_format_config - && child_item.schema == accumulator.schema - && child_item.storage_config == accumulator.storage_config - && child_item.pushdowns == accumulator.pushdowns; - - let smaller_than_min_size_bytes = - if let Some(accumulator_bytes) = accumulator.size_bytes() { - accumulator_bytes <= self.min_size_bytes - } else { - false - }; - - let sum_smaller_than_max_size_bytes = if let Some(child_bytes) = child_item.size_bytes() - && let Some(accumulator_bytes) = accumulator.size_bytes() {child_bytes + accumulator_bytes < self.max_size_bytes} else {false}; - - child_matches_accumulator - && smaller_than_min_size_bytes - && sum_smaller_than_max_size_bytes - }; - - if should_merge { - let merged_result = Some(Arc::new( - ScanTask::merge(accumulator.as_ref(), child_item.as_ref()) - .expect("ScanTasks should be mergeable in MergeByFileSize"), - )); - - // Whether or not we should immediately yield the merged result, or keep accumulating - let should_yield = matches!( - (child_item.size_bytes(), accumulator.size_bytes()), - (Some(child_item_size), Some(buffered_item_size)) if child_item_size + buffered_item_size >= self.min_size_bytes - ); - - // Either yield eagerly, or keep looping with a merged accumulator - if should_yield { - return Ok(merged_result).transpose(); - } else { - self.accumulator = merged_result; - continue; - } - } else { - self.accumulator = Some(child_item); - return Some(Ok(accumulator)); - } - } - // Bubble up errors from child iterator, making sure to replace the accumulator which we moved - (Some(Err(e)), acc) => { - self.accumulator = acc; - return Some(Err(e)); - } - // Iterator ran out of elements: ensure that we flush the last buffered ScanTask - (None, Some(last_scan_task)) => { - return Some(Ok(last_scan_task)); - } - (None, None) => { - return None; - } + if self.accumulator.is_none() { + self.accumulator = match self.iter.next() { + Some(Ok(item)) => Some(item), + e @ Some(Err(_)) => return e, + None => return None, + }; + } + + if self.accumulator_ready() { + return self.accumulator.take().map(Ok); } + + let next_item = match self.iter.next() { + Some(Ok(item)) => item, + e @ Some(Err(_)) => return e, + None => return self.accumulator.take().map(Ok), + }; + + if next_item.size_bytes().is_none() || !self.can_merge(&next_item) { + let curr_acc = self.accumulator.take().map(Ok); + self.accumulator = Some(next_item); + return curr_acc; + } + + self.accumulator = Some(Arc::new( + ScanTask::merge( + self.accumulator + .as_ref() + .expect("accumulator should be populated"), + next_item.as_ref(), + ) + .expect("ScanTasks should be mergeable in MergeByFileSize"), + )); } } } diff --git a/tests/io/test_merge_scan_tasks.py b/tests/io/test_merge_scan_tasks.py index c3fbd96dc3..75c426874b 100644 --- a/tests/io/test_merge_scan_tasks.py +++ b/tests/io/test_merge_scan_tasks.py @@ -44,20 +44,20 @@ def test_merge_scan_task_exceed_max(csv_files): @pytest.mark.skipif(os.getenv("DAFT_MICROPARTITIONS", "1") == "0", reason="Test can only run on micropartitions") def test_merge_scan_task_below_max(csv_files): - with override_merge_scan_tasks_configs(1, 20): + with override_merge_scan_tasks_configs(21, 22): df = daft.read_csv(str(csv_files)) assert ( df.num_partitions() == 2 - ), "Should have 2 partitions [(CSV1, CSV2), (CSV3)] since the second merge is too large (>20 bytes)" + ), "Should have 2 partitions [(CSV1, CSV2), (CSV3)] since the second merge is too large (>22 bytes)" @pytest.mark.skipif(os.getenv("DAFT_MICROPARTITIONS", "1") == "0", reason="Test can only run on micropartitions") def test_merge_scan_task_above_min(csv_files): - with override_merge_scan_tasks_configs(0, 40): + with override_merge_scan_tasks_configs(19, 40): df = daft.read_csv(str(csv_files)) assert ( df.num_partitions() == 2 - ), "Should have 2 partitions [(CSV1, CSV2), (CSV3)] since the first merge is above the minimum (>0 bytes)" + ), "Should have 2 partitions [(CSV1, CSV2), (CSV3)] since the first merge is above the minimum (>19 bytes)" @pytest.mark.skipif(os.getenv("DAFT_MICROPARTITIONS", "1") == "0", reason="Test can only run on micropartitions") From e1106d60cc466078bb59f04ae9422ea54b0b92f5 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Tue, 13 Feb 2024 21:04:10 -0800 Subject: [PATCH 3/4] favor replace --- src/daft-scan/src/scan_task_iters.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/daft-scan/src/scan_task_iters.rs b/src/daft-scan/src/scan_task_iters.rs index 586d8f5870..081aa7a292 100644 --- a/src/daft-scan/src/scan_task_iters.rs +++ b/src/daft-scan/src/scan_task_iters.rs @@ -98,9 +98,7 @@ impl Iterator for MergeByFileSize { }; if next_item.size_bytes().is_none() || !self.can_merge(&next_item) { - let curr_acc = self.accumulator.take().map(Ok); - self.accumulator = Some(next_item); - return curr_acc; + return self.accumulator.replace(next_item).map(Ok); } self.accumulator = Some(Arc::new( From fc9b5be7684612ef52b5d35f0c1804c74307a334 Mon Sep 17 00:00:00 2001 From: Sammy Sidhu Date: Wed, 14 Feb 2024 11:21:07 -0800 Subject: [PATCH 4/4] drop non case is acc ready --- src/daft-scan/src/scan_task_iters.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/daft-scan/src/scan_task_iters.rs b/src/daft-scan/src/scan_task_iters.rs index 081aa7a292..fe3d470a2f 100644 --- a/src/daft-scan/src/scan_task_iters.rs +++ b/src/daft-scan/src/scan_task_iters.rs @@ -47,9 +47,7 @@ struct MergeByFileSize { impl MergeByFileSize { fn accumulator_ready(&self) -> bool { - if self.accumulator.is_none() { - true - } else if let Some(acc) = &self.accumulator && let Some(acc_bytes) = acc.size_bytes() && acc_bytes >= self.min_size_bytes { + if let Some(acc) = &self.accumulator && let Some(acc_bytes) = acc.size_bytes() && acc_bytes >= self.min_size_bytes { true } else { false