-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(source): column pruning for parquet file source #18967
Conversation
@@ -57,14 +62,15 @@ impl<Src: OpendalSource> SplitReader for OpendalReader<Src> { | |||
splits: Vec<OpendalFsSplit<Src>>, | |||
parser_config: ParserConfig, | |||
source_ctx: SourceContextRef, | |||
_columns: Option<Vec<Column>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, the columns variable was unused, does it mean it is None?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's just not used. Here the columns is Some(user_defined_columns
, hidden_columns
).
I just run some test to verify
CREATE Table test_s3(
id int,
age int,
) WITH (
connector = 's3',
match_pattern = '*.parquet',
s3.region_name = 'custom',
s3.bucket_name = 'hummock001',
s3.credentials.access = 'hummockadmin',
s3.credentials.secret = 'hummockadmin',
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
) FORMAT PLAIN ENCODE PARQUET;
and the columns here is Some([Column { name: "id", data_type: Int32, is_visible: true }, Column { name: "age", data_type: Int32, is_visible: true }, Column { name: "_row_id", data_type: Serial, is_visible: false }, Column { name: "_rw_s3_v2_file", data_type: Varchar, is_visible: false }, Column { name: "_rw_s3_v2_offset", data_type: Varchar, is_visible: false }])
}) | ||
}) | ||
.collect(); | ||
valid_column_indices.sort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the order of the input columns isn't respected, but the order matters. Considering parquet schema is ['a', 'b', 'c'], but the column order we want is [c
, b
]. The valid_column_indices.sort()
here will lead to [b
, c
] which is unexpected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The valid_column_indices
is used to construct ProjectionMask
, and in ProjectionMask
, repeated or out of order indices will not impact the final mask(see https://docs.rs/crate/parquet/latest/source/src/arrow/mod.rs#179 for detail). i.e. [0, 1, 2]
will construct the same mask as [1, 0, 0, 2]
. After getting the record batch stream, we still do the schema check and match so no matter what order, there will be no problem with correctness.
But I don't mind removing the sort here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously I keep .sort()
just refer to how iceberg-rust does, let me also send a pr to remove this in iceberg-rust😄.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. ParquetParser
will reorder the chunk returned from ParquetRecordBatchStream
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…nto wcy/column_clipping_for_parquet_file_source.pr
|
GitGuardian id | GitGuardian status | Secret | Commit | Filename | |
---|---|---|---|---|---|
9425213 | Triggered | Generic Password | ad71847 | risedev.yml | View secret |
9425213 | Triggered | Generic Password | ad71847 | e2e_test/source_inline/tvf/postgres_query.slt | View secret |
9425213 | Triggered | Generic Password | ad71847 | e2e_test/source_inline/tvf/postgres_query.slt | View secret |
🛠 Guidelines to remediate hardcoded secrets
- Understand the implications of revoking this secret by investigating where it is used in your code.
- Replace and store your secrets safely. Learn here the best practices.
- Revoke and rotate these secrets.
- If possible, rewrite git history. Rewriting git history is not a trivial act. You might completely break other contributing developers' workflow and you risk accidentally deleting legitimate data.
To avoid such incidents in the future consider
- following these best practices for managing and storing secrets including API keys and other credentials
- install secret detection on pre-commit to catch secret before it leaves your machine and ease remediation.
🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
For Parquet encode, we can get it's own schema before reading whole file, thus we can do column pruning to read only the columns that the user needs by setting
ProjectionMask
.The same optimization can also be done on tvf(
file_scan(paruqet_file)
), but since tvf and batch source will be unified in the backend, this optimization is only implemented on the file source side.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.