-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-43660: [C++] Add a CastingGenerator
to Parquet Reader that applies required casts before slicing
#43661
base: main
Are you sure you want to change the base?
Conversation
|
CastingGenerator
to Parquet Reader that applies required casts before slicingCastingGenerator
to Parquet Reader that applies required casts before slicing
226d2d1
to
f1c0d6e
Compare
@@ -555,6 +562,57 @@ Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader | |||
}); | |||
} | |||
|
|||
struct CastingGenerator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit curious why casting generator is required, since during reading parquet to arrow, Parquet reader already applies a round of casting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're probably right and it likely does perform some casts during the read. However, it doesn't seem to be doing it for certain cases like String to LargeString.
Could you point me to where the Parquet Reader should be performing this cast? I'm happy to add it there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
Based on what I see, that is only responsible for casting the data to the logical type specified in the parquet metadata and not the Arrow type we want to convert to (the one in the dataset_schema). For strings, that seems to always map to a String type (based on FromByteArray
which is called by GetArrowType
which is called by GetTypeForNode
which is called by NodeToSchemaField
which is called in SchemaManifest::Make
during the creation of the LeafReader
). Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on what I see, that is only responsible for casting the data to the logical type specified in the parquet metadata and not the Arrow type we want to convert to (the one in the dataset_schema)
Parquet logical type doesn't have an arrow schema, isn't it? Binary reader reads from ::arrow::BinaryBuilder
, and casting it to user-specified binary type.
For strings, that seems to always map to a String type (based on FromByteArray which is called by GetArrowType which is called by GetTypeForNode which is called by NodeToSchemaField which is called in SchemaManifest::Make during the creation of the LeafReader).
Yeah, you're right, the read "cast" with file-schema rather than an expected schema. I think a native cast is better here but this doesn't solve your problem, perhaps I can trying to add a naive SchemaManifest
with hint solving here, but it would spend some time.
::arrow::Result<std::shared_ptr<ArrowType>> GetTypeForNode(
int column_index, const schema::PrimitiveNode& primitive_node,
SchemaTreeContext* ctx)
Maybe we should rethink the GetTypeForNode
handling for string/large_string/stringView, or using some handle written type hint here. A casting generator is also good for me when the reader cannot provide the right casting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Provide hint would looks like: apache/arrow-rs#5939
Maybe I can add separate issue for that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parquet logical type doesn't have an arrow schema, isn't it?
As far as I understand, the parquet metadata may or may not have the arrow schema. I believe it depends on the writer. It looks like it tries to get that using GetOriginSchema
in SchemaManifest::Make
. However, the schema at write time might not be the same as the schema the reader expects.
Binary reader reads from ::arrow::BinaryBuilder, and casting it to user-specified binary type.
Sorry, I didn't quite follow. Are you saying that we should use this to do the cast at read time somehow?
I think a native cast is better here but this doesn't solve your problem, perhaps I can trying to add a naive SchemaManifest with hint solving here, but it would spend some time.
Maybe we should rethink the GetTypeForNode handling for string/large_string/stringView, or using some handle written type hint here.
That makes sense to me.
Maybe I can add separate issue for that
That would be great, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just check this. We can first add a CastingGenerator
, because sometimes we would have schema evolution here, and need cast to a "final type". The Parquet reader code can be regard as an optimization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, thanks!
bdac54a
to
e832a0d
Compare
I'm not sure why the "Java JNI / AMD64 manylinux2014 Java JNI (pull_request)" and "continuous-integration/appveyor/pr" are failing. The latter in particular passed on the previous commit which is the same as this commit (my last commit was an empty commit to trigger the CI again). Any insights would be appreciated. I would also appreciate feedback on if and what unit tests need to be added for this change. It seems like the existing unit tests provide good coverage of this code (they helped me find some of the bugs), but I'm happy to add more if it is deemed useful. |
The interface general LGTM. I'm a little busy on working days, and will take a careful around in weekend. You also can mark it as ready for review here |
Thanks, I appreciate it! |
@mapleFU If you get a chance to review this PR this weekend, that'd be great. Thanks! |
Sorry for delaying I'll take a pass today. I'm out today so I didn't finish review, will continue after I wake up |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea looks great to me, but I don't know would setting memory to maximum matters
if (this->cols_to_skip_.count(field->name())) { | ||
// Maintain the original input type. | ||
out_schema_fields.emplace_back(field->WithType(column->type())); | ||
out_cols.emplace_back(std::move(column)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So cols_to_skip_
is just not "Project" this field, rather than not need this field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, to skip the cast and leave them as they are.
@@ -617,6 +684,9 @@ Result<RecordBatchGenerator> ParquetFileFormat::ScanBatchesAsync( | |||
[this, options, parquet_fragment, pre_filtered, | |||
row_groups](const std::shared_ptr<parquet::arrow::FileReader>& reader) mutable | |||
-> Result<RecordBatchGenerator> { | |||
// Since we already do the batching through the SlicingGenerator, we don't need the | |||
// reader to batch its output. | |||
reader->set_batch_size(std::numeric_limits<int64_t>::max()); | |||
// Ensure that parquet_fragment has FileMetaData |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming a large file, would memory usage grows high in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have that problem regardless of the batch size of the reader.
We pass this reader to reader->GetRecordBatchGenerator
(a few lines down). This eventually creates a RowGroupGenerator
(
arrow/cpp/src/parquet/arrow/reader.cc
Line 1204 in 6a2e19a
RowGroupGenerator(::arrow::internal::checked_pointer_cast<FileReaderImpl>(reader), |
FetchNext
for RowGroupGenerator
, it essentially calls ReadOneRowGroup
which reads the entire row group into a table and then creates an output stream using TableBatchReader
(arrow/cpp/src/parquet/arrow/reader.cc
Line 1170 in 6a2e19a
table_reader.set_chunksize(batch_size); |
batch_size
just creates a reader on top of it which generates zero-copy slices. This is the same as what the SlicingGenerator does and is redundant. In my opinion, it's better to set the batch size to INT_MAX so that it returns one table per row group, and then we can perform the batching through the SlicingGenerator.
// We need to skip casting the dictionary columns since the dataset_schema doesn't | ||
// have the dictionary-encoding information. Parquet reader will return them with the | ||
// dictionary type, which is what we eventually want. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've forget something here, would Dict(String)
and Cast(xxx -> LargeString)
matters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't sure, so I left that case untouched. There are more casts done further up the chain (e.g. in MakeExecBatch
potentially) that seem to handle those cases. I couldn't figure out how the dictionary case gets handled either, so I left it as is. I'm happy to implement it here if you could point me in the right direction.
…s before slices are created
912db12
to
f1a4955
Compare
Also cc @bkietz as the expert here |
cc @srilman |
Rationale for this change
(See #43660)
What changes are included in this PR?
ParquetFileFormat::ScanBatchesAsync
, set thebatch_size
for thereader
to INT64_MAX. Since we already have aSlicingGenerator
that is responsible for converting the reader's output intobatch_size
sized batches, we don't need the reader to batch its output.CastingGenerator
that applies any required casts to the output of the reader before the output is passed toSlicingGenerator
. The logic for the cast closely follows the logic inMakeExecBatch
(arrow/cpp/src/arrow/compute/expression.cc
Line 644 in 6a2e19a
arrow/cpp/src/arrow/compute/expression.cc
Line 672 in 6a2e19a
Are these changes tested?
I would like some feedback on the best way to test this. The changes passed our internal test cases, however, it may not account for all possible use cases.
Are there any user-facing changes?
No.