-
Notifications
You must be signed in to change notification settings - Fork 236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make state spillable in partitioned writer [databricks] #8667
Make state spillable in partitioned writer [databricks] #8667
Conversation
Signed-off-by: Alessandro Bellina <[email protected]>
build |
...lake/common/src/main/scala/com/nvidia/spark/rapids/delta/GpuDeltaTaskStatisticsTracker.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarOutputWriter.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala
Show resolved
Hide resolved
build |
build |
build |
build |
Another follow on issue: #8736 |
@jlowe @revans2 this should be ready for another look. I added unit tests that cover the interaction with batches to write, spillables, and splits, but that do not validate the outputs. I wanted to get this reviewed and follow up with more testing either via integration tests or unit tests as follow on work. It would be nice to get runtime for this patch in general. #8738 for follow on test work. |
build |
I am working on the failure. It looks like a failure to shutdown RMM causing failures in other tests. It seems related to mocking exceptions in the tests, so that looks to be a real issue. Will post once I have it |
build |
build |
…into spillable_splits_part_writer
build |
I am having lots of issues with |
Ok the reason why |
build |
@revans2 this is ready for another look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few nits
protected def getOutputStream: FSDataOutputStream = { | ||
val hadoopPath = new Path(path) | ||
val fs = hadoopPath.getFileSystem(conf) | ||
fs.create(hadoopPath, false) | ||
} | ||
|
||
protected val outputStream: FSDataOutputStream = getOutputStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could you add that as a comment to getOutputStream so we know why it is there?
spillableBatch: SpillableColumnarBatch, | ||
statsTrackers: Seq[ColumnarWriteTaskStatsTracker]): Long = { | ||
val writeStartTime = System.nanoTime | ||
val cb = closeOnExcept(spillableBatch) { _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think it would be a little cleaner to move cb to be inside of the closeOnExcept and not recreate it.
closeOnExcept(spillableBatch) { _ =>
val cb = withRetryNoSplit[ColumnarBatch] {
spillableBatch.getColumnarBatch()
}
withResource(cb) { _ =>
throwIfRebase...
}
}
build |
Closes #6980
This change ensures that the partitioned writer queued batches are made spillable prior to releasing the semaphore.
I did change the ParquetWriterSuite such that exceptions being checked during writer failures look at the cause and find a SparkUpgradeException, instead of a SparkException to reflect the current state of some checks done in parquet specifically for date/time rebasing. I didn't include fixes for this, just made the tests test for the exception actually thrown.
I verified that this change allows the
store_sales
nds_transcode
at 50GB to work with only 3GB of GPU memory, whereas I needed 12GB of memory before the change (failing with OOM at various places with less than 12GB due to all of the state kept around without the semaphore held). I used 16 shuffle partitions to make the write tasks process around 500MiB each. I used the following command and template to test, changing the JAR location (and the allocSize to test when the baseline would stop OOMing):