Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow concurrent file compaction #1383

Merged
merged 11 commits into from
Jun 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ jobs:
- name: Run benchmark
run: |
source venv/bin/activate
pytest tests/test_benchmark.py --benchmark-json output.json
pytest tests/test_benchmark.py -m benchmark --benchmark-json output.json

- name: Store benchmark result
uses: benchmark-action/github-action-benchmark@v1
Expand Down
1 change: 1 addition & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ serde = "1"
serde_json = "1"
thiserror = "1"
tokio = { version = "1", features = ["rt-multi-thread"] }
num_cpus = "1"

# reqwest is pulled in by azure sdk, but not used by python binding itself
# for binary wheel best practice, statically link openssl
Expand Down
19 changes: 17 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,25 +413,35 @@ def vacuum(
retention_hours: Optional[int] = None,
dry_run: bool = True,
enforce_retention_duration: bool = True,
max_concurrent_requests: int = 10,
) -> List[str]:
"""
Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.

:param retention_hours: the retention threshold in hours, if none then the value from `configuration.deletedFileRetentionDuration` is used or default of 1 week otherwise.
:param dry_run: when activated, list only the files, delete otherwise
:param enforce_retention_duration: when disabled, accepts retention hours smaller than the value from `configuration.deletedFileRetentionDuration`.
:param max_concurrent_requests: the maximum number of concurrent requests to send to the backend.
Increasing this number may improve performance of vacuuming large tables, however it might also
increase the risk of hitting rate limits.
:return: the list of files no longer referenced by the Delta Table and are older than the retention threshold.
"""
if retention_hours:
if retention_hours < 0:
raise ValueError("The retention periods should be positive.")

return self._table.vacuum(dry_run, retention_hours, enforce_retention_duration)
return self._table.vacuum(
dry_run,
retention_hours,
enforce_retention_duration,
max_concurrent_requests,
)

def optimize(
self,
partition_filters: Optional[List[Tuple[str, str, Any]]] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
) -> Dict[str, Any]:
"""
Compacts small files to reduce the total number of files in the table.
Expand All @@ -446,9 +456,14 @@ def optimize(
:param target_size: desired file size after bin-packing files, in bytes. If not
provided, will attempt to read the table configuration value ``delta.targetFileSize``.
If that value isn't set, will use default value of 256MB.
:param max_concurrent_tasks: the maximum number of concurrent tasks to use for
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
faster, but will also use more memory.
:return: the metrics from optimize
"""
metrics = self._table.optimize(partition_filters, target_size)
metrics = self._table.optimize(
partition_filters, target_size, max_concurrent_tasks
)
self.update_incremental()
return json.loads(metrics)

Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ ignore = ["E501"]
known-first-party = ["deltalake"]

[tool.pytest.ini_options]
addopts = "--cov=deltalake -v -m 'not integration'"
addopts = "--cov=deltalake -v -m 'not integration and not benchmark'"
testpaths = [
"tests",
]
Expand Down
12 changes: 8 additions & 4 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,16 +243,18 @@ impl RawDeltaTable {

/// Run the Vacuum command on the Delta Table: list and delete files no longer referenced
/// by the Delta table and are older than the retention threshold.
#[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true))]
#[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true, max_concurrent_requests = 10))]
pub fn vacuum(
&mut self,
dry_run: bool,
retention_hours: Option<u64>,
enforce_retention_duration: bool,
max_concurrent_requests: usize,
) -> PyResult<Vec<String>> {
let mut cmd = VacuumBuilder::new(self._table.object_store(), self._table.state.clone())
.with_enforce_retention_duration(enforce_retention_duration)
.with_dry_run(dry_run);
.with_dry_run(dry_run)
.with_max_concurrent_requests(max_concurrent_requests);
if let Some(retention_period) = retention_hours {
cmd = cmd.with_retention_period(Duration::hours(retention_period as i64));
}
Expand All @@ -264,13 +266,15 @@ impl RawDeltaTable {
}

/// Run the optimize command on the Delta Table: merge small files into a large file by bin-packing.
#[pyo3(signature = (partition_filters = None, target_size = None))]
#[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None))]
pub fn optimize(
&mut self,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<i64>,
max_concurrent_tasks: Option<usize>,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone());
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone())
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get));
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
Expand Down
59 changes: 57 additions & 2 deletions python/tests/test_benchmark.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

import pyarrow as pa
import pyarrow.fs as pa_fs
import pytest
Expand All @@ -11,9 +13,9 @@

@pytest.fixture()
def sample_table() -> pa.Table:
max_size_bytes = 1024 * 1024 * 1024
max_size_bytes = 128 * 1024 * 1024
ncols = 20
nrows = max_size_bytes // 20 // 64
nrows = max_size_bytes // 20 // 8
tab = pa.table({f"x{i}": standard_normal(nrows) for i in range(ncols)})
# Add index column for sorting
tab = tab.append_column("i", pa.array(range(nrows), type=pa.int64()))
Expand Down Expand Up @@ -58,3 +60,56 @@ def test_benchmark_read_pyarrow(benchmark, sample_table, tmp_path):
fs = pa_fs.SubTreeFileSystem(str(tmp_path), pa_fs.LocalFileSystem())
result = benchmark(dt.to_pyarrow_table, filesystem=fs)
assert result.sort_by("i") == sample_table


@pytest.mark.benchmark(group="optimize")
@pytest.mark.parametrize("max_tasks", [1, 5])
def test_benchmark_optimize(benchmark, sample_table, tmp_path, max_tasks):
# Create 2 partitions, each partition with 10 files.
# Each file is about 100MB, so the total size is 2GB.
files_per_part = 10
parts = ["a", "b", "c", "d", "e"]

nrows = int(sample_table.num_rows / files_per_part)
for part in parts:
tab = sample_table.slice(0, nrows)
tab = tab.append_column("part", pa.array([part] * nrows))
for _ in range(files_per_part):
write_deltalake(tmp_path, tab, mode="append", partition_by=["part"])

dt = DeltaTable(tmp_path)
dt = DeltaTable(tmp_path)

dt = DeltaTable(tmp_path)

assert len(dt.files()) == files_per_part * len(parts)
initial_version = dt.version()

def setup():
# Instead of recreating the table for each benchmark run, we just delete
# the optimize log file
optimize_version = initial_version + 1
try:
os.remove(
os.path.join(tmp_path, "_delta_log", f"{optimize_version:020}.json")
)
except FileNotFoundError:
pass

# Reload the table after we have altered the log
dt = DeltaTable(tmp_path)
assert dt.version() == initial_version

return (dt,), dict(max_concurrent_tasks=max_tasks)

def func(dt, max_concurrent_tasks):
return dt.optimize(
max_concurrent_tasks=max_concurrent_tasks, target_size=1024 * 1024 * 1024
)

# We need to recreate the table for each benchmark run
results = benchmark.pedantic(func, setup=setup, rounds=5)

assert results["numFilesRemoved"] == 50
assert results["numFilesAdded"] == 5
assert results["partitionsOptimized"] == 5
1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ lazy_static = "1"
log = "0"
libc = ">=0.2.90, <1"
num-bigint = "0.4"
num_cpus = "1"
num-traits = "0.2.15"
object_store = "0.5.6"
once_cell = "1.16.0"
Expand Down
Loading