-
Notifications
You must be signed in to change notification settings - Fork 283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support lazy saving #4190
Comments
I did not know such functionality existed, but I can already think of an example in our operational post-processing that this might help: We load a large cube from many files If I've understood, lazy saving would make my context manager redundant - is that right? Assuming we also get #3901 in. |
I also have code that calculates some statistic and both saves it to file and creates a plot. It took longer than it should have because the statistic got calculated twice: once for the file and once for the plot. |
Yes, I think so.
This would only improve if there was some way to save the plot lazily in addition to lazy saving of the NetCDF file. However, if the data is small by the time you're plotting it (after all, how much information fits in a single plot, no more than the number of pixels right?) then you could realize the data before saving to avoid the double calculation. |
Thanks, this is what I am currently doing (since I realised why it was taking so long). |
I'm looking into this, but I'm rather struggling to see how the lazy solutions would work in practice. In @rcomer examples, the calculations require the data, which is available in a lazy form, but the only way to avoid reading the data multiple times is to perform all the calculations in parallel. Which you can do with 'co_realize_cubes', in case you didn't know ? The case for "lazy saves" sounds a bit different, but I guess the same is true : you can only avoid re-calculation if you get Dask to do it all at once. In my experience, this doesn't actually always work as you might expect / hope ! The 'save to individual files' option sounds easier to achieve, but again if the sources are lazy with a common computation factor (the regridding, in this case), you can only improve if it is all done in parallel. So, that would also mean "lazy saves" (using |
I did not, thanks! |
P.S. I looked into supporting a 'deferred save' within Iris save operations once before, If we support this, there is also a question of how we control open/close of the datasets. |
Hi @bouweandela . If so, we really need to discuss what the interface might look like ? I would expect iris.save to return a set of 'dask.delayed' objects, analagous to the results of What seems most natural to me is ...
The "simple" part of that is that it returns only one 'delayed' per saved cube, which is easy to interpret. The alternative would be to return a delayed per target variable, but these would then need to be identified somehow, i.e. referred back to the Iris objects. We should probably also consider possible (future) use with parallel-netcdf implementations, @bouweandela I'd be interested to hear your ideas on what is useful in such an API, and how you anticipate using it. |
(*) ref from previous
This is a basically a problem caused by Iris' obsession with copying things ! See #3172 |
Thank you for getting back to this issue @pp-mo! My apologies it took me so long to respond. I did not have time to work on this, but I found some funding and will have more time from now on.
Here is a (heavily simplified) example of how I would like to use this: import sys
from pathlib import Path
import dask.distributed
import dask_jobqueue
import dask
import iris
import xarray
def compute_example(in_file, out_file):
cube = iris.load_cube(in_file)
# Simplistic example computation
cube = cube.collapsed(['latitude', 'longitude'], iris.analysis.MEAN)
result = iris.save(cube, out_file, compute=False)
# This already works:
# ds = xarray.DataArray.from_iris(cube)
# result = ds.to_netcdf(out_file, compute=False)
return result
def main(in_files):
# start a dask cluster, e.g.
# cluster = dask.distributed.LocalCluster(processes=False)
# or when on Jasmin/Levante/etc:
cluster = dask_jobqueue.SLURMCluster(
cores=8,
memory='24GiB',
)
cluster.scale(10)
# or a cluster in the cloud once we are able to use zarr
print(cluster.dashboard_link)
client = dask.distributed.Client(cluster)
results = []
for i, in_file in enumerate(in_files):
out_file = Path(f'result{i}.nc')
result = compute_example(in_file, out_file)
results.append(result)
dask.compute(results)
if __name__ == '__main__':
main(sys.argv[1:]) The script above takes a list of netcdf files as input, load the data and compute some statistics and then write the result again to a NetCDF file. A lazy saving feature like this is already available in the xarray.Dataset.to_netcdf method. Unfortunately, all the coordinate bounds are lost when using this, but the data looks OK. Our real use case would be ESMValCore, where we typically have hundreds of input NetCDF files that we run some computations on and then we write the results again to NetCDF files. A simplified example of our current workflow is: import sys
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
import iris
def compute_example(in_file, out_file):
cube = iris.load_cube(in_file)
cube = cube.collapsed(["latitude", "longitude"], iris.analysis.MEAN)
iris.save(cube, out_file)
def main(in_files):
with ProcessPoolExecutor() as executor:
futures = {}
for i, in_file in enumerate(in_files):
out_file = Path(f"result{i}.nc")
future = executor.submit(compute_example, in_file, out_file)
futures[future] = in_file
for future in as_completed(futures):
print("Saved ", futures[future])
future.result()
if __name__ == "__main__":
main(sys.argv[1:]) This is not the correct way to parallelize using dask, but at the moment I see no other option with iris. |
We mostly work with CMIP-like data, so the NetCDF files we work with are typically relatively simple:
so all the complicated cases you mention are not applicable to our use case. |
Thanks @bouweandela this is really useful progress. I can see what is done here, but I'm still a bit vague as to how this actually helps with control/performance I believe we have a remote discussion meeting this Friday (July 1st) ? BTW I'm also currently thinking that the "netcdf-xarray-wrapper" approach I've been investigating might also possibly have value here. |
There are two issues that we run into with the current approach really:
Regarding point 1): I'm not sure exactly why this is, but I have some ideas. For example: most file systems support parallel writes and I suspect the current Regarding point 2): this may actually get worse when we build one big complicated graph and then try to compute it, but there is some hope that using a distributed scheduler would help because it makes it possible to access more memory than there is in a single node and distributed schedulers seem to be recommended, so they might be smarter at how they manage memory use, but I would need to try it to see if that is actually the case.
Indeed sharing between the results and computing in parallel are things we hope to achieve. Do you have a suggestion of how this could be implemented with dask bags?
That certainly looks interesting, I will have a look.
Thanks for the informative meeting and see you today for another one! |
✨ Feature Request
The
iris.save
function only supports saving a single file at a time and is not lazy. However, thedask.array.store
function that backs the NetCDF saver supports delayed saving. For our use case, it would be computationally more efficient to have this supported byiris.save
. Would it be possible to allow providing thecompute=False
option to the currentiris.save
function, so instead of saving directly it returns adask.delayed.Delayed
object that can be computed at a later time?Alternatively, having a save function in iris that allows saving a list of cubes to a list of files, one cube per file (also similar to
da.store
but then working for cubes) would also work for us.Motivation
In our case, multi model statistics in ESMValTool, we are interested in computing statistics (e.g. mean, median) over a number of climate models (cubes). Before we can compute those statistics, we need to load the data from disk and regrid the cubes to the same horizontal grid (and optionally to the same vertical levels). Then we merge all cubes into a single cube with a 'model' dimension and collapse along that dimension using e.g.
iris.analysis.MEAN
to compute the mean.We want to store both the regridded input cubes and the cube(s) containing the statistics, each cube in it's own netCDF file according to the CMIP/CMOR conventions. Because
iris.save
only allows saving a single cube to a single file and is immediately executed, the load and regrid needs to be executed (1 + the number of statistics) times. Having support for delayed saving (or saving a list of cubes to a matching list of files) would save computational time, because the regridded chunks can be re-used for computing each statistic (as well as for storing the regridded cube), so we only need to load and regrid the chunk once.Additional context
Example script that shows the use case
This is an example script that demonstrates our workflow and how we could use the requested save function to speed up the multi-model statistics computation. Note that the script uses lazy multi-model statistics, which are still in development in ESMValGroup/ESMValCore#968.
The text was updated successfully, but these errors were encountered: