Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EPIC: Rust Based Compaction #624

Open
Xuanwo opened this issue Sep 9, 2024 · 11 comments
Open

EPIC: Rust Based Compaction #624

Xuanwo opened this issue Sep 9, 2024 · 11 comments

Comments

@Xuanwo
Copy link
Member

Xuanwo commented Sep 9, 2024

This is an EPIC issue that serves as a direction worth our community's attention. We can use this issue to track the features we want to offer and how close we are to achieving them.


The issue concerns compaction, specifically native compaction, to be precise, Rust-based compaction.

We all know that compaction is a resource-intensive task that involves heavy calculations, significant I/O, substantial memory consumption, and large-scale resources. I beleive compaction is the killer feature that iceberg-rust can provide for the whole communnity. I expect iceberg-rust can implement compaction more efficiently in terms of both performance and cost.

In this EPIC, I want iceberg-rust to deliver:

Compaction API for a table.

  • It should have a simple API that is easier to use for small tables, such as table.compact().
  • It should have a well-designed planner and scheduler that functions efficiently in a distributed system, processing large tables quickly.

Bindings for Python and Java.

  • This API should be available in Python so that PyIceberg can benefit from our implementation.
  • This API should be available in Java, allowing users to enhance their Spark jobs.

Tests (E2E tests, behavior tests, fuzz tests, ...)

Compaction is more complex than just reading. Mistakes we make could break the user's entire table.

We will need various tests, including end-to-end tests, behavior tests, and fuzz tests, to ensure we have done it correctly.

@flaneur2020
Copy link

flaneur2020 commented Sep 9, 2024

it makes a lot of sense to use rust on these computation heavy workloads.

i'm looking forward to save up to 90% memory usages after switching to the rust implementation.

keep a close watch on this issue!

@kevinjqliu
Copy link

Thanks for starting this! Linking the relevant issue from the pyiceberg side, iceberg-python/#1092
Would be great to collaborate on this!

@kevinjqliu
Copy link

Data compaction is something the BDT at Amazon is heavily invested in and blogged about in https://aws.amazon.com/blogs/opensource/amazons-exabyte-scale-migration-from-apache-spark-to-ray-on-amazon-ec2/.

Related to the ray-project/deltacat project.

@manuzhang
Copy link
Contributor

Does this depend on a distributed compute engine?

@Xuanwo
Copy link
Member Author

Xuanwo commented Sep 10, 2024

Does this depend on a distributed compute engine?

I think this can work on a distributed query engine but not required.

@hzxa21
Copy link

hzxa21 commented Sep 10, 2024

I am super excited that the community is considering offering compaction capability in iceberg-rust. Thanks for bring this up!

There are several things popping out of my mind that we need to consider in terms of compaction:

  1. Compaction API
    i. High-level API: trigger compaction for a table with some configuration similar to the spark rewrite_data_files. This is essentially table.compact() mentioned in the issue description and is useful for small tables.
    ii. Low-level API: given a collection of files (including both data. eq-delete and pos-delete files) of a table, output a collection of compacted files (including both data and eq-delete files). This is useful as a building block of more sophistic compaction stretegy.
  2. Compaction strategy:
    i. When to trigger a compaction
    ii. Which files to pick to minimize read/write amplification, space amplification, improve sorting, etc...
  3. Compaction runtime:
    i. Where to schedule a compaction task
    ii. How to determine the parallelism of a compaction

I think we need to be clear about what should be done in iceberg-rust and what are the no-goals. IMO, given that iceberg is a spec rather than a service, I think Compaction API (1), especially the low-level API, is more essential than others (2 & 3). With that, users of iceberg-rust have the flexibility to further implement 2 and 3 based on their workload, use cases and environments.

@sdd
Copy link
Contributor

sdd commented Sep 11, 2024

I will be working on adding support for positional and identity deletes to the table scans next. Hopefully this will help towards this goal.

@liurenjie1024
Copy link
Collaborator

Thanks @Xuanwo for raisings this. I think we have a long way to go, for example, support for deletion files, transaction api. Also, I'm not sure if we actually don't need a distributed compute engine for this, since typically iceberg table is quite huge and single machine will not be able to serve the workload.

@vakarisbk
Copy link

Thanks @Xuanwo for raisings this. I think we have a long way to go, for example, support for deletion files, transaction api. Also, I'm not sure if we actually don't need a distributed compute engine for this, since typically iceberg table is quite huge and single machine will not be able to serve the workload.

DataFusion now has a distributed mode project, DataFusion Ray. By the time we finish preparations for compaction, DataFusion Ray may become sufficient enough at least for the compaction use case.

@camuel
Copy link

camuel commented Oct 14, 2024

Does anyone has any insights on how computation heavy is the compaction workload really? Like on a beefy machine what compaction rate will be possible? Like 1GB/sec? 10GB/sec? A ball part figure? Separately what is possible compaction rate from first principles and what is typical compaction rate per node with today Iceberg impl.?

I think in the overwhelming majority of usecases, data is primarily partitioned by time, and most data of a huge table is old data, of low value and absolutely not to be routinely touched on every compaction job! Am I right? Or my experience is not representative? If so, it is not the the size of the table which matters, but the rate of new data arrival between compaction jobs, as mostly those new data is be compacted and rewritten and only once in its lifetime (unless we have DML touching old data which should be extremely rare compared to routine compaction).

Separately, what is exactly compaction is a bit confusing to me, should data be sorted across resulting (after compaction) parquet files in a partition, or it is enough that each resulting parquet file is only internally sorted and it is permissible for all parquet files in a partition to be all over the partition range? As per specification, it is enough to sort internally each resulting parquet file and not only that, the sort order can be different for each file that hints that there is no global sort in a partition. I looked into java compaction code and it indeed sorts it across files but not across all files in a partition but across some files in a partition which they call "a group" and "group" is not a concept that exists in the specification which puzzles me how Rust implementation should approach it. It is quite a biggie here.

Assuming only a local sort within a resulting parquet file (as per current spec) and assuming the file size around 100MB (which is currently the default) and assuming partition by time and that data comes more or less chronologically so compaction is needed only for data arrived between the compaction jobs (like always in my experience) the single beefy node running compaction code implemented in rust must not find it infeasible to serve quite a huge table really.

Of course there are corner cases, like partition scheme is changed and it is being asked to re-partition the entire table on the next compaction job but I don't think this will ever happen in practice. If the entire table is to be repartitioned, this won't be just scheduled for the next compaction, perhaps it will be a custom job on a separate cluster and won't be even titled "compaction".

Will appreciate any feedback. Thanks!

@liurenjie1024
Copy link
Collaborator

Hi, @camuel

Does anyone has any insights on how computation heavy is the compaction workload really? Like on a beefy machine what compaction rate will be possible? Like 1GB/sec? 10GB/sec? A ball part figure? Separately what is possible compaction rate from first principles and what is typical compaction rate per node with today Iceberg impl.?

I think it's hard to say the compaction rate. Compaction is a mixed workload, it involves io(read/write files), computation(reclustering, binpacking), so it's difficult to estimate the compaction rate. Things gets more complicated when we need to handle deletion files.

As with partition spec change, it's typically lazy in iceberg, e.g. partition spec chanage is a metadata only operation in iceberg, and it's not part of compaction typically.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants