Skip to content

Commit

Permalink
Support return_arrow for various queries (#256)
Browse files Browse the repository at this point in the history
* Support return_arrow for various queries

* Unit-test cases
  • Loading branch information
johnkerl authored Aug 26, 2022
1 parent 23d61a9 commit 879e341
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 93 deletions.
107 changes: 88 additions & 19 deletions apis/python/src/tiledbsc/annotation_dataframe.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, Sequence, Set, Tuple
from typing import Optional, Sequence, Set, Tuple, Union

import numpy as np
import pandas as pd
import pyarrow as pa
import tiledb

import tiledbsc.util as util
Expand Down Expand Up @@ -109,15 +110,19 @@ def keyset(self) -> Set[str]:

# ----------------------------------------------------------------
def dim_select(
self, ids: Optional[Ids], attrs: Optional[Sequence[str]] = None
) -> pd.DataFrame:
self,
ids: Optional[Ids],
attrs: Optional[Sequence[str]] = None,
*,
return_arrow: bool = False,
) -> Union[pd.DataFrame, pa.Table]:
"""
Selects a slice out of the dataframe with specified `obs_ids` (for `obs`) or `var_ids` (for
`var`). If `ids` is `None`, the entire dataframe is returned. Similarly, if `attrs` are
provided, they're used for the query; else, all attributes are returned.
"""
with self._open("r") as A:
query = A.query(attrs=attrs)
query = A.query(return_arrow=return_arrow, attrs=attrs)
if ids is None:
df = query.df[:]
else:
Expand All @@ -132,79 +137,97 @@ def dim_select(
# so the set_index is already done for us.
#
# However if the data was written somehow else (e.g. by tiledbscr-r) then we do.
if isinstance(df.index, pd.RangeIndex) and self.dim_name in df.columns:
df.set_index(self.dim_name, inplace=True)
if not return_arrow:
if isinstance(df.index, pd.RangeIndex) and self.dim_name in df.columns:
df.set_index(self.dim_name, inplace=True)

# TODO: when UTF-8 attributes are queryable using TileDB-Py's QueryCondition API we can remove this.
# This is the 'decode on read' part of our logic; in from_dataframe we have the 'encode on write' part.
# Context: https://github.com/single-cell-data/TileDB-SingleCell/issues/99.
return self._ascii_to_unicode_dataframe_readback(df)
if return_arrow:
return self._ascii_to_unicode_arrow_readback(df)
else:
return self._ascii_to_unicode_pandas_readback(df)

# ----------------------------------------------------------------
def df(
self, ids: Optional[Ids] = None, attrs: Optional[Sequence[str]] = None
) -> pd.DataFrame:
self,
ids: Optional[Ids] = None,
attrs: Optional[Sequence[str]] = None,
*,
return_arrow: bool = False,
) -> Union[pd.DataFrame, pa.Table]:
"""
Keystroke-saving alias for `.dim_select()`. If `ids` are provided, they're used
to subselect; if not, the entire dataframe is returned. If `attrs` are provided,
they're used for the query; else, all attributes are returned.
"""
return self.dim_select(ids, attrs)
return self.dim_select(ids, attrs, return_arrow=return_arrow)

# ----------------------------------------------------------------
def query(
self,
query_string: Optional[str],
ids: Optional[Ids] = None,
attrs: Optional[Sequence[str]] = None,
) -> pd.DataFrame:
*,
return_arrow: bool = False,
) -> Union[pd.DataFrame, pa.Table]:
"""
Selects from obs/var using a TileDB-Py `QueryCondition` string such as `cell_type ==
"blood"`. If `attrs` is `None`, returns all column names in the dataframe; use `[]` for
`attrs` to select none of them. Any column names specified in the `query_string` must be
included in `attrs` if `attrs` is not `None`. Returns `None` if the slice is empty.
"""
if query_string is None:
return self.dim_select(ids)
return self.dim_select(ids, return_arrow=return_arrow)

with self._open() as A:
qc = tiledb.QueryCondition(query_string)
if attrs is None:
slice_query = A.query(attr_cond=qc)
slice_query = A.query(attr_cond=qc, return_arrow=return_arrow)
if ids is None:
slice_df = slice_query.df[:]
else:
slice_df = slice_query.df[ids]
else:
slice_query = A.query(attr_cond=qc, attrs=attrs)
slice_query = A.query(
attr_cond=qc, attrs=attrs, return_arrow=return_arrow
)
if ids is None:
slice_df = slice_query.df[:]
else:
slice_df = slice_query.df[ids]
# This is the 'decode on read' part of our logic; in dim_select we have the 'encode on write' part.
# Context: https://github.com/single-cell-data/TileDB-SingleCell/issues/99.
return self._ascii_to_unicode_dataframe_readback(slice_df)
if return_arrow:
return self._ascii_to_unicode_arrow_readback(slice_df)
else:
return self._ascii_to_unicode_pandas_readback(slice_df)

# ----------------------------------------------------------------
def _ascii_to_unicode_series_readback(
def _ascii_to_unicode_pandas_series_readback(
self, field_name: str, series: pd.Series
) -> Tuple[str, bool, Optional[pd.Series]]:
"""
Helper method for `_ascii_to_unicode_pandas_readback`
"""
if len(series) > 0 and type(series[0]) == bytes:
return (field_name, True, series.map(lambda e: e.decode()))
else:
return (field_name, False, None)

def _ascii_to_unicode_dataframe_readback(self, df: pd.DataFrame) -> pd.DataFrame:
def _ascii_to_unicode_pandas_readback(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Implements the 'decode on read' partof our logic as noted in `dim_select()`.
Implements the 'decode on read' part of our logic as noted in `dim_select()`.
"""
futures = []
# Empirically we find this has a bit of a speed-up. Presumably that's because of some NumPy
# C++ code releasing the GIL.
with ThreadPoolExecutor() as executor:
for k in df:
future = executor.submit(
self._ascii_to_unicode_series_readback, k, df[k]
self._ascii_to_unicode_pandas_series_readback, k, df[k]
)
futures.append(future)

Expand All @@ -215,6 +238,52 @@ def _ascii_to_unicode_dataframe_readback(self, df: pd.DataFrame) -> pd.DataFrame

return df

# ----------------------------------------------------------------
def _ascii_to_unicode_arrow_series_readback(
self, array_number: int, series: Union[pa.Array, pa.ChunkedArray]
) -> Tuple[int, bool, Optional[Union[pa.Array, pa.ChunkedArray]]]:
"""
Helper method for `_ascii_to_unicode_arrow_readback`
"""
# pyarrow's way of handling 'bytes'
if len(series) > 0 and (
type(series[0]) == pa.LargeBinaryArray
or type(series[0]) == pa.LargeStringScalar
):
return (array_number, True, series.cast(pa.string()))
else:
return (array_number, False, None)

def _ascii_to_unicode_arrow_readback(self, df: pa.Table) -> pa.Table:
"""
Implements the 'decode on read' part of our logic as noted in `dim_select()`.
"""

array_names = df.column_names
futures = []
# Empirically we find this doesn't have much of a speed-up. Presumably that's because of
# PyArrow Python code holding the GIL. Nonetheless, experiments show it isn't slower so
# we'll keep the ThreadPoolExecutor logic, which will only get faster pending (hypothetical)
# future PyArrow C++ work.

with ThreadPoolExecutor() as executor:
for array_number in range(df.num_columns):
future = executor.submit(
self._ascii_to_unicode_arrow_series_readback,
array_number,
df[array_number],
)
futures.append(future)

new_arrays = [None] * df.num_columns
for future in futures:
array_number, modified, new_array = future.result()
if modified:
new_arrays[array_number] = new_array
else:
new_arrays[array_number] = df[array_number]
return pa.Table.from_arrays(new_arrays, names=array_names)

# ----------------------------------------------------------------
def from_dataframe(self, dataframe: pd.DataFrame, extent: int = 2048) -> None:
"""
Expand Down
26 changes: 20 additions & 6 deletions apis/python/src/tiledbsc/annotation_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import numpy as np
import pandas as pd
import pyarrow as pa
import tiledb

import tiledbsc.util as util
Expand Down Expand Up @@ -66,27 +67,40 @@ def shape(self) -> Tuple[int, int]:
return (num_rows, num_cols)

# ----------------------------------------------------------------
def dim_select(self, ids: Optional[Ids]) -> pd.DataFrame:
def dim_select(
self,
ids: Optional[Ids] = None,
*,
return_arrow: bool = False,
) -> Union[pd.DataFrame, pa.Table]:
"""
Selects a slice out of the array with specified `obs_ids` (for `obsm` elements) or
`var_ids` (for `varm` elements). If `ids` is `None`, the entire array is returned.
"""
if ids is None:
with self._open() as A:
df = A.df[:]
query = A.query(return_arrow=return_arrow)
df = query.df[:]
else:
with self._open() as A:
df = A.df[ids]
df.set_index(self.dim_name, inplace=True)
query = A.query(return_arrow=return_arrow)
df = query.df[ids]
if not return_arrow:
df.set_index(self.dim_name, inplace=True)
return df

# ----------------------------------------------------------------
def df(self, ids: Optional[Ids] = None) -> pd.DataFrame:
def df(
self,
ids: Optional[Ids] = None,
*,
return_arrow: bool = False,
) -> Union[pd.DataFrame, pa.Table]:
"""
Keystroke-saving alias for `.dim_select()`. If `ids` are provided, they're used
to subselect; if not, the entire dataframe is returned.
"""
return self.dim_select(ids)
return self.dim_select(ids, return_arrow=return_arrow)

# ----------------------------------------------------------------
def from_matrix_and_dim_values(
Expand Down
31 changes: 21 additions & 10 deletions apis/python/src/tiledbsc/assay_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import numpy as np
import pandas as pd
import pyarrow as pa
import scipy.sparse as sp
import tiledb

Expand Down Expand Up @@ -77,36 +78,46 @@ def shape(self) -> Tuple[int, int]:

# ----------------------------------------------------------------
def dim_select(
self, obs_ids: Optional[Ids], var_ids: Optional[Ids]
) -> pd.DataFrame:
self,
obs_ids: Optional[Ids],
var_ids: Optional[Ids],
*,
return_arrow: bool = False,
) -> Union[pd.DataFrame, pa.Table]:
"""
Selects a slice out of the matrix with specified `obs_ids` and/or `var_ids`.
Either or both of the ID lists may be `None`, meaning, do not subselect along
that dimension. If both ID lists are `None`, the entire matrix is returned.
"""
with tiledb.open(self.uri, ctx=self._ctx) as A:
query = A.query(return_arrow=return_arrow)
if obs_ids is None:
if var_ids is None:
df = A.df[:, :]
df = query.df[:, :]
else:
df = A.df[:, var_ids]
df = query.df[:, var_ids]
else:
if var_ids is None:
df = A.df[obs_ids, :]
df = query.df[obs_ids, :]
else:
df = A.df[obs_ids, var_ids]
df.set_index([self.row_dim_name, self.col_dim_name], inplace=True)
df = query.df[obs_ids, var_ids]
if not return_arrow:
df.set_index([self.row_dim_name, self.col_dim_name], inplace=True)
return df

# ----------------------------------------------------------------
def df(
self, obs_ids: Optional[Ids] = None, var_ids: Optional[Ids] = None
) -> pd.DataFrame:
self,
obs_ids: Optional[Ids] = None,
var_ids: Optional[Ids] = None,
*,
return_arrow: bool = False,
) -> Union[pd.DataFrame, pa.Table]:
"""
Keystroke-saving alias for `.dim_select()`. If either of `obs_ids` or `var_ids`
are provided, they're used to subselect; if not, the entire dataframe is returned.
"""
return self.dim_select(obs_ids, var_ids)
return self.dim_select(obs_ids, var_ids, return_arrow=return_arrow)

# ----------------------------------------------------------------
def csr(
Expand Down
Loading

0 comments on commit 879e341

Please sign in to comment.