Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Understanding parallelism with Dask within Iris #3919

Closed
thomascrocker opened this issue Nov 12, 2020 · 8 comments
Closed

Understanding parallelism with Dask within Iris #3919

thomascrocker opened this issue Nov 12, 2020 · 8 comments

Comments

@thomascrocker
Copy link

Hi @zklaus ,

Following up the discussion in #3218 (comment)

As I mentioned, in the past I've always found that even if an iris operator supports lazy evaluation, executing the code with multiple cores doesn't invoke the calculations in parallel. As a result I've got into the habit of setting up my parallel calculations manually. It could well be though that newer versions of iris are capable of doing this. If so, I'd appreciate some guidance on how to enable it.

As an example, the analysis I'm working on at the moment involves calculating climate indices on a cube with 656 x 310 grid boxes. (And 30 years of daily data). The netCDF file itself is almost 9GB.

For calculating the yearly indices I use the following dask bag based approach

yrs_bg = db.from_sequence(years)
yrs_con_bg = yrs_bg.map(lambda y: iris.Constraint(year=y))
# idx_calc is a function that calculates the 'idx' on the cube usually involving iris.collapsed
idx_bg = yrs_con_bg.map(idx_calc, cube, idx)
idx_cubes = iris.cube.CubeList(idx_bg.compute())

idx_cube = idx_cube.merge_cube().collapsed('time', iris.analysis.MEAN)

I'm running on the SPICE system at the Met Office, so I submit that as a slurm job with 30 CPUs and get the result back.

What I'm struggling with at the moment though is slightly more complex.. I need to calculate the baseline percentiles for some indices.. This involves calculating a percentile value for each day of year, using a 5 day window centred on that day, see for example, number 10.. http://etccdi.pacificclimate.org/list_27_indices.shtml
So to do that I have added a day of year co-ordinate, then I loop over the days, creating a constraint of +/- 2 from that day of year. This then gives 150 time points (5 days per year across the 30 years) from which the percentiles are calculated at each grid point. (Using iris.collapsed and the PERCENTILE aggregator).
If I submit that as is as a slurm job with multiple CPUs I don't see any speed up vs submitting with only 1, i.e. the extra cores are not being used to do the calculations in paralell.
In order to get speedups, I need to again follow the dask bag approach and create a dask bag of each day of year, which I then map functions to and call compute to calculate in paralell.

@zklaus
Copy link

zklaus commented Nov 13, 2020

Hi @thomascrocker,

I am not as familiar with dask.bags (henceforth db), as I am with dask.array (da). Indeed, I think that most likely da is more appropriate here, since the data itself is very much in that form.

Having said that, I must ask for a bit of clarification:

  1. How do you do the slurm job? You must somehow create a dask.distributed cluster, otherwise, the computation would just be run on every node independently with one note winning the prize of writing the final version of the file. Perhaps you are using dask-jobqueue or dask-mpi?

  2. With dask, as a rule of thumb, whenever you call compute(), the result get's pulled into the memory of just one process. Maybe that's what you want in the above snippet. But since you perform another aggregation after that, I rather suspect you would fare better with an approach that leaves the data on the cluster. Perhaps client.persist() would work better here for you.

  3. Comment 2 crucially depends on what idx_calc does. Would you mind sharing the simplest version that you have, perhaps only for one index.

Lastly, maybe you could consider to save yourself the trouble of developing this software by using any one from the following list

@thomascrocker
Copy link
Author

Thanks for getting back to me,

1.) I'm not using the distributed scheduler at all. The Dask bag by default uses the multiprocessing scheduler Which requires no setup. Our slurm system has multiple nodes, each node has 42 CPUs and ~242GB of memory. It's not possible to submit multi node jobs in our setup (although I understand one can get around this by creating a dask.distributed scheduler to do so). The multiprocessing scheduler allows me to launch parallel jobs on a single node and can create new processes on each CPU requested. So if I launch my job requesting 24 CPUs, I'll get 24 different processes running on a single node. The multiprocessing scheduler handles launching these jobs and collecting the results automatically.

2 and 3.)
Because I've taken the parallelisation approach of breaking up my data by time period, the idx_calc function is a wrapper to extract the time period and then compute the index:

def idx_calc(con, cube, name):
    cube_p = cube.extract(con)
    cube_p = change_units(cube_p)
    idx_p = IDX_FUN_DICT[name](cube_p)
    idx_p.rename(name)

    return idx_p

change_units is a function that checks cube units and converts if necessary
IDX_FUNC_DICT is a dictionary mapping index names to lambda functions that calculate the index in question

For example: 'Rx1day': lambda c: calc_Rx1day(c)

and

def calc_Rx1day(cube):
    # get the max precip value for each grid point.
    max_cube = cube.collapsed('time', iris.analysis.MAX)
    return max_cube

I have been using ESMValTool and did look into the extreme events recipe, however, the data I'm using is from an RCM and I think the extra rotated latitude and longitude dimensions caused the diagnostic to fail, my R isn't good enough to fix it unfortunately. (Incidentally the data is from the same source as this issue. ESMValGroup/ESMValCore#772)
I've not tried the other 2 packages, but wouldn't be surprised if they also assume the data is on a standard grid.. and have issues with data on rotated grids.
For now, my approach is working to calculate what I need in a reasonable time (even the percentiles calculations I've been able to get 4 hours of CPU time down to around 15 minutes real time which is fine for what I need to do)

For future though, I'd be very interested in making proper use of dask array. If you could share an example of how you invoke a parallel calculation on iris cubes using the dask array approach I'd be interested to see it. Thanks

@zklaus
Copy link

zklaus commented Nov 19, 2020

On 1), I see, so it is a single node job, it's just that the node has many cores. I am afraid the terminology in this area (nodes, sockets, cpus, cores, hyperthreads,...) is always a bit confusing. But seems like what you are doing is working. I would still suggest giving distributed a try, it works very well (and is preferred by some) on a single machine (see here), and gives you some nice things like the dashboard and certain profiling support. Plus it offers a straightforward way to multi-node jobs later on.

On 2), 3), a couple of points, though I am afraid more details and a discussion of percentile-based indices is a bit too much for me right now. As a side note, if possible, try to change the units of your thresholds to the units of your cube. That is much less work for the computer.

As for using dask.array and iris, I think a simpler approach would be a (custom) aggregator based on iris.analysis.Aggregator combined with cube.aggregated_by.

In the simple case of Rx1day, you don't even need a custom aggregator and can take it almost straight from the user guide with something like

rx1day = cube.aggregated_by('year', iris.analysis.MAX)

@thomascrocker
Copy link
Author

Thanks for the tips! I've begun experimenting with distributed to see if I can avoid manually breaking up my data for dask bag.

And it... kind of works

So.. if I do this...

import iris
import iris.coord_categorisation as iccat

from dask.distributed import Client

print("Creating cluster")
client = Client(processes=False)

print("loading data")
c = iris.load_cube(
    "/scratch/tcrocker/esmvaltool_output/recipe_indices_20201118_151601/preproc/extreme_events/tas/tas_mi-ba868_historical_day_1981-2010.nc"
)
iccat.add_year(c, "time")

print("Calculating")
rx1d = c.aggregated_by("year", iris.analysis.MAX)

print("saving")
iris.save(rx1d, "rx1d.nc")

I get this error when it tries to save...

Traceback (most recent call last):
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 49, in dumps
    result = pickle.dumps(x, **dump_kwargs)
  File "netCDF4/_netCDF4.pyx", line 5477, in netCDF4._netCDF4.Variable.__reduce__
NotImplementedError: Variable is not picklable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "dask_distributed_test.py", line 20, in <module>
    iris.save(rx1d, "rx1d.nc")
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/iris/io/__init__.py", line 408, in save
    saver(source, target, **kwargs)
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/iris/fileformats/netcdf.py", line 2388, in save
    sman.write(cube, local_keys, unlimited_dimensions, zlib, complevel,
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/iris/fileformats/netcdf.py", line 983, in write
    cf_var_cube = self._create_cf_data_variable(
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/iris/fileformats/netcdf.py", line 2060, in _create_cf_data_variable
    is_masked, contains_fill_value = store(data, cf_var,
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/iris/fileformats/netcdf.py", line 2038, in store
    da.store([data], [target])
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/dask/array/core.py", line 945, in store
    result.compute(**kwargs)
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/dask/base.py", line 167, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/dask/base.py", line 447, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/distributed/client.py", line 2673, in get
    futures = self._graph_to_futures(
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/distributed/client.py", line 2609, in _graph_to_futures
    "tasks": valmap(dumps_task, dsk3),
  File "cytoolz/dicttoolz.pyx", line 179, in cytoolz.dicttoolz.valmap
  File "cytoolz/dicttoolz.pyx", line 204, in cytoolz.dicttoolz.valmap
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/distributed/worker.py", line 3361, in dumps_task
    return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/distributed/worker.py", line 3370, in warn_dumps
    b = dumps(obj, protocol=4)
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 60, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 72, in dumps
    cp.dump(obj)
  File "/home/h02/tcrocker/miniconda3/envs/esmvaltool/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 540, in dump
    return Pickler.dump(self, obj)
  File "netCDF4/_netCDF4.pyx", line 5477, in netCDF4._netCDF4.Variable.__reduce__
NotImplementedError: Variable is not picklable

My understanding is the actual compute isn't called until it's needed, so on the .save call.. but it seems the object being returned is not something that the saver knows how to handle

I managed to work around it by doing...

rx_copy = rx1d.copy()
rx_copy.data = rx1d.data
iris.save(rx_copy, "rx1d.nc")

Accessing the .data attribute obviously caused the compute to be kicked off, and then rx_copy was a proper iris cube so the save worked..

@zklaus
Copy link

zklaus commented Nov 20, 2020

Glad it worked! The save thing is a longstanding known issue, unfortunately.
It is, however, enough to just "touch" the data, no need to copy. Try to replace those three lines with just

rx1d.data
iris.save(rx1d, "rx1d.nc")

Oh, and I would be very curious if this makes any difference in terms of memory and runtime compared to your bags approach.

@thomascrocker
Copy link
Author

Thanks, the "touch" approach worked.

Weirdly I tried to run a test dask bag version shortly afterward, and ended up with a bunch of runtime errors from the multiprocessing library. I wonder if the cluster started by my previous script was not shutdown properly..

I suspect you are right though and the dask bag approach would likely take longer to run and use more memory..

Unfortunately I am constrained to using it for the time being to handle calculation of for example, cumulative wet days etc. since I have defined those using custom aggregators built with numpy functionality. Looking at the dask documentation though, I think I may be able to rewrite these to use dask arrays rather than numpy, in which case I hope the distributed approach would work.

A lot of these habits I have I think stem from having to work around iris features that didn't support lazy evaluation in the past. The newer versions definitely seem to have made big improvements in this area though.

@zklaus
Copy link

zklaus commented Nov 23, 2020

@thomascrocker, sounds good! In fact, moving from numpy to ask often involves only minimal overhead, frequently even none at all, thanks to numpy's dispatching mechanisms. A notable exception is when dask consciously refuses to implement a numpy method, in which case some more thought is necessary. Perhaps the biggest example for that is sorting, where a true sort is not well suited for data-local parallelization as done in dask, and the better suited topk approach often works well too.

Anyway, do you think some of the lessons you learned are missing from the user guide right now? Would it be worth adding a Section or two?

@thomascrocker
Copy link
Author

Anyway, do you think some of the lessons you learned are missing from the user guide right now? Would it be worth adding a Section or two?

Possibly. Maybe the sections on lazy data and lazy evaluation could be expanded a little to make clear that these will take advantage of parallel processing, provided this has been initialised earlier in the script in some way.

I'd be happy to help out further, but it will have to be in the new year as I'm currently very busy writing a report that is due at the end of this year.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants