From 766da3480f50d7672fe1a7c1cdf3aa32d8181fcf Mon Sep 17 00:00:00 2001 From: Tom Nicholas Date: Mon, 18 Dec 2023 14:30:18 -0500 Subject: [PATCH] Generalize cumulative reduction (scan) to non-dask types (#8019) * add scan to ChunkManager ABC * implement scan for dask using cumreduction * generalize push to work for non-dask chunked arrays * whatsnew * fix importerror * Allow arbitrary kwargs Co-authored-by: Deepak Cherian * Type hint return value of T_ChunkedArray Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com> * Type hint return value of Dask array * ffill -> bfill in doc/whats-new.rst Co-authored-by: Deepak Cherian * hopefully fix docs warning --------- Co-authored-by: Deepak Cherian Co-authored-by: Illviljan <14371165+Illviljan@users.noreply.github.com> --- doc/whats-new.rst | 4 ++++ xarray/core/daskmanager.py | 22 +++++++++++++++++++++ xarray/core/parallelcompat.py | 37 +++++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 4188af98e3f..c0917b7443b 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -589,6 +589,10 @@ Internal Changes - :py:func:`as_variable` now consistently includes the variable name in any exceptions raised. (:pull:`7995`). By `Peter Hill `_ +- Redirect cumulative reduction functions internally through the :py:class:`ChunkManagerEntryPoint`, + potentially allowing :py:meth:`~xarray.DataArray.ffill` and :py:meth:`~xarray.DataArray.bfill` to + use non-dask chunked array types. + (:pull:`8019`) By `Tom Nicholas `_. - :py:func:`encode_dataset_coordinates` now sorts coordinates automatically assigned to `coordinates` attributes during serialization (:issue:`8026`, :pull:`8034`). `By Ian Carroll `_. diff --git a/xarray/core/daskmanager.py b/xarray/core/daskmanager.py index 56d8dc9e23a..efa04bc3df2 100644 --- a/xarray/core/daskmanager.py +++ b/xarray/core/daskmanager.py @@ -97,6 +97,28 @@ def reduction( keepdims=keepdims, ) + def scan( + self, + func: Callable, + binop: Callable, + ident: float, + arr: T_ChunkedArray, + axis: int | None = None, + dtype: np.dtype | None = None, + **kwargs, + ) -> DaskArray: + from dask.array.reductions import cumreduction + + return cumreduction( + func, + binop, + ident, + arr, + axis=axis, + dtype=dtype, + **kwargs, + ) + def apply_gufunc( self, func: Callable, diff --git a/xarray/core/parallelcompat.py b/xarray/core/parallelcompat.py index 333059e00ae..37542925dde 100644 --- a/xarray/core/parallelcompat.py +++ b/xarray/core/parallelcompat.py @@ -403,6 +403,43 @@ def reduction( """ raise NotImplementedError() + def scan( + self, + func: Callable, + binop: Callable, + ident: float, + arr: T_ChunkedArray, + axis: int | None = None, + dtype: np.dtype | None = None, + **kwargs, + ) -> T_ChunkedArray: + """ + General version of a 1D scan, also known as a cumulative array reduction. + + Used in ``ffill`` and ``bfill`` in xarray. + + Parameters + ---------- + func: callable + Cumulative function like np.cumsum or np.cumprod + binop: callable + Associated binary operator like ``np.cumsum->add`` or ``np.cumprod->mul`` + ident: Number + Associated identity like ``np.cumsum->0`` or ``np.cumprod->1`` + arr: dask Array + axis: int, optional + dtype: dtype + + Returns + ------- + Chunked array + + See also + -------- + dask.array.cumreduction + """ + raise NotImplementedError() + @abstractmethod def apply_gufunc( self,