Skip to content

Commit

Permalink
Merge pull request #654 from astrofrog/rechunk-api
Browse files Browse the repository at this point in the history
Added rechunk method to DaskSpectralCube
  • Loading branch information
keflavich authored Aug 12, 2020
2 parents fddeba3 + dd1f17d commit 1497c07
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 2 deletions.
50 changes: 48 additions & 2 deletions docs/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ To read in a FITS cube using the dask-enabled classes, you can do::
>>> fn = data.get_pkg_data_filename('tests/data/example_cube.fits', 'spectral_cube')
>>> cube = SpectralCube.read(fn, use_dask=True)
>>> cube
DaskSpectralCube with shape=(7, 4, 3) and unit=Jy / beam:
DaskSpectralCube with shape=(7, 4, 3) and unit=Jy / beam and chunk size (7, 4, 3):
n_x: 3 type_x: RA---ARC unit_x: deg range: 52.231466 deg: 52.231544 deg
n_y: 4 type_y: DEC--ARC unit_y: deg range: 31.243639 deg: 31.243739 deg
n_s: 7 type_s: VRAD unit_s: m / s range: 14322.821 m / s: 14944.909 m / s
Expand Down Expand Up @@ -81,7 +81,7 @@ it is stored as a `zarr <https://zarr.readthedocs.io/en/stable/>`_ dataset)::
>>> cube_new = cube.sigma_clip_spectrally(3, save_to_tmp_dir=True) # doctest: +IGNORE_OUTPUT
[########################################] | 100% Completed | 0.1s
>>> cube_new
DaskSpectralCube with shape=(7, 4, 3) and unit=Jy / beam:
DaskSpectralCube with shape=(7, 4, 3) and unit=Jy / beam and chunk size (7, 4, 3):
n_x: 3 type_x: RA---ARC unit_x: deg range: 52.231466 deg: 52.231544 deg
n_y: 4 type_y: DEC--ARC unit_y: deg range: 31.243639 deg: 31.243739 deg
n_s: 7 type_s: VRAD unit_s: m / s range: 14322.821 m / s: 14944.909 m / s
Expand All @@ -92,6 +92,52 @@ installed.
This can also be beneficial if you are using multiprocessing or multithreading to carry out calculations,
because zarr works nicely with disk access from different threads and processes.

Rechunking data
---------------

In some cases, the way the data is chunked on disk may be inefficient (for example large CASA
datasets may be chunked into tens of thousands of blocks), which may make dask operations slow due to
the size of the tree. To get around this, you can use the :meth:`~spectral_cube.DaskSpectralCube.rechunk`
method with the ``save_to_tmp_dir`` option mentioned above, which will rechunk the data to disk and
make subsequent operations more efficient - either by letting dask choose the new chunk size::

>>> cube_new = cube.rechunk(save_to_tmp_dir=True) # doctest: +IGNORE_OUTPUT
[########################################] | 100% Completed | 0.1s
>>> cube_new
DaskSpectralCube with shape=(7, 4, 3) and unit=Jy / beam and chunk size (7, 4, 3):
n_x: 3 type_x: RA---ARC unit_x: deg range: 52.231466 deg: 52.231544 deg
n_y: 4 type_y: DEC--ARC unit_y: deg range: 31.243639 deg: 31.243739 deg
n_s: 7 type_s: VRAD unit_s: m / s range: 14322.821 m / s: 14944.909 m / s

or by specifying it explicitly::

>>> cube_new = cube.rechunk(chunks=(2, 2, 2), save_to_tmp_dir=True) # doctest: +IGNORE_OUTPUT
[########################################] | 100% Completed | 0.1s
>>> cube_new
DaskSpectralCube with shape=(7, 4, 3) and unit=Jy / beam and chunk size (2, 2, 2):
n_x: 3 type_x: RA---ARC unit_x: deg range: 52.231466 deg: 52.231544 deg
n_y: 4 type_y: DEC--ARC unit_y: deg range: 31.243639 deg: 31.243739 deg
n_s: 7 type_s: VRAD unit_s: m / s range: 14322.821 m / s: 14944.909 m / s

While the :meth:`~spectral_cube.DaskSpectralCube.rechunk` method can be used without
the ``save_to_tmp_dir=True`` option, which then just adds the rechunking to the dask tree,
doing so is unlikely to lead in performance gains.

A common scenario for rechunking is if you plan to do mostly operations that
collapse along the spectral axis, for example computing moment maps. In this
case you can use::

>>> cube_new = cube.rechunk(chunks=(-1, 'auto', 'auto'), save_to_tmp_dir=True) # doctest: +IGNORE_OUTPUT
[########################################] | 100% Completed | 0.1s

which will rechunk the data into cubes that span the full spectral axis but will be
chunked in the image plane. And a complementary case is if you plan to do operations
to each image plane, such as spatial convolution, in which case you can divide the
data into spectral chunks that span the whole of the image dimensions::

>>> cube_new = cube.rechunk(chunks=('auto', -1, -1), save_to_tmp_dir=True) # doctest: +IGNORE_OUTPUT
[########################################] | 100% Completed | 0.1s

Performance benefits of dask classes
------------------------------------

Expand Down
44 changes: 44 additions & 0 deletions spectral_cube/dask_spectral_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,50 @@ def _get_filled_data(self, view=(), fill=np.nan, check_endian=None, use_memmap=N
else:
return da.from_array(FilledArrayHandler(self, fill=fill), name='FilledArrayHandler ' + str(uuid.uuid4()), chunks=data.chunksize)[view]

def __repr__(self):
default_repr = super().__repr__()
lines = default_repr.splitlines()
lines[0] = lines[0][:-1] + ' and chunk size {0}:'.format(self._data.chunksize)
return '\n'.join(lines)

@add_save_to_tmp_dir_option
def rechunk(self, chunks='auto', threshold=None, block_size_limit=None,
**kwargs):
"""
Rechunk the underlying dask array and return a new cube.
For more details about the parameters below, see the dask documentation
about `rechunking <https://docs.dask.org/en/latest/array-chunks.html>`_.
Parameters
----------
chunks: int, tuple, dict or str, optional
The new block dimensions to create. -1 indicates the full size of
the corresponding dimension. Default is "auto" which automatically
determines chunk sizes. This can also be a tuple with a different
value along each dimension - for example if computing moment maps,
you could use e.g. ``chunks=(-1, 'auto', 'auto')``
threshold: int, optional
The graph growth factor under which we don't bother introducing an
intermediate step.
block_size_limit: int, optional
The maximum block size (in bytes) we want to produce
Defaults to the dask configuration value ``array.chunk-size``
save_to_tmp_dir : bool
If `True`, the rechunking will be carried out straight away and
saved to a temporary directory. This can improve performance,
especially if carrying out several operations sequentially. If
`False`, the rechunking is added as a step in the dask tree.
kwargs
Additional keyword arguments are passed to the dask rechunk method.
"""

newdata = self._data.rechunk(chunks=chunks,
threshold=threshold,
block_size_limit=block_size_limit)

return self._new_cube_with(data=newdata)

@add_save_to_tmp_dir_option
@projection_if_needed
def apply_function(self, function, axis=None, unit=None,
Expand Down
9 changes: 9 additions & 0 deletions spectral_cube/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,12 @@ def test_save_to_tmp_dir(data_adv):
# The following test won't necessarily always work in future since the name
# is not really guaranteed, but this is pragmatic enough for now
assert cube_new._data.name.startswith('from-zarr')


def test_rechunk(data_adv):
cube = DaskSpectralCube.read(data_adv)
assert cube._data.chunksize == (4, 3, 2)
cube_new = cube.rechunk(chunks=(1, 2, 3))
# note last element is 2 because the chunk size we asked for
# is larger than cube - this is fine and deliberate in this test
assert cube_new._data.chunksize == (1, 2, 2)

0 comments on commit 1497c07

Please sign in to comment.