-
Notifications
You must be signed in to change notification settings - Fork 174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] Native Runner #3178
[FEAT] Native Runner #3178
Conversation
CodSpeed Performance ReportMerging #3178 will degrade performances by 60.07%Comparing Summary
Benchmarks breakdown
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3178 +/- ##
==========================================
- Coverage 79.13% 78.37% -0.77%
==========================================
Files 640 641 +1
Lines 77983 78938 +955
==========================================
+ Hits 61715 61868 +153
- Misses 16268 17070 +802
|
@colin-ho looks like this introduces a regression for the benchmark |
output_file = self.fs.open_output_stream(self.full_path) | ||
return pacsv.CSVWriter( | ||
self.full_path, | ||
output_file, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pacsv writer doesn't have a filesystem
argument for us to pass in a fs to, instead we have to use the open_output_stream
function on the fs, and pass it into the writer.
io_runtime.spawn(async move { | ||
stream_scan_task( | ||
scan_task.clone(), | ||
io_stats.clone(), | ||
delete_map.clone(), | ||
)); | ||
} | ||
while let Some(result) = task_set.join_next().await { | ||
result.context(JoinSnafu)??; | ||
} | ||
Ok(()) | ||
}, | ||
self.name(), | ||
); | ||
delete_map.map(Arc::new), | ||
maintain_order, | ||
) | ||
.await | ||
}) | ||
})) | ||
.buffered(self.num_parallel_tasks) | ||
.map(|s| s?) | ||
.try_flatten(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a pretty important fix because previously all the scan tasks were spawned on the execution_runtime, which is single threaded. I tested TPCH with num_files = 2 x num_cpus, and this change makes it much faster. On that note, we should make our benchmarks test not just 1 or 2 files, but also > num_cpu files.
.await | ||
}) | ||
})) | ||
.buffered(self.num_parallel_tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The codspeed test is still slower :( , still figuring out why.
@@ -27,16 +27,15 @@ def test_read_multi_parquet_from_s3_with_include_file_path_column(minio_io_confi | |||
with minio_create_bucket(minio_io_config, bucket_name=bucket_name): | |||
file_paths = ( | |||
daft.from_pydict(data) | |||
.into_partitions(3) | |||
.write_parquet(f"s3://{bucket_name}", io_config=minio_io_config) | |||
.write_parquet(f"s3://{bucket_name}", partition_cols=["a"], io_config=minio_io_config) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using partition_cols
instead of into_partitions. For swordfish.
@@ -810,6 +802,8 @@ jobs: | |||
source activate | |||
maturin develop | |||
pytest --doctest-modules --continue-on-collection-errors daft/dataframe/dataframe.py daft/expressions/expressions.py daft/convert.py daft/udf.py | |||
env: | |||
DAFT_RUNNER: py |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doc tests don't pass on swordfish because some of them use into_partitions
.
@@ -129,12 +129,18 @@ impl StreamingSinkNode { | |||
match result { | |||
StreamingSinkOutput::NeedMoreInput(mp) => { | |||
if let Some(mp) = mp { | |||
let _ = output_sender.send(mp).await; | |||
if output_sender.send(mp).await.is_err() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should instead match on the exact error like:
if let Err(ErrorType) = output_sender.send(mp).await
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's an error struct, so there's no pattern to match. And it always signifies channel closure. https://docs.rs/tokio/latest/tokio/sync/mpsc/error/struct.SendError.html
Makes swordfish a top level runner,
set_runner_native()
. Additionally sets swordfish to be the default runner for development.This PR also contains some bug fixes and test changes, of which I have left comments for.
Additionally, this PR refactors swordfish in two ways:
num_parallel_tasks
parameter, that takes into account any pushed down limits.is_err
check on the sender in parts of the code where we have awhile receiver.recv.await -> sender.send
pattern, such that it breaks out of the loop if the sender is dropped. This is needed in cases when the consumer is done receiving data, such as in a Limit, or if the user is doingiter(df)
and breaks out of the iter, which will cause receivers to be dropped. As such, the senders should recognize this and drop as well.