-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
dask.rst
577 lines (395 loc) · 22.6 KB
/
dask.rst
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
.. currentmodule:: xarray
.. _dask:
Parallel computing with Dask
============================
Xarray integrates with `Dask <https://dask.org/>`__ to support parallel
computations and streaming computation on datasets that don't fit into memory.
Currently, Dask is an entirely optional feature for xarray. However, the
benefits of using Dask are sufficiently strong that Dask may become a required
dependency in a future version of xarray.
For a full example of how to use xarray's Dask integration, read the
`blog post introducing xarray and Dask`_. More up-to-date examples
may be found at the `Pangeo project's gallery <http://gallery.pangeo.io/>`_
and at the `Dask examples website <https://examples.dask.org/xarray.html>`_.
.. _blog post introducing xarray and Dask: https://stephanhoyer.com/2015/06/11/xray-dask-out-of-core-labeled-arrays/
What is a Dask array?
---------------------
.. image:: ../_static/dask_array.png
:width: 40 %
:align: right
:alt: A Dask array
Dask divides arrays into many small pieces, called *chunks*, each of which is
presumed to be small enough to fit into memory.
Unlike NumPy, which has eager evaluation, operations on Dask arrays are lazy.
Operations queue up a series of tasks mapped over blocks, and no computation is
performed until you actually ask values to be computed (e.g., to print results
to your screen or write to disk). At that point, data is loaded into memory
and computation proceeds in a streaming fashion, block-by-block.
The actual computation is controlled by a multi-processing or thread pool,
which allows Dask to take full advantage of multiple processors available on
most modern computers.
For more details, read the `Dask documentation <https://docs.dask.org/>`__.
Note that xarray only makes use of ``dask.array`` and ``dask.delayed``.
.. _dask.io:
Reading and writing data
------------------------
The usual way to create a ``Dataset`` filled with Dask arrays is to load the
data from a netCDF file or files. You can do this by supplying a ``chunks``
argument to :py:func:`~xarray.open_dataset` or using the
:py:func:`~xarray.open_mfdataset` function.
.. ipython:: python
:suppress:
import os
import numpy as np
import pandas as pd
import xarray as xr
np.random.seed(123456)
np.set_printoptions(precision=3, linewidth=100, threshold=100, edgeitems=3)
ds = xr.Dataset(
{
"temperature": (
("time", "latitude", "longitude"),
np.random.randn(30, 180, 180),
),
"time": pd.date_range("2015-01-01", periods=30),
"longitude": np.arange(180),
"latitude": np.arange(89.5, -90.5, -1),
}
)
ds.to_netcdf("example-data.nc")
.. ipython:: python
ds = xr.open_dataset("example-data.nc", chunks={"time": 10})
ds
In this example ``latitude`` and ``longitude`` do not appear in the ``chunks``
dict, so only one chunk will be used along those dimensions. It is also
entirely equivalent to opening a dataset using :py:func:`~xarray.open_dataset`
and then chunking the data using the ``chunk`` method, e.g.,
``xr.open_dataset('example-data.nc').chunk({'time': 10})``.
To open multiple files simultaneously in parallel using Dask delayed,
use :py:func:`~xarray.open_mfdataset`::
xr.open_mfdataset('my/files/*.nc', parallel=True)
This function will automatically concatenate and merge datasets into one in
the simple cases that it understands (see :py:func:`~xarray.combine_by_coords`
for the full disclaimer). By default, :py:func:`~xarray.open_mfdataset` will chunk each
netCDF file into a single Dask array; again, supply the ``chunks`` argument to
control the size of the resulting Dask arrays. In more complex cases, you can
open each file individually using :py:func:`~xarray.open_dataset` and merge the result, as
described in :ref:`combining data`. Passing the keyword argument ``parallel=True`` to
:py:func:`~xarray.open_mfdataset` will speed up the reading of large multi-file datasets by
executing those read tasks in parallel using ``dask.delayed``.
.. warning::
:py:func:`~xarray.open_mfdataset` called without ``chunks`` argument will return
dask arrays with chunk sizes equal to the individual files. Re-chunking
the dataset after creation with ``ds.chunk()`` will lead to an ineffective use of
memory and is not recommended.
You'll notice that printing a dataset still shows a preview of array values,
even if they are actually Dask arrays. We can do this quickly with Dask because
we only need to compute the first few values (typically from the first block).
To reveal the true nature of an array, print a DataArray:
.. ipython:: python
ds.temperature
Once you've manipulated a Dask array, you can still write a dataset too big to
fit into memory back to disk by using :py:meth:`~xarray.Dataset.to_netcdf` in the
usual way.
.. ipython:: python
ds.to_netcdf("manipulated-example-data.nc")
By setting the ``compute`` argument to ``False``, :py:meth:`~xarray.Dataset.to_netcdf`
will return a ``dask.delayed`` object that can be computed later.
.. ipython:: python
from dask.diagnostics import ProgressBar
# or distributed.progress when using the distributed scheduler
delayed_obj = ds.to_netcdf("manipulated-example-data.nc", compute=False)
with ProgressBar():
results = delayed_obj.compute()
.. ipython:: python
:suppress:
os.remove("manipulated-example-data.nc") # Was not opened.
.. note::
When using Dask's distributed scheduler to write NETCDF4 files,
it may be necessary to set the environment variable `HDF5_USE_FILE_LOCKING=FALSE`
to avoid competing locks within the HDF5 SWMR file locking scheme. Note that
writing netCDF files with Dask's distributed scheduler is only supported for
the `netcdf4` backend.
A dataset can also be converted to a Dask DataFrame using :py:meth:`~xarray.Dataset.to_dask_dataframe`.
.. ipython:: python
:okwarning:
df = ds.to_dask_dataframe()
df
Dask DataFrames do not support multi-indexes so the coordinate variables from the dataset are included as columns in the Dask DataFrame.
Using Dask with xarray
----------------------
Nearly all existing xarray methods (including those for indexing, computation,
concatenating and grouped operations) have been extended to work automatically
with Dask arrays. When you load data as a Dask array in an xarray data
structure, almost all xarray operations will keep it as a Dask array; when this
is not possible, they will raise an exception rather than unexpectedly loading
data into memory. Converting a Dask array into memory generally requires an
explicit conversion step. One notable exception is indexing operations: to
enable label based indexing, xarray will automatically load coordinate labels
into memory.
.. tip::
By default, dask uses its multi-threaded scheduler, which distributes work across
multiple cores and allows for processing some datasets that do not fit into memory.
For running across a cluster, `setup the distributed scheduler <https://docs.dask.org/en/latest/setup.html>`_.
The easiest way to convert an xarray data structure from lazy Dask arrays into
*eager*, in-memory NumPy arrays is to use the :py:meth:`~xarray.Dataset.load` method:
.. ipython:: python
ds.load()
You can also access :py:attr:`~xarray.DataArray.values`, which will always be a
NumPy array:
.. ipython::
:verbatim:
In [5]: ds.temperature.values
Out[5]:
array([[[ 4.691e-01, -2.829e-01, ..., -5.577e-01, 3.814e-01],
[ 1.337e+00, -1.531e+00, ..., 8.726e-01, -1.538e+00],
...
# truncated for brevity
Explicit conversion by wrapping a DataArray with ``np.asarray`` also works:
.. ipython::
:verbatim:
In [5]: np.asarray(ds.temperature)
Out[5]:
array([[[ 4.691e-01, -2.829e-01, ..., -5.577e-01, 3.814e-01],
[ 1.337e+00, -1.531e+00, ..., 8.726e-01, -1.538e+00],
...
Alternatively you can load the data into memory but keep the arrays as
Dask arrays using the :py:meth:`~xarray.Dataset.persist` method:
.. ipython:: python
persisted = ds.persist()
:py:meth:`~xarray.Dataset.persist` is particularly useful when using a
distributed cluster because the data will be loaded into distributed memory
across your machines and be much faster to use than reading repeatedly from
disk.
.. warning::
On a single machine :py:meth:`~xarray.Dataset.persist` will try to load all of
your data into memory. You should make sure that your dataset is not larger than
available memory.
.. note::
For more on the differences between :py:meth:`~xarray.Dataset.persist` and
:py:meth:`~xarray.Dataset.compute` see this `Stack Overflow answer on the differences between client persist and client compute <https://stackoverflow.com/questions/41806850/dask-difference-between-client-persist-and-client-compute>`_ and the `Dask documentation <https://distributed.dask.org/en/latest/manage-computation.html#dask-collections-to-futures>`_.
For performance you may wish to consider chunk sizes. The correct choice of
chunk size depends both on your data and on the operations you want to perform.
With xarray, both converting data to a Dask arrays and converting the chunk
sizes of Dask arrays is done with the :py:meth:`~xarray.Dataset.chunk` method:
.. ipython:: python
rechunked = ds.chunk({"latitude": 100, "longitude": 100})
.. warning::
Rechunking an existing dask array created with :py:func:`~xarray.open_mfdataset`
is not recommended (see above).
You can view the size of existing chunks on an array by viewing the
:py:attr:`~xarray.Dataset.chunks` attribute:
.. ipython:: python
rechunked.chunks
If there are not consistent chunksizes between all the arrays in a dataset
along a particular dimension, an exception is raised when you try to access
``.chunks``.
.. note::
In the future, we would like to enable automatic alignment of Dask
chunksizes (but not the other way around). We might also require that all
arrays in a dataset share the same chunking alignment. Neither of these
are currently done.
NumPy ufuncs like ``np.sin`` transparently work on all xarray objects, including those
that store lazy Dask arrays:
.. ipython:: python
import numpy as np
np.sin(rechunked)
To access Dask arrays directly, use the
:py:attr:`DataArray.data <xarray.DataArray.data>` attribute. This attribute exposes
array data either as a Dask array or as a NumPy array, depending on whether it has been
loaded into Dask or not:
.. ipython:: python
ds.temperature.data
.. note::
``.data`` is also used to expose other "computable" array backends beyond Dask and
NumPy (e.g. sparse and pint arrays).
.. _dask.automatic-parallelization:
Automatic parallelization with ``apply_ufunc`` and ``map_blocks``
-----------------------------------------------------------------
Almost all of xarray's built-in operations work on Dask arrays. If you want to
use a function that isn't wrapped by xarray, and have it applied in parallel on
each block of your xarray object, you have three options:
1. Extract Dask arrays from xarray objects (``.data``) and use Dask directly.
2. Use :py:func:`~xarray.apply_ufunc` to apply functions that consume and return NumPy arrays.
3. Use :py:func:`~xarray.map_blocks`, :py:meth:`Dataset.map_blocks` or :py:meth:`DataArray.map_blocks`
to apply functions that consume and return xarray objects.
``apply_ufunc``
~~~~~~~~~~~~~~~
:py:func:`~xarray.apply_ufunc` automates `embarrassingly parallel
<https://en.wikipedia.org/wiki/Embarrassingly_parallel>`__ "map" type operations
where a function written for processing NumPy arrays should be repeatedly
applied to xarray objects containing Dask arrays. It works similarly to
:py:func:`dask.array.map_blocks` and :py:func:`dask.array.blockwise`, but without
requiring an intermediate layer of abstraction.
For the best performance when using Dask's multi-threaded scheduler, wrap a
function that already releases the global interpreter lock, which fortunately
already includes most NumPy and Scipy functions. Here we show an example
using NumPy operations and a fast function from
`bottleneck <https://github.com/pydata/bottleneck>`__, which
we use to calculate `Spearman's rank-correlation coefficient <https://en.wikipedia.org/wiki/Spearman%27s_rank_correlation_coefficient>`__:
.. code-block:: python
import numpy as np
import xarray as xr
import bottleneck
def covariance_gufunc(x, y):
return (
(x - x.mean(axis=-1, keepdims=True)) * (y - y.mean(axis=-1, keepdims=True))
).mean(axis=-1)
def pearson_correlation_gufunc(x, y):
return covariance_gufunc(x, y) / (x.std(axis=-1) * y.std(axis=-1))
def spearman_correlation_gufunc(x, y):
x_ranks = bottleneck.rankdata(x, axis=-1)
y_ranks = bottleneck.rankdata(y, axis=-1)
return pearson_correlation_gufunc(x_ranks, y_ranks)
def spearman_correlation(x, y, dim):
return xr.apply_ufunc(
spearman_correlation_gufunc,
x,
y,
input_core_dims=[[dim], [dim]],
dask="parallelized",
output_dtypes=[float],
)
The only aspect of this example that is different from standard usage of
``apply_ufunc()`` is that we needed to supply the ``output_dtypes`` arguments.
(Read up on :ref:`comput.wrapping-custom` for an explanation of the
"core dimensions" listed in ``input_core_dims``.)
Our new ``spearman_correlation()`` function achieves near linear speedup
when run on large arrays across the four cores on my laptop. It would also
work as a streaming operation, when run on arrays loaded from disk:
.. ipython::
:verbatim:
In [56]: rs = np.random.RandomState(0)
In [57]: array1 = xr.DataArray(rs.randn(1000, 100000), dims=["place", "time"]) # 800MB
In [58]: array2 = array1 + 0.5 * rs.randn(1000, 100000)
# using one core, on NumPy arrays
In [61]: %time _ = spearman_correlation(array1, array2, 'time')
CPU times: user 21.6 s, sys: 2.84 s, total: 24.5 s
Wall time: 24.9 s
In [8]: chunked1 = array1.chunk({"place": 10})
In [9]: chunked2 = array2.chunk({"place": 10})
# using all my laptop's cores, with Dask
In [63]: r = spearman_correlation(chunked1, chunked2, "time").compute()
In [64]: %time _ = r.compute()
CPU times: user 30.9 s, sys: 1.74 s, total: 32.6 s
Wall time: 4.59 s
One limitation of ``apply_ufunc()`` is that it cannot be applied to arrays with
multiple chunks along a core dimension:
.. ipython::
:verbatim:
In [63]: spearman_correlation(chunked1, chunked2, "place")
ValueError: dimension 'place' on 0th function argument to apply_ufunc with
dask='parallelized' consists of multiple chunks, but is also a core
dimension. To fix, rechunk into a single Dask array chunk along this
dimension, i.e., ``.rechunk({'place': -1})``, but beware that this may
significantly increase memory usage.
This reflects the nature of core dimensions, in contrast to broadcast (non-core)
dimensions that allow operations to be split into arbitrary chunks for
application.
.. tip::
For the majority of NumPy functions that are already wrapped by Dask, it's
usually a better idea to use the pre-existing ``dask.array`` function, by
using either a pre-existing xarray methods or
:py:func:`~xarray.apply_ufunc()` with ``dask='allowed'``. Dask can often
have a more efficient implementation that makes use of the specialized
structure of a problem, unlike the generic speedups offered by
``dask='parallelized'``.
``map_blocks``
~~~~~~~~~~~~~~
Functions that consume and return xarray objects can be easily applied in parallel using :py:func:`map_blocks`.
Your function will receive an xarray Dataset or DataArray subset to one chunk
along each chunked dimension.
.. ipython:: python
ds.temperature
This DataArray has 3 chunks each with length 10 along the time dimension.
At compute time, a function applied with :py:func:`map_blocks` will receive a DataArray corresponding to a single block of shape 10x180x180
(time x latitude x longitude) with values loaded. The following snippet illustrates how to check the shape of the object
received by the applied function.
.. ipython:: python
def func(da):
print(da.sizes)
return da.time
mapped = xr.map_blocks(func, ds.temperature)
mapped
Notice that the :py:meth:`map_blocks` call printed
``Frozen({'time': 0, 'latitude': 0, 'longitude': 0})`` to screen.
``func`` is received 0-sized blocks! :py:meth:`map_blocks` needs to know what the final result
looks like in terms of dimensions, shapes etc. It does so by running the provided function on 0-shaped
inputs (*automated inference*). This works in many cases, but not all. If automatic inference does not
work for your function, provide the ``template`` kwarg (see below).
In this case, automatic inference has worked so let's check that the result is as expected.
.. ipython:: python
mapped.load(scheduler="single-threaded")
mapped.identical(ds.time)
Note that we use ``.load(scheduler="single-threaded")`` to execute the computation.
This executes the Dask graph in `serial` using a for loop, but allows for printing to screen and other
debugging techniques. We can easily see that our function is receiving blocks of shape 10x180x180 and
the returned result is identical to ``ds.time`` as expected.
Here is a common example where automated inference will not work.
.. ipython:: python
:okexcept:
def func(da):
print(da.sizes)
return da.isel(time=[1])
mapped = xr.map_blocks(func, ds.temperature)
``func`` cannot be run on 0-shaped inputs because it is not possible to extract element 1 along a
dimension of size 0. In this case we need to tell :py:func:`map_blocks` what the returned result looks
like using the ``template`` kwarg. ``template`` must be an xarray Dataset or DataArray (depending on
what the function returns) with dimensions, shapes, chunk sizes, attributes, coordinate variables *and* data
variables that look exactly like the expected result. The variables should be dask-backed and hence not
incur much memory cost.
.. note::
Note that when ``template`` is provided, ``attrs`` from ``template`` are copied over to the result. Any
``attrs`` set in ``func`` will be ignored.
.. ipython:: python
template = ds.temperature.isel(time=[1, 11, 21])
mapped = xr.map_blocks(func, ds.temperature, template=template)
Notice that the 0-shaped sizes were not printed to screen. Since ``template`` has been provided
:py:func:`map_blocks` does not need to infer it by running ``func`` on 0-shaped inputs.
.. ipython:: python
mapped.identical(template)
:py:func:`map_blocks` also allows passing ``args`` and ``kwargs`` down to the user function ``func``.
``func`` will be executed as ``func(block_xarray, *args, **kwargs)`` so ``args`` must be a list and ``kwargs`` must be a dictionary.
.. ipython:: python
def func(obj, a, b=0):
return obj + a + b
mapped = ds.map_blocks(func, args=[10], kwargs={"b": 10})
expected = ds + 10 + 10
mapped.identical(expected)
.. ipython:: python
:suppress:
ds.close() # Closes "example-data.nc".
os.remove("example-data.nc")
.. tip::
As :py:func:`map_blocks` loads each block into memory, reduce as much as possible objects consumed by user functions.
For example, drop useless variables before calling ``func`` with :py:func:`map_blocks`.
Chunking and performance
------------------------
The ``chunks`` parameter has critical performance implications when using Dask
arrays. If your chunks are too small, queueing up operations will be extremely
slow, because Dask will translate each operation into a huge number of
operations mapped across chunks. Computation on Dask arrays with small chunks
can also be slow, because each operation on a chunk has some fixed overhead from
the Python interpreter and the Dask task executor.
Conversely, if your chunks are too big, some of your computation may be wasted,
because Dask only computes results one chunk at a time.
A good rule of thumb is to create arrays with a minimum chunksize of at least
one million elements (e.g., a 1000x1000 matrix). With large arrays (10+ GB), the
cost of queueing up Dask operations can be noticeable, and you may need even
larger chunksizes.
.. tip::
Check out the `dask documentation on chunks <https://docs.dask.org/en/latest/array-chunks.html>`_.
Optimization Tips
-----------------
With analysis pipelines involving both spatial subsetting and temporal resampling, Dask performance
can become very slow or memory hungry in certain cases. Here are some optimization tips we have found
through experience:
1. Do your spatial and temporal indexing (e.g. ``.sel()`` or ``.isel()``) early in the pipeline, especially before calling ``resample()`` or ``groupby()``. Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn't been implemented in Dask yet. (See `Dask issue #746 <https://github.com/dask/dask/issues/746>`_).
2. More generally, ``groupby()`` is a costly operation and will perform a lot better if the ``flox`` package is installed.
See the `flox documentation <https://flox.readthedocs.io>`_ for more. By default Xarray will use ``flox`` if installed.
3. Save intermediate results to disk as a netCDF files (using ``to_netcdf()``) and then load them again with ``open_dataset()`` for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, Dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the Dask scheduler, because it tries to keep every chunk of an array that it computes in memory. (See `Dask issue #874 <https://github.com/dask/dask/issues/874>`_)
4. Specify smaller chunks across space when using :py:meth:`~xarray.open_mfdataset` (e.g., ``chunks={'latitude': 10, 'longitude': 10}``). This makes spatial subsetting easier, because there's no risk you will load subsets of data which span multiple chunks. On individual files, prefer to subset before chunking (suggestion 1).
5. Chunk as early as possible, and avoid rechunking as much as possible. Always pass the ``chunks={}`` argument to :py:func:`~xarray.open_mfdataset` to avoid redundant file reads.
6. Using the h5netcdf package by passing ``engine='h5netcdf'`` to :py:meth:`~xarray.open_mfdataset` can be quicker than the default ``engine='netcdf4'`` that uses the netCDF4 package.
7. Find `best practices specific to Dask arrays in the documentation <https://docs.dask.org/en/latest/array-best-practices.html>`_.
8. The `dask diagnostics <https://docs.dask.org/en/latest/understanding-performance.html>`_ can be useful in identifying performance bottlenecks.