Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes broadcast spill serialization/deserialization [databricks] #8617

Merged
merged 4 commits into from
Jun 28, 2023

Conversation

abellina
Copy link
Collaborator

Closes #8602
Closes #8603

This PR fixes a couple of issues in broadcast related to Spark's MemoryStore spill and due to us marking the SerializeConcatHostBuffersDeserializeBatch as AutoCloseable (this is no longer the case as of this PR).

We change the input of SerializeConcatHostBuffersDeserializeBatch to be: HostConcatResult, numRows, and dataLen. HostConcatResult is null in the case we have a rows-only broadcast. Once the batch is materialized on the GPU, we defer to the materialized spillable as the source of truth for the various methods, including hostBatch. I've added tests to cover the various spill scenarios (before GPU materialization and after GPU materialization) + obtaining a hostBatch from driver or the executor.

@abellina
Copy link
Collaborator Author

build

@abellina
Copy link
Collaborator Author

build

@abellina
Copy link
Collaborator Author

I see dpp_test failed (I was running other suites locally) and it is related. I will debug and patch up.

@abellina
Copy link
Collaborator Author

I see dpp_test failed (I was running other suites locally) and it is related. I will debug and patch up.

Should be fixed by 48e8c05.

I was closing a hostBatch result too soon since we use a row iterator to project out in the GpuSubqueryBroadcastExec. I moved the .toArray within the withResource and added a comment.

@abellina
Copy link
Collaborator Author

build

@abellina
Copy link
Collaborator Author

@jlowe let me know if you have other comments.

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code changes look fine to me for the most part.

}
}
// scalastyle:on no.finalize

// scalastyle:off no.finalize
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why turn it on to turn it off right afterwards???

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make that one scalastyle:off block, it was mostly so that it was clear that it was needed in both classes.

* @param output used to find the schema for this broadcast batch
* @param numRows number of rows for this broadcast batch
* @param dataLen size in bytes for this broadcast batch
*/
// scalastyle:off no.finalize
@SerialVersionUID(100L)
class SerializeConcatHostBuffersDeserializeBatch(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why are we not doing what @jlowe suggested earlier and do the concat on the driver before we put it into the buffer? And similarly why don't we create a HostBuffer before we send the batch back so the same thing goes in as comes out. I really think I made this overly complicated.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the SerializeConcatHostBuffersDeserializeBatch classname could be changed as it is now confusing. The concat that @jlowe suggested is happening in the driver here: https://github.com/NVIDIA/spark-rapids/pull/8617/files#diff-7ca82be5615724f3ba4376b82be1c8a34e3fb5df34a3dedfa9a21eb3e6e54b97R637.

The "DeserializeBatch" class (which is what is actually broadcasted out) deals with a HostConcatResult exclusively, not the headers and buffers it used to deal with.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not about the name, but that is part of it. It is the fact that we have multiple different Serializable classes that take one type of input and produce a different type of output. Why? The original reason was because we needed to broadcast an object from the host and put it on the device, where it would be deduped on the device. But is that really saving us anything. Would it be better to just send/receive a HostBuffer and the headers. Just one Serializable class that does that. Then everything else are helper functions that transform it in different ways.

Not that this blocks the current patch from going in. I just thought that was the direction we were going to try, but I guess I was wrong.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started to file a follow on but I am not entirely sure how I'd merge the two serializables into one, well I can think of one way. Here's what happens today:

  1. Every partition produces a SerializeBatchDeserializeHostBuffer from the executors. This class has to be serializable takes a ColumnarBatch and it is able to write and read from the streams.
  2. The driver collects Array[SerializeBatchDeserializeHostBuffer], where each element is a partition.
  3. The driver concats this array and produces HostConcatResult.
  4. The driver wraps the host concatenated result using SerializeConcatHostBuffersDeserializeBatch and broadcasts this out.
  5. Executors deal with the HostConcatResult only.

In order to remove the number of serializable classes, I think we could broadcast the Array that we collected in (2) as Array is Serializable and then the executors would concatenate these results when materializing the batch:

  1. Every partition produces a SerializeBatchDeserializeHostBuffer from the executors. This class has to be serializable takes a ColumnarBatch and it is able to write and read from the streams.
  2. The driver collects Array[SerializeBatchDeserializeHostBuffer], where each element is a partition.
  3. The driver broadcasts the Array directly.
  4. Executors deal with serializing and deserializing this Array upto materialization.
  5. At materialiation, the executors create a HostConcatResult and turn it into a spillable GPU batch.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I synced with @revans2 here and the main comment here was that the driver could still concatenate but then broadcast out essentially the SerializeBatchDeserializeHostBuffer again (as the result of the concatenation is a header and a host buffer, so this object could be reused). This would help with simplifying the code and make it easier for us to go to the next step to improve Broadcast which was to allow these broadcast GPU RapidsBuffer to be freed instead of actually spilled, since they are already in host memory at the source. We can work on this cleanup as part of #7709.

Copy link
Member

@jlowe jlowe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. We can address the cleanup for driver concat and simplification/symmetry in a followup.

@abellina
Copy link
Collaborator Author

Thanks for the reviews @jlowe and @revans2!

@abellina abellina merged commit 2cad130 into NVIDIA:branch-23.08 Jun 28, 2023
@sameerz sameerz added the reliability Features to improve reliability or bugs that severly impact the reliability of the plugin label Jun 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
reliability Features to improve reliability or bugs that severly impact the reliability of the plugin
Projects
None yet
4 participants