Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance issues using map_blocks with cftime indexes. #5701

Closed
aulemahal opened this issue Aug 12, 2021 · 5 comments
Closed

Performance issues using map_blocks with cftime indexes. #5701

aulemahal opened this issue Aug 12, 2021 · 5 comments

Comments

@aulemahal
Copy link
Contributor

What happened:
When using map_blocks on an object that is dask-backed and has a CFTimeIndex coordinate, the construction step (not computation done) is very slow. I've seen up to 100x slower than an equivalent object with a numpy datetime index.

What you expected to happen:
I would understand a performance difference since numpy/pandas objects are usually more optimized than cftime/xarray objects, but the difference is quite large here.

Minimal Complete Verifiable Example:

Here is a MCVE that I ran in a jupyter notebook. Performance is basically measured by execution time (wall time). I included the current workaround I have for my usecase.

import numpy as np
import pandas as pd
import xarray as xr
import dask.array as da
from dask.distributed import Client

c = Client(n_workers=1, threads_per_worker=8)

# Test Data
Nt = 10_000
Nx = Ny = 100
chks = (Nt, 10, 10)

A = xr.DataArray(
    da.zeros((Nt, Ny, Nx), chunks=chks),
    dims=('time', 'y', 'x'),
    coords={'time': pd.date_range('1900-01-01', freq='D', periods=Nt),
            'x': np.arange(Nx),
            'y': np.arange(Ny)
           },
    name='data'
)

# Copy of a, but with a cftime coordinate
B = A.copy()
B['time'] = xr.cftime_range('1900-01-01', freq='D', periods=Nt, calendar='noleap')

#  A dumb function to apply
def func(data):
    return data + data

# Test 1 : numpy-backed time coordinate
%time outA = A.map_blocks(func, template=A)  # 
%time outA.load();
# Res on my machine:
# CPU times: user 130 ms, sys: 6.87 ms, total: 136 ms
# Wall time: 127 ms
# CPU times: user 3.01 s, sys: 8.09 s, total: 11.1 s
# Wall time: 13.4 s


# Test 2 : cftime-backed time coordinate
%time outB = B.map_blocks(func, template=B)
%time outB.load();
# Res on my machine
# CPU times: user 4.42 s, sys: 219 ms, total: 4.64 s
# Wall time: 4.48 s
# CPU times: user 13.2 s, sys: 3.1 s, total: 16.3 s
# Wall time: 26 s


# Workaround in my code
def func_cf(data):
    data['time'] = xr.decode_cf(data.coords.to_dataset()).time
    return data + data

def map_blocks_cf(func, data):
    data2 = data.copy()
    data2['time'] = xr.conventions.encode_cf_variable(data.time)
    return data2.map_blocks(func, template=data)

# Test 3 : cftime time coordinate with encoding-decoding
%time outB2 = map_blocks_cf(func_cf, B)
%time outB2.load();
# Res
# CPU times: user 536 ms, sys: 10.5 ms, total: 546 ms
# Wall time: 528 ms
# CPU times: user 9.57 s, sys: 2.23 s, total: 11.8 s
# Wall time: 21.7 s

Anything else we need to know?:
After exploration I found 2 culprits for this slowness. I used %%prun to profile the construction phase of map_blocks and found that in the second case (cftime time coordinate):

  1. In map_blocks calls to dask.base.tokenize take the most time. Precisely, tokenizing a numpy ndarray of O dtype goes through the pickling process of the array. This is already quite slow and cftime objects take even more time to pickle. See Performance : speeding up pickling of cftime arrays Unidata/cftime#253 for the corresponding issue. Most of the construction phase execution time is spent pickling the same datetime array at least once per chunk.
  2. Second, but only significant when the time coordinate is very large (55000 in my use case). CFTimeIndex.__new__ is called more than twice as many times as there are chunks. And within the object creation there is this line :
    if not all(isinstance(value, date_type) for value in data):

    The larger the array, the more time is spent in this iteration. Changing the example above to use Nt = 50_000, the code spent a total of 25 s in dask.base.tokenize calls and 5 s in CFTimeIndex.__new__ calls.

My workaround is not the best, but it was easy to code without touching xarray. The encoding of the time coordinate changes it to an integer array, which is super fast to tokenize. And the speed up of the construction phase is because there is only one call to encode_cf_variable compared to N_chunks calls to the pickling, As shown above, I have not seen a slowdown in the computation phase. I think this is mostly because the added decode_cf calls are done in parallel, but there might be other reason I do not understand.

I do not know for sure how/why this tokenization works, but I guess the best improvment in xarray could be to:

  • Look into the inputs of map_blocks and spot cftime-backed coordinates
  • Convert those coordinates to a ndarray of a basic dtype.
  • At the moment of tokenization of the time coordinates, do a switheroo and pass the converted arrays instead.

I have no idea if that would work, but if it does that would be the best speed-up I think.

Environment:

Output of xr.show_versions()

INSTALLED VERSIONS

commit: None
python: 3.9.6 | packaged by conda-forge | (default, Jul 11 2021, 03:39:48)
[GCC 9.3.0]
python-bits: 64
OS: Linux
OS-release: 3.10.0-514.2.2.el7.x86_64
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: None
LANG: en_CA.UTF-8
LOCALE: ('en_CA', 'UTF-8')
libhdf5: 1.10.6
libnetcdf: 4.7.4

xarray: 0.19.1.dev18+g4bb9d9c.d20210810
pandas: 1.3.1
numpy: 1.21.1
scipy: 1.7.0
netCDF4: 1.5.6
pydap: None
h5netcdf: None
h5py: None
Nio: None
zarr: 2.8.3
cftime: 1.5.0
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: 1.3.2
dask: 2021.07.1
distributed: 2021.07.1
matplotlib: 3.4.2
cartopy: 0.19.0
seaborn: None
numbagg: None
pint: 0.17
setuptools: 49.6.0.post20210108
pip: 21.2.1
conda: None
pytest: None
IPython: 7.25.0
sphinx: None

@dcherian
Copy link
Contributor

dcherian commented Apr 19, 2022

I think this could be fixed by adding __dask_tokenize__ to CFTimeIndex.

For IndexVariables we pass self._data.array to normalize_token

xarray/xarray/core/variable.py

Lines 2688 to 2692 in ea99445

def __dask_tokenize__(self):
from dask.base import normalize_token
# Don't waste time converting pd.Index to np.ndarray
return normalize_token((type(self), self._dims, self._data.array, self._attrs))

_data.array turns out to be CFTimeIndex on main (so that comment above is wrong). Dask knows how to handle pandas indexes so this isn't usually a problem.

@phofl
Copy link
Contributor

phofl commented Aug 20, 2024

The overhead seems to be gone now, the map_blocks call builds in a few hundred milliseconds for me instead of 5 seconds

@aulemahal
Copy link
Contributor Author

I confirm the same! I don't know what was changed, but with xarray 2024.07.0 and 2024.08.0, the CFTime and Pandas cases yield the same performance.

@dcherian
Copy link
Contributor

presumably dask is now better at tokenizing object arrays?

@phofl
Copy link
Contributor

phofl commented Aug 20, 2024

Yeah we changed a bunch of things there, I haven't dug into the release that actually made it faster though

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants