-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve compaction job throughput #3643
Labels
Comments
patchwork01
changed the title
Lower limit of compaction job creation
Serverless options to improve compaction job throughput
Nov 8, 2024
patchwork01
added
enhancement
New feature or request
and removed
compactions-module
labels
Nov 8, 2024
patchwork01
changed the title
Serverless options to improve compaction job throughput
Options to improve compaction job throughput
Nov 8, 2024
patchwork01
changed the title
Options to improve compaction job throughput
Document options to improve compaction job throughput
Nov 8, 2024
patchwork01
added
documentation
Improvements or additions to documentation
and removed
epic
labels
Nov 8, 2024
patchwork01
added
epic
and removed
documentation
Improvements or additions to documentation
labels
Nov 12, 2024
patchwork01
changed the title
Document options to improve compaction job throughput
Improve compaction job throughput
Nov 12, 2024
This was referenced Nov 12, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The state store backed by a transaction log can handle a considerable throughput, but it involves putting all updates to the state store through an SQS FIFO queue into a lambda, and a fully ordered transaction log in DynamoDB for each Sleeper table. These components each result in limits on the number of updates we can apply to the state store. We can consider how we might avoid or raise each of those limits.
The current bottleneck this places on the size of the system is the number of compactions we can create and apply to a Sleeper table. As you ingest data, this is likely to produce roughly as many compactions as there are partitions in the table. We'd like to focus on the number of compactions we can run and the rate at which they pass through the system.
We're considering adding a state store backed by PostgreSQL which should allow for much higher throughput of compaction jobs. This involves a persistent instance of Aurora Serverless v2, which does not scale to zero. This issue is an alternative to that, to improve the transaction log based approach.
Numbers of updates for compactions
The number of compactions a Sleeper table can process is limited by the number that can be created by the compaction job creation lambda, and the number that can be committed by the state store committer lambda (assuming asynchronous commits), and the number of compaction tasks that are available, and the rate at which each compaction job executes.
By default, the compaction job creation lambda runs once per minute, and can create a maximum of 10,000 per invocation. These 10,000 jobs are submitted to the state store committer lambda in batches of 10 to have their input files assigned to them. This will generate a maximum of 1,000 commits per minute.
By default, when the jobs are picked up by a compaction task and run successfully, they are sent to the state store committer lambda as individual commits. If we assume jobs are completed immediately, this will generate a maximum of 10,000 commits per minute.
If we assume the state store committer lambda can handle 20 commits per second, that comes to 1,200 commits per minute. That's barely enough to handle the input file assignments. If we assume 100 commits per second, that comes to 6,000 commits per minute. That's not enough to handle all the compaction jobs.
But the above assumes that the compaction job creation rate is constant throughout the day. If 10,000 compaction jobs are created per minute, that's 14.4 million jobs per day. It's not clear that any of our scenarios require that many jobs. If they don't then the rates the components need to handle will not be as high as above.
Updates from the compaction job creation lambda
Each invocation of the compaction creation lambda creates a number of updates to split file references down the partition tree, and to assign input files to each compaction job that it creates. If these updates aren't committed before the next run of the lambda, the lambda will re-create the same file reference splits and compaction jobs, and they will build up on the state store committer queue. This is a failure case we'd like to avoid.
We can only send compaction jobs to the compaction queue in at most batches of 10. The updates to assign input files to the compactions need to happen after each job is sent, because we don't want a file to get assigned to a job that never happens. Conversely, if a compaction ever completes before its input file assignment is on the committer queue, it will fail to commit.
The purpose of the file assignments is to avoid ever editing a file reference if it's already due to be compacted, or if it's being split. With the state store committer queue, it only achieves that purpose if the commits are applied before the next run of the compaction creation lambda. We could consider decoupling this concern from the state store entirely, as in the pending operations store section below, or we could focus on reducing the load on the committer queue.
Pending jobs queue
Rather than expect the compaction job creation lambda to do everything necessary to create all the compaction jobs, we could have it create larger batches of jobs, write those to S3, then put a message on a pending jobs queue pointing to each batch. This would allow multiple lambda instances to receive those batches and create the jobs:
Synchronous file assignments
We could experiment with larger, synchronous updates to the state store, e.g. send 100 batches of jobs to SQS, where each batch contains 10 jobs, and then synchronously apply one transaction to the state store which assigns job ids for those 1000 jobs. Normally with the transaction log state store we want to do all commits via the commit lambda to avoid contention, but a small amount of contention due to the compaction job creation lambda needing to apply updates may be ok.
Pending operations store
We could create a store to track which files have pending operations. As is, this would only ever be updated by the compaction job creation lambda, which executes synchronously for each Sleeper table. We may also want to update it from a client.
We would also need to decide whether this should replace the job ID on the file reference in the state store, or coexist with it. It may be cleaner to remove the job ID entirely but this would also remove the validation of this in the state store. That might be worthwhile to decouple it, or we might prefer to keep both. This is also related to the problem where if a compaction job ever completes before its input file assignment is applied, it will fail to commit. If we remove the validation and ensure causal linking with the pending operations store instead, we wouldn't have that problem.
The flow for the compaction job creation lambda would be something like:
If we use DynamoDB to hold the pending operations, we have a potential problem as DynamoDB cannot produce a consistent view of a large set of data. There may be more items than fit in a page, and when you load the next page you'll get an inconsistent view if the state has changed between page loads. Here are some ways we could resolve this:
Updates from compaction tasks completing jobs
We may be able to reduce the number of commits from finished compaction jobs. We could add a separate queue and lambda to perform batching of the jobs into larger commit messages. The queue can be a normal, non-FIFO queue, because we don't care about ordering of compaction job commits as the affected files are already locked:
Throughput of transactions
The following issue seems necessary to improve the throughput of the state store committer lambda:
This issue will help to future proof if we want to optimise querying the transaction log:
Here are some options which we may choose to do, but possibly separately:
We decided not to include this in this epic, because it may be easier to address this with a persistent instance:
The text was updated successfully, but these errors were encountered: