Skip to content

Commit

Permalink
expose in Python and start benchmarking
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed May 22, 2023
1 parent 94a6182 commit a0fda92
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 12 deletions.
19 changes: 17 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,25 +417,35 @@ def vacuum(
retention_hours: Optional[int] = None,
dry_run: bool = True,
enforce_retention_duration: bool = True,
max_concurrent_requests: int = 10,
) -> List[str]:
"""
Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.
:param retention_hours: the retention threshold in hours, if none then the value from `configuration.deletedFileRetentionDuration` is used or default of 1 week otherwise.
:param dry_run: when activated, list only the files, delete otherwise
:param enforce_retention_duration: when disabled, accepts retention hours smaller than the value from `configuration.deletedFileRetentionDuration`.
:param max_concurrent_requests: the maximum number of concurrent requests to send to the backend.
Increasing this number may improve performance of vacuuming large tables, however it might also
increase the risk of hitting rate limits.
:return: the list of files no longer referenced by the Delta Table and are older than the retention threshold.
"""
if retention_hours:
if retention_hours < 0:
raise ValueError("The retention periods should be positive.")

return self._table.vacuum(dry_run, retention_hours, enforce_retention_duration)
return self._table.vacuum(
dry_run,
retention_hours,
enforce_retention_duration,
max_concurrent_requests,
)

def optimize(
self,
partition_filters: Optional[List[Tuple[str, str, Any]]] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: int = 10,
) -> Dict[str, Any]:
"""
Compacts small files to reduce the total number of files in the table.
Expand All @@ -450,9 +460,14 @@ def optimize(
:param target_size: desired file size after bin-packing files, in bytes. If not
provided, will attempt to read the table configuration value ``delta.targetFileSize``.
If that value isn't set, will use default value of 256MB.
:param max_concurrent_tasks: the maximum number of concurrent tasks to use for
file compaction. Defaults to 10. More concurrent tasks can make compaction
faster, but will also use more memory.
:return: the metrics from optimize
"""
metrics = self._table.optimize(partition_filters, target_size)
metrics = self._table.optimize(
partition_filters, target_size, max_concurrent_tasks
)
self.update_incremental()
return json.loads(metrics)

Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ ignore = ["E501"]
known-first-party = ["deltalake"]

[tool.pytest.ini_options]
addopts = "--cov=deltalake -v -m 'not integration'"
addopts = "--cov=deltalake -v -m 'not integration and not benchmark'"
testpaths = [
"tests",
]
Expand Down
12 changes: 8 additions & 4 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,16 +292,18 @@ impl RawDeltaTable {

/// Run the Vacuum command on the Delta Table: list and delete files no longer referenced
/// by the Delta table and are older than the retention threshold.
#[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true))]
#[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true, max_concurrent_requests = 10))]
pub fn vacuum(
&mut self,
dry_run: bool,
retention_hours: Option<u64>,
enforce_retention_duration: bool,
max_concurrent_requests: usize,
) -> PyResult<Vec<String>> {
let mut cmd = VacuumBuilder::new(self._table.object_store(), self._table.state.clone())
.with_enforce_retention_duration(enforce_retention_duration)
.with_dry_run(dry_run);
.with_dry_run(dry_run)
.with_max_concurrent_requests(max_concurrent_requests);
if let Some(retention_period) = retention_hours {
cmd = cmd.with_retention_period(Duration::hours(retention_period as i64));
}
Expand All @@ -313,13 +315,15 @@ impl RawDeltaTable {
}

// Run the optimize command on the Delta Table: merge small files into a large file by bin-packing.
#[pyo3(signature = (partition_filters = None, target_size = None))]
#[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = 10))]
pub fn optimize(
&mut self,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<DeltaDataTypeLong>,
max_concurrent_tasks: usize,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone());
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone())
.with_max_concurrent_tasks(max_concurrent_tasks);
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
Expand Down
49 changes: 47 additions & 2 deletions python/tests/test_benchmark.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import os
from itertools import product

import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.fs as pa_fs
import pytest
from numpy.random import standard_normal
Expand All @@ -11,9 +15,9 @@

@pytest.fixture()
def sample_table() -> pa.Table:
max_size_bytes = 1024 * 1024 * 1024
max_size_bytes = 100 * 1024 * 1024
ncols = 20
nrows = max_size_bytes // 20 // 64
nrows = max_size_bytes // 20 // 8
tab = pa.table({f"x{i}": standard_normal(nrows) for i in range(ncols)})
# Add index column for sorting
tab = tab.append_column("i", pa.array(range(nrows), type=pa.int64()))
Expand Down Expand Up @@ -58,3 +62,44 @@ def test_benchmark_read_pyarrow(benchmark, sample_table, tmp_path):
fs = pa_fs.SubTreeFileSystem(str(tmp_path), pa_fs.LocalFileSystem())
result = benchmark(dt.to_pyarrow_table, filesystem=fs)
assert result.sort_by("i") == sample_table


@pytest.mark.benchmark(group="optimize")
@pytest.mark.parametrize("max_tasks", [1, 5, 10])
def test_benchmark_optimize(benchmark, sample_table, tmp_path, max_tasks):
def setup():
# Create 2 partitions, each partition with 10 files.
# Each file is about 100MB, so the total size is 2GB.
files_per_part = 10
parts = ["a", "b"]
for part, _ in product(parts, range(files_per_part)):
# Default row group size is 1 million rows. In order to get close to
# the target size, we need a tall dataset. Wide datasets will have
# huge row groups.
nrows = sample_table.num_rows
tab = sample_table.select(["x0", "x1", "i"])
tab = tab.append_column("part", pa.array([part] * nrows))
write_deltalake(tmp_path, tab, mode="append", partition_by=["part"])

dt = DeltaTable(tmp_path)

assert len(dt.files()) == files_per_part * len(parts)

target_size = 100 * 1024 * 1024
sizes = dt.get_add_actions().column("size_bytes")
assert all(size.as_py() <= target_size for size in sizes)

return (dt,), dict(max_concurrent_tasks=max_tasks, target_size=target_size)

def func(dt, max_concurrent_tasks, target_size):
return dt.optimize(
max_concurrent_tasks=max_concurrent_tasks, target_size=target_size
)

# We need to recreate the table for each benchmark run
results = benchmark.pedantic(func, setup=setup)
print(results)

assert results["numFilesRemoved"] == 20
assert results["numFilesAdded"] == 4
assert results["partitionsOptimized"] == 2
10 changes: 7 additions & 3 deletions rust/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ pub struct MergePlan {
}

impl MergePlan {
/// Rewrites files in a single partition.
///
/// Returns a vector of add and remove actions, as well as the partial metrics
/// collected during the operation.
async fn rewrite_files(
&self,
partition: PartitionTuples,
Expand Down Expand Up @@ -345,9 +349,10 @@ impl MergePlan {
let file_reader = ParquetObjectReader::new(object_store.clone(), file);
ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.with_batch_size(1024) // TODO: should this be larger?
.build()
});

// If we don't care about preserving insertion order, we can scan files in parallel.
let mut batch_stream = if self.preserve_insertion_order {
batch_stream.try_flatten().boxed()
} else {
Expand Down Expand Up @@ -394,7 +399,6 @@ impl MergePlan {

let mut operations = HashMap::new();
std::mem::swap(&mut self.operations, &mut operations);
// let operations = self.operations.take();

futures::stream::iter(operations)
.map(|(partition, files)| self.rewrite_files(partition, files, object_store.clone()))
Expand All @@ -406,7 +410,7 @@ impl MergePlan {
})
.await?;

metrics.preserve_insertion_order = true;
metrics.preserve_insertion_order = self.preserve_insertion_order;
if metrics.num_files_added == 0 {
metrics.files_added.min = 0;
}
Expand Down

0 comments on commit a0fda92

Please sign in to comment.