Skip to content

Commit

Permalink
Merge pull request #667 from andrewfulton9/partial_read
Browse files Browse the repository at this point in the history
  • Loading branch information
Carreau authored Jan 25, 2021
2 parents a8ee708 + 45e8c25 commit 33ec64a
Show file tree
Hide file tree
Showing 10 changed files with 546 additions and 43 deletions.
12 changes: 12 additions & 0 deletions docs/release.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ This release od Zarr Python is is the first release of Zarr to not supporting Py
See `this link <https://github.com/zarr-developers/zarr-python/milestone/11?closed=1>` for the full list of closed and
merged PR tagged with the 2.6 milestone.

* Add ability to partially read and decompress arrays, see :issue:`667`. It is
only available to chunks stored using fs-spec and using bloc as a compressor.

For certain analysis case when only a small portion of chunks is needed it can
be advantageous to only access and decompress part of the chunks. Doing
partial read and decompression add high latency to many of the operation so
should be used only when the subset of the data is small compared to the full
chunks and is stored contiguously (that is to say either last dimensions for C
layout, firsts for F). Pass ``partial_decompress=True`` as argument when
creating an ``Array``, or when using ``open_array``. No option exists yet to
apply partial read and decompress on a per-operation basis.

2.5.0
-----

Expand Down
2 changes: 1 addition & 1 deletion requirements_dev_minimal.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# library requirements
asciitree==0.3.3
fasteners==0.15
numcodecs==0.6.4
numcodecs==0.7.2
msgpack-python==0.5.6
setuptools-scm==3.3.3
# test requirements
Expand Down
152 changes: 128 additions & 24 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,41 @@

from zarr.attrs import Attributes
from zarr.codecs import AsType, get_codec
from zarr.errors import ArrayNotFoundError, ReadOnlyError
from zarr.indexing import (BasicIndexer, CoordinateIndexer, MaskIndexer,
OIndex, OrthogonalIndexer, VIndex, check_fields,
check_no_multi_fields, ensure_tuple,
err_too_many_indices, is_contiguous_selection,
is_scalar, pop_fields)
from zarr.errors import ArrayNotFoundError, ReadOnlyError, ArrayIndexError
from zarr.indexing import (
BasicIndexer,
CoordinateIndexer,
MaskIndexer,
OIndex,
OrthogonalIndexer,
VIndex,
PartialChunkIterator,
check_fields,
check_no_multi_fields,
ensure_tuple,
err_too_many_indices,
is_contiguous_selection,
is_scalar,
pop_fields,
)
from zarr.meta import decode_array_metadata, encode_array_metadata
from zarr.storage import array_meta_key, attrs_key, getsize, listdir
from zarr.util import (InfoReporter, check_array_shape, human_readable_size,
is_total_slice, nolock, normalize_chunks,
normalize_resize_args, normalize_shape,
normalize_storage_path)
from zarr.util import (
InfoReporter,
check_array_shape,
human_readable_size,
is_total_slice,
nolock,
normalize_chunks,
normalize_resize_args,
normalize_shape,
normalize_storage_path,
PartialReadBuffer,
)


# noinspection PyUnresolvedReferences
class Array(object):
class Array:
"""Instantiate an array from an initialized store.
Parameters
Expand All @@ -51,6 +70,12 @@ class Array(object):
If True (default), user attributes will be cached for attribute read
operations. If False, user attributes are reloaded from the store prior
to all attribute read operations.
partial_decompress : bool, optional
If True and while the chunk_store is a FSStore and the compresion used
is Blosc, when getting data from the array chunks will be partially
read and decompressed when possible.
.. versionadded:: 2.7
Attributes
----------
Expand Down Expand Up @@ -102,8 +127,17 @@ class Array(object):
"""

def __init__(self, store, path=None, read_only=False, chunk_store=None,
synchronizer=None, cache_metadata=True, cache_attrs=True):
def __init__(
self,
store,
path=None,
read_only=False,
chunk_store=None,
synchronizer=None,
cache_metadata=True,
cache_attrs=True,
partial_decompress=False,
):
# N.B., expect at this point store is fully initialized with all
# configuration metadata fully specified and normalized

Expand All @@ -118,6 +152,7 @@ def __init__(self, store, path=None, read_only=False, chunk_store=None,
self._synchronizer = synchronizer
self._cache_metadata = cache_metadata
self._is_view = False
self._partial_decompress = partial_decompress

# initialize metadata
self._load_metadata()
Expand Down Expand Up @@ -1580,8 +1615,17 @@ def _set_selection(self, indexer, value, fields=None):
self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values,
fields=fields)

def _process_chunk(self, out, cdata, chunk_selection, drop_axes,
out_is_ndarray, fields, out_selection):
def _process_chunk(
self,
out,
cdata,
chunk_selection,
drop_axes,
out_is_ndarray,
fields,
out_selection,
partial_read_decode=False,
):
"""Take binary data from storage and fill output array"""
if (out_is_ndarray and
not fields and
Expand All @@ -1604,8 +1648,9 @@ def _process_chunk(self, out, cdata, chunk_selection, drop_axes,
# optimization: we want the whole chunk, and the destination is
# contiguous, so we can decompress directly from the chunk
# into the destination array

if self._compressor:
if isinstance(cdata, PartialReadBuffer):
cdata = cdata.read_full()
self._compressor.decode(cdata, dest)
else:
chunk = ensure_ndarray(cdata).view(self._dtype)
Expand All @@ -1614,6 +1659,33 @@ def _process_chunk(self, out, cdata, chunk_selection, drop_axes,
return

# decode chunk
try:
if partial_read_decode:
cdata.prepare_chunk()
# size of chunk
tmp = np.empty(self._chunks, dtype=self.dtype)
index_selection = PartialChunkIterator(chunk_selection, self.chunks)
for start, nitems, partial_out_selection in index_selection:
expected_shape = [
len(
range(*partial_out_selection[i].indices(self.chunks[0] + 1))
)
if i < len(partial_out_selection)
else dim
for i, dim in enumerate(self.chunks)
]
cdata.read_part(start, nitems)
chunk_partial = self._decode_chunk(
cdata.buff,
start=start,
nitems=nitems,
expected_shape=expected_shape,
)
tmp[partial_out_selection] = chunk_partial
out[out_selection] = tmp[chunk_selection]
return
except ArrayIndexError:
cdata = cdata.read_full()
chunk = self._decode_chunk(cdata)

# select data from chunk
Expand Down Expand Up @@ -1688,11 +1760,36 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection,
out_is_ndarray = False

ckeys = [self._chunk_key(ch) for ch in lchunk_coords]
cdatas = self.chunk_store.getitems(ckeys, on_error="omit")
if (
self._partial_decompress
and self._compressor
and self._compressor.codec_id == "blosc"
and hasattr(self._compressor, "decode_partial")
and not fields
and self.dtype != object
and hasattr(self.chunk_store, "getitems")
):
partial_read_decode = True
cdatas = {
ckey: PartialReadBuffer(ckey, self.chunk_store)
for ckey in ckeys
if ckey in self.chunk_store
}
else:
partial_read_decode = False
cdatas = self.chunk_store.getitems(ckeys, on_error="omit")
for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection):
if ckey in cdatas:
self._process_chunk(out, cdatas[ckey], chunk_select, drop_axes,
out_is_ndarray, fields, out_select)
self._process_chunk(
out,
cdatas[ckey],
chunk_select,
drop_axes,
out_is_ndarray,
fields,
out_select,
partial_read_decode=partial_read_decode,
)
else:
# check exception type
if self._fill_value is not None:
Expand All @@ -1706,7 +1803,8 @@ def _chunk_setitems(self, lchunk_coords, lchunk_selection, values, fields=None):
ckeys = [self._chunk_key(co) for co in lchunk_coords]
cdatas = [self._process_for_setitem(key, sel, val, fields=fields)
for key, sel, val in zip(ckeys, lchunk_selection, values)]
self.chunk_store.setitems({k: v for k, v in zip(ckeys, cdatas)})
values = {k: v for k, v in zip(ckeys, cdatas)}
self.chunk_store.setitems(values)

def _chunk_setitem(self, chunk_coords, chunk_selection, value, fields=None):
"""Replace part or whole of a chunk.
Expand Down Expand Up @@ -1800,11 +1898,17 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None):
def _chunk_key(self, chunk_coords):
return self._key_prefix + '.'.join(map(str, chunk_coords))

def _decode_chunk(self, cdata):

def _decode_chunk(self, cdata, start=None, nitems=None, expected_shape=None):
# decompress
if self._compressor:
chunk = self._compressor.decode(cdata)
# only decode requested items
if (
all([x is not None for x in [start, nitems]])
and self._compressor.codec_id == "blosc"
) and hasattr(self._compressor, "decode_partial"):
chunk = self._compressor.decode_partial(cdata, start, nitems)
else:
chunk = self._compressor.decode(cdata)
else:
chunk = cdata

Expand All @@ -1829,7 +1933,7 @@ def _decode_chunk(self, cdata):

# ensure correct chunk shape
chunk = chunk.reshape(-1, order='A')
chunk = chunk.reshape(self._chunks, order=self._order)
chunk = chunk.reshape(expected_shape or self._chunks, order=self._order)

return chunk

Expand Down
31 changes: 26 additions & 5 deletions zarr/creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,26 @@ def array(data, **kwargs):
return z


def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None,
compressor='default', fill_value=0, order='C', synchronizer=None,
filters=None, cache_metadata=True, cache_attrs=True, path=None,
object_codec=None, chunk_store=None, storage_options=None,
**kwargs):
def open_array(
store=None,
mode="a",
shape=None,
chunks=True,
dtype=None,
compressor="default",
fill_value=0,
order="C",
synchronizer=None,
filters=None,
cache_metadata=True,
cache_attrs=True,
path=None,
object_codec=None,
chunk_store=None,
storage_options=None,
partial_decompress=False,
**kwargs
):
"""Open an array using file-mode-like semantics.
Parameters
Expand Down Expand Up @@ -415,6 +430,12 @@ def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None,
storage_options : dict
If using an fsspec URL to create the store, these will be passed to
the backend implementation. Ignored otherwise.
partial_decompress : bool, optional
If True and while the chunk_store is a FSStore and the compresion used
is Blosc, when getting data from the array chunks will be partially
read and decompressed when possible.
.. versionadded:: 2.7
Returns
-------
Expand Down
4 changes: 4 additions & 0 deletions zarr/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ def __init__(self, *args):
super().__init__(self._msg.format(*args))


class ArrayIndexError(IndexError):
pass


class _BaseZarrIndexError(IndexError):
_msg = ""

Expand Down
Loading

0 comments on commit 33ec64a

Please sign in to comment.