Skip to content

Commit

Permalink
[python] Ingestion performance (#2434) (#2439)
Browse files Browse the repository at this point in the history
* AlreadyExistsError; use for DataFrame in tiledbsoma.io

* apply in `_create_or_open_collection`

* apply in _create_from_matrix

* apply in _ingest_uns_ndarray

* lint

* Run SO copying workflow on macos-13 to avoid SIP (#2435)

* AlreadyExistsError; use for DataFrame in tiledbsoma.io

* apply in `_create_or_open_collection`

* apply in _create_from_matrix

* apply in _ingest_uns_ndarray

* lint

* neaten

* neaten

* Update raises-notes

* code-review feedback



---------

Co-authored-by: John Kerl <[email protected]>
Co-authored-by: John Blischak <[email protected]>
Co-authored-by: nguyenv <[email protected]>
  • Loading branch information
4 people authored Apr 12, 2024
1 parent 3ed7453 commit 50e5d64
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 39 deletions.
3 changes: 2 additions & 1 deletion apis/python/src/tiledbsoma/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
from ._constants import SOMA_JOINID
from ._dataframe import DataFrame
from ._dense_nd_array import DenseNDArray
from ._exception import DoesNotExistError, SOMAError
from ._exception import AlreadyExistsError, DoesNotExistError, SOMAError
from ._experiment import Experiment
from ._factory import open
from ._general_utilities import (
Expand All @@ -171,6 +171,7 @@
__version__ = get_implementation_version()

__all__ = [
"AlreadyExistsError",
"AxisColumnNames",
"AxisQuery",
"Collection",
Expand Down
28 changes: 20 additions & 8 deletions apis/python/src/tiledbsoma/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@
from ._common_nd_array import NDArray
from ._dataframe import DataFrame
from ._dense_nd_array import DenseNDArray
from ._exception import SOMAError, is_does_not_exist_error
from ._exception import (
AlreadyExistsError,
SOMAError,
is_already_exists_error,
is_does_not_exist_error,
)
from ._funcs import typeguard_ignore
from ._sparse_nd_array import SparseNDArray
from ._tiledb_object import AnyTileDBObject, TileDBObject
Expand Down Expand Up @@ -112,20 +117,27 @@ def create(
the context.
Raises:
tiledbsoma.AlreadyExistsError:
If the underlying object already exists at the given URI.
TileDBError:
If unable to create the underlying object.
Lifecycle:
Experimental.
"""
context = _validate_soma_tiledb_context(context)
tiledb.group_create(uri=uri, ctx=context.tiledb_ctx)
handle = cls._wrapper_type.open(uri, "w", context, tiledb_timestamp)
cls._set_create_metadata(handle)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
try:
tiledb.group_create(uri=uri, ctx=context.tiledb_ctx)
handle = cls._wrapper_type.open(uri, "w", context, tiledb_timestamp)
cls._set_create_metadata(handle)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
except tiledb.TileDBError as tdbe:
if is_already_exists_error(tdbe):
raise AlreadyExistsError(f"{uri!r} already exists")
raise

@classmethod
def open(
Expand Down
18 changes: 13 additions & 5 deletions apis/python/src/tiledbsoma/_common_nd_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import tiledb

from . import _arrow_types, _util
from ._exception import AlreadyExistsError, is_already_exists_error
from ._tiledb_array import TileDBArray
from ._types import OpenTimestamp
from .options._soma_tiledb_context import (
Expand Down Expand Up @@ -77,6 +78,8 @@ def create(
If the ``type`` is unsupported.
ValueError:
If the ``shape`` is unsupported.
tiledbsoma.AlreadyExistsError:
If the underlying object already exists at the given URI.
TileDBError:
If unable to create the underlying object.
Expand All @@ -91,11 +94,16 @@ def create(
context,
is_sparse=cls.is_sparse,
)
handle = cls._create_internal(uri, schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
try:
handle = cls._create_internal(uri, schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
except tiledb.TileDBError as tdbe:
if is_already_exists_error(tdbe):
raise AlreadyExistsError(f"{uri!r} already exists")
raise

@property
def shape(self) -> Tuple[int, ...]:
Expand Down
18 changes: 13 additions & 5 deletions apis/python/src/tiledbsoma/_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from . import _arrow_types, _util
from . import pytiledbsoma as clib
from ._constants import SOMA_JOINID
from ._exception import AlreadyExistsError, is_already_exists_error
from ._query_condition import QueryCondition
from ._read_iters import TableReadIter
from ._tdb_handles import DataFrameWrapper
Expand Down Expand Up @@ -187,6 +188,8 @@ def create(
an undefined column name.
ValueError:
If the ``schema`` specifies illegal column names.
tiledbsoma.AlreadyExistsError:
If the underlying object already exists at the given URI.
TileDBError:
If unable to create the underlying object.
Expand Down Expand Up @@ -217,11 +220,16 @@ def create(
TileDBCreateOptions.from_platform_config(platform_config),
context,
)
handle = cls._create_internal(uri, tdb_schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
try:
handle = cls._create_internal(uri, tdb_schema, context, tiledb_timestamp)
return cls(
handle,
_dont_call_this_use_create_or_open_instead="tiledbsoma-internal-code",
)
except tiledb.TileDBError as tdbe:
if is_already_exists_error(tdbe):
raise AlreadyExistsError(f"{uri!r} already exists")
raise

def keys(self) -> Tuple[str, ...]:
"""Returns the names of the columns when read back as a dataframe.
Expand Down
30 changes: 30 additions & 0 deletions apis/python/src/tiledbsoma/_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,36 @@ def is_does_not_exist_error(e: tiledb.TileDBError) -> bool:
return False


class AlreadyExistsError(SOMAError):
"""Raised when attempting to create an already existing SOMA object.
Lifecycle: experimental
"""

pass


def is_already_exists_error(e: tiledb.TileDBError) -> bool:
"""Given a TileDBError, return true if it indicates the object already exists
Lifecycle: experimental
Example:
try:
tiledb.Array.create(uri, schema, ctx=ctx)
...
except tiledb.TileDBError as e:
if is_already_exists_error(e):
...
raise e
"""
stre = str(e)
# Local-disk, S3, and TileDB Cloud exceptions all have the substring
# "already exists". Here we lower-case the exception message just
# in case someone ever uppercases it on the other end.
return "already exists" in stre.lower()


def is_duplicate_group_key_error(e: tiledb.TileDBError) -> bool:
"""Given a TileDBError, return try if it indicates a duplicate member
add request in a tiledb.Group.
Expand Down
40 changes: 20 additions & 20 deletions apis/python/src/tiledbsoma/io/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@
from .._collection import AnyTileDBCollection, CollectionBase
from .._common_nd_array import NDArray
from .._constants import SOMA_JOINID
from .._exception import DoesNotExistError, SOMAError
from .._exception import (
AlreadyExistsError,
DoesNotExistError,
SOMAError,
)
from .._tdb_handles import RawHandle
from .._tiledb_array import TileDBArray
from .._tiledb_object import AnyTileDBObject, TileDBObject
Expand Down Expand Up @@ -984,17 +988,13 @@ def _create_or_open_collection(
additional_metadata: AdditionalMetadata = None,
) -> CollectionBase[_TDBO]:
try:
thing = cls.open(uri, "w", context=context)
except DoesNotExistError:
pass # This is always OK; make a new one.
else:
coll = cls.create(uri, context=context)
except AlreadyExistsError:
# It already exists. Are we resuming?
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{uri} already exists")
add_metadata(thing, additional_metadata)
return thing
coll = cls.open(uri, "w", context=context)

coll = cls.create(uri, context=context)
add_metadata(coll, additional_metadata)
return coll

Expand Down Expand Up @@ -1194,15 +1194,18 @@ def _write_dataframe_impl(
)

try:
soma_df = _factory.open(df_uri, "w", soma_type=DataFrame, context=context)
except DoesNotExistError:
soma_df = DataFrame.create(
df_uri,
schema=arrow_table.schema,
platform_config=platform_config,
context=context,
)
else:
except AlreadyExistsError:
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{soma_df.uri} already exists")

soma_df = _factory.open(df_uri, "w", soma_type=DataFrame, context=context)

if ingestion_params.skip_existing_nonempty_domain:
storage_ned = _read_nonempty_domain(soma_df)
dim_range = ((int(df.index.min()), int(df.index.max())),)
Expand All @@ -1212,8 +1215,6 @@ def _write_dataframe_impl(
_util.format_elapsed(s, f"SKIPPED {soma_df.uri}"),
)
return soma_df
elif ingestion_params.error_if_already_exists:
raise SOMAError(f"{soma_df.uri} already exists")

if ingestion_params.write_schema_no_data:
logging.log_io(
Expand Down Expand Up @@ -1291,10 +1292,6 @@ def _create_from_matrix(
logging.log_io(None, f"START WRITING {uri}")

try:
soma_ndarray = cls.open(
uri, "w", platform_config=platform_config, context=context
)
except DoesNotExistError:
# A SparseNDArray must be appendable in soma.io.
shape = [None for _ in matrix.shape] if cls.is_sparse else matrix.shape
soma_ndarray = cls.create(
Expand All @@ -1304,9 +1301,12 @@ def _create_from_matrix(
platform_config=platform_config,
context=context,
)
else:
except AlreadyExistsError:
if ingestion_params.error_if_already_exists:
raise SOMAError(f"{soma_ndarray.uri} already exists")
soma_ndarray = cls.open(
uri, "w", platform_config=platform_config, context=context
)

if ingestion_params.write_schema_no_data:
logging.log_io(
Expand Down Expand Up @@ -2749,15 +2749,15 @@ def _ingest_uns_ndarray(
logging.log_io(msg, msg)
return
try:
soma_arr = _factory.open(arr_uri, "w", soma_type=DenseNDArray, context=context)
except DoesNotExistError:
soma_arr = DenseNDArray.create(
arr_uri,
type=pa_dtype,
shape=value.shape,
platform_config=platform_config,
context=context,
)
except AlreadyExistsError:
soma_arr = _factory.open(arr_uri, "w", soma_type=DenseNDArray, context=context)

# If resume mode: don't re-write existing data. This is the user's explicit request
# that we not re-write things that have already been written.
Expand Down

0 comments on commit 50e5d64

Please sign in to comment.