Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallelize window function evaluations #546

Closed
wants to merge 1 commit into from

Conversation

jimexist
Copy link
Member

@jimexist jimexist commented Jun 12, 2021

Which issue does this PR close?

Closes #.

Rationale for this change

parallelize window function evaluations using tokio

What changes are included in this PR?

Are there any user-facing changes?

@jimexist jimexist changed the title parallelize window function calls parallelize window function evaluations Jun 12, 2021
@codecov-commenter
Copy link

Codecov Report

Merging #546 (ddd3418) into master (ad70a1e) will decrease coverage by 0.00%.
The diff coverage is 81.48%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #546      +/-   ##
==========================================
- Coverage   76.13%   76.13%   -0.01%     
==========================================
  Files         156      156              
  Lines       27032    27024       -8     
==========================================
- Hits        20582    20574       -8     
  Misses       6450     6450              
Impacted Files Coverage Δ
datafusion/src/physical_plan/windows.rs 85.97% <81.48%> (-0.50%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ad70a1e...ddd3418. Read the comment docs.

@jimexist jimexist marked this pull request as draft June 13, 2021 09:32
@jimexist
Copy link
Member Author

maybe a better way is to use rayon

@Dandandan
Copy link
Contributor

Dandandan commented Jun 13, 2021

maybe a better way is to use rayon

In my opinion we should try to stay away from Rayon and probably also should stay away from introducing parallelism at a low level (per expression) as it will likely add quite some overhead (in terms of extra threads, memory allocations hold on to and Rayon scheduling overhead).
I added some more thoughts in this discussion
https://the-asf.slack.com/archives/C01QUFS30TD/p1623571458296000?thread_ts=1623523766.291500&cid=C01QUFS30TD

Or we should have some convincing benchmarks / reasoning why at a certain part it is reasonable to introduce parallelism using tool x.

I think for other parts of the code we could use Tokios spawn_blocking function at the moment, while we design a better way to do parallelism.

On the general level I think we mostly should check whether we perform parallelism at a partition / file level and introduce enough parallelism in the query plan (e.g. by using partitioning https://medium.com/@danilheres/increasing-the-level-of-parallelism-in-datafusion-4-0-d2a15b5a2093 ).

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this approach is fine for now -- using something like rayon would just add more thread contention (as it has its own thread pool that is separate from the tokio thread pool). I think this approach is 👍

As mentioned elsewhere as we get more sophisticated in DataFusion, adding a task scheduler to manage these tasks (e.g. so we aren't thrashing, for example) is probably a good thing

@jimexist
Copy link
Member Author

thanks for the comment. i thought about thread contention but one can never know until we have a benchmark.

i plan to put this to draft until i fully implement #360 and #299 after which i'll try to add benchmark.

@alamb
Copy link
Contributor

alamb commented Jun 14, 2021

Just to be super clear, I am not suggesting we add a task scheduler as part of adding window functions -- I was trying to say that I felt following the existing pattern of using tokio::tasks in this code was a good idea and we can address the larger question of "better task scheduling" as part of a different project

@jimexist jimexist force-pushed the refactor-window-aggregate branch 4 times, most recently from 1c4938a to 7b5445f Compare June 24, 2021 07:02
@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Jun 24, 2021
@alamb
Copy link
Contributor

alamb commented Jun 24, 2021

@jimexist FYI just give the word when you think this one is ready to review / merge (it is still marked as Draft so I am not sure)

@jimexist
Copy link
Member Author

benchmark run:

window empty over, aggregate functions
                        time:   [33.347 ms 34.588 ms 35.904 ms]
                        change: [-3.6454% +0.0126% +3.6391%] (p = 0.98 > 0.05)
                        No change in performance detected.
Found 9 outliers among 100 measurements (9.00%)
  6 (6.00%) high mild
  3 (3.00%) high severe

window empty over, built-in functions
                        time:   [31.568 ms 32.173 ms 32.795 ms]
                        change: [+18.532% +21.139% +23.772%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 6 outliers among 100 measurements (6.00%)
  6 (6.00%) high mild

Benchmarking window order by, aggregate functions: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 19.1s, or reduce sample count to 20.
window order by, aggregate functions
                        time:   [204.92 ms 209.64 ms 214.85 ms]
                        change: [+16.691% +19.909% +22.780%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 8 outliers among 100 measurements (8.00%)
  6 (6.00%) high mild
  2 (2.00%) high severe

Benchmarking window order by, built-in functions: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 18.4s, or reduce sample count to 20.
window order by, built-in functions
                        time:   [176.37 ms 180.63 ms 185.20 ms]
                        change: [+10.734% +13.444% +16.828%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

Benchmarking window partition by, u64_wide, aggregate functions: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 21.1s, or reduce sample count to 20.
window partition by, u64_wide, aggregate functions
                        time:   [206.03 ms 208.36 ms 210.84 ms]
                        change: [-18.160% -16.929% -15.689%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) high mild
  1 (1.00%) high severe

Benchmarking window partition by, u64_narrow, aggregate functions: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 13.1s, or reduce sample count to 30.
window partition by, u64_narrow, aggregate functions
                        time:   [130.24 ms 133.09 ms 136.08 ms]
                        change: [-5.3256% -3.1363% -0.8213%] (p = 0.01 < 0.05)
                        Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

Benchmarking window partition by, u64_wide, built-in functions: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 17.5s, or reduce sample count to 20.
window partition by, u64_wide, built-in functions
                        time:   [179.48 ms 180.61 ms 181.76 ms]
                        change: [-18.346% -17.609% -16.863%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

Benchmarking window partition by, u64_narrow, built-in functions: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 13.4s, or reduce sample count to 30.
window partition by, u64_narrow, built-in functions
                        time:   [118.65 ms 119.75 ms 120.88 ms]
                        change: [-1.6034% -0.1766% +1.1790%] (p = 0.81 > 0.05)
                        No change in performance detected.
Found 5 outliers among 100 measurements (5.00%)
  5 (5.00%) high mild

Benchmarking window partition and order by, u64_wide, aggregate functions: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 38.6s, or reduce sample count to 10.
window partition and order by, u64_wide, aggregate functions
                        time:   [379.78 ms 382.19 ms 384.75 ms]
                        change: [-15.144% -14.176% -13.207%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe

Benchmarking window partition and order by, u64_narrow, aggregate functions: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 54.0s, or reduce sample count to 10.
window partition and order by, u64_narrow, aggregate functions
                        time:   [546.14 ms 555.42 ms 565.09 ms]
                        change: [-14.362% -12.767% -11.029%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild

Benchmarking window partition and order by, u64_wide, built-in functions: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 39.6s, or reduce sample count to 10.
window partition and order by, u64_wide, built-in functions
                        time:   [392.68 ms 399.86 ms 407.68 ms]
                        change: [-3.2891% -1.3995% +0.5607%] (p = 0.16 > 0.05)
                        No change in performance detected.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe

Benchmarking window partition and order by, u64_narrow, built-in functions: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 40.5s, or reduce sample count to 10.
window partition and order by, u64_narrow, built-in functions
                        time:   [416.35 ms 422.31 ms 428.51 ms]
                        change: [+0.1759% +1.7816% +3.3761%] (p = 0.04 < 0.05)
                        Change within noise threshold.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild

i suspect that this requires more sample where multiple window functions are planned in the same phase.

@jimexist
Copy link
Member Author

@Dandandan and @alamb what do you think of the above results? I think we can either merge or close this - and wait for that customized scheuduler

@Dandandan
Copy link
Contributor

Dandandan commented Jun 24, 2021

In my view, creating tasks this small is unlikely to be of benefit for query execution. Note that spawn blocking will create new threads on demand (512 by default), which will lead to higher memory use, and additional CPU usage / context switching (which can hurt performance of other parts of the query execution). The spawn blocking is only meant for longer running tasks.

I am not sure the customized scheduler will handle tasks this small.

I think much larger gains at this moment can be achieved with more efficient implementations and parallization at a much higher level.

@alamb
Copy link
Contributor

alamb commented Jun 24, 2021

I agree with @Dandandan 's conclusion

@jimexist
Copy link
Member Author

In my view, creating tasks this small is unlikely to be of benefit for query execution. Note that spawn blocking will create new threads on demand (512 by default), which will lead to higher memory use, and additional CPU usage / context switching (which can hurt performance of other parts of the query execution). The spawn blocking is only meant for longer running tasks.

I am not sure the customized scheduler will handle tasks this small.

I think much larger gains at this moment can be achieved with more efficient implementations and parallization at a much higher level.

thanks for the comment, i agree with the assessment above, plus that in most cases the number of window functions within a logically planned phase should be 1 or 2, there's little point to parallize.

i think #569 is more promising

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants