diff --git a/docs/release.rst b/docs/release.rst index 1fa6526eac..5cdfc96055 100644 --- a/docs/release.rst +++ b/docs/release.rst @@ -30,6 +30,18 @@ This release od Zarr Python is is the first release of Zarr to not supporting Py See `this link ` for the full list of closed and merged PR tagged with the 2.6 milestone. +* Add ability to partially read and decompress arrays, see :issue:`667`. It is + only available to chunks stored using fs-spec and using bloc as a compressor. + + For certain analysis case when only a small portion of chunks is needed it can + be advantageous to only access and decompress part of the chunks. Doing + partial read and decompression add high latency to many of the operation so + should be used only when the subset of the data is small compared to the full + chunks and is stored contiguously (that is to say either last dimensions for C + layout, firsts for F). Pass ``partial_decompress=True`` as argument when + creating an ``Array``, or when using ``open_array``. No option exists yet to + apply partial read and decompress on a per-operation basis. + 2.5.0 ----- diff --git a/requirements_dev_minimal.txt b/requirements_dev_minimal.txt index 61c3fe691f..e372137811 100644 --- a/requirements_dev_minimal.txt +++ b/requirements_dev_minimal.txt @@ -1,7 +1,7 @@ # library requirements asciitree==0.3.3 fasteners==0.15 -numcodecs==0.6.4 +numcodecs==0.7.2 msgpack-python==0.5.6 setuptools-scm==3.3.3 # test requirements diff --git a/zarr/core.py b/zarr/core.py index 69fa296576..f8e5834070 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -11,22 +11,41 @@ from zarr.attrs import Attributes from zarr.codecs import AsType, get_codec -from zarr.errors import ArrayNotFoundError, ReadOnlyError -from zarr.indexing import (BasicIndexer, CoordinateIndexer, MaskIndexer, - OIndex, OrthogonalIndexer, VIndex, check_fields, - check_no_multi_fields, ensure_tuple, - err_too_many_indices, is_contiguous_selection, - is_scalar, pop_fields) +from zarr.errors import ArrayNotFoundError, ReadOnlyError, ArrayIndexError +from zarr.indexing import ( + BasicIndexer, + CoordinateIndexer, + MaskIndexer, + OIndex, + OrthogonalIndexer, + VIndex, + PartialChunkIterator, + check_fields, + check_no_multi_fields, + ensure_tuple, + err_too_many_indices, + is_contiguous_selection, + is_scalar, + pop_fields, +) from zarr.meta import decode_array_metadata, encode_array_metadata from zarr.storage import array_meta_key, attrs_key, getsize, listdir -from zarr.util import (InfoReporter, check_array_shape, human_readable_size, - is_total_slice, nolock, normalize_chunks, - normalize_resize_args, normalize_shape, - normalize_storage_path) +from zarr.util import ( + InfoReporter, + check_array_shape, + human_readable_size, + is_total_slice, + nolock, + normalize_chunks, + normalize_resize_args, + normalize_shape, + normalize_storage_path, + PartialReadBuffer, +) # noinspection PyUnresolvedReferences -class Array(object): +class Array: """Instantiate an array from an initialized store. Parameters @@ -51,6 +70,12 @@ class Array(object): If True (default), user attributes will be cached for attribute read operations. If False, user attributes are reloaded from the store prior to all attribute read operations. + partial_decompress : bool, optional + If True and while the chunk_store is a FSStore and the compresion used + is Blosc, when getting data from the array chunks will be partially + read and decompressed when possible. + + .. versionadded:: 2.7 Attributes ---------- @@ -102,8 +127,17 @@ class Array(object): """ - def __init__(self, store, path=None, read_only=False, chunk_store=None, - synchronizer=None, cache_metadata=True, cache_attrs=True): + def __init__( + self, + store, + path=None, + read_only=False, + chunk_store=None, + synchronizer=None, + cache_metadata=True, + cache_attrs=True, + partial_decompress=False, + ): # N.B., expect at this point store is fully initialized with all # configuration metadata fully specified and normalized @@ -118,6 +152,7 @@ def __init__(self, store, path=None, read_only=False, chunk_store=None, self._synchronizer = synchronizer self._cache_metadata = cache_metadata self._is_view = False + self._partial_decompress = partial_decompress # initialize metadata self._load_metadata() @@ -1580,8 +1615,17 @@ def _set_selection(self, indexer, value, fields=None): self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values, fields=fields) - def _process_chunk(self, out, cdata, chunk_selection, drop_axes, - out_is_ndarray, fields, out_selection): + def _process_chunk( + self, + out, + cdata, + chunk_selection, + drop_axes, + out_is_ndarray, + fields, + out_selection, + partial_read_decode=False, + ): """Take binary data from storage and fill output array""" if (out_is_ndarray and not fields and @@ -1604,8 +1648,9 @@ def _process_chunk(self, out, cdata, chunk_selection, drop_axes, # optimization: we want the whole chunk, and the destination is # contiguous, so we can decompress directly from the chunk # into the destination array - if self._compressor: + if isinstance(cdata, PartialReadBuffer): + cdata = cdata.read_full() self._compressor.decode(cdata, dest) else: chunk = ensure_ndarray(cdata).view(self._dtype) @@ -1614,6 +1659,33 @@ def _process_chunk(self, out, cdata, chunk_selection, drop_axes, return # decode chunk + try: + if partial_read_decode: + cdata.prepare_chunk() + # size of chunk + tmp = np.empty(self._chunks, dtype=self.dtype) + index_selection = PartialChunkIterator(chunk_selection, self.chunks) + for start, nitems, partial_out_selection in index_selection: + expected_shape = [ + len( + range(*partial_out_selection[i].indices(self.chunks[0] + 1)) + ) + if i < len(partial_out_selection) + else dim + for i, dim in enumerate(self.chunks) + ] + cdata.read_part(start, nitems) + chunk_partial = self._decode_chunk( + cdata.buff, + start=start, + nitems=nitems, + expected_shape=expected_shape, + ) + tmp[partial_out_selection] = chunk_partial + out[out_selection] = tmp[chunk_selection] + return + except ArrayIndexError: + cdata = cdata.read_full() chunk = self._decode_chunk(cdata) # select data from chunk @@ -1688,11 +1760,36 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection, out_is_ndarray = False ckeys = [self._chunk_key(ch) for ch in lchunk_coords] - cdatas = self.chunk_store.getitems(ckeys, on_error="omit") + if ( + self._partial_decompress + and self._compressor + and self._compressor.codec_id == "blosc" + and hasattr(self._compressor, "decode_partial") + and not fields + and self.dtype != object + and hasattr(self.chunk_store, "getitems") + ): + partial_read_decode = True + cdatas = { + ckey: PartialReadBuffer(ckey, self.chunk_store) + for ckey in ckeys + if ckey in self.chunk_store + } + else: + partial_read_decode = False + cdatas = self.chunk_store.getitems(ckeys, on_error="omit") for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection): if ckey in cdatas: - self._process_chunk(out, cdatas[ckey], chunk_select, drop_axes, - out_is_ndarray, fields, out_select) + self._process_chunk( + out, + cdatas[ckey], + chunk_select, + drop_axes, + out_is_ndarray, + fields, + out_select, + partial_read_decode=partial_read_decode, + ) else: # check exception type if self._fill_value is not None: @@ -1706,7 +1803,8 @@ def _chunk_setitems(self, lchunk_coords, lchunk_selection, values, fields=None): ckeys = [self._chunk_key(co) for co in lchunk_coords] cdatas = [self._process_for_setitem(key, sel, val, fields=fields) for key, sel, val in zip(ckeys, lchunk_selection, values)] - self.chunk_store.setitems({k: v for k, v in zip(ckeys, cdatas)}) + values = {k: v for k, v in zip(ckeys, cdatas)} + self.chunk_store.setitems(values) def _chunk_setitem(self, chunk_coords, chunk_selection, value, fields=None): """Replace part or whole of a chunk. @@ -1800,11 +1898,17 @@ def _process_for_setitem(self, ckey, chunk_selection, value, fields=None): def _chunk_key(self, chunk_coords): return self._key_prefix + '.'.join(map(str, chunk_coords)) - def _decode_chunk(self, cdata): - + def _decode_chunk(self, cdata, start=None, nitems=None, expected_shape=None): # decompress if self._compressor: - chunk = self._compressor.decode(cdata) + # only decode requested items + if ( + all([x is not None for x in [start, nitems]]) + and self._compressor.codec_id == "blosc" + ) and hasattr(self._compressor, "decode_partial"): + chunk = self._compressor.decode_partial(cdata, start, nitems) + else: + chunk = self._compressor.decode(cdata) else: chunk = cdata @@ -1829,7 +1933,7 @@ def _decode_chunk(self, cdata): # ensure correct chunk shape chunk = chunk.reshape(-1, order='A') - chunk = chunk.reshape(self._chunks, order=self._order) + chunk = chunk.reshape(expected_shape or self._chunks, order=self._order) return chunk diff --git a/zarr/creation.py b/zarr/creation.py index 5b4e97bc70..6fbdaf04c1 100644 --- a/zarr/creation.py +++ b/zarr/creation.py @@ -362,11 +362,26 @@ def array(data, **kwargs): return z -def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None, - compressor='default', fill_value=0, order='C', synchronizer=None, - filters=None, cache_metadata=True, cache_attrs=True, path=None, - object_codec=None, chunk_store=None, storage_options=None, - **kwargs): +def open_array( + store=None, + mode="a", + shape=None, + chunks=True, + dtype=None, + compressor="default", + fill_value=0, + order="C", + synchronizer=None, + filters=None, + cache_metadata=True, + cache_attrs=True, + path=None, + object_codec=None, + chunk_store=None, + storage_options=None, + partial_decompress=False, + **kwargs +): """Open an array using file-mode-like semantics. Parameters @@ -415,6 +430,12 @@ def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None, storage_options : dict If using an fsspec URL to create the store, these will be passed to the backend implementation. Ignored otherwise. + partial_decompress : bool, optional + If True and while the chunk_store is a FSStore and the compresion used + is Blosc, when getting data from the array chunks will be partially + read and decompressed when possible. + + .. versionadded:: 2.7 Returns ------- diff --git a/zarr/errors.py b/zarr/errors.py index dd0d6a08ed..85c28ea8b6 100644 --- a/zarr/errors.py +++ b/zarr/errors.py @@ -15,6 +15,10 @@ def __init__(self, *args): super().__init__(self._msg.format(*args)) +class ArrayIndexError(IndexError): + pass + + class _BaseZarrIndexError(IndexError): _msg = "" diff --git a/zarr/indexing.py b/zarr/indexing.py index 356e7165cb..06b2ad4fbe 100644 --- a/zarr/indexing.py +++ b/zarr/indexing.py @@ -5,7 +5,9 @@ import numpy as np + from zarr.errors import ( + ArrayIndexError, NegativeStepError, err_too_many_indices, VindexInvalidSelectionError, @@ -471,7 +473,7 @@ def __iter__(self): yield ChunkDimProjection(dim_chunk_ix, dim_chunk_sel, dim_out_sel) -def slice_to_range(s, l): # noqa: E741 +def slice_to_range(s: slice, l: int): # noqa: E741 return range(*s.indices(l)) @@ -825,3 +827,116 @@ def pop_fields(selection): selection = tuple(s for s in selection if not isinstance(s, str)) selection = selection[0] if len(selection) == 1 else selection return fields, selection + + +def make_slice_selection(selection): + ls = [] + for dim_selection in selection: + if is_integer(dim_selection): + ls.append(slice(dim_selection, dim_selection + 1, 1)) + elif isinstance(dim_selection, np.ndarray): + if len(dim_selection) == 1: + ls.append(slice(dim_selection[0], dim_selection[0] + 1, 1)) + else: + raise ArrayIndexError() + else: + ls.append(dim_selection) + return ls + + +class PartialChunkIterator(object): + """Iterator to retrieve the specific coordinates of requested data + from within a compressed chunk. + + Parameters + ---------- + selection : tuple + tuple of slice objects to take from the chunk + arr_shape : shape of chunk to select data from + + Attributes + ----------- + arr_shape + selection + + Returns + ------- + Tuple with 3 elements: + + start: int + elements offset in the chunk to read from + nitems: int + number of elements to read in the chunk from start + partial_out_selection: list of slices + indices of a temporary empty array of size `Array._chunks` to assign + the decompressed data to after the partial read. + + Notes + ----- + An array is flattened when compressed with blosc, so this iterator takes + the wanted selection of an array and determines the wanted coordinates + of the flattened, compressed data to be read and then decompressed. The + decompressed data is then placed in a temporary empty array of size + `Array._chunks` at the indices yielded as partial_out_selection. + Once all the slices yielded by this iterator have been read, decompressed + and written to the temporary array, the wanted slice of the chunk can be + indexed from the temporary array and written to the out_selection slice + of the out array. + + """ + + def __init__(self, selection, arr_shape): + selection = make_slice_selection(selection) + self.arr_shape = arr_shape + + # number of selection dimensions can't be greater than the number of chunk dimensions + if len(selection) > len(self.arr_shape): + raise ValueError( + "Selection has more dimensions then the array:\n" + f"selection dimensions = {len(selection)}\n" + f"array dimensions = {len(self.arr_shape)}" + ) + + # any selection can not be out of the range of the chunk + selection_shape = np.empty(self.arr_shape)[tuple(selection)].shape + if any( + [ + selection_dim < 0 or selection_dim > arr_dim + for selection_dim, arr_dim in zip(selection_shape, self.arr_shape) + ] + ): + raise IndexError( + "a selection index is out of range for the dimension" + ) # pragma: no cover + + for i, dim_size in enumerate(self.arr_shape[::-1]): + index = len(self.arr_shape) - (i + 1) + if index <= len(selection) - 1: + slice_size = selection_shape[index] + if slice_size == dim_size and index > 0: + selection.pop() + else: + break + + chunk_loc_slices = [] + last_dim_slice = None if selection[-1].step > 1 else selection.pop() + for arr_shape_i, sl in zip(arr_shape, selection): + dim_chunk_loc_slices = [] + assert isinstance(sl, slice) + for x in slice_to_range(sl, arr_shape_i): + dim_chunk_loc_slices.append(slice(x, x + 1, 1)) + chunk_loc_slices.append(dim_chunk_loc_slices) + if last_dim_slice: + chunk_loc_slices.append([last_dim_slice]) + self.chunk_loc_slices = list(itertools.product(*chunk_loc_slices)) + + def __iter__(self): + chunk1 = self.chunk_loc_slices[0] + nitems = (chunk1[-1].stop - chunk1[-1].start) * np.prod( + self.arr_shape[len(chunk1) :], dtype=int + ) + for partial_out_selection in self.chunk_loc_slices: + start = 0 + for i, sl in enumerate(partial_out_selection): + start += sl.start * np.prod(self.arr_shape[i + 1 :], dtype=int) + yield start, nitems, partial_out_selection diff --git a/zarr/tests/test_core.py b/zarr/tests/test_core.py index eb11e4a097..229b61aa5b 100644 --- a/zarr/tests/test_core.py +++ b/zarr/tests/test_core.py @@ -18,11 +18,20 @@ from zarr.core import Array from zarr.meta import json_loads from zarr.n5 import N5Store, n5_keywords -from zarr.storage import (ABSStore, DBMStore, DirectoryStore, LMDBStore, - LRUStoreCache, NestedDirectoryStore, SQLiteStore, - FSStore, atexit_rmglob, atexit_rmtree, init_array, - init_group - ) +from zarr.storage import ( + ABSStore, + DBMStore, + DirectoryStore, + LMDBStore, + LRUStoreCache, + NestedDirectoryStore, + SQLiteStore, + FSStore, + atexit_rmglob, + atexit_rmtree, + init_array, + init_group, +) from zarr.util import buffer_size from zarr.tests.util import skip_test_env_var, have_fsspec @@ -2376,7 +2385,6 @@ def test_store_has_bytes_values(self): @pytest.mark.skipif(have_fsspec is False, reason="needs fsspec") class TestArrayWithFSStore(TestArray): - @staticmethod def create_array(read_only=False, **kwargs): path = mkdtemp() @@ -2411,3 +2419,78 @@ def test_hexdigest(self): z = self.create_array(shape=(1050,), chunks=100, dtype=' start_byte].min() + length = stop_byte - start_byte + data_buff = self.fs.read_block(self.key_path, start_byte, length) + self.buff[start_byte:stop_byte] = data_buff + self.read_blocks.add(start_block) + if wanted_decompressed == 0: + wanted_decompressed += ((start_block + 1) * self.n_per_block) - start + else: + wanted_decompressed += self.n_per_block + start_block += 1 + + def read_full(self): + return self.chunk_store[self.store_key]