-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Iceberg split generation rate limit #18214
base: master
Are you sure you want to change the base?
Conversation
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Marton Bod.
|
21f41d4
to
c237f9f
Compare
@alexjo2144 @findinpath Just a friendly bump, I would really appreciate it if you could review this PR when you get the chance. Thank you! |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitManager.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergModule.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java
Show resolved
Hide resolved
11e29b8
to
c322bad
Compare
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Marton Bod.
|
ae26fb3
to
6b08d49
Compare
6b08d49
to
aa4fca3
Compare
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Marton Bod.
|
3f3674a
to
ab82595
Compare
.addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) | ||
.addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), numberOfFiles + min(icebergManifestPrefetching, numberOfFiles)) | ||
.build()); | ||
assertFileSystemDataFileAccesses("SELECT * FROM test_select_with_limit LIMIT " + limit, min(limit, numberOfFiles)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@findepi the number of manifest accesses is no longer deterministic, because the splits are enqueued async. As soon as it's identified during a getNextBatch
call that we have enough splits, the enqueuing will stop, but we can't make any guarantees about the number of manifest accesses.
On the other hand, the number of data file accesses are deterministic and no more data files will be read than necessary. Changed this test to reflect this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thinking about this
- we still want to test manifest accesses; this is important metric
- there should probably be a kill-switch for this functionality
so maybe the test should just use that kill-switch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been thinking about this too, but I think adding a kill switch would complicate the code greatly. The iterative split generation approach is fundamentally different from the background async split loading approach, and a kill switch would require maintaining the code for both approaches in parallel and duplicating the logic in some places (and more painfully, porting fixes to both places). Because of this, I think it's impractical to introduce this flag.
I've made some enhancement to the test though, since even with this new approach we can make some guarantees about the number of manifest accesses. I've changed the test to compare the manifest access count against a lower bound and upper bound, and added some code comments to explain the reasoning behind the bounds. Regardless of the number of manifests in the scan, if there's a limit, the number of actually-accessed manifests should stay within a constant, narrow range.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The iterative split generation approach is fundamentally different from the background async split loading approach
kill switches are useful in case a bug is uncovered. if the new code is fundamentally different from the old one, it sounds like it's not safe to proceed, is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I phrased it wrong, I think. They're not fundamentally different, since most of the split generation and filtering logic has remained unchanged. It's different only in the sense that hiding it behind a feature flag is not trivial like putting some code in an if-else statement. But it's doable. For example, I could refactor it so that IcebergAsyncSplitSource
extends IcebergSplitSource
and then IcebergSplitManager
can toggle between which implementation to use based on the flag - what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make the new implementation produce the same behaviour as before if max-outstanding-splits
was configurable and set to 1 ?
@findepi Thanks so much for your time to review this. I've tried answering your questions as best as I could. Please let me know if you have any other questions/improvement ideas. Thanks! |
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: Marton Bod.
|
e408de7
to
2fc86ae
Compare
@raunaqmorarka @findepi @findinpath @electrum Would really appreciate reviews here, its been a long pending one. Hope to close this out soon. |
@raunaqmorarka @findepi @findinpath @electrum @mosabua Gentle ping again on this. |
@vgankidi I'm just gonna ping and ping until someone can get to this lol. Sorry this has taken so long. Also, please attend the contributor meeting next week: https://github.com/trinodb/trino/wiki/Contributor-meetings Bring this up there. |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should be doing this, and it should be discussed (slack, contributors meeting, etc.) before merging.
Turns out this is in every other hive style connector for some reason, so I guess this this ship has sailed
@electrum pointed out this is in the outher Hive style connectors already. I have no idea why this was added because it is a poor stand in for real concurrency control... anyway, I suggest we do not document or recommend this feature. |
|
I suggest we fix this up and merge it, so there is parity across the implementations. Then I suggest we add a real throttling wrapper to the new native file systems to actualy deal with real errors (we can start with a simpel per machine uncoordinated limt). Once that is in we should drop this from all of the connectors. |
Awesome @dain .. do you want me to file an issue so we can track this and move it along? @marton-bod can you rebase and ping us when its ready for review again? Then we can figure out how @findepi @raunaqmorarka @ebyhr and @dain collaborate efficiently to proceed this to merge. |
09a6c52
to
1bff24d
Compare
This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua |
Hi @dain, I was wondering if you could please take a look at this to close it out? Thanks so much! |
Please rebase @marton-bod Either @findepi or @dain can then proceed with final review and merge |
b913b34
to
dedf6a0
Compare
@mosabua Shall we just close this PR instead? |
@marton-bod @dain .. we kinda agreed to get this merged to get parity for all the object storage connectors on this feature. I still think we should do that .. and also work on a proper solution, maybe on the file system level @electrum Thoughts? And also .. sorry for the delays and back and forth. |
Description
Hive, Hudi and DeltaLake connectors already use the same approach implemented in this PR, whereby they first asynchronously load (stream) splits into an
AsyncQueue
(which can be configured with throttling settings), and then whengetNextBatch()
is periodically called from the SplitSource, splits are borrowed from this queue.The primary aim is to control the rate at which splits are fetched for scheduling, thereby protecting the storage layer from getting overwhelmed by requests (e.g. namenodes in HDFS). This is achieved by a newly-introduced config flag (already present in Hive, Hudi and Delta):
iceberg.max-splits-per-second
.Additional context and related issues
Solves the issue: #17974
The logic for filtering out splits should be unchanged: the pruning logic related to dynamic filter is in
getNextBatch()
since DF needs to be recomputed periodically. The rest of the pruning logic we can be applied to a split even before enqueueing it, therefore we can perform those checks withinacceptScanTask()
when we create the split stream.Note:
FileAwareScanTask
is a new class introduced to keep metadata for FileScanTasks regarding their parent files. This is needed to replicate the current logic which stops the iteration if we hit enough records already for a LIMIT clause.Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(X) Release notes are required, with the following suggested text: