Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

map_blocks should dispatch to ChunkManager #8545

Open
TomNicholas opened this issue Dec 12, 2023 · 5 comments
Open

map_blocks should dispatch to ChunkManager #8545

TomNicholas opened this issue Dec 12, 2023 · 5 comments
Labels
topic-chunked-arrays Managing different chunked backends, e.g. dask

Comments

@TomNicholas
Copy link
Member

Is your feature request related to a problem?

#7019 generalized most of xarrays internals to be able to use any chunked array type that we can create a ChunkManagerEntrypoint for. Most functions now go through this (e.g. apply_ufunc), but I did not redirect xarray.map_blocks to go through ChunkManagerEntrypoint.

This redirection works by dispatching to high-level dask.array primitives such as dask.array.apply_gufunc, dask.array.blockwise, and dask.array.map_blocks. However the current implementation of xarray.map_blocks is much lower-level, building a custom HLG, so it was not obvious how to swap it out.

Describe the solution you'd like

I would like to either:

  1. Replace the current internals of xarray.map_blocks with a simple call to ChunkManagerEntrypoint.map_blocks. This would be the cleanest separation of concerns we could do here. Presumably there is some obvious reason why this cannot or should not be done, but I have yet to understand what that reason is. (either @dcherian or @tomwhite can you enlighten me perhaps? 🙏)

  2. (More likely) refactor so that the existing guts of xarray.map_blocks are only called from the ChunkManagerEntrypoint, and a non-dask chunked array (i.e. cubed, but in theory other types too) would be able to specify how it wants to perform the map_blocks.

Describe alternatives you've considered

Leaving it as the status quo breaks the nice abstraction and separation of concerns that #7019 introduced.

Additional context

Split off from #8414

@dcherian
Copy link
Contributor

I think at present, map_blocks is fundamentally a different model for computation.

map_blocks splits the Dataset into blocks, applies a function and stitches the blocks back together. It uses dask as a task scheduler, not as an array type. This is an implementation detail. There is some confusion here because the definition of blocks lines up with dask.array chunks, but it need not. It's also possible to implement a streaming serial version of this (using #1093 (comment)). Xarray-beam also implements this model IIUC.

To go through the ChunkManager, we would have to conceptualize map_blocks as creating an object array of Dataset objects and mapping a function on every element. This could be done but would be substantially different from what's done today.

@TomNicholas
Copy link
Member Author

That makes a lot more sense, thanks @dcherian. I see now how this is more similar to xarray-beam's model.

To go through the ChunkManager, we would have to conceptualize map_blocks as creating an object array of Dataset objects and mapping a function on every element.

I suspect we could imagine an alternative implementation of xarray.map_blocks which takes advantage of dask.array.map_blocks accepting multiple arrays, and works more like xarray.apply_ufunc. But that would probably require changing the signature of xarray.map_blocks to accept a func which accepted arrays, not a func which accepted Dataset objects.

I think at present, xarray.map_blocks is fundamentally a different model for computation.

What's the advantage to this over a dask.array.map_blocks implementation? Sounds like we're treating separate blocks of the Dataset objects as basically totally unrelated inputs to a general task parallelization framework. My understanding was that xarray-beam was implemented using this model just because it would have been hard to use beam to parallelize xarray any other way (it would have required an intermediate array layer like cubed before you could dispatch to beam). In the dask case it seems kind of weird because xarray objects normally wrap dask (array) objects, but xarray.map_blocks is implemented by having dask wrap xarray objects... I almost want to suggest that given that difference in wrapping order this current implementation of xarray.map_blocks should live outside of xarray itself, like xarray-beam does (it also seems like you're mostly using public xarray API).

FWIW I don't have particularly strong opinions on anything above, I'm just trying to give some food for thought 🙂

@dcherian
Copy link
Contributor

What's the advantage to this over a dask.array.map_blocks implementation?

One, it requires that the function return arrays. It's nice to get away from the array abstraction for a lot of what map_blocks is used for. We are at present ~8 lines away from as_delayed implementation which would cover about 80% of map_blocks workloads IMO, and be a lot easier to use.

having dask wrap xarray objects

dask isn't wrapping anything (it would if you were to implement as an object array of tuples of Datasets). It's executing tasks that create Xarray objects and passing them to the UDF, then taking the return value.

The final step is when we extract dask arrays from the return value and construct the single Dataset output.

require changing the signature of xarray.map_blocks to accept a func which accepted arrays, not a func which accepted Dataset objects.

Accepting Datasets (or DataArrays) is the whole point of map_blocks! If not, how would it be different from apply_ufunc(..., dask="parallelized"). But I agree though, there seems to be an apply_over_blocks that generalizes this model, and could be executed in a variety of ways.

@TomNicholas
Copy link
Member Author

One, it requires that the function return arrays.

Accepting Datasets (or DataArrays) is the whole point of map_blocks! If not, how would it be different from apply_ufunc(..., dask="parallelized")

Fair point! That is much nicer from a user perspective.

dask isn't wrapping anything

Right yeah sure, I should have said executing instead of wrapping. My point is just that dask is being used in xarray.map_blocks in a pretty different way from how we use it in the rest of xarray (i.e. as high-level task orchestration rather than a parallelized array type, as you said), and that incongruity makes me wonder about what the right design decisions are here.

But I agree though, there seems to be an apply_over_blocks that generalizes this model, and could be executed in a variety of ways.

Yeah it feels like this is missing from our API. But are there operations that could be expressed using map_over_blocks which couldn't be using xarray.map_blocks or vice versa?

@TomNicholas
Copy link
Member Author

We are at present ~8 lines away from as_delayed implementation which would cover about 80% of map_blocks workloads IMO, and be a lot easier to use.

Is there a PR for this? Rewriting xarray.map_blocks in terms of delayed could be another way to complete the separation of concerns that ChunkManager aims for.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
topic-chunked-arrays Managing different chunked backends, e.g. dask
Projects
None yet
Development

No branches or pull requests

2 participants