Skip to content

Commit

Permalink
[python] Consolidation and vacuuming are now platform configuration o…
Browse files Browse the repository at this point in the history
…ptions (#1690)

* Consolidation and vacuuming are now platform configuration options

Commit and fragment_metadata consolidation and vacuuming can improve the
opening and query performance of SOMA experiments. Vacuuming requires
slight coordination though and should not happen by default. Instead a
platform config allows the user to control these operations based. This
will be expanded to defaults for top-level `io` packages where its more
likely a user is doing a one-shot ingestion and will want automatic
handling.

A new platform config, `consolidate_and_vacuum` has been added which is
a boolean to handle this behavior.

* set is_mac for ci-minimal workflow
  • Loading branch information
Shelnutt2 authored and github-actions[bot] committed Sep 18, 2023
1 parent 3105be7 commit 139cb09
Showing 6 changed files with 51 additions and 17 deletions.
1 change: 1 addition & 0 deletions .github/workflows/python-ci-minimal.yml
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ jobs:
python_version: ${{ matrix.python-version }}
cc: ${{ matrix.cc }}
cxx: ${{ matrix.cxx }}
is_mac: ${{ contains(matrix.os, 'macos') }}
report_codecov: ${{ matrix.python-version == '3.10' }}
run_lint: ${{ matrix.python-version == '3.10' }}
secrets: inherit
9 changes: 5 additions & 4 deletions apis/python/src/tiledbsoma/_dataframe.py
Original file line number Diff line number Diff line change
@@ -401,7 +401,6 @@ def write(
"""
_util.check_type("values", values, (pa.Table,))

del platform_config # unused
dim_cols_map: Dict[str, pd.DataFrame] = {}
attr_cols_map: Dict[str, pd.DataFrame] = {}
dim_names_set = self.index_column_names
@@ -437,14 +436,17 @@ def write(
dim_cols_list = [dim_cols_map[name] for name in self.index_column_names]
dim_cols_tuple = tuple(dim_cols_list)
self._handle.writer[dim_cols_tuple] = attr_cols_map
self._consolidate_and_vacuum_fragment_metadata()
tiledb_create_options = TileDBCreateOptions.from_platform_config(
platform_config
)
if tiledb_create_options.consolidate_and_vacuum:
self._consolidate_and_vacuum()

return self

def _set_reader_coord(
self, sr: clib.SOMAArray, dim_idx: int, dim: tiledb.Dim, coord: object
) -> bool:

if coord is None:
return True # No constraint; select all in this dimension

@@ -582,7 +584,6 @@ def _set_reader_coord_by_py_seq_or_np_array(
def _set_reader_coord_by_numeric_slice(
self, sr: clib.SOMAArray, dim_idx: int, dim: tiledb.Dim, coord: Slice[Any]
) -> bool:

try:
lo_hi = _util.slice_to_numeric_range(coord, dim.domain)
except _util.NonNumericDimensionError:
7 changes: 5 additions & 2 deletions apis/python/src/tiledbsoma/_dense_nd_array.py
Original file line number Diff line number Diff line change
@@ -172,9 +172,12 @@ def write(
"""
_util.check_type("values", values, (pa.Tensor,))

del platform_config # Currently unused.
self._handle.writer[coords] = values.to_numpy()
self._consolidate_and_vacuum_fragment_metadata()
tiledb_create_options = TileDBCreateOptions.from_platform_config(
platform_config
)
if tiledb_create_options.consolidate_and_vacuum:
self._consolidate_and_vacuum()
return self

@classmethod
19 changes: 12 additions & 7 deletions apis/python/src/tiledbsoma/_sparse_nd_array.py
Original file line number Diff line number Diff line change
@@ -183,9 +183,11 @@ def write(
Lifecycle:
Experimental.
"""
del platform_config # Currently unused.

arr = self._handle.writer
tiledb_create_options = TileDBCreateOptions.from_platform_config(
platform_config
)

if isinstance(values, pa.SparseCOOTensor):
# Write bulk data
@@ -197,8 +199,9 @@ def write(
bounding_box = self._compute_bounding_box_metadata(maxes)
self._set_bounding_box_metadata(bounding_box)

# Consolidate non-bulk data
self._consolidate_and_vacuum_fragment_metadata()
if tiledb_create_options.consolidate_and_vacuum:
# Consolidate non-bulk data
self._consolidate_and_vacuum()
return self

if isinstance(values, (pa.SparseCSCMatrix, pa.SparseCSRMatrix)):
@@ -216,8 +219,9 @@ def write(
bounding_box = self._compute_bounding_box_metadata([nr - 1, nc - 1])
self._set_bounding_box_metadata(bounding_box)

# Consolidate non-bulk data
self._consolidate_and_vacuum_fragment_metadata()
if tiledb_create_options.consolidate_and_vacuum:
# Consolidate non-bulk data
self._consolidate_and_vacuum()
return self

if isinstance(values, pa.Table):
@@ -241,8 +245,9 @@ def write(
bounding_box = self._compute_bounding_box_metadata(maxes)
self._set_bounding_box_metadata(bounding_box)

# Consolidate non-bulk data
self._consolidate_and_vacuum_fragment_metadata()
if tiledb_create_options.consolidate_and_vacuum:
# Consolidate non-bulk data
self._consolidate_and_vacuum()
return self

raise TypeError(
29 changes: 25 additions & 4 deletions apis/python/src/tiledbsoma/_tiledb_array.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
import ctypes
import os
import sys
from typing import Any, Dict, Optional, Sequence, Tuple
from typing import Any, Dict, List, Optional, Sequence, Tuple

import pyarrow as pa
import tiledb
@@ -194,20 +194,41 @@ def _create_internal(
cls._set_create_metadata(handle)
return handle

def _consolidate_and_vacuum_fragment_metadata(self) -> None:
def _consolidate_and_vacuum(
self, modes: List[str] = ["fragment_meta", "commits"]
) -> None:
"""
This post-ingestion helper consolidates and vacuums fragment metadata and commit files --
this is quick to do, and positively impacts query performance. It does _not_ consolidate
bulk array data, which is more time-consuming and should be done at the user's opt-in
discretion.
"""

for mode in ["fragment_meta", "commits"]:
for mode in modes:
self._consolidate(modes=[mode])
self._vacuum(modes=[mode])

def _consolidate(self, modes: List[str] = ["fragment_meta", "commits"]) -> None:
"""
This post-ingestion helper consolidates by default fragment metadata and commit files --
this is quick to do, and positively impacts query performance.
"""

for mode in modes:
cfg = self._ctx.config()
cfg["sm.consolidation.mode"] = mode
cfg["sm.vacuum.mode"] = mode
ctx = tiledb.Ctx(cfg)

tiledb.consolidate(self.uri, ctx=ctx)

def _vacuum(self, modes: List[str] = ["fragment_meta", "commits"]) -> None:
"""
This post-ingestion helper vacuums by default fragment metadata and commit files. Vacuuming is not multi-process safe and requires coordination that nothing is currently reading the files that will be vacuumed.
"""

for mode in modes:
cfg = self._ctx.config()
cfg["sm.vacuum.mode"] = mode
ctx = tiledb.Ctx(cfg)

tiledb.vacuum(self.uri, ctx=ctx)
3 changes: 3 additions & 0 deletions apis/python/src/tiledbsoma/options/_tiledb_create_options.py
Original file line number Diff line number Diff line change
@@ -143,6 +143,9 @@ class TileDBCreateOptions:
attrs: Mapping[str, _ColumnConfig] = attrs_.field(
factory=dict, converter=_normalize_columns
)
consolidate_and_vacuum: bool = attrs_.field(
validator=vld.instance_of(bool), default=False
)

@classmethod
def from_platform_config(

0 comments on commit 139cb09

Please sign in to comment.