Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow concurrent file compaction #1383

Merged
merged 11 commits into from
Jun 3, 2023

Conversation

wjones127
Copy link
Collaborator

@wjones127 wjones127 commented May 21, 2023

Description

Refactors such that:

  1. Runs compaction tasks in parallel, with parallelism controlled by the user but defaulting to number of cpus. (The num_cpu crate is used by tokio, so we already have it transitively.)
  2. Turns on zstd compression by default at level 4. In a future PR, we can make this configurable for Python and maybe benchmark different levels.
  3. Initial prep to have other types of optimize commands.

However, the writer isn't very good at writing for a target row size, because the code that checks the size of the written file only knows the size of the serialized row groups and not the current row group. So if your row groups are 100MB in size, and you target 150MB, you will get 200MB files. There is upstream work in apache/arrow-rs#4280 that will allow us to write much more exactly sized files, so this will improve in the near future.

Related Issue(s)

closes #1171

Documentation

@github-actions github-actions bot added binding/rust Issues for the Rust crate rust labels May 21, 2023
@github-actions github-actions bot added the binding/python Issues for the Python package label May 22, 2023
@wjones127 wjones127 marked this pull request as ready for review May 28, 2023 19:20
@wjones127
Copy link
Collaborator Author

FYI @Blajda

@wjones127
Copy link
Collaborator Author

TBH, I'm not seeing much performance difference locally between 1 and 5 concurrent tasks. But I can at least confirm with the debug logs it is working concurrently. My current working theory is I am maxing out the write IO on my SSD.

@roeap
Copy link
Collaborator

roeap commented May 28, 2023

As you implied, I would assume that this could have the most benefit using object stores, presumably even more when deployed. I guess especially allowing out of order could have a benefit, when we are reading on multiple threads as well?

I guess we are about at that point where we maybe need to formalize the benchmarking setup a bit? Not sure if we reaches out to the databricks folks already, if they maybe have some "real" cloud infra that can be used in CI/CD?

@wjones127 wjones127 changed the title feat: allow concurrent and out-of-order file compaction feat: allow concurrent file compaction May 28, 2023
@Blajda
Copy link
Collaborator

Blajda commented May 29, 2023

I think we would still keep a form of bin packing and use this approach whenever a z-order is specified.

Based on my current understanding it looks like each worker is responsible for a single partition. Typically in my workflows partitions are dates and the latest dates are hot (i.e have a lot of small files that can be compacted) and optimization is executed daily after the date closes. Using bins increases the granularity of the work and keeps all the workers active.

Another trade off is that files near the target limit size are rewritten where bin packing would prune them when no coalescing is achieved.

I imagine this is also not idempotent. Can you confirm if run optimize again on partition there are zero new actions?

@wjones127
Copy link
Collaborator Author

I imagine this is also not idempotent. Can you confirm if run optimize again on partition there are zero new actions?

It is. It passes the unit test you wrote earlier. :) Every time optimize is run you will end up with at most 1 file that is below the target size. If we only detect one file that is smaller than the target size, then we skip that partition.

Another trade off is that files near the target limit size are rewritten where bin packing would prune them when no coalescing is achieved.

Yeah I think this is a convincing point. If you have two files of 100MB and a target of 110MB, then it will change them to one with 110MB and one with 90MB, which isn't really an improvement. So it's probably worth it to create planners specific to each optimize type.

I'll go ahead and bring back the binning.

@wjones127 wjones127 marked this pull request as draft May 29, 2023 02:05
Comment on lines +448 to +464
.map(|(partition, files)| async {
let rewrite_result = tokio::task::spawn(Self::rewrite_files(
self.task_parameters.clone(),
partition,
files,
object_store.clone(),
))
.await;
match rewrite_result {
Ok(Ok(data)) => Ok(data),
Ok(Err(e)) => Err(e),
Err(e) => Err(DeltaTableError::GenericError {
source: Box::new(e),
}),
}
})
.buffer_unordered(self.max_concurrent_tasks)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm finding this is a good pattern for parallel processing a stream:

  1. Map tasks into tokio::task::spawn() (which allows them to be sent to other threads)
  2. Then follow with buffer_unordered to control the number of tasks being spawned at a time.

@wjones127
Copy link
Collaborator Author

So for 5 max tasks it's now about 3 times faster in the benchmark. I think the reason it's slower for a single thread is that I added compression by default.

Local benchmark results:

New results:

--------------------------------------------------------------------------------------- benchmark 'optimize': 2 tests ---------------------------------------------------------------------------------------
Name (time in ms)                     Min                   Max                  Mean              StdDev                Median                 IQR            Outliers     OPS            Rounds  Iterations
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_benchmark_optimize[5]       779.3044 (1.0)      1,052.4848 (1.0)        842.6561 (1.0)      117.7213 (1.0)        792.2957 (1.0)       84.6014 (1.0)           1;1  1.1867 (1.0)           5           1
test_benchmark_optimize[1]     2,559.4057 (3.28)     3,065.0166 (2.91)     2,704.3611 (3.21)     207.4415 (1.76)     2,644.0822 (3.34)     203.8526 (2.41)          1;0  0.3698 (0.31)          5           1
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Results from main:

---------------------------------------- benchmark 'optimize': 1 tests -----------------------------------------
Name (time in s)               Min     Max    Mean  StdDev  Median     IQR  Outliers     OPS  Rounds  Iterations
----------------------------------------------------------------------------------------------------------------
test_benchmark_optimize     2.2828  2.6773  2.3809  0.1674  2.3108  0.1399       1;1  0.4200       5           1
----------------------------------------------------------------------------------------------------------------

@wjones127 wjones127 marked this pull request as ready for review June 3, 2023 02:46
.set_compression(Compression::ZSTD(ZstdLevel::try_new(4).unwrap()))
.set_created_by(format!("delta-rs version {}", crate_version()))
.build()
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the files are rewritten using ZSTD don't we need to update the filenames?

let file_name = format!("part-{}-{}-c000.snappy.parquet", part, self.writer_id);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah. TBH I don't know why we write the compression algorithm in the filename. We don't handle this currently if a user passes in a different compression algorithm themselves.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I don't know why we write the compression algorithm in the filename

This is again one of these relics from the early days i think. I do remember comments floating around, about figuring out the various parts in names written by spark, so I guess short answer is "b/c spark did it" 😆.

That said I do believe this to be somewhat of a "soft convention" in many parquet usages, but I may also be wrong.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this can be fixed in a future PR. I believe it should not break anything.

Copy link
Member

@houqp houqp Jun 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this the convention used by spark and hive. The reason behind that is it can be used as an optimization to setup the parquet reader with proper decompression decoder based on the list api response alone, without having to read the first couple of bytes from the object store to detect the compression algo.

@@ -111,6 +149,10 @@ pub struct OptimizeBuilder<'a> {
writer_properties: Option<WriterProperties>,
/// Additional metadata to be added to commit
app_metadata: Option<HashMap<String, serde_json::Value>>,
/// Whether to preserve insertion order within files (default false)
preserve_insertion_order: bool,
/// Max number of concurrent tasks (defeault 10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update doc to mention num_cpus::get() is used to determine concurrency

roeap
roeap previously approved these changes Jun 3, 2023
Copy link
Collaborator

@roeap roeap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! and great performance improvements!

.set_compression(Compression::ZSTD(ZstdLevel::try_new(4).unwrap()))
.set_created_by(format!("delta-rs version {}", crate_version()))
.build()
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I don't know why we write the compression algorithm in the filename

This is again one of these relics from the early days i think. I do remember comments floating around, about figuring out the various parts in names written by spark, so I guess short answer is "b/c spark did it" 😆.

That said I do believe this to be somewhat of a "soft convention" in many parquet usages, but I may also be wrong.

@roeap
Copy link
Collaborator

roeap commented Jun 3, 2023

re-ran the integration tests several times to no avail. Getting the HDFS integration to pass is often quite cumbersome, thus, I opened an issue regarding the failing integration tests with HDFS #1428.

@wjones127 wjones127 merged commit 1a6064e into delta-io:main Jun 3, 2023
@wjones127 wjones127 deleted the concurrent-optimize branch June 3, 2023 21:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
binding/python Issues for the Python package binding/rust Issues for the Rust crate rust
Projects
None yet
Development

Successfully merging this pull request may close these issues.

parallel processing in Optimize command
4 participants