Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Consolidation and vacuuming are now platform configuration options
Browse files Browse the repository at this point in the history
Commit and fragment_metadata consolidation and vacuuming can improve the
opening and query performance of SOMA experiments. Vacuuming requires
slight coordination though and should not happen by default. Instead a
platform config allows the user to control these operations based. This
will be expanded to defaults for top-level `io` packages where its more
likely a user is doing a one-shot ingestion and will want automatic
handling.

A new platform config, `consolidate_and_vacuum` has been added which is
a boolean to handle this behavior.
Shelnutt2 committed Sep 16, 2023
1 parent 7541a31 commit 806d305
Showing 5 changed files with 50 additions and 17 deletions.
9 changes: 5 additions & 4 deletions apis/python/src/tiledbsoma/_dataframe.py
Original file line number Diff line number Diff line change
@@ -401,7 +401,6 @@ def write(
"""
_util.check_type("values", values, (pa.Table,))

del platform_config # unused
dim_cols_map: Dict[str, pd.DataFrame] = {}
attr_cols_map: Dict[str, pd.DataFrame] = {}
dim_names_set = self.index_column_names
@@ -427,14 +426,17 @@ def write(
dim_cols_list = [dim_cols_map[name] for name in self.index_column_names]
dim_cols_tuple = tuple(dim_cols_list)
self._handle.writer[dim_cols_tuple] = attr_cols_map
self._consolidate_and_vacuum_fragment_metadata()
tiledb_create_options = TileDBCreateOptions.from_platform_config(
platform_config
)
if tiledb_create_options.consolidate_and_vacuum:
self._consolidate_and_vacuum()

return self

def _set_reader_coord(
self, sr: clib.SOMAArray, dim_idx: int, dim: tiledb.Dim, coord: object
) -> bool:

if coord is None:
return True # No constraint; select all in this dimension

@@ -572,7 +574,6 @@ def _set_reader_coord_by_py_seq_or_np_array(
def _set_reader_coord_by_numeric_slice(
self, sr: clib.SOMAArray, dim_idx: int, dim: tiledb.Dim, coord: Slice[Any]
) -> bool:

try:
lo_hi = _util.slice_to_numeric_range(coord, dim.domain)
except _util.NonNumericDimensionError:
7 changes: 5 additions & 2 deletions apis/python/src/tiledbsoma/_dense_nd_array.py
Original file line number Diff line number Diff line change
@@ -172,9 +172,12 @@ def write(
"""
_util.check_type("values", values, (pa.Tensor,))

del platform_config # Currently unused.
self._handle.writer[coords] = values.to_numpy()
self._consolidate_and_vacuum_fragment_metadata()
tiledb_create_options = TileDBCreateOptions.from_platform_config(
platform_config
)
if tiledb_create_options.consolidate_and_vacuum:
self._consolidate_and_vacuum()
return self

@classmethod
19 changes: 12 additions & 7 deletions apis/python/src/tiledbsoma/_sparse_nd_array.py
Original file line number Diff line number Diff line change
@@ -183,9 +183,11 @@ def write(
Lifecycle:
Experimental.
"""
del platform_config # Currently unused.

arr = self._handle.writer
tiledb_create_options = TileDBCreateOptions.from_platform_config(
platform_config
)

if isinstance(values, pa.SparseCOOTensor):
# Write bulk data
@@ -197,8 +199,9 @@ def write(
bounding_box = self._compute_bounding_box_metadata(maxes)
self._set_bounding_box_metadata(bounding_box)

# Consolidate non-bulk data
self._consolidate_and_vacuum_fragment_metadata()
if tiledb_create_options.consolidate_and_vacuum:
# Consolidate non-bulk data
self._consolidate_and_vacuum()
return self

if isinstance(values, (pa.SparseCSCMatrix, pa.SparseCSRMatrix)):
@@ -216,8 +219,9 @@ def write(
bounding_box = self._compute_bounding_box_metadata([nr - 1, nc - 1])
self._set_bounding_box_metadata(bounding_box)

# Consolidate non-bulk data
self._consolidate_and_vacuum_fragment_metadata()
if tiledb_create_options.consolidate_and_vacuum:
# Consolidate non-bulk data
self._consolidate_and_vacuum()
return self

if isinstance(values, pa.Table):
@@ -241,8 +245,9 @@ def write(
bounding_box = self._compute_bounding_box_metadata(maxes)
self._set_bounding_box_metadata(bounding_box)

# Consolidate non-bulk data
self._consolidate_and_vacuum_fragment_metadata()
if tiledb_create_options.consolidate_and_vacuum:
# Consolidate non-bulk data
self._consolidate_and_vacuum()
return self

raise TypeError(
29 changes: 25 additions & 4 deletions apis/python/src/tiledbsoma/_tiledb_array.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
import ctypes
import os
import sys
from typing import Any, Dict, Optional, Sequence, Tuple
from typing import Any, Dict, List, Optional, Sequence, Tuple

import pyarrow as pa
import tiledb
@@ -194,20 +194,41 @@ def _create_internal(
cls._set_create_metadata(handle)
return handle

def _consolidate_and_vacuum_fragment_metadata(self) -> None:
def _consolidate_and_vacuum(
self, modes: List[str] = ["fragment_meta", "commits"]
) -> None:
"""
This post-ingestion helper consolidates and vacuums fragment metadata and commit files --
this is quick to do, and positively impacts query performance. It does _not_ consolidate
bulk array data, which is more time-consuming and should be done at the user's opt-in
discretion.
"""

for mode in ["fragment_meta", "commits"]:
for mode in modes:
self._consolidate(modes=[mode])
self._vacuum(modes=[mode])

def _consolidate(self, modes: List[str] = ["fragment_meta", "commits"]) -> None:
"""
This post-ingestion helper consolidates by default fragment metadata and commit files --
this is quick to do, and positively impacts query performance.
"""

for mode in modes:
cfg = self._ctx.config()
cfg["sm.consolidation.mode"] = mode
cfg["sm.vacuum.mode"] = mode
ctx = tiledb.Ctx(cfg)

tiledb.consolidate(self.uri, ctx=ctx)

def _vacuum(self, modes: List[str] = ["fragment_meta", "commits"]) -> None:
"""
This post-ingestion helper vacuums by default fragment metadata and commit files. Vacuuming is not multi-process safe and requires coordination that nothing is currently reading the files that will be vacuumed.
"""

for mode in modes:
cfg = self._ctx.config()
cfg["sm.vacuum.mode"] = mode
ctx = tiledb.Ctx(cfg)

tiledb.vacuum(self.uri, ctx=ctx)
3 changes: 3 additions & 0 deletions apis/python/src/tiledbsoma/options/_tiledb_create_options.py
Original file line number Diff line number Diff line change
@@ -143,6 +143,9 @@ class TileDBCreateOptions:
attrs: Mapping[str, _ColumnConfig] = attrs_.field(
factory=dict, converter=_normalize_columns
)
consolidate_and_vacuum: bool = attrs_.field(
validator=vld.instance_of(bool), default=False
)

@classmethod
def from_platform_config(

0 comments on commit 806d305

Please sign in to comment.