Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider using a dask distributed cluster when computing many-cubes statistics or metrics #1335

Closed
valeriupredoi opened this issue Sep 28, 2021 · 16 comments
Labels
enhancement New feature or request iris Related to the Iris package preprocessor Related to the preprocessor

Comments

@valeriupredoi
Copy link
Contributor

valeriupredoi commented Sep 28, 2021

A Dask distributed cluster is a powerful parallel computing tool to get done all sorts of bigger-tah-memory computations. We should consider using such a facility, or even better, persuade the iris folk @bjlittle to implement a callable version of it. Here's a very concrete example: I am using two methods to compute a global maximum: using fully-lazy iris and a dask cluster. Each time I am loading some 30x cubes (of respectable dimensions 2000x64x128). The code for using just the lazy iris stat os simple:

import iris
import dask.array as da
import numpy as np

cube = iris.load_cube("/home/valeriu/climate_data/CMIP6/CMIP/BCC/BCC-ESM1/historical/r1i1p1f1/Amon/tas/gn/v20181214/tas_Amon_BCC-ESM1_historical_r1i1p1f1_gn_185001-201412.nc")
cubes = []
for i in range(30):
    cubes.append(cube)

print(np.max([da.max(c.core_data()).compute() for c in cubes]))

this takes 4.20user 1.30system 0:05.49elapsed 100%CPU (0avgtext+0avgdata 341776maxresident)k on a 4-core 8GB local laptop; the code for using a dask cluster takes in the same data but schedules the computation of 30 local maxima before getting the global maximum:

import dask.array as da
import numpy as np
import iris
from dask.distributed import Client


# have some netCDF data in path
data_file = "/home/valeriu/climate_data/CMIP6/CMIP/BCC/BCC-ESM1/historical/r1i1p1f1/Amon/tas/gn/v20181214/tas_Amon_BCC-ESM1_historical_r1i1p1f1_gn_185001-201412.nc"
data_dir = "/home/valeriu/climate_data/CMIP6/CMIP/BCC/BCC-ESM1/historical/r1i1p1f1/Amon/tas/gn/v20181214/"


# set up a (local) dask cluster
def _get_client():
    client = Client()
    return client


# get a maximum of a dask array
def dask_max_func(x):
    return da.max(x.core_data()).compute()


# assemble a dask array from a netCDF4 file
def _assemble_dask_array():
    # lazy parallel load of a number of netcdf files and concatenation into one `coll`
    cube = iris.load_cube("/home/valeriu/climate_data/CMIP6/CMIP/BCC/BCC-ESM1/historical/r1i1p1f1/Amon/tas/gn/v20181214/tas_Amon_BCC-ESM1_historical_r1i1p1f1_gn_185001-201412.nc")

    # let's replicate the data 20x
    coll_replicates = []
    for i in range(30):
        coll_replicates.append(cube)

    return coll_replicates


# this function returns the max lazy array
def _get_local_max():
    client = _get_client()
    print("We use a Dask cluster: ", client)
    dask_array = _assemble_dask_array()
    # print("Input netcdf4 dask array: ", dask_array)
    max_call = client.map(dask_max_func, client.scatter(dask_array))
    max_call = client.gather(max_call)

    return max_call


# dummy dummer caller function
def main():
    final_res = np.max(_get_local_max())
    print("Final result: ", final_res)


if __name__ == '__main__':
    main()

and this takes 14.67user 2.08system 0:08.35elapsed 200%CPU (0avgtext+0avgdata 269720maxresident)k on the same machine - it's slower, but needing 20-30% less memory (of course, the cluster can be configured in all sorts of ways to optimize the power of the machine). Incidentally, @bjlittle can sleep well tonight knowing the xarray performs about the same way in exactly the same scenario, actually needing a bit more memory when doing a simple max (by about 50% more memory 😁 ), very similar performance when running in a dask cluster though. Attn: @bouweandela @zklaus @jvegasbsc @Peter9192

@valeriupredoi valeriupredoi added enhancement New feature or request iris Related to the Iris package preprocessor Related to the preprocessor labels Sep 28, 2021
@zklaus
Copy link

zklaus commented Sep 29, 2021

Obviously we need better dask handling and that almost certainly includes using dask.distributed (which is where the cluster and client and so on lives). But your code is far, far too complicated. If all you want to do is the same thing as your first program using dask distributed, try:

#!/usr/bin/env python

from dask.distributed import Client
import iris
import numpy as np


def main():
    cube = iris.load_cube(
        "/badc/cmip6/data/CMIP6/CMIP/BCC/BCC-ESM1/historical/"
        "r1i1p1f1/Amon/tas/gn/v20181214/"
        "tas_Amon_BCC-ESM1_historical_r1i1p1f1_gn_185001-201412.nc")
    cubes = []
    for i in range(30):
        cubes.append(cube)

    dask_result = np.max(np.stack([c.core_data() for c in cubes]))
    print(type(dask_result))
    print(dask_result)
    print(dask_result.compute())


if __name__ == "__main__":
    client = Client()
    main()

Yep, that's only adding the line client = Client() (plus the necessary import). The rest happens automatically.

I also added a bit of fun at the end.

  • We should (perhaps almost?) never use .compute().
  • Thanks to Numpy's dispatching (NEP-18), np.max will automatically be turned into da.max where appropriate.

@valeriupredoi
Copy link
Contributor Author

@zklaus - love it! Using the same file as above and same machine (my grubby laptop) I get 14.48user 2.06system 0:08.31elapsed 199%CPU (0avgtext+0avgdata 265380maxresident)k - exactly the same performance as with the code above (which is indeed complex because 1. I didn't know you can call the function without explicitly calling the Client and its instances and 2. I wanted to use client.scatter() that they recommend to use when inputting large datas). Yes, I completely agree about not using compute() - but that's there so I can see what we can gain when we actually must get the numerical/floaty-floats out, intermediary steps should never run compute, but we have to be careful not to make hugely complicated graphs before we eventually call compute(), there's a trade-off there. Very interesting stuff about NEP-18, that's new to me 🍺

@valeriupredoi
Copy link
Contributor Author

also, here's an interesting one - without compute, in your script it's easy to comment out the print statement, I am consuming about 190M memory, so even if it's a lot of data, compute here is only about 25% of the memory - probably since the actual graph is very easy to compute a max - get item->amax local->amax global

@bouweandela
Copy link
Member

Using the distributed scheduler is indeed what I wanted to try to see if I can get #968 unstuck before the release.

@zklaus
Copy link

zklaus commented Sep 30, 2021

Since that is planned for later (2.5.0), let's try to make some progress on https://github.com/ESMValGroup/ESMValCore/milestone/7 first.

@valeriupredoi
Copy link
Contributor Author

totally with you on that @zklaus - am playing around with Dask for another project and thought this would be a good point where we start the discussion (or continue it further as it were) for ESMValCore. I will also be very willing to start dasking Core in the very near future, hopefully for 2.5 🍺 Regarding the Milestone, I'll be doing some serious PR reviewing work next week 👍

@zklaus
Copy link

zklaus commented Sep 30, 2021

Great, @valeriupredoi! Strictly speaking, that is after the feature freeze, but I'll take what I can get.

@bouweandela
Copy link
Member

Note that you need to use

from dask.distributed import Client
client = Client(processes=False)

to make the distributed scheduler work with iris.save, see also here.

@zklaus
Copy link

zklaus commented Oct 1, 2021

There are other ways. But as I said, let's take this at a later time.

@valeriupredoi
Copy link
Contributor Author

valeriupredoi commented Oct 13, 2021

as Celine Dion says, My (Dask) heart will go on - so here is me looking at running a Dask cluster remotely, over SSH:

Performance difference between running a local cluster vs SSH cluster

Local cluster and SSH cluster examples

  • Local cluster:
from dask.distributed import Client
import iris
import numpy as np


def main():
    cube = iris.load_cube(
        "/home/valeriu/tas_Amon_BCC-ESM1_historical_r1i1p1f1_gn_185001-201412.nc")
    cubes = []
    for i in range(30):
        cubes.append(cube.core_data())

    dask_result = np.max(np.stack([c for c in cubes]))
    print(type(dask_result))
    print(dask_result)
    print(dask_result.compute())


if __name__ == "__main__":
    client = Client()
    main()
  • SSH cluster:
import dask.array as da
import iris
import numpy as np
from dask.distributed import SSHCluster, Client


# running 4 Pythons: 1 remote, 3 local
cluster = SSHCluster(
    ["localhost", "192.171.139.62"],  # localhost is the scheduler, IP is the nanny = 1 remote Python, 1 local Python
    connect_options={"known_hosts": None, "username": "yyy", "password": "xxx"},  # copy ssh keys so no need for login info
    worker_options={"nthreads": 2, "nprocs": 2},  # 2 threads = 2 Pythons
    scheduler_options={"port": 8786}
)


def main():
    cube = iris.load_cube(
        "/home/valeriu/tas_Amon_BCC-ESM1_historical_r1i1p1f1_gn_185001-201412.nc")
    cubes = []
    for i in range(30):
        cubes.append(cube.core_data())

    dask_result = np.max(np.stack([c for c in cubes]))
    print("Result type: ", type(dask_result))
    print("Result before computing: ", dask_result)
    print("Result after computing: ", dask_result.compute())


if __name__ == '__main__':
    client = Client(cluster)
    print(client)
    main()
    client.loop.add_callback(client.scheduler.retire_workers, close_workers=True)
    client.loop.add_callback(client.scheduler.terminate)
    cluster.close()

They both return the same thing: a float: Result after computing: 314.9707!

Performance numbers

  • Local cluster:
	Command being timed: "python run_local_cluster.py"
	User time (seconds): 7.61
	System time (seconds): 1.36
	Percent of CPU this job got: 188%
        Maximum resident set size (kbytes): 222180
  • SSH cluster:
	Command being timed: "python run_ssh_cluster.py"
	User time (seconds): 1.71
	System time (seconds): 0.35
	Percent of CPU this job got: 29%
	Maximum resident set size (kbytes): 125208

Caveats

For the ssh cluster: the scheduler machine ("localhost" here) needs to have exactly the same
environment as the executor machine (nanny+workers, "192.171.139.62" in our case - there could be multiple machines);
no firewalls or different login settings for each of the scheduler and nanny+workers machines (ie I couldn't
make it run with my laptop as scheduler machine and remote (Puma2) as executor machine coz some crazy TCP blocker (no idea).

@bouweandela
Copy link
Member

There are other ways.

@zklaus If you got iris.save to work in other ways, I'd be interested to hear about those.

@zklaus zklaus added this to the v2.5.0 milestone Oct 18, 2021
@zklaus
Copy link

zklaus commented Oct 18, 2021

Not too much time now, but some pointers:

If you set up the cluster from the get-go like this, all calculations are limited to one process (and thus one machine). The simplest thing to do is to keep it distributed until the moment of saving and only then pull it down into one machine's memory, e.g. by realizing the data, possibly block by block.

The best way to resolve this would be to modify iris to use xarray as its backend. It seems the main hurdle to that is mask support in xarray, but a bit of investigation might lay out a clear path on how to get there.

The intermediate solution is to adopt ideas from xarrays io framework into iris. I have/had a working prototype for that in another project, but haven't followed through. Iirc, basically, you cache the file descriptor per process and use distributed.Lock to prevent conflicts. The rest is done with the standard dask.array io approach.

@valeriupredoi
Copy link
Contributor Author

very interesting stuff, cheers, Klaus! I'll dig deeper - not now, but imminently 👍

@schlunma
Copy link
Contributor

schlunma commented Feb 4, 2022

Moving this to v2.6 since there is not open PR yet.

@schlunma schlunma modified the milestones: v2.5.0, v2.6.0 Feb 4, 2022
@valeriupredoi
Copy link
Contributor Author

totally! This smells like 2.7 actually if you asked me, or even 3.0 if the daskification warrants a major release 👍

@sloosvel sloosvel removed this from the v2.6.0 milestone Jun 7, 2022
@bouweandela
Copy link
Member

Support for Distributed was implemented some time ago.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request iris Related to the Iris package preprocessor Related to the preprocessor
Projects
None yet
Development

No branches or pull requests

5 participants