Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Help debugging workflow with numba + xarray + dask #6

Closed
rabernat opened this issue Apr 14, 2021 · 17 comments
Closed

Help debugging workflow with numba + xarray + dask #6

rabernat opened this issue Apr 14, 2021 · 17 comments

Comments

@rabernat
Copy link
Contributor

Here is a notebook that I think reproduces a problem that @stb2145 is having in her workflow.
https://gist.github.com/rabernat/a794cc1f58aa1c515e9c1c2c385c24cb

We have a package called fastjmd95 that implements the ocean equation of state in numba:
https://github.com/xgcm/fastjmd95/

We would like to be able to call it on numpy, dask, xarray, and xarray+dask data and have it "just work". Our attempts to do that are in this module: https://github.com/xgcm/fastjmd95/blob/master/fastjmd95/jmd95wrapper.py (thanks @cspencerjones!)

In the notebook, we are calling this function on zarr-backed data in the cloud via a Dask Gateway cluster. It runs through various scenarios that do and don't work well. I won't repeat them here. Hopefully the notebook is self-explanatory. (Can be run from Pangeo Cloud or anywhere else with Google Cloud requestor pays credentials.)

Unfortunately, it doesn't work as we expect. In particular, we are having problems with using xarray apply_ufunc. In some settings, we get PicklingError: Can't pickle <ufunc 'rho'>: attribute lookup rho on __main__ failed. In others, the computation launches but performs very slow compared to the equivalent dask.map_blocks version.

Aspects of this problem has been raised elsewhere on github:

But we have never gotten to the bottom of it.

IMO this is a very useful issue to explore. In theory, the combination of numba + dask + xarray + zarr should be a killer stack for scientific computing in the cloud. But this issue reveals the challenges we have had in integrating these tools.

Any help you could provide @jrbourbeau and co would be very appreciated.

@jrbourbeau
Copy link

Thanks for the detailed write up and notebook @rabernat -- I'll try it out on Pangeo Cloud. Briefly looking through the linked numba and distributed issues, it looks like Tom has a PR into numba (xref numba/numba#6234) which would close dask/distributed#3450 and might lay the groundwork for closing numba/numba#4314 (see this comment).

cc @gjoseph92

@mrocklin
Copy link

Looking briefly at some of the associated issues it looks like Tom may have a solution. I think that the question is if we want to wait for a numpy and numba release, or if we want to do a workaround. Guido seems to have a workaround that we could implement in Dask if necessary. @rabernat what is the time pressure here? Are you looking for something fast, or are you looking for something that might take a few months but will have universal effect? If the latter then it looks like Tom may already have done all of the work here and we just need to wait for the gears of OSS to move. If the former then we probably need to add a special case inside of dask.array.Array.__apply_ufunc__

numba/numba#6234 (comment)

(also, I see that James beat me to the punch on this one)

@rabernat
Copy link
Contributor Author

rabernat commented Apr 14, 2021

Thanks for the quick replies!

My hunch is that there are really two distinct issues here:

  • The upstream issues related to serializability of numba functions, involving numpy, numba, dask, etc. I am fine with waiting for progress on those, as long as we continue to track the issue and not lose sight of the ball as the process unfolds.
  • The workaround we have already deployed revealed a vast performance difference between dask.map_blocks and xarray.apply_ufunc(..., dask="parallelized"). I believe this is worth investigating now, independently of the numba issue. I think they point to a performance problem in xarray. If you have some general function that you want to apply in parallel, I often recommend xarray.apply_ufunc, but this example shows the limitations of that approach.

@rabernat
Copy link
Contributor Author

More generally, it would be nice to know whether the code we have in https://github.com/xgcm/fastjmd95/blob/master/fastjmd95/jmd95wrapper.py is considered "good practice" or not.

@gjoseph92
Copy link

If the former then we probably need to add a special case inside of dask.array.Array.__apply_ufunc__

FWIW I don't think this is possible to do on the Dask end. dask/distributed#3450 (comment) is a good explanation: by the time dask is seeing the ufunc, type(ufunc) is a compiled numpy.ufunc, with no reference back to the numba.DUFunc where all the state that could be used for pickling lives.

Additionally, Guido's workaround in numba/numba#4314 (comment) will only work for client.submit, not for passing a Dask array (or DataArray, etc.) to the numba ufunc. Tom kinda explains this in dask/distributed#3450 (comment), but it's the same problem: even if you use Guido's wrapper, when you __call__ it on an array, it first drops down into __call__ on the compiled numpy.ufunc, which then dispatches on the inputs to reach dask.array.Array.__array_ufunc__. So the ufunc is that Dask sees (and sticks in the graph) is again the compiled numpy.ufunc, not Guido's wrapper that implements __reduce__.

Interestingly, I would have expected your maybe_wrap_arrays to bypass all this. Because you call dask.array.apply_gufunc directly (or via xarray), rather than through __array_ufunc__ dispatching, the DUFunc object you pass in (which is pickleable) should be what's inserted into the graph.

I'm going to dig into this to see how the compiled ufunc is getting into the graph, which may somehow be related to

a vast performance difference between dask.map_blocks and xarray.apply_ufunc(..., dask="parallelized")

(but if it isn't, then I'll look into apply_ufunc vs map_blocks as well)

@mrocklin
Copy link

The upstream issues related to serializability of numba functions, involving numpy, numba, dask, etc. I am fine with waiting for progress on those, as long as we continue to track the issue and not lose sight of the ball as the process unfolds.

It looks like @rabernat isn't too concerned about the serializability issue. We can probably just wait on that (although I encourage @gjoseph92 and @jrbourbeau to subscribe to the various upstream issues).

I'm going to dig into this to see how the compiled ufunc is getting into the graph, which may somehow be related to

a vast performance difference between dask.map_blocks and xarray.apply_ufunc(..., dask="parallelized")

(but if it isn't, then I'll look into apply_ufunc vs map_blocks as well)

Yeah, it seems like broader investigation is called for here. Thanks @gjoseph92

@rabernat
Copy link
Contributor Author

Just to clarify, I am concerned about the serializability issue to the extent that it relates to the following central question:

What is the best practice for integrating numba code seamlessly with xarray + dask.distributed processing?

Our goal in several packages we maintain is to provide functions (possibly implemented in numba) that "just work" with:

  • Numpy inputs
  • Xarray inputs
  • Dask inputs (including via distributed)
  • Xarray-Dask inputs (including via distributed)

With python lacking a robust mechanism for multiple dispatch, this may be unrealistic. I don't understand the deeper issues well enough to know for sure.

@mrocklin
Copy link

With python lacking a robust mechanism for multiple dispatch, this may be unrealistic

The __apply_ufunc__ mechanism is a robust mechanism for multiple dispatch, it's just complicated.

What is the best practice for integrating numba code seamlessly with xarray + dask.distributed processing?

I support this overarching focus. My guess is that the problem comes down to serialization, and that things will get better when this is resolved, but the fact that you're seeing differences based on the API path that you use is curious and warrants investigation. I suspect that if @gjoseph92 spends a bit of time running things and generating performance reports then we should be able to find some insight here.

@rabernat
Copy link
Contributor Author

Great! Sounds like a good plan that will be useful for everyone involved. Thanks a lot.

@jrbourbeau
Copy link

Totally agree that it makes sense for @gjoseph92 to dive deeper here. As a seed for future exploration, yesterday I ran through the notebook Ryan provided on Pangeo Cloud and captured a couple of performance reports (see below) so we can compare the Dask map_blocks approach vs. Xarray's apply_ufunc (I should have moved the code snippets generating the computations into the performance_report context but didn't, so for now they're missing).

The primary things that jumps out to me is we spend a lot more transferring dependencies in the apply_ufunc case. There's also some initial deserialization happening in the map_blocks case which I don't see in the apply_ufunc case. Perhaps there's some caching which apply_ufunc isn't able to take advantage? We also could investigate the differences in the task graph structure between the map_blocks and apply_ufunc case.

@mrocklin
Copy link

It would be useful to see a visualization of the high level graph that's involved in each case. I wonder if there is some rechunking going on with ufuncs. This would happen if we used gufunc and there were core dimensions, but I'm not sure that that's what's happening here.

@mrocklin
Copy link

Oh, another possible cause for excess communication would be that high level blockwise fusion is working in one case but not the other. This again seems like a good thing for @gjoseph92 to think about.

@gjoseph92
Copy link

Good news: it turns out the serialization error you're seeing is actually already fixed—by you! Pangeo is still running fastjmd95 v0.1, which didn't have the maybe_wrap_arrays decorator. Since that decorator calls map_blocks/apply_ufunc directly, it gets to put the DUFunc object into the graph, which is serializable. If you can upgrade fastjmd95 on Pangeo, I think the notebook should just work without errors (though apply_ufunc may still be really slow).

It seems like the last Pangeo auto-update might have failed (xref pangeo-data/pangeo-cloud-federation#947), so Pangeo is still running versions of packages from a few months ago.

I'll look into the performance discrepancy next (and see whether it also has anything to do with older versions).

@rabernat
Copy link
Contributor Author

Well that's just embarrassing. 🤪 Very sorry for diverting so much effort before checking on this ourselves.

@gjoseph92
Copy link

Not a problem at all. It's just an unintentional side effect of your code that it happens to sidestep the problem—as you said this should be seamless; ideally you shouldn't even have to write maybe_wrap_arrays. So this was a good opportunity to start understanding those issues.

@rabernat
Copy link
Contributor Author

So we have the updated package on https://staging.us-central1-b.gcp.pangeo.io/ (thanks @jrbourbeau!). It looks like things are pretty much working! I can do

sigma0 = fastjmd95.rho(ds.SALT, ds.THETA, 0).mean()

and have it "just work" reasonably well. There are still some small differences between the xarray vs. dask graphs, but not significant differences in execution time.

@stb2145 - to close the loop, can you try your workflow on https://staging.us-central1-b.gcp.pangeo.io/ and see whether things are working more smoothly? Don't use any apply_ufunc or map_blocks, just call fastjmd95.rho directly on the xarray dataarrays.

@shanicetbailey
Copy link

Successfully ran my workflow on staging pangeo with just calling fastjmd95.rho. Thank you everyone for the help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants