From 4c8f94ff4e3c8c8d06f8379e11fe9b0414d2733d Mon Sep 17 00:00:00 2001 From: Thomas Robitaille Date: Sat, 8 Aug 2020 10:13:58 +0100 Subject: [PATCH 1/6] Added rechunk method to DaskSpectralCube --- spectral_cube/dask_spectral_cube.py | 33 +++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/spectral_cube/dask_spectral_cube.py b/spectral_cube/dask_spectral_cube.py index 6af1b4ad8..132f15b7b 100644 --- a/spectral_cube/dask_spectral_cube.py +++ b/spectral_cube/dask_spectral_cube.py @@ -310,6 +310,39 @@ def _get_filled_data(self, view=(), fill=np.nan, check_endian=None, use_memmap=N else: return da.from_array(FilledArrayHandler(self, fill=fill), name='FilledArrayHandler ' + str(uuid.uuid4()), chunks=data.chunksize)[view] + @add_save_to_tmp_dir_option + def rechunk(self, chunks='auto', threshold=None, block_size_limit=None, + **kwargs): + """ + Rechunk the underlying dask array and return a new cube. + + Parameters + ---------- + chunks: int, tuple, dict or str, optional + The new block dimensions to create. -1 indicates the full size of the + corresponding dimension. Default is "auto" which automatically + determines chunk sizes. + threshold: int, optional + The graph growth factor under which we don't bother introducing an + intermediate step. + block_size_limit: int, optional + The maximum block size (in bytes) we want to produce + Defaults to the dask configuration value ``array.chunk-size`` + save_to_tmp_dir : bool + If `True`, the rechunking will be carried out straight away and + saved to a temporary directory. This can improve performance, + especially if carrying out several operations sequentially. If + `False`, the rechunking is added as a step in the dask tree. + kwargs + Additional keyword arguments are passed to the dask rechunk method. + """ + + newdata = data.rechunk(chunks=chunks, + threshold=threshold, + block_size_limit=block_size_limit) + + return self._new_cube_with(data=newdata) + @add_save_to_tmp_dir_option @projection_if_needed def apply_function(self, function, axis=None, unit=None, From 71af2aaec9310ec999879cffa4a42270413799b0 Mon Sep 17 00:00:00 2001 From: Thomas Robitaille Date: Sat, 8 Aug 2020 10:33:08 +0100 Subject: [PATCH 2/6] Fix implementation of rechunk and added docs --- docs/dask.rst | 31 +++++++++++++++++++++++++++-- spectral_cube/dask_spectral_cube.py | 14 +++++++++---- 2 files changed, 39 insertions(+), 6 deletions(-) diff --git a/docs/dask.rst b/docs/dask.rst index f7f15d173..4511d824a 100644 --- a/docs/dask.rst +++ b/docs/dask.rst @@ -17,7 +17,7 @@ To read in a FITS cube using the dask-enabled classes, you can do:: >>> fn = data.get_pkg_data_filename('tests/data/example_cube.fits', 'spectral_cube') >>> cube = SpectralCube.read(fn, use_dask=True) >>> cube - DaskSpectralCube with shape=(7, 4, 3) and unit=Jy / beam: + DaskSpectralCube with shape=(7, 4, 3) and unit=Jy / beam and chunk size (7, 4, 3): n_x: 3 type_x: RA---ARC unit_x: deg range: 52.231466 deg: 52.231544 deg n_y: 4 type_y: DEC--ARC unit_y: deg range: 31.243639 deg: 31.243739 deg n_s: 7 type_s: VRAD unit_s: m / s range: 14322.821 m / s: 14944.909 m / s @@ -81,7 +81,7 @@ it is stored as a `zarr `_ dataset):: >>> cube_new = cube.sigma_clip_spectrally(3, save_to_tmp_dir=True) # doctest: +IGNORE_OUTPUT [########################################] | 100% Completed | 0.1s >>> cube_new - DaskSpectralCube with shape=(7, 4, 3) and unit=Jy / beam: + DaskSpectralCube with shape=(7, 4, 3) and unit=Jy / beam and chunk size (7, 4, 3): n_x: 3 type_x: RA---ARC unit_x: deg range: 52.231466 deg: 52.231544 deg n_y: 4 type_y: DEC--ARC unit_y: deg range: 31.243639 deg: 31.243739 deg n_s: 7 type_s: VRAD unit_s: m / s range: 14322.821 m / s: 14944.909 m / s @@ -92,6 +92,33 @@ installed. This can also be beneficial if you are using multiprocessing or multithreading to carry out calculations, because zarr works nicely with disk access from different threads and processes. +Rechunking data +--------------- + +In some cases, the way the data is chunked on disk may be inefficient (for example large CASA +datasets may be chunked into tens of thousands of blocks, which may make dask operations slow due to +the size of the tree. To get around this, you can use the :meth:`~spectral_cube.DaskSpectralCube.rechunk` +metho with the ``save_to_tmp_dir`` option mentioned above, which will rechunk the data to disk and +make subsequent operations more efficient - either by letting dask choose the new chunk size:: + + >>> cube_new = cube.rechunk(save_to_tmp_dir=True) # doctest: +IGNORE_OUTPUT + [########################################] | 100% Completed | 0.1s + >>> cube_new + DaskSpectralCube with shape=(7, 4, 3) and unit=Jy / beam and chunk size (7, 4, 3): + n_x: 3 type_x: RA---ARC unit_x: deg range: 52.231466 deg: 52.231544 deg + n_y: 4 type_y: DEC--ARC unit_y: deg range: 31.243639 deg: 31.243739 deg + n_s: 7 type_s: VRAD unit_s: m / s range: 14322.821 m / s: 14944.909 m / s + +or by specifying it explicitly:: + + >>> cube_new = cube.rechunk(chunks=(2, 2, 2), save_to_tmp_dir=True) # doctest: +IGNORE_OUTPUT + [########################################] | 100% Completed | 0.1s + >>> cube_new + DaskSpectralCube with shape=(7, 4, 3) and unit=Jy / beam and chunk size (2, 2, 2): + n_x: 3 type_x: RA---ARC unit_x: deg range: 52.231466 deg: 52.231544 deg + n_y: 4 type_y: DEC--ARC unit_y: deg range: 31.243639 deg: 31.243739 deg + n_s: 7 type_s: VRAD unit_s: m / s range: 14322.821 m / s: 14944.909 m / s + Performance benefits of dask classes ------------------------------------ diff --git a/spectral_cube/dask_spectral_cube.py b/spectral_cube/dask_spectral_cube.py index 132f15b7b..1556f5584 100644 --- a/spectral_cube/dask_spectral_cube.py +++ b/spectral_cube/dask_spectral_cube.py @@ -310,7 +310,13 @@ def _get_filled_data(self, view=(), fill=np.nan, check_endian=None, use_memmap=N else: return da.from_array(FilledArrayHandler(self, fill=fill), name='FilledArrayHandler ' + str(uuid.uuid4()), chunks=data.chunksize)[view] - @add_save_to_tmp_dir_option + def __repr__(self): + default_repr = super().__repr__() + lines = default_repr.splitlines() + lines[0] = lines[0][:-1] + ' and chunk size {0}:'.format(self._data.chunksize) + return '\n'.join(lines) + + @add_save_to_tmp_dir_option def rechunk(self, chunks='auto', threshold=None, block_size_limit=None, **kwargs): """ @@ -337,9 +343,9 @@ def rechunk(self, chunks='auto', threshold=None, block_size_limit=None, Additional keyword arguments are passed to the dask rechunk method. """ - newdata = data.rechunk(chunks=chunks, - threshold=threshold, - block_size_limit=block_size_limit) + newdata = self._data.rechunk(chunks=chunks, + threshold=threshold, + block_size_limit=block_size_limit) return self._new_cube_with(data=newdata) From 3cf73193c38296a611363279937602b52ae02595 Mon Sep 17 00:00:00 2001 From: Thomas Robitaille Date: Sat, 8 Aug 2020 10:37:28 +0100 Subject: [PATCH 3/6] Added unit test --- spectral_cube/tests/test_dask.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/spectral_cube/tests/test_dask.py b/spectral_cube/tests/test_dask.py index a7a9d001f..45d96e8a3 100644 --- a/spectral_cube/tests/test_dask.py +++ b/spectral_cube/tests/test_dask.py @@ -49,3 +49,12 @@ def test_save_to_tmp_dir(data_adv): # The following test won't necessarily always work in future since the name # is not really guaranteed, but this is pragmatic enough for now assert cube_new._data.name.startswith('from-zarr') + + +def test_rechunk(data_adv): + cube = DaskSpectralCube.read(data_adv) + assert cube._data.chunksize == (4, 3, 2) + cube_new = cube.rechunk(chunks=(1, 2, 3)) + # note last element is 2 because the chunk size we asked for + # is larger than cube - this is fine and deliberate in this test + assert cube_new._data.chunksize == (1, 2, 2) From 3ab1a77137f599858e44edea3dd535ff83abc854 Mon Sep 17 00:00:00 2001 From: Thomas Robitaille Date: Sat, 8 Aug 2020 10:38:53 +0100 Subject: [PATCH 4/6] Expand docs --- docs/dask.rst | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/dask.rst b/docs/dask.rst index 4511d824a..65c82b907 100644 --- a/docs/dask.rst +++ b/docs/dask.rst @@ -98,7 +98,7 @@ Rechunking data In some cases, the way the data is chunked on disk may be inefficient (for example large CASA datasets may be chunked into tens of thousands of blocks, which may make dask operations slow due to the size of the tree. To get around this, you can use the :meth:`~spectral_cube.DaskSpectralCube.rechunk` -metho with the ``save_to_tmp_dir`` option mentioned above, which will rechunk the data to disk and +method with the ``save_to_tmp_dir`` option mentioned above, which will rechunk the data to disk and make subsequent operations more efficient - either by letting dask choose the new chunk size:: >>> cube_new = cube.rechunk(save_to_tmp_dir=True) # doctest: +IGNORE_OUTPUT @@ -119,6 +119,10 @@ or by specifying it explicitly:: n_y: 4 type_y: DEC--ARC unit_y: deg range: 31.243639 deg: 31.243739 deg n_s: 7 type_s: VRAD unit_s: m / s range: 14322.821 m / s: 14944.909 m / s +While the :meth:`~spectral_cube.DaskSpectralCube.rechunk` method can be used without +the ``save_to_tmp_dir=True`` option, which then just adds the rechunking to the dask tree, +doing so is unlikely to lead in performance gains. + Performance benefits of dask classes ------------------------------------ From 460568c1f7f27aef9639af4526a440c07e2a6697 Mon Sep 17 00:00:00 2001 From: Thomas Robitaille Date: Tue, 11 Aug 2020 08:11:22 +0100 Subject: [PATCH 5/6] Fix typo Co-authored-by: Adam Ginsburg --- docs/dask.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dask.rst b/docs/dask.rst index 65c82b907..d8a56a8ab 100644 --- a/docs/dask.rst +++ b/docs/dask.rst @@ -96,7 +96,7 @@ Rechunking data --------------- In some cases, the way the data is chunked on disk may be inefficient (for example large CASA -datasets may be chunked into tens of thousands of blocks, which may make dask operations slow due to +datasets may be chunked into tens of thousands of blocks), which may make dask operations slow due to the size of the tree. To get around this, you can use the :meth:`~spectral_cube.DaskSpectralCube.rechunk` method with the ``save_to_tmp_dir`` option mentioned above, which will rechunk the data to disk and make subsequent operations more efficient - either by letting dask choose the new chunk size:: From dd1f17db296340b0e6e7b90a9a3fdf8a85d079d7 Mon Sep 17 00:00:00 2001 From: Thomas Robitaille Date: Tue, 11 Aug 2020 08:19:26 +0100 Subject: [PATCH 6/6] Expanded documentation --- docs/dask.rst | 15 +++++++++++++++ spectral_cube/dask_spectral_cube.py | 11 ++++++++--- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/docs/dask.rst b/docs/dask.rst index d8a56a8ab..b0fb39d4d 100644 --- a/docs/dask.rst +++ b/docs/dask.rst @@ -123,6 +123,21 @@ While the :meth:`~spectral_cube.DaskSpectralCube.rechunk` method can be used wit the ``save_to_tmp_dir=True`` option, which then just adds the rechunking to the dask tree, doing so is unlikely to lead in performance gains. +A common scenario for rechunking is if you plan to do mostly operations that +collapse along the spectral axis, for example computing moment maps. In this +case you can use:: + + >>> cube_new = cube.rechunk(chunks=(-1, 'auto', 'auto'), save_to_tmp_dir=True) # doctest: +IGNORE_OUTPUT + [########################################] | 100% Completed | 0.1s + +which will rechunk the data into cubes that span the full spectral axis but will be +chunked in the image plane. And a complementary case is if you plan to do operations +to each image plane, such as spatial convolution, in which case you can divide the +data into spectral chunks that span the whole of the image dimensions:: + + >>> cube_new = cube.rechunk(chunks=('auto', -1, -1), save_to_tmp_dir=True) # doctest: +IGNORE_OUTPUT + [########################################] | 100% Completed | 0.1s + Performance benefits of dask classes ------------------------------------ diff --git a/spectral_cube/dask_spectral_cube.py b/spectral_cube/dask_spectral_cube.py index 1556f5584..0f133abd4 100644 --- a/spectral_cube/dask_spectral_cube.py +++ b/spectral_cube/dask_spectral_cube.py @@ -322,12 +322,17 @@ def rechunk(self, chunks='auto', threshold=None, block_size_limit=None, """ Rechunk the underlying dask array and return a new cube. + For more details about the parameters below, see the dask documentation + about `rechunking `_. + Parameters ---------- chunks: int, tuple, dict or str, optional - The new block dimensions to create. -1 indicates the full size of the - corresponding dimension. Default is "auto" which automatically - determines chunk sizes. + The new block dimensions to create. -1 indicates the full size of + the corresponding dimension. Default is "auto" which automatically + determines chunk sizes. This can also be a tuple with a different + value along each dimension - for example if computing moment maps, + you could use e.g. ``chunks=(-1, 'auto', 'auto')`` threshold: int, optional The graph growth factor under which we don't bother introducing an intermediate step.