Skip to content

Commit

Permalink
[python] Consolidation and vacuuming are now platform configuration o…
Browse files Browse the repository at this point in the history
…ptions (#1690) (#1696)

* Consolidation and vacuuming are now platform configuration options

Commit and fragment_metadata consolidation and vacuuming can improve the
opening and query performance of SOMA experiments. Vacuuming requires
slight coordination though and should not happen by default. Instead a
platform config allows the user to control these operations based. This
will be expanded to defaults for top-level `io` packages where its more
likely a user is doing a one-shot ingestion and will want automatic
handling.

A new platform config, `consolidate_and_vacuum` has been added which is
a boolean to handle this behavior.

* set is_mac for ci-minimal workflow

Co-authored-by: Seth Shelnutt <[email protected]>
  • Loading branch information
github-actions[bot] and Shelnutt2 authored Sep 18, 2023
1 parent 3105be7 commit e90d7db
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 17 deletions.
1 change: 1 addition & 0 deletions .github/workflows/python-ci-minimal.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jobs:
python_version: ${{ matrix.python-version }}
cc: ${{ matrix.cc }}
cxx: ${{ matrix.cxx }}
is_mac: ${{ contains(matrix.os, 'macos') }}
report_codecov: ${{ matrix.python-version == '3.10' }}
run_lint: ${{ matrix.python-version == '3.10' }}
secrets: inherit
9 changes: 5 additions & 4 deletions apis/python/src/tiledbsoma/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,6 @@ def write(
"""
_util.check_type("values", values, (pa.Table,))

del platform_config # unused
dim_cols_map: Dict[str, pd.DataFrame] = {}
attr_cols_map: Dict[str, pd.DataFrame] = {}
dim_names_set = self.index_column_names
Expand Down Expand Up @@ -437,14 +436,17 @@ def write(
dim_cols_list = [dim_cols_map[name] for name in self.index_column_names]
dim_cols_tuple = tuple(dim_cols_list)
self._handle.writer[dim_cols_tuple] = attr_cols_map
self._consolidate_and_vacuum_fragment_metadata()
tiledb_create_options = TileDBCreateOptions.from_platform_config(
platform_config
)
if tiledb_create_options.consolidate_and_vacuum:
self._consolidate_and_vacuum()

return self

def _set_reader_coord(
self, sr: clib.SOMAArray, dim_idx: int, dim: tiledb.Dim, coord: object
) -> bool:

if coord is None:
return True # No constraint; select all in this dimension

Expand Down Expand Up @@ -582,7 +584,6 @@ def _set_reader_coord_by_py_seq_or_np_array(
def _set_reader_coord_by_numeric_slice(
self, sr: clib.SOMAArray, dim_idx: int, dim: tiledb.Dim, coord: Slice[Any]
) -> bool:

try:
lo_hi = _util.slice_to_numeric_range(coord, dim.domain)
except _util.NonNumericDimensionError:
Expand Down
7 changes: 5 additions & 2 deletions apis/python/src/tiledbsoma/_dense_nd_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,12 @@ def write(
"""
_util.check_type("values", values, (pa.Tensor,))

del platform_config # Currently unused.
self._handle.writer[coords] = values.to_numpy()
self._consolidate_and_vacuum_fragment_metadata()
tiledb_create_options = TileDBCreateOptions.from_platform_config(
platform_config
)
if tiledb_create_options.consolidate_and_vacuum:
self._consolidate_and_vacuum()
return self

@classmethod
Expand Down
19 changes: 12 additions & 7 deletions apis/python/src/tiledbsoma/_sparse_nd_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,11 @@ def write(
Lifecycle:
Experimental.
"""
del platform_config # Currently unused.

arr = self._handle.writer
tiledb_create_options = TileDBCreateOptions.from_platform_config(
platform_config
)

if isinstance(values, pa.SparseCOOTensor):
# Write bulk data
Expand All @@ -197,8 +199,9 @@ def write(
bounding_box = self._compute_bounding_box_metadata(maxes)
self._set_bounding_box_metadata(bounding_box)

# Consolidate non-bulk data
self._consolidate_and_vacuum_fragment_metadata()
if tiledb_create_options.consolidate_and_vacuum:
# Consolidate non-bulk data
self._consolidate_and_vacuum()
return self

if isinstance(values, (pa.SparseCSCMatrix, pa.SparseCSRMatrix)):
Expand All @@ -216,8 +219,9 @@ def write(
bounding_box = self._compute_bounding_box_metadata([nr - 1, nc - 1])
self._set_bounding_box_metadata(bounding_box)

# Consolidate non-bulk data
self._consolidate_and_vacuum_fragment_metadata()
if tiledb_create_options.consolidate_and_vacuum:
# Consolidate non-bulk data
self._consolidate_and_vacuum()
return self

if isinstance(values, pa.Table):
Expand All @@ -241,8 +245,9 @@ def write(
bounding_box = self._compute_bounding_box_metadata(maxes)
self._set_bounding_box_metadata(bounding_box)

# Consolidate non-bulk data
self._consolidate_and_vacuum_fragment_metadata()
if tiledb_create_options.consolidate_and_vacuum:
# Consolidate non-bulk data
self._consolidate_and_vacuum()
return self

raise TypeError(
Expand Down
29 changes: 25 additions & 4 deletions apis/python/src/tiledbsoma/_tiledb_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import ctypes
import os
import sys
from typing import Any, Dict, Optional, Sequence, Tuple
from typing import Any, Dict, List, Optional, Sequence, Tuple

import pyarrow as pa
import tiledb
Expand Down Expand Up @@ -194,20 +194,41 @@ def _create_internal(
cls._set_create_metadata(handle)
return handle

def _consolidate_and_vacuum_fragment_metadata(self) -> None:
def _consolidate_and_vacuum(
self, modes: List[str] = ["fragment_meta", "commits"]
) -> None:
"""
This post-ingestion helper consolidates and vacuums fragment metadata and commit files --
this is quick to do, and positively impacts query performance. It does _not_ consolidate
bulk array data, which is more time-consuming and should be done at the user's opt-in
discretion.
"""

for mode in ["fragment_meta", "commits"]:
for mode in modes:
self._consolidate(modes=[mode])
self._vacuum(modes=[mode])

def _consolidate(self, modes: List[str] = ["fragment_meta", "commits"]) -> None:
"""
This post-ingestion helper consolidates by default fragment metadata and commit files --
this is quick to do, and positively impacts query performance.
"""

for mode in modes:
cfg = self._ctx.config()
cfg["sm.consolidation.mode"] = mode
cfg["sm.vacuum.mode"] = mode
ctx = tiledb.Ctx(cfg)

tiledb.consolidate(self.uri, ctx=ctx)

def _vacuum(self, modes: List[str] = ["fragment_meta", "commits"]) -> None:
"""
This post-ingestion helper vacuums by default fragment metadata and commit files. Vacuuming is not multi-process safe and requires coordination that nothing is currently reading the files that will be vacuumed.
"""

for mode in modes:
cfg = self._ctx.config()
cfg["sm.vacuum.mode"] = mode
ctx = tiledb.Ctx(cfg)

tiledb.vacuum(self.uri, ctx=ctx)
3 changes: 3 additions & 0 deletions apis/python/src/tiledbsoma/options/_tiledb_create_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ class TileDBCreateOptions:
attrs: Mapping[str, _ColumnConfig] = attrs_.field(
factory=dict, converter=_normalize_columns
)
consolidate_and_vacuum: bool = attrs_.field(
validator=vld.instance_of(bool), default=False
)

@classmethod
def from_platform_config(
Expand Down

0 comments on commit e90d7db

Please sign in to comment.