Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Better Memory Management for BroadcastNestedLoopJoin #302

Closed
revans2 opened this issue Jun 29, 2020 · 7 comments
Closed

[FEA] Better Memory Management for BroadcastNestedLoopJoin #302

revans2 opened this issue Jun 29, 2020 · 7 comments
Labels
feature request New feature or request P1 Nice to have for release performance A performance related task/issue reliability Features to improve reliability or bugs that severly impact the reliability of the plugin wontfix This will not be worked on

Comments

@revans2
Copy link
Collaborator

revans2 commented Jun 29, 2020

For #296 BroadcastNestedLoopJoin is going to be disabled by default because of the potential for a memory explosion. Before we can enable it by default we need a much better way to do memory management for it.

Some of the ideas that I had are.

  1. Have CoalesceBatches take a function instead of a hard value and use the schema + broadcast size to avoid combining data together if we are just going to have to split it apart later anyways.
  2. Implement batch splitting so we can take the size of both input batches and guess as to what the size of the produced output would be. If it is too large then we can split the streamed batch up into smaller batches to try and avoid using too much memory. This one might get a little hard, because we have to manage the partial batch along with the new batch that we are trying to produce. We need to do some experiments to see if we want to actually split the incoming batch into smaller pieces, which probably requires a kernel launch and extra memory buffers vs using a zero copy slice which would require some code changes to the CUDF APIs to be able to support this. Some of this code can probably be reused for [BUG] very large shuffles can fail #45

There are only two join types were we could split the broadcast table and those are covered by CarteseanExec. In all other cases if we wanted to split the broadcast table we would need some help from cudf to support it.

@revans2 revans2 added feature request New feature or request ? - Needs Triage Need team to review and classify labels Jun 29, 2020
@kuhushukla
Copy link
Collaborator

My preference would be approach#1 since it gives us more control.

@revans2
Copy link
Collaborator Author

revans2 commented Jun 29, 2020

I think we need both. If we just do the first approach then we risk having some processing up stream that produced a large batch. In theory we can push it up further, but at some point we might lose by processing really small batches. We might be able to get away with just option 1 for some things, but eventually I think we need both.

@sameerz sameerz added P1 Nice to have for release performance A performance related task/issue and removed ? - Needs Triage Need team to review and classify labels Aug 25, 2020
@sperlingxx
Copy link
Collaborator

Hi @revans2, I have some questions about the first approach. Does it applied on the broadcast table? And what kind of function the CoalesceBatches should take? Thank you!

@revans2
Copy link
Collaborator Author

revans2 commented Feb 4, 2021

Currently we only support inner joins for this. It means that each join is going to do a full cross join, optionally followed by a filter. In this case, inner joins, Spark will insert a BroadcastNestedLoopJoin iff it estimates that the size of the build table is small enough that it fits in the auto-broadcast threshold. Otherwise it will fall back to a CartesianProductExec. So even though we could split up the build table Spark and the user think it is small enough that we should not need to. Also if we did split up the build table we would need to change the algorithm so it matched more closely to what CartesianProductExec is doing.

A cross join produces rows where everything on the left is combined with everything on the right. 100 rows on the left and 5 rows on the right produces 500 rows of output. So we can guess at the output size with

output_rows = left_side_rows * right_side_rows
approximate_row_size = left_side_bytes/left_side_row + right_side_bytes/right_side_rows
approximate_output_size = output_rows * approximate_row_size
approximate_output_size = left_side_rows * right_side_rows * (left_side_bytes/left_side_row + right_side_bytes/right_side_rows)
approximate_output_size = (left_side_bytes * right_side_rows) + (right_side_bytes * left_side_rows)

In query planning we can estimate the size of each row based off of the schema. We can estimate the size of the broadcast based off of the autoBroadcastThreashold config. We can also set our target output size based off of gpuTargetBatchSizeBytes.

So for the CoalesceBatch we can start out with a default value of something like.

approximate_output_size = (left_side_bytes * right_side_rows) + (right_side_bytes * left_side_rows)
gpuTargetBatchSizeBytes = (left_side_rows * estimate_left_row_size * (autoBroadcastThreashold/estimated_right_row_size)) + (autoBroadcastThreashold * left_side_rows)
gpuTargetBatchSizeBytes = left_side_rows * ((estimate_left_row_size * autoBroadcastThreashold/estimated_right_row_size) + autoBroadcastThreashold
left_side_rows = gpuTargetBatchSizeBytes/((estimate_left_row_size * autoBroadcastThreashold/estimated_right_row_size) + autoBroadcastThreashold
target_left_side_bytes = left_side_rows * estimated_left_row_size

Once we have real numbers for the build table and the batch that showed up we can then decide if we need to split up the input table or not.

@sperlingxx
Copy link
Collaborator

Thanks for so detailed explanation. I am trying to understand it.
So, we decide whether to split up the stream table (and the split size) according to above estimation formula?
If I am not wrong, the formula above is to ensure the batch size of cross join's outputs doesn't exceed the gpuTargetBatchSizeBytes?
And I also found that BroadcastNestedLoopJoin can be run without CoalesceBatches, like below plan:

GpuColumnarToRow false
+- GpuBroadcastNestedLoopJoin BroadcastNestedLoopJoin BuildRight, Cross, Cross, 2147483647
   :- GpuRowToColumnar TargetSize(2147483647)
   :  +- LocalTableScan [longs#7L, more_longs#8L]
   +- GpuBroadcastExchange IdentityBroadcastMode, [id=#28]
      +- GpuRowToColumnar TargetSize(2147483647)
         +- LocalTableScan [longs#18L, more_longs#19L]

@revans2
Copy link
Collaborator Author

revans2 commented Feb 5, 2021

The math I did was just quick back of the envelope math. If you can come up with a better estimate for splitting the data then that is fine. Also please check my math, I am not 100% sure that it is correct.

The plan you showed is the typical plan. The build table comes from a GpuBroadcastExchange. The data for that table is already concatenated together. In theory we could split it up into smaller pieces and join each of them with each of the stream batches (cross product) but since the size of the broadcast is likely to be small (10MB by default) then that hopefully means that we can split the stream data into similarly small pieces.

@revans2 revans2 mentioned this issue Apr 8, 2022
14 tasks
@revans2 revans2 added the reliability Features to improve reliability or bugs that severly impact the reliability of the plugin label Apr 12, 2022
@mattahrens
Copy link
Collaborator

Closing as won't fix for now

pxLi pushed a commit to pxLi/spark-rapids that referenced this issue May 12, 2022
…#302)

* refactor overseer_agent to AdminAPI from cli, add working ha commands

* Fix CI

* fix CI

* remove check because it was performed earlier

* fix CI
@sameerz sameerz added the wontfix This will not be worked on label May 26, 2022
tgravescs pushed a commit to tgravescs/spark-rapids that referenced this issue Nov 30, 2023
Signed-off-by: spark-rapids automation <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request P1 Nice to have for release performance A performance related task/issue reliability Features to improve reliability or bugs that severly impact the reliability of the plugin wontfix This will not be worked on
Projects
None yet
Development

No branches or pull requests

5 participants