Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport release-1.7] [python] Chunked writes for DataFrame; byte-caps for Arrow-table writes #2064

Merged
merged 1 commit into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 87 additions & 12 deletions apis/python/src/tiledbsoma/io/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,30 @@ def _extract_new_values_for_append(
return arrow_table


def _write_arrow_table(
arrow_table: pa.Table,
handle: Union[DataFrame, SparseNDArray],
tiledb_create_options: TileDBCreateOptions,
) -> None:
"""Handles num-bytes capacity for remote object stores."""
cap = tiledb_create_options.remote_cap_nbytes
if arrow_table.nbytes > cap:
n = len(arrow_table)
if n < 2:
raise SOMAError(
"single table row nbytes {arrow_table.nbytes} exceeds cap nbytes {cap}"
)
m = n // 2
_write_arrow_table(arrow_table[:m], handle, tiledb_create_options)
_write_arrow_table(arrow_table[m:], handle, tiledb_create_options)
else:
logging.log_io(
None,
f"Write Arrow table num_rows={len(arrow_table)} num_bytes={arrow_table.nbytes} cap={cap}",
)
handle.write(arrow_table)


def _write_dataframe(
df_uri: str,
df: pd.DataFrame,
Expand Down Expand Up @@ -1252,7 +1276,10 @@ def _write_dataframe_impl(
)
return soma_df

soma_df.write(arrow_table)
tiledb_create_options = TileDBCreateOptions.from_platform_config(platform_config)

_write_arrow_table(arrow_table, soma_df, tiledb_create_options)

logging.log_io(
f"Wrote {soma_df.uri}",
_util.format_elapsed(s, f"FINISH WRITING {soma_df.uri}"),
Expand Down Expand Up @@ -1719,10 +1746,40 @@ def _write_matrix_to_denseNDArray(

# OR, write in chunks
eta_tracker = eta.Tracker()
nrow, ncol = matrix.shape
if matrix.ndim == 2:
nrow, ncol = matrix.shape
elif matrix.ndim == 1:
nrow = matrix.shape[0]
ncol = 1
else:
raise ValueError(
f"only 1D or 2D dense arrays are supported here; got {matrix.ndim}"
)

# Number of rows to chunk by. These are dense writes, so this is loop-invariant.
# * The goal_chunk_nnz is an older parameter. It's still important, as for backed AnnData,
# it controls how much is read into client RAM from the backing store on each chunk.
# * The remote_cap_nbytes is an older parameter.
# * Compute chunk sizes for both and take the minimum.
chunk_size_using_nnz = int(math.ceil(tiledb_create_options.goal_chunk_nnz / ncol))

try:
# not scipy csr/csc
itemsize = matrix.itemsize
except AttributeError:
# scipy csr/csc
itemsize = matrix.data.itemsize

total_nbytes = matrix.size * itemsize
nbytes_num_chunks = math.ceil(
total_nbytes / tiledb_create_options.remote_cap_nbytes
)
nbytes_num_chunks = min(1, nbytes_num_chunks)
chunk_size_using_nbytes = math.floor(nrow / nbytes_num_chunks)

chunk_size = min(chunk_size_using_nnz, chunk_size_using_nbytes)

i = 0
# Number of rows to chunk by. Dense writes, so this is a constant.
chunk_size = int(math.ceil(tiledb_create_options.goal_chunk_nnz / ncol))
while i < nrow:
t1 = time.time()
i2 = i + chunk_size
Expand All @@ -1735,7 +1792,10 @@ def _write_matrix_to_denseNDArray(
% (i, i2 - 1, nrow, chunk_percent),
)

chunk = matrix[i:i2, :]
if matrix.ndim == 2:
chunk = matrix[i:i2, :]
else:
chunk = matrix[i:i2]

if ingestion_params.skip_existing_nonempty_domain and storage_ned is not None:
chunk_bounds = matrix_bounds
Expand All @@ -1757,7 +1817,10 @@ def _write_matrix_to_denseNDArray(
tensor = pa.Tensor.from_numpy(chunk)
else:
tensor = pa.Tensor.from_numpy(chunk.toarray())
soma_ndarray.write((slice(i, i2), slice(None)), tensor)
if matrix.ndim == 2:
soma_ndarray.write((slice(i, i2), slice(None)), tensor)
else:
soma_ndarray.write((slice(i, i2),), tensor)

t2 = time.time()
chunk_seconds = t2 - t1
Expand Down Expand Up @@ -1971,6 +2034,11 @@ def _find_sparse_chunk_size_backed(
extent = int(matrix.shape[axis])

# Initial guess of chunk size.
#
# The goal_chunk_nnz is important, as for backed AnnData, it controls how much is read into
# client RAM from the backing store on each chunk. We also subdivide chunks by
# remote_cap_nbytes, if necessary, within _write_arrow_table in order to accommodate remote
# object stores, which is a different ceiling.
chunk_size = max(1, int(math.floor(goal_chunk_nnz / mean_nnz)))
if chunk_size > extent:
chunk_size = extent
Expand Down Expand Up @@ -2231,9 +2299,10 @@ def _coo_to_table(
),
)

soma_ndarray.write(
_coo_to_table(chunk_coo, axis_0_mapping, axis_1_mapping, stride_axis, i)
arrow_table = _coo_to_table(
chunk_coo, axis_0_mapping, axis_1_mapping, stride_axis, i
)
_write_arrow_table(arrow_table, soma_ndarray, tiledb_create_options)

t2 = time.time()
chunk_seconds = t2 - t1
Expand Down Expand Up @@ -2634,11 +2703,17 @@ def _ingest_uns_ndarray(

with soma_arr:
_maybe_set(coll, key, soma_arr, use_relative_uri=use_relative_uri)
soma_arr.write(
(),
pa.Tensor.from_numpy(value),
platform_config=platform_config,

_write_matrix_to_denseNDArray(
soma_arr,
value,
tiledb_create_options=TileDBCreateOptions.from_platform_config(
platform_config
),
context=context,
ingestion_params=ingestion_params,
)

msg = f"Wrote {soma_arr.uri} (uns ndarray)"
logging.log_io(msg, msg)

Expand Down
6 changes: 6 additions & 0 deletions apis/python/src/tiledbsoma/options/_tiledb_create_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ class TileDBCreateOptions:
goal_chunk_nnz: int = attrs_.field(
validator=vld.instance_of(int), default=100_000_000
)
# We would prefer _remote_cap_nbytes as this is a server-side parameter
# people should not be changing. However, leading underscores are not
# accepted by the attrs framework.
remote_cap_nbytes: int = attrs_.field(
validator=vld.instance_of(int), default=2_400_000_000
)
capacity: int = attrs_.field(validator=vld.instance_of(int), default=100_000)
offsets_filters: Tuple[_DictFilterSpec, ...] = attrs_.field(
converter=_normalize_filters,
Expand Down
44 changes: 44 additions & 0 deletions apis/python/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def src_matrix(request):
TileDBCreateOptions(write_X_chunked=False, goal_chunk_nnz=100000),
TileDBCreateOptions(write_X_chunked=True, goal_chunk_nnz=10000),
TileDBCreateOptions(write_X_chunked=True, goal_chunk_nnz=100000),
TileDBCreateOptions(write_X_chunked=True, remote_cap_nbytes=100000),
],
)
@pytest.mark.parametrize(
Expand Down Expand Up @@ -127,3 +128,46 @@ def test_io_create_from_matrix_Sparse_nd_array(

# fast equality check using __ne__
assert (sp.csr_matrix(src_matrix) != read_back).nnz == 0


@pytest.mark.parametrize(
"num_rows",
[0, 1, 2, 3, 4, 10, 100, 1_000, 10_000],
)
@pytest.mark.parametrize(
"cap_nbytes",
[1, 100, 1_000, 10_000],
)
def test_write_arrow_table(tmp_path, num_rows, cap_nbytes):
"""
Additional focus-testing for tiledbsoma.io._write_arrow_table
"""

schema = pa.schema(
[
("foo", pa.int32()),
("bar", pa.float64()),
]
)

pydict = {}
pydict["soma_joinid"] = list(range(num_rows))
pydict["foo"] = [(e + 1) * 10 for e in range(num_rows)]
pydict["bar"] = [(e + 1) / 25 for e in range(num_rows)]

opt = soma.TileDBCreateOptions(remote_cap_nbytes=cap_nbytes)
uri = tmp_path.as_posix()
expect_error = cap_nbytes == 1 and num_rows > 0 # Not enough room for even one row

with soma.DataFrame.create(uri, schema=schema) as sdf:
table = pa.Table.from_pydict(pydict)
if expect_error:
with pytest.raises(soma.SOMAError):
somaio.ingest._write_arrow_table(table, sdf, opt)
else:
somaio.ingest._write_arrow_table(table, sdf, opt)

if not expect_error:
with soma.DataFrame.open(uri) as sdf:
pdf = sdf.read().concat().to_pandas()
assert list(pdf["foo"]) == pydict["foo"]
Loading