From 7490e2b8a277e5f196b2687f29a6758759079414 Mon Sep 17 00:00:00 2001 From: John Kerl Date: Wed, 20 Nov 2024 14:35:00 -0500 Subject: [PATCH 1/6] [python] Protect against huge enum-of-strings input --- apis/python/src/tiledbsoma/io/conversions.py | 48 ++++++++------------ apis/python/src/tiledbsoma/io/ingest.py | 12 +++-- 2 files changed, 25 insertions(+), 35 deletions(-) diff --git a/apis/python/src/tiledbsoma/io/conversions.py b/apis/python/src/tiledbsoma/io/conversions.py index 107a83855b..5b3318b318 100644 --- a/apis/python/src/tiledbsoma/io/conversions.py +++ b/apis/python/src/tiledbsoma/io/conversions.py @@ -13,6 +13,7 @@ import numpy as np import pandas as pd import pandas._typing as pdt +import pandas.api.types import pyarrow as pa import scipy.sparse as sp @@ -26,18 +27,18 @@ _str_to_type = {"boolean": bool, "string": str, "bytes": bytes} -def decategoricalize_obs_or_var(obs_or_var: pd.DataFrame) -> pd.DataFrame: +def obs_or_var_to_tiledb_supported_array_type(obs_or_var: pd.DataFrame) -> pd.DataFrame: """Performs a typecast into types that TileDB can persist.""" - if len(obs_or_var.columns) > 0: - return pd.DataFrame.from_dict( - { - str(k): to_tiledb_supported_array_type(str(k), v) - for k, v in obs_or_var.items() - }, - ) - else: + if len(obs_or_var.columns) == 0: return obs_or_var.copy() + return pd.DataFrame.from_dict( + { + str(k): to_tiledb_supported_array_type(str(k), v) + for k, v in obs_or_var.items() + }, + ) + @typeguard_ignore def _to_tiledb_supported_dtype(dtype: _DT) -> _DT: @@ -56,27 +57,14 @@ def to_tiledb_supported_array_type(name: str, x: _MT) -> _MT: target_dtype = _to_tiledb_supported_dtype(x.dtype) return x if target_dtype == x.dtype else x.astype(target_dtype) - # categories = x.cat.categories - # cat_dtype = categories.dtype - # if cat_dtype.kind in ("f", "u", "i"): - # if x.hasnans and cat_dtype.kind == "i": - # raise ValueError( - # f"Categorical column {name!r} contains NaN -- unable to convert to TileDB array." - # ) - # # More mysterious spurious mypy errors. - # target_dtype = _to_tiledb_supported_dtype(cat_dtype) # type: ignore[arg-type] - # else: - # # Into the weirdness. See if Pandas can help with edge cases. - # inferred = infer_dtype(categories) - # if x.hasnans and inferred in ("boolean", "bytes"): - # raise ValueError( - # "Categorical array contains NaN -- unable to convert to TileDB array." - # ) - # target_dtype = np.dtype( # type: ignore[assignment] - # _str_to_type.get(inferred, object) - # ) - - # return x.astype(target_dtype) + # If the column is categorical-of-string of high cardinality, we declare + # this is likely a mistake, and it will definitely lead to performance + # issues in subsequent processing. + if isinstance(x, pd.Series) and isinstance(x.dtype, pd.CategoricalDtype): + # Heuristic number + if pandas.api.types.is_string_dtype(x) and len(x.cat.categories) > 4096: + return x.astype(str) + return x diff --git a/apis/python/src/tiledbsoma/io/ingest.py b/apis/python/src/tiledbsoma/io/ingest.py index ac970eb94a..5a6b320d4b 100644 --- a/apis/python/src/tiledbsoma/io/ingest.py +++ b/apis/python/src/tiledbsoma/io/ingest.py @@ -516,7 +516,7 @@ def from_anndata( df_uri = _util.uri_joinpath(experiment_uri, "obs") with _write_dataframe( df_uri, - conversions.decategoricalize_obs_or_var(anndata.obs), + conversions.obs_or_var_to_tiledb_supported_array_type(anndata.obs), id_column_name=obs_id_name, axis_mapping=jidmaps.obs_axis, **ingest_platform_ctx, @@ -566,7 +566,7 @@ def from_anndata( # MS/meas/VAR with _write_dataframe( _util.uri_joinpath(measurement_uri, "var"), - conversions.decategoricalize_obs_or_var(anndata.var), + conversions.obs_or_var_to_tiledb_supported_array_type(anndata.var), id_column_name=var_id_name, # Layer existence is pre-checked in the registration phase axis_mapping=jidmaps.var_axes[measurement_name], @@ -701,7 +701,9 @@ def _ingest_obs_var_m_p( with _write_dataframe( _util.uri_joinpath(raw_uri, "var"), - conversions.decategoricalize_obs_or_var(anndata.raw.var), + conversions.obs_or_var_to_tiledb_supported_array_type( + anndata.raw.var + ), id_column_name=var_id_name, axis_mapping=jidmaps.var_axes["raw"], **ingest_platform_ctx, @@ -794,7 +796,7 @@ def append_obs( with _write_dataframe( exp.obs.uri, - conversions.decategoricalize_obs_or_var(new_obs), + conversions.obs_or_var_to_tiledb_supported_array_type(new_obs), id_column_name=obs_id_name, platform_config=platform_config, context=context, @@ -859,7 +861,7 @@ def append_var( with _write_dataframe( sdf.uri, - conversions.decategoricalize_obs_or_var(new_var), + conversions.obs_or_var_to_tiledb_supported_array_type(new_var), id_column_name=var_id_name, platform_config=platform_config, context=context, From 7a82682e30edc9469a317eb1b36c1fac80d1522a Mon Sep 17 00:00:00 2001 From: John Kerl Date: Thu, 5 Dec 2024 10:46:19 -0500 Subject: [PATCH 2/6] unit testing --- apis/python/src/tiledbsoma/io/conversions.py | 13 +- apis/python/src/tiledbsoma/io/ingest.py | 135 +++++++++++++++++-- apis/python/tests/test_basic_anndata_io.py | 132 ++++++++++++++++++ 3 files changed, 269 insertions(+), 11 deletions(-) diff --git a/apis/python/src/tiledbsoma/io/conversions.py b/apis/python/src/tiledbsoma/io/conversions.py index 5b3318b318..2d8771d66d 100644 --- a/apis/python/src/tiledbsoma/io/conversions.py +++ b/apis/python/src/tiledbsoma/io/conversions.py @@ -26,6 +26,14 @@ _MT = TypeVar("_MT", NPNDArray, sp.spmatrix, PDSeries) _str_to_type = {"boolean": bool, "string": str, "bytes": bytes} +STRING_DECAT_THRESHOLD = 4096 +""" +For enum-of-string columns with a cardinality higher than this, we convert from +enum-of-string in the AnnData ``obs``/``var``, to plain string in TileDB-SOMA +``obs``/``var``. However, if we're appending to existing storage, we follow the +schema there. +""" + def obs_or_var_to_tiledb_supported_array_type(obs_or_var: pd.DataFrame) -> pd.DataFrame: """Performs a typecast into types that TileDB can persist.""" @@ -62,7 +70,10 @@ def to_tiledb_supported_array_type(name: str, x: _MT) -> _MT: # issues in subsequent processing. if isinstance(x, pd.Series) and isinstance(x.dtype, pd.CategoricalDtype): # Heuristic number - if pandas.api.types.is_string_dtype(x) and len(x.cat.categories) > 4096: + if ( + pandas.api.types.is_string_dtype(x) + and len(x.cat.categories) > STRING_DECAT_THRESHOLD + ): return x.astype(str) return x diff --git a/apis/python/src/tiledbsoma/io/ingest.py b/apis/python/src/tiledbsoma/io/ingest.py index 5a6b320d4b..2150c82060 100644 --- a/apis/python/src/tiledbsoma/io/ingest.py +++ b/apis/python/src/tiledbsoma/io/ingest.py @@ -1070,6 +1070,127 @@ def _create_or_open_coll( ) +def _extract_new_values_for_append_aux( + previous_soma_dataframe: DataFrame, + arrow_table: pa.Table, +) -> pa.Table: + """ + Helper function for _extract_new_values_for_append. + + This does two things: + + * Retains only the 'new' rows compared to existing storage. + Example is append-mode updates to the var dataframe for + which it's likely that most/all gene IDs have already been seen. + + * String vs categorical of string: + + o If we're appending a plain-string column to existing + categorical-of-string storage, convert the about-to-be-written data + to categorical of string, to match. + + o If we're appending a categorical-of-string column to existing + plain-string storage, convert the about-to-be-written data + to plain string, to match. + + Context: https://github.com/single-cell-data/TileDB-SOMA/issues/3353. + Namely, we find that AnnData's to_h5ad/from_h5ad can categoricalize (without + the user's knowledge or intention) string columns. For example, even + cell_id/barcode, for which there may be millions of distinct values, with no + gain to be had from dictionary encoding, will be converted to categorical. + We find that converting these high-cardinality enums to plain string is a + significant performance win for subsequent accesses. When we do an initial + ingest from AnnData to TileDB-SOMA, we convert from categorical-of-string to + plain string if the cardinality exceeds some threshold. + + All well and good -- except for one more complication which is append mode. + Namely, if the new column has high enough cardinality that we would + downgrade to plain string, but the existing storage has + categorical-of-string, we must write the new data as categorical-of-string. + Likewise, if the new column has low enough cardinality that we would keep it + as categorical-of-string, but the existing storage has plain string, we must + write the new data as plain strings. + """ + + # Retain only the new rows. + previous_sjids_table = previous_soma_dataframe.read( + column_names=["soma_joinid"] + ).concat() + previous_join_ids = set( + int(e) + for e in get_dataframe_values(previous_sjids_table.to_pandas(), SOMA_JOINID) + ) + mask = [e.as_py() not in previous_join_ids for e in arrow_table[SOMA_JOINID]] + arrow_table = arrow_table.filter(mask) + + # This is a redundant, failsafe check. The appebd-mode registrar already + # ensure schema homogeneity before we get here. + old_schema = previous_soma_dataframe.schema + new_schema = arrow_table.schema + + old_names = sorted(old_schema.names) + new_names = sorted(new_schema.names) + if old_names != new_names: + raise SOMAError(f"old column names {old_names} != {new_names}") + names = new_names + + # Helper functions for + def is_str_type(typ: pa.DataType) -> bool: + return cast(bool, typ == pa.string() or typ == pa.large_string()) + + def is_str_col(field: pa.Field) -> bool: + return is_str_type(field.type) + + def is_str_cat_col(field: pa.Field) -> bool: + if not pa.types.is_dictionary(field.type): + return False + return is_str_type(field.type.value_type) + + # Make a quick check of the old and new schemas to see if any columns need + # changing between plain string and categorical-of-string. We're about to + # duplicate the new data -- and we must, since pyarrow.Table is immutable -- + # so let's only do that if we need to. + any_to_change = False + for name in names: + if is_str_col(old_schema.field(name)) and is_str_cat_col( + new_schema.field(name) + ): + any_to_change = True + break + if is_str_cat_col(old_schema.field(name)) and is_str_col( + new_schema.field(name) + ): + any_to_change = True + break + + if any_to_change: + fields_dict = {} + for name in names: + column = arrow_table.column(name) + old_info = old_schema.field(name) + new_info = new_schema.field(name) + if is_str_col(old_info) and is_str_cat_col(new_info): + # Convert from categorical-of-string to plain string. + column = column.to_pylist() + elif is_str_cat_col(old_info) and is_str_col(new_info): + # Convert from plain string to categorical-of-string. Note: + # libtiledbsoma already merges the enum mappings, e.g if the + # storage has red, yellow, & green, but our new data has some + # yellow, green, and orange. + column = pa.array( + column.to_pylist(), + pa.dictionary( + index_type=old_info.type.index_type, + value_type=old_info.type.value_type, + ordered=old_info.type.ordered, + ), + ) + fields_dict[name] = column + arrow_table = pa.Table.from_pydict(fields_dict) + + return arrow_table + + def _extract_new_values_for_append( df_uri: str, arrow_table: pa.Table, @@ -1100,17 +1221,10 @@ def _extract_new_values_for_append( with _factory.open( df_uri, "r", soma_type=DataFrame, context=context ) as previous_soma_dataframe: - previous_sjids_table = previous_soma_dataframe.read().concat() - previous_join_ids = set( - int(e) - for e in get_dataframe_values( - previous_sjids_table.to_pandas(), SOMA_JOINID - ) + return _extract_new_values_for_append_aux( + previous_soma_dataframe, arrow_table ) - mask = [ - e.as_py() not in previous_join_ids for e in arrow_table[SOMA_JOINID] - ] - return arrow_table.filter(mask) + except DoesNotExistError: return arrow_table @@ -1214,6 +1328,7 @@ def _write_dataframe_impl( # Nominally, nil id_column_name only happens for uns append and we do not append uns, # which is a concern for our caller. This is a second-level check. raise ValueError("internal coding error: id_column_name unspecified") + # XXX arrow_table = _extract_new_values_for_append(df_uri, arrow_table, context) try: diff --git a/apis/python/tests/test_basic_anndata_io.py b/apis/python/tests/test_basic_anndata_io.py index 505394bb06..7805cbe0f6 100644 --- a/apis/python/tests/test_basic_anndata_io.py +++ b/apis/python/tests/test_basic_anndata_io.py @@ -7,6 +7,7 @@ import anndata import numpy as np import pandas as pd +import pyarrow as pa import pytest import scipy import somacore @@ -1347,3 +1348,134 @@ def test_nan_append(conftest_pbmc_small, dtype, nans, new_obs_ids): measurement_name="RNA", registration_mapping=rd, ) + + +# This tests https://github.com/single-cell-data/TileDB-SOMA/pull/3354: +# * High-cardinality categorical-of-string gets converted to plain string, for +# a significant performance benefit in subsequent processing +# * Two particular case needing testing: +# o When a high-cardinality enum is appended to existing storage with categorical of string; +# o When a low-cardinality enum is appended to existing storage with plain string. +def test_decat_append(tmp_path): + + # Prepare the AnnData inputs + nobs_under = tiledbsoma.io.conversions.STRING_DECAT_THRESHOLD - 2 + nobs_over = tiledbsoma.io.conversions.STRING_DECAT_THRESHOLD + 2 + nvar = 100 + + obs_ids_under = [f"under_{e:08}" for e in range(nobs_under)] + obs_ids_over = [f"over_{e:08}" for e in range(nobs_over)] + var_ids = [f"gene_{e:08}" for e in range(nvar)] + + enum_values_under = [f"enum_u_{e:06}" for e in range(nobs_under)] + enum_values_over = [f"enum_o_{e:06}" for e in range(nobs_over)] + + obs_under = pd.DataFrame( + data={ + "obs_id": np.asarray(obs_ids_under), + "is_primary_data": np.asarray([True] * nobs_under), + "myenum": pd.Series(np.asarray(enum_values_under), dtype="category"), + } + ) + obs_under.set_index("obs_id", inplace=True) + + obs_over = pd.DataFrame( + data={ + "obs_id": np.asarray(obs_ids_over), + "is_primary_data": np.asarray([True] * nobs_over), + "myenum": pd.Series(np.asarray(enum_values_over), dtype="category"), + } + ) + obs_over.set_index("obs_id", inplace=True) + + var = pd.DataFrame( + data={ + "var_id": np.asarray(var_ids), + "mybool": np.asarray([True] * nvar), + } + ) + var.set_index("var_id", inplace=True) + + X_under = scipy.sparse.random( + nobs_under, nvar, density=0.1, dtype=np.float64 + ).tocsr() + X_over = scipy.sparse.random(nobs_over, nvar, density=0.1, dtype=np.float64).tocsr() + + adata_under = anndata.AnnData( + X=X_under, obs=obs_under, var=var, dtype=X_under.dtype + ) + adata_over = anndata.AnnData(X=X_over, obs=obs_over, var=var, dtype=X_over.dtype) + + # Do the initial ingests from AnnData format to TileDB-SOMA format + path_under = (tmp_path / "under").as_posix() + path_over = (tmp_path / "over").as_posix() + + tiledbsoma.io.from_anndata(path_under, adata_under, "RNA") + tiledbsoma.io.from_anndata(path_over, adata_over, "RNA") + + # Check that the low-cardinality categorical-of-string in the AnnData has + # been ingested to TileDB-SOMA enum-of-string. + with tiledbsoma.Experiment.open(path_under) as exp_under: + assert pa.types.is_dictionary(exp_under.obs.schema.field("myenum").type) + obs_table = exp_under.obs.read().concat() + assert obs_table.column("myenum").to_pylist() == enum_values_under + + # Check that the high-cardinality categorical-of-string in the AnnData has + # been ingested to TileDB-SOMA plain string. + with tiledbsoma.Experiment.open(path_over) as exp_over: + assert not pa.types.is_dictionary(exp_over.obs.schema.field("myenum").type) + obs_table = exp_over.obs.read().concat() + assert obs_table.column("myenum").to_pylist() == enum_values_over + + # Append over-the-threshold AnnData to under-the-threshold TileDB-SOMA + # storage, and vice versa. + rd_under_over = tiledbsoma.io.register_anndatas( + experiment_uri=path_under, + adatas=[adata_over], + measurement_name="RNA", + obs_field_name="obs_id", + var_field_name="var_id", + ) + + rd_over_under = tiledbsoma.io.register_anndatas( + experiment_uri=path_over, + adatas=[adata_under], + measurement_name="RNA", + obs_field_name="obs_id", + var_field_name="var_id", + ) + + tiledbsoma.io.resize_experiment( + path_under, + nobs=rd_under_over.get_obs_shape(), + nvars=rd_under_over.get_var_shapes(), + ) + tiledbsoma.io.resize_experiment( + path_over, + nobs=rd_over_under.get_obs_shape(), + nvars=rd_over_under.get_var_shapes(), + ) + + tiledbsoma.io.from_anndata( + path_under, adata_over, "RNA", registration_mapping=rd_under_over + ) + tiledbsoma.io.from_anndata( + path_over, adata_under, "RNA", registration_mapping=rd_over_under + ) + + # Check that the appends happened successfully + with tiledbsoma.Experiment.open(path_under) as exp_under: + assert pa.types.is_dictionary(exp_under.obs.schema.field("myenum").type) + obs_table = exp_under.obs.read().concat() + assert ( + obs_table.column("myenum").to_pylist() + == enum_values_under + enum_values_over + ) + + with tiledbsoma.Experiment.open(path_over) as exp_over: + assert not pa.types.is_dictionary(exp_over.obs.schema.field("myenum").type) + obs_table = exp_over.obs.read().concat() + assert ( + obs_table.column("myenum").to_pylist() + == enum_values_over + enum_values_under + ) From e1c2488c3ab1fff7f57f38887537433e5ff25153 Mon Sep 17 00:00:00 2001 From: John Kerl Date: Fri, 6 Dec 2024 15:02:52 -0500 Subject: [PATCH 3/6] fix a failing unit-test case --- apis/python/src/tiledbsoma/io/ingest.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/apis/python/src/tiledbsoma/io/ingest.py b/apis/python/src/tiledbsoma/io/ingest.py index 2150c82060..28137846cb 100644 --- a/apis/python/src/tiledbsoma/io/ingest.py +++ b/apis/python/src/tiledbsoma/io/ingest.py @@ -1128,11 +1128,10 @@ def _extract_new_values_for_append_aux( old_schema = previous_soma_dataframe.schema new_schema = arrow_table.schema - old_names = sorted(old_schema.names) - new_names = sorted(new_schema.names) - if old_names != new_names: - raise SOMAError(f"old column names {old_names} != {new_names}") - names = new_names + # Note: we may be doing an add-column that doesn't exist in tiledbsoma + # storage but is present in the new AnnData. We don't need to change + # anything in that case. Regardless, we can't assume that the old + # and new schema have the same column names. # Helper functions for def is_str_type(typ: pa.DataType) -> bool: @@ -1151,7 +1150,9 @@ def is_str_cat_col(field: pa.Field) -> bool: # duplicate the new data -- and we must, since pyarrow.Table is immutable -- # so let's only do that if we need to. any_to_change = False - for name in names: + for name in new_schema.names: + if name not in old_schema.names: + continue if is_str_col(old_schema.field(name)) and is_str_cat_col( new_schema.field(name) ): @@ -1165,7 +1166,9 @@ def is_str_cat_col(field: pa.Field) -> bool: if any_to_change: fields_dict = {} - for name in names: + for name in new_schema.names: + if name not in old_schema.names: + continue column = arrow_table.column(name) old_info = old_schema.field(name) new_info = new_schema.field(name) From 6e755aaa4aac15f7654d5290d3d6f87a490319b1 Mon Sep 17 00:00:00 2001 From: John Kerl Date: Mon, 9 Dec 2024 13:02:10 -0500 Subject: [PATCH 4/6] code-review feedback [skip ci] Co-authored-by: nguyenv --- apis/python/src/tiledbsoma/io/ingest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apis/python/src/tiledbsoma/io/ingest.py b/apis/python/src/tiledbsoma/io/ingest.py index 28137846cb..92df9241ba 100644 --- a/apis/python/src/tiledbsoma/io/ingest.py +++ b/apis/python/src/tiledbsoma/io/ingest.py @@ -1123,7 +1123,7 @@ def _extract_new_values_for_append_aux( mask = [e.as_py() not in previous_join_ids for e in arrow_table[SOMA_JOINID]] arrow_table = arrow_table.filter(mask) - # This is a redundant, failsafe check. The appebd-mode registrar already + # This is a redundant, failsafe check. The append-mode registrar already # ensure schema homogeneity before we get here. old_schema = previous_soma_dataframe.schema new_schema = arrow_table.schema From d4baf75e401889d7eaa2d49df580da7de3162564 Mon Sep 17 00:00:00 2001 From: John Kerl Date: Mon, 9 Dec 2024 13:03:17 -0500 Subject: [PATCH 5/6] neaten --- apis/python/src/tiledbsoma/io/ingest.py | 1 - 1 file changed, 1 deletion(-) diff --git a/apis/python/src/tiledbsoma/io/ingest.py b/apis/python/src/tiledbsoma/io/ingest.py index 92df9241ba..cbf239f157 100644 --- a/apis/python/src/tiledbsoma/io/ingest.py +++ b/apis/python/src/tiledbsoma/io/ingest.py @@ -1331,7 +1331,6 @@ def _write_dataframe_impl( # Nominally, nil id_column_name only happens for uns append and we do not append uns, # which is a concern for our caller. This is a second-level check. raise ValueError("internal coding error: id_column_name unspecified") - # XXX arrow_table = _extract_new_values_for_append(df_uri, arrow_table, context) try: From 8435095e68e36329f0e374b66c73c711f8071fad Mon Sep 17 00:00:00 2001 From: John Kerl Date: Mon, 9 Dec 2024 14:19:26 -0500 Subject: [PATCH 6/6] more comments --- apis/python/src/tiledbsoma/io/conversions.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/apis/python/src/tiledbsoma/io/conversions.py b/apis/python/src/tiledbsoma/io/conversions.py index 2d8771d66d..0f7380f3a4 100644 --- a/apis/python/src/tiledbsoma/io/conversions.py +++ b/apis/python/src/tiledbsoma/io/conversions.py @@ -36,7 +36,11 @@ def obs_or_var_to_tiledb_supported_array_type(obs_or_var: pd.DataFrame) -> pd.DataFrame: - """Performs a typecast into types that TileDB can persist.""" + """ + Performs a typecast into types that TileDB can persist. This includes, as a + performance improvement, converting high-cardinality categorical-of-string + columns (cardinality > 4096) to plain string. + """ if len(obs_or_var.columns) == 0: return obs_or_var.copy() @@ -56,8 +60,8 @@ def _to_tiledb_supported_dtype(dtype: _DT) -> _DT: def to_tiledb_supported_array_type(name: str, x: _MT) -> _MT: - """Converts datatypes unrepresentable by TileDB into datatypes it can represent. - E.g., float16 -> float32 + """Converts datatypes unrepresentable by TileDB into datatypes it can represent, + e.g., float16 -> float32. """ if isinstance(x, (np.ndarray, sp.spmatrix)) or not isinstance( x.dtype, pd.CategoricalDtype