-
Notifications
You must be signed in to change notification settings - Fork 236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for chunked parquet reading [databricks] #6934
Conversation
Signed-off-by: Robert (Bobby) Evans <[email protected]>
sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/parquet/GpuParquet.java
Show resolved
Hide resolved
sql-plugin/src/main/java/com/nvidia/spark/rapids/iceberg/spark/source/GpuSparkScan.java
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
Outdated
Show resolved
Hide resolved
@jlowe I think I have finished all of the rework you requested. Please take another look. |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchIterator.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchIterator.scala
Outdated
Show resolved
Hide resolved
* | ||
* @tparam T what it is that we are wrapping | ||
*/ | ||
abstract class GpuDataProducer[T] extends AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
abstract class GpuDataProducer[T] extends AutoCloseable { | |
trait GpuDataProducer[T] extends AutoCloseable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also sealed
it?
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchIterator.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataProducer.scala
Outdated
Show resolved
Hide resolved
override def next(): ColumnarBatch = throw new NoSuchElementException() | ||
} | ||
|
||
class SingleGpuColumnarBatchIterator(var batch: ColumnarBatch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class SingleGpuColumnarBatchIterator(var batch: ColumnarBatch) | |
class SingleGpuColumnarBatchIterator(private var batch: ColumnarBatch) |
@@ -70,19 +70,29 @@ def read_parquet_sql(data_path): | |||
original_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'PERFILE'} | |||
multithreaded_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED'} | |||
coalesce_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'COALESCING'} | |||
coalesce_parquet_file_reader_multithread_filter_chunked_conf = {'spark.rapids.sql.format.parquet.reader.type': 'COALESCING', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any test with different size limits here? Some limit values ranging from small to large should be better than the default max 2GB value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default limit in these tests is 512 MiB. We don't have any files that are read that would produce more than a single batch anyways. I don't think we have any files that would be larger than a page per row so even if we tried to turn it on the output would be the same. I have done manual tests because the size needed does not really lend itself to this type of test.
@@ -67,6 +67,8 @@ public static class ReadBuilder { | |||
private Configuration conf = null; | |||
private int maxBatchSizeRows = Integer.MAX_VALUE; | |||
private long maxBatchSizeBytes = Integer.MAX_VALUE; | |||
private long targetBatchSizeBytes = Integer.MAX_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me, I'm very confused in differentiating between maxBatchSizeBytes
and targetBatchSizeBytes
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want a comment in here? The long term plan is to eventually get rid of the maxBatchSizeBytes and the maxBatchSizeRows, but we can only do that one we have chunked reads the only option for Parquet and ORC and depending on how things go for avro, CSV, and JSON too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes please add comment here to clarify how they are different.
while (hasNext) { | ||
func(next) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In cudf we had issue when not calling next
before hasNext
for the empty input file. In such cases, the valid output should be a table having 0 rows but with all (0 rows) columns in the file schema. We can only get such output table if we call next
before hasNext
:
do {
func(next)
} while(hasNext)
I'm not sure if such output table is also desired in Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow that is totally not the order that I would expect the APIs to need to be called in. It is totally opposite of all java iterators, but OK. I will need to change it in a very different place than this because GpuDataProducer is abstract and says that it should operate like an iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make sure that the JNI API is clearly documented because like I said this totally is the opposite of java semantics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is documenting this weird behavior the right answer? This seems like behavior that should be fixed in cudf. The entire point of hasNext
is to be a predicate function to let the calling code know that it is valid to call next
, as the method name implies. Needing to special-case that for the first call is bizarre and error-prone, IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not just a specific issue with cudf, but a general logic issue. In the case of empty file, what should hasNext
return? It should always return false
as there is no data in the file (except metadata) to read. But what you expect is a table with empty columns, not a table without anything, right? So if you check hasNext
and see a false
then next
will never be called and you never get a table with empty columns.
On the other hand, if hasNext
return true
for empty file then it will always return true
, and you can't know when to stop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talked with Bobby and decided to workaround this in cudf, hasNext
(cudf Java JNI) will return true
at least once (always returns true
for the first time).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, just a minor question on the single iterators.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchIterator.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, assuming iterator issues get worked out in the cudf Java bindings.
Yes the latest patch has the iterator issues fixed on the java side. Hopefully it gets merged in soon. |
build |
1 similar comment
build |
This depends on rapidsai/cudf#11961
This fixes the parquet part of #4968 I need to file a separate follow on issue for the ORC part of this.
In my tests with non-nested values there was no performance difference, and in general the chunked reader was able to avoid memory problems.
I still need to do some performance testing for nested types.