Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic functional lazy saving. #5031

Closed
wants to merge 4 commits into from
Closed

Basic functional lazy saving. #5031

wants to merge 4 commits into from

Conversation

pp-mo
Copy link
Member

@pp-mo pp-mo commented Oct 21, 2022

@bjlittle @bouweandela following discussion yesterday, I couldn't resist having a bash at lazy saving.
So here it is!

This is a slightly crude first attempt, but seems to basically work.
In principle, addresses #4190
@bouweandela can you see if this actually functions for parallelising + whether it shows a benefit ?
Sorry again that I can't be there this morning

In yesterday's discussion, I got a much clearer idea of what is needed.
I think also @zklaus explained to me what the xarray implementation actually does.
So I've tried to copy that idea.

My simple test code [click to expand]
import iris
from iris.tests.stock import realistic_4d
import dask.array as da

# Just avoid a warning
iris.FUTURE.datum_support = True

def make_lazy_datacube():
    cube = realistic_4d()
    # Replace data + aux-coords with lazy content.
    def lazy_like(array):
        if array is not None:
            dmin, dmax =  array.min(), array.max()
            chunks = list(array.shape)
            chunks[0] = 1
            array = da.random.uniform(dmin, dmax, size=array.shape, chunks=chunks)
        return array
    cube.data = lazy_like(cube.data)
    auxcoord = cube.coord('surface_altitude')
    auxcoord.points = lazy_like(auxcoord.points)
    return cube


if __name__ == '__main__':
    cube = make_lazy_datacube()
    print(cube)
    print('')
    print('Cube details ...')
    print('Core data:', cube.core_data())
    print('Alt coord points:', cube.coord('surface_altitude').core_points())
    def surface_sample(cube):
        return cube.coord('surface_altitude').core_points().flatten()[:10].compute()

    samp = surface_sample(cube)
    print('Alt coord points actual sample values', samp)
    print('')
    print('Lazy save..')
    lazy_save = iris.save(cube, 'tmp.nc', compute=False)
    print('.. done.')
    print('result = ', lazy_save)
    readback = iris.load_cube('tmp.nc', 'air_potential_temperature')
    print('Readback altitude:', surface_sample(readback))
    print('')
    print('Now complete the lazy save..')
    lazy_save.compute()
    print('..done.')
    print('Readback altitude:', surface_sample(readback))
..and sample resulting output [click to expand]
air_potential_temperature / (K)     (time: 6; model_level_number: 70; grid_latitude: 100; grid_longitude: 100)
    Dimension coordinates:
        time                             x                      -                  -                    -
        model_level_number               -                      x                  -                    -
        grid_latitude                    -                      -                  x                    -
        grid_longitude                   -                      -                  -                    x
    Auxiliary coordinates:
        level_height                     -                      x                  -                    -
        sigma                            -                      x                  -                    -
        surface_altitude                 -                      -                  x                    x
    Derived coordinates:
        altitude                         -                      x                  x                    x
    Scalar coordinates:
        forecast_period             0.0 hours
    Attributes:
        source                      'Iris test case'

Cube details ...
Core data: dask.array<uniform, shape=(6, 70, 100, 100), dtype=float64, chunksize=(1, 70, 100, 100), chunktype=numpy.ndarray>
Alt coord points: dask.array<uniform, shape=(100, 100), dtype=float64, chunksize=(1, 100), chunktype=numpy.ndarray>
Alt coord points actual sample values [435.58198889 475.71775563 421.56953978 393.62885393 317.8254836
 464.83843177 317.20213419 246.2240947  367.07881807 224.52397621]

Lazy save..
.. done.
result =  Delayed('combined_delayeds-abbd0f8e-5712-4d49-af4e-6e4d760858f0')
Readback altitude: [-- -- -- -- -- -- -- -- -- --]

Now complete the lazy save..
..done.
Readback altitude: [435.58198889 475.71775563 421.56953978 393.62885393 317.8254836
 464.83843177 317.20213419 246.2240947  367.07881807 224.52397621]

@bjlittle
Copy link
Member

bjlittle commented Oct 21, 2022

@pp-mo We could also offer a similar pattern for parallel loading, akin to xarray.open_mfdataset(parallel=True), returning delayed futures 🤔

An object which mimics the data access of a netCDF4.Variable, and can be written to.
It encapsulates the netcdf file and variable which are actually to be written to.
This opens the file each time, to enable writing the data chunk, then closes it.
TODO: could be improved with a caching scheme, but this just about works.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pp-mo Apart from caching, locking should be considered, right?

Copy link
Member Author

@pp-mo pp-mo Oct 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead right. I had not understood much, which is why it did not work with 'distributed'.
Hopefully that is now fixed @ed654d ...

@bjlittle
Copy link
Member

@pp-mo Kicking the tyres with this using distributed... and running into issues

@pp-mo
Copy link
Member Author

pp-mo commented Oct 23, 2022

@pp-mo Kicking the tyres with this using distributed... and running into issues

Updated version should now work with 'distributed', I think.
@bjlittle @bouweandela can you try this again + report back ?

My testcode, running via distributed/slurm [click to expand...]
from contextlib import contextmanager
import dask
import dask.distributed
import dask_jobqueue

import iris
from iris.tests.stock import realistic_4d
import dask.array as da

# Just avoid a warning
iris.FUTURE.datum_support = True

def make_lazy_datacube():
    cube = realistic_4d()
    # Replace data + aux-coords with lazy content.
    def lazy_like(array):
        if array is not None:
            dmin, dmax =  array.min(), array.max()
            chunks = list(array.shape)
            chunks[0] = 1
            array = da.random.uniform(dmin, dmax, size=array.shape, chunks=chunks)
        return array
    cube.data = lazy_like(cube.data)
    auxcoord = cube.coord('surface_altitude')
    auxcoord.points = lazy_like(auxcoord.points)
    return cube


@contextmanager
def cluster_client_context():
    cluster = dask_jobqueue.SLURMCluster(
        # Original code setup...
        # ntasks=10,
        cores=8,
        memory="24GiB",
        # header_skip=["--mem"],
        local_directory="/scratch/itpp/esmvaltool_parallel_save_files/temp",
        # # Local SPICE-relevant options
        # mem_per_cpu='24',
        # ntasks=10,
        # ntasks_per_core=1,
    )
    # cluster = dask.distributed.LocalCluster(processes=False)
    print(cluster.job_script())
    cluster.scale(1)
    print(cluster.dashboard_link)
    yield dask.distributed.Client(cluster)
    print('Exit cluster context?')


if __name__ == '__main__':
    cube = make_lazy_datacube()
    print(cube)
    print('')
    print('Cube details ...')
    print('Core data:', cube.core_data())
    print('Alt coord points:', cube.coord('surface_altitude').core_points())
    def surface_sample(cube):
        return cube.coord('surface_altitude').core_points().flatten()[:10].compute()

    samp = surface_sample(cube)
    print('Alt coord points actual sample values', samp)
    print('')
    print('Lazy save..')
    lazy_save = iris.save(cube, 'tmp.nc', compute=False)
    print('.. done.')
    print('result = ', lazy_save)
    readback = iris.load_cube('tmp.nc', 'air_potential_temperature')
    print('Readback altitude:', surface_sample(readback))
    print('')
    print('Now complete the lazy save..')
    with cluster_client_context():
        lazy_save.compute()
    print('..done.')
    print('Readback altitude:', surface_sample(readback))

.. sample of resulting output [click to expand...]
$ python lazy_save_exercise_distributed.py 
air_potential_temperature / (K)     (time: 6; model_level_number: 70; grid_latitude: 100; grid_longitude: 100)
    Dimension coordinates:
        time                             x                      -                  -                    -
        model_level_number               -                      x                  -                    -
        grid_latitude                    -                      -                  x                    -
        grid_longitude                   -                      -                  -                    x
    Auxiliary coordinates:
        level_height                     -                      x                  -                    -
        sigma                            -                      x                  -                    -
        surface_altitude                 -                      -                  x                    x
    Derived coordinates:
        altitude                         -                      x                  x                    x
    Scalar coordinates:
        forecast_period             0.0 hours
    Attributes:
        source                      'Iris test case'

Cube details ...
Core data: dask.array<uniform, shape=(6, 70, 100, 100), dtype=float64, chunksize=(1, 70, 100, 100), chunktype=numpy.ndarray>
Alt coord points: dask.array<uniform, shape=(100, 100), dtype=float64, chunksize=(1, 100), chunktype=numpy.ndarray>
Alt coord points actual sample values [371.0184907  439.52463164 466.46212351 307.15823929 345.207617
 324.24650883 278.69095522 259.06367203 270.0846977  438.60792611]

Lazy save..
.. done.
result =  Delayed('postsave_remove_lockfile-4854d43b-9370-4f9b-a87e-061b2c33c1ca')
Readback altitude: [-- -- -- -- -- -- -- -- -- --]

Now complete the lazy save..
#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -n 1
#SBATCH --cpus-per-task=8
#SBATCH --mem=24G
#SBATCH -t 00:30:00

/scratch/itpp/esmvaltool_parallel_save_files/envs/iris_djq/bin/python -m distributed.cli.dask_worker tcp://10.152.48.140:35915 --nthreads 2 --nworkers 4 --memory-limit 6.00GiB --name dummy-name --nanny --death-timeout 60 --local-directory /scratch/itpp/esmvaltool_parallel_save_files/temp

http://10.152.48.140:8787/status
Exit cluster context?
..done.
Readback altitude: [371.0184907  439.52463164 466.46212351 307.15823929 345.207617
 324.24650883 278.69095522 259.06367203 270.0846977  438.60792611]

@pp-mo
Copy link
Member Author

pp-mo commented Oct 23, 2022

Status summary

My experience has been that the problems with 'distributed' mostly behaved just the same with a local 'processes' scheduler.
So that could be useful for testing.

The existing failing tests here appear due to some mock-ist testing, whereby a Mock is passed as a datapath, which can't be converted to an absolute filepath. So, the new code breaks that, but that should be easily fix-able.

This whole idea is still somewhat experimental -- work to do, not least add some kind of testing.
I would like some feedback on whether this approach will work in practice, before going further.

@pp-mo
Copy link
Member Author

pp-mo commented Oct 23, 2022

@pp-mo We could also offer a similar pattern for parallel loading, akin to xarray.open_mfdataset(parallel=True), returning delayed futures thinking

Some thoughts from offline discussion with @bjlittle ...

This is certainly possible, but it's not nearly so crucial to have specialist support IMHO, because ...
Since loading does not involve dask "compute", loading operations can be parallelised simply by "@delayed" on a loading routine like "def get_data(filepath): ...".
This would certainly fit well into "Add user advice on Dask "best practices" though. We already have an example like this in our internal "Dask Best Practices" documentation.

It's also unfortunate, but I think unavoidable, that such a "delayed load" cannot return a delayed per-input-cube, since that would involve most of the loading process at that point (to know how many cubes will result). Instead, it can really only return a CubeList, whose content wouldn't generally be guaranteed in advance.
However a slightly more structured approach, that might often work, would be to have a set of delayed load calls, where each calls load_cube + returns a single cube, e.g. from different files or result-names : then you can have advance knowledge of the results + can 'compute' a specific result cube on demand.

Also, of course, the large "cube.data" and aux-coords-like arrays would still remain lazy on initial load, so the whole thing doesn't necessarily give any more efficient loading than the normal way.

is streamed directly into file variables during the save operation.
If False, the file is created as normal, but computation and streaming of
any lazy array content is instead deferred to :class:`dask.delayed` objects,
which are held in a list in the saver 'delayed_writes' property.
Copy link
Member Author

@pp-mo pp-mo Oct 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit can probably be treated as internal/private detail now.
Probably better to make .deferred_writes private, and ._deferred_save() public.

@@ -2509,6 +2583,39 @@ def store(data, cf_var, fill_value):
)
warnings.warn(msg.format(cf_var.name, fill_value))

def _deferred_save(self):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth making public?
Almost certainly better named 'delayed' than 'deferred' (!)

@bouweandela
Copy link
Member

@bjlittle @bouweandela can you try this again + report back ?

Thanks for all the good work @pp-mo! I tried this using a dask_jobqueue.SLURMCluster today and it seems to work now. Are the .lock files needed though? I don't see those appearing when doing the same thing using xarray's lazy save function.

@pp-mo
Copy link
Member Author

pp-mo commented Oct 26, 2022

@bouweandela I tried this using a dask_jobqueue.SLURMCluster today and it seems to work now.

That is great news, thanks for testing !


@bouweandela Are the .lock files needed though? I don't see those appearing when doing the same thing using xarray's lazy save function.

Well, it's just a way of solving the problem. And I think a simple one 👍.
I wasn't quite sure how to implement a lock that was genuinely the same for all workers in a cluster, so I knew that doing it this way with the file-system should be a fairly general approach.
I did try to work out what xarray is using, but there are a lot of unfamiliar code layers + it's slightly beyond me (!).
I think it is probably using different approaches for different dask schedulers, and in this case a distributed.DistributedLock, but I might still not be looking in quite the right place for this.

N.B. the ".lock" files should be removing themselves.
Is that not working in your usage?

@bouweandela
Copy link
Member

bouweandela commented Oct 27, 2022

The .lock files are indeed removing themselves, but I have some doubts about this approach:

  • I suspect it can lead to race conditions if not all machines on the cluster have the same view of the (distributed) filesystem at the same time
  • What happens if your script is killed halfway through and you try to rerun it with some of those lock files still around from the previous run? Will it hang indefinitely and/or do you need to clean them up manually before? I'm not sure this will give a good user experience

@bouweandela
Copy link
Member

I did try to work out what xarray is using, but there are a lot of unfamiliar code layers + it's slightly beyond me (!).
I think it is probably using different approaches for different dask schedulers, and in this case a distributed.DistributedLock, but I might still not be looking in quite the right place for this.

I would be happy to have a look at the code myself and see if we can figure it out together. Would that help? I will probably have time for that next week.

@pp-mo
Copy link
Member Author

pp-mo commented Oct 27, 2022

The .lock files are indeed removing themselves, but I have some doubts about this approach:

  • I suspect it can lead to race conditions if not all machines on the cluster have the same view of the (distributed) filesystem at the same time
  • What happens if your script is killed halfway through and you try to rerun it with some of those lock files still around from the previous run? Will it hang indefinitely and/or do you need to clean them up manually before? I'm not sure this will give a good user experience

Re: "if your script is killed halfway through"
In Linux terms, if triggered as a normal SIGINT, this ought to exit in a controlled fashion, removing locks via the "finally" actions.
I guess SIGKILL would defeat that, but that is usually reserved as an emergency action.

Re: race conditions
I'm not convinced this is a real problem : I would hope that using a common file-system to control would be the most reliable cross-platform way to achieve co-ordination between processes on a cluster -- that's the whole point of 'filelock'.

I'm not saying there is not a better way, but I was more concerned about the performance/efficiency of the solution.
As far as I can see though, if we need dask workers in a cluster to share the same lock, the only easy way to do that --apart from the file-system-- is to make the locks used specific to the dask scheduler type, as xarray is doing.

So from a simplicity + generality PoV, 'filelock' is still winning for me !
An alternative could be, a non-dask solution to locks using message passing -- since distributed workers are in different memory spaces.
So far, I don't know what we'd use for that, or what other dependencies it might require.

@pp-mo
Copy link
Member Author

pp-mo commented Oct 27, 2022

An alternative could be, a non-dask solution to locks using message passing -- since distributed workers are in different memory spaces. So far, I don't know what we'd use for that, or what other dependencies it might require.

P.S. a little searching turns up 'sherlock', which implements a generic multiprocessing-like interface to various solutions, including 'Redis' and 'filelock'. Well, I have at least heard of those ones.

To be clear : I don't really like to include distributed-specific code, but maybe there are some good benefits?

@bouweandela
Copy link
Member

bouweandela commented Nov 21, 2022

Hi @pp-mo,

My apologies for being slow to get back to this. I promise I will look into the locking issue in more detail together with my colleague @fnattino and get back to you on the topic.

Meanwhile, I was trying to integrate this into the ESMValCore and ran into an issue:

>>> import iris.cube
>>> import numpy as np
>>> cube = iris.cube.Cube(np.array([1]))
>>> iris.save(cube, target='tmp.nc', compute=False)
[]
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/bandela/src/scitools/iris/lib/iris/io/__init__.py", line 447, in save
    result = saver(source, target, **kwargs)
  File "/home/bandela/src/scitools/iris/lib/iris/fileformats/netcdf/saver.py", line 2917, in save
    result = sman._deferred_save()
  File "/home/bandela/src/scitools/iris/lib/iris/fileformats/netcdf/saver.py", line 2603, in _deferred_save
    sources, targets = zip(*self.deferred_writes)
ValueError: not enough values to unpack (expected 2, got 0)

it looks like the code in this branch does not work if the data is not lazy. I'm not sure what it is supposed to do if the data is not lazy, but a clear error message would be nice. Do you have ideas on this?

bouweandela added a commit to ESMValGroup/ESMValCore that referenced this pull request Nov 21, 2022
@pp-mo
Copy link
Member Author

pp-mo commented Nov 21, 2022

it looks like the code in this branch does not work if the data is not lazy. I'm not sure what it is supposed to do if the data is not lazy, but a clear error message would be nice. Do you have ideas on this?

I think if we are requested to do a deferred save, then we must do one even if the data is already in-memory.

By my reading, these 2 lines are making the choice in the wrong order.
So, the saving choice should be ..

if lazy-save enabled:
  # lazy save is *always* deferred
  store = "lazy-store"
elif data is lazy:
  store = "streamed-store"
else:  # non-lazy, and data is real
  store = "direct-store"

lazy-store
streamed-store
direct-store

@pp-mo
Copy link
Member Author

pp-mo commented Nov 21, 2022

WARNING:
it looks like the solutions being investigated to fix the netcdf1.6.1 thread safety problems will also be affecting this code, e.g. here.

So, expect more changes to come here.

@pp-mo
Copy link
Member Author

pp-mo commented Nov 22, 2022

@bouweandela apologies for being slow to get back to this

P.S. please don't worry about the timeliness.
We are currently busy doing other things, and progress here on longterm dev issues is pretty slow !! 🐌

@fnattino
Copy link
Contributor

fnattino commented Dec 9, 2022

Hello @pp-mo, as @bouweandela mentioned I have been running some tests with the Iris lazy saving together with ESMValCore. Thanks for all the nice work, it indeed seems to work well with ESMValCore@dask-distributed.

Just a couple of comments:

  • The synchronisation of the parallel writing via filelock seems to rely on the target .lock file being locked at the file-system level. This feature might not been supported from all file-systems (for instance in Lustre this needs to be specifically enabled when mounting client nodes). Also considering that threading is the default Dask scheduler for Dask arrays, would it make sense to have the threading.Lock as default locking scheme here?
  • One could leave the possibility to pass to Iris a different type of lock to customize it on the basis e.g. of the scheduler employed. One could do:
    lazy_save = iris.save(cube, 'tmp.nc', compute=False)
    lazy_save.compute()  # use the default multithreading scheduler and threading.Lock() 
    or:
    from dask.distributed import Client, Lock
    with Client() as client:
      lazy_save = iris.save(cube, 'tmp.nc', compute=False, lock=Lock())
      lazy_save.compute()  # use the distributed scheduler and the user-provided lock
    This would be similar to how this is handled in rioxarray. ESMValCore could set up the distributed lock outside Iris, so Iris would remain distributed-free.
  • I think that the additional lock passed on to dask.array.store is not needed, since the synchronization of the parallel writing is already taken care of by the DeferredSaveWrapper (see comment). I think this is also how Xarray handles it, with the lock for parallel writing being set at the datastore level: https://github.com/pydata/xarray/blob/6e77f5e8942206b3e0ab08c3621ade1499d8235b/xarray/backends/netCDF4_.py#L68
    while the ArrayWriter, which ultimately calls dask.array.store, does not have a lock set:
    https://github.com/pydata/xarray/blob/6e77f5e8942206b3e0ab08c3621ade1499d8235b/xarray/backends/common.py#L171
  • I have bumped into the same issue as @bouweandela with real (not lazy) data. In addition to your fix in the if-statement, one would also need to make sure that the sources are Dask arrays when calling dask.array.store (see comment).

Comment on lines +2594 to +2599
# Create a lock to satisfy the da.store call.
# We need a serialisable lock for scheduling with processes or distributed.
# See : https://github.com/dask/distributed/issues/780
# However, this does *not* imply safe access for file writing in parallel.
# For that, DeferredSaveWrapper uses a filelock as well.
lock = SerializableLock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you mention, the synchronisation of the parallel writing should be taken care of by the DeferredSaveWrapper, so one could set lock=False here.

Copy link
Member Author

@pp-mo pp-mo Dec 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the hint. The truth is, we are (I am) still learning here.
And also, it looks like the #5016 -> #5095 issues will affect the eventual implementation of this -- this code is definitely only a draft "Proof of Concept" solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Just thought I would write this down, since it also took me a while to figure out from Xarray..


# Create a single delayed da.store operation to complete the file.
sources, targets = zip(*self.deferred_writes)
result = da.store(sources, targets, compute=False, lock=lock)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the sources can be numpy array here, so one could do something like:

Suggested change
result = da.store(sources, targets, compute=False, lock=lock)
result = da.store([da.asarray(s) for s in sources], targets, compute=False, lock=lock)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above
I think this is the same issue ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's also related to having real data. One needs to make sure these are converted to Dask arrays.

@pp-mo
Copy link
Member Author

pp-mo commented Dec 12, 2022

  • The synchronisation of the parallel writing via filelock seems to rely on the target .lock file being locked at the file-system level.

IIUC, "filelock" is simply a way of ensuring a mutex that is shared across possibly multiple systems or CPUS. So it uses the common filesystem as a communication mechanism between them, and connects any that use that same file path.
Which is exactly what the dask "distributed lock" does.
So IIUC, filelock uses the files to support atomic operations e.g. check whether someone is assigned access / aquire access for yourself. And what this requires is simply that only one client can have a file open for writing. Which I think is enforced by standard unix filesystem access.

@pp-mo
Copy link
Member Author

pp-mo commented Dec 12, 2022

  • One could leave the possibility to pass to Iris a different type of lock to customize it on the basis e.g. of the scheduler employed. One could do:

Yes, I think this would be useful.

  • I think that the additional lock passed on to dask.array.store is not needed

Yes, I agree. Still learning here !

  • I have bumped into the same issue as @bouweandela with real (not lazy) data. In addition to your fix in the if-statement, one would also need to make sure that the sources are Dask arrays when calling dask.array.store (see comment).

Yes also agreed !
We will also address this when we get to a "real proposed implementation".

@pp-mo
Copy link
Member Author

pp-mo commented Dec 13, 2022

BTW my comment is a bit hidden above, but when I said ..

it looks like the #5016 -> #5095 issues will affect the eventual implementation of this -- this code is definitely only a draft "Proof of Concept" solution.

I really meant that, since it seems we will be adding file-system locking throughout, IMHO we will almost certainly follow xarray in using different locks for different schedulers. This will introduce more coupling and complexity, as I was complaining above, but it will also replace the use of 'filelock' which @bouweandela was concerned about.
I'm also hopeful that Iris will then work with local process-based or distributed Dask schedulers, which it currently does not (which surprised me).

@fnattino
Copy link
Contributor

Just wanted to mention that while Xarray seems to support threading, multiprocessing and distributed as schedulers for writing NetCDFs, the multiprocessing implementation based on the multiprocessing.Lock seems not to work: pydata/xarray#3781 (it's an old issue, but I still get the same error). Maybe just having the multithreading implementation (default) and distributed supported would already cover many (most?) use cases?

@bouweandela
Copy link
Member

bouweandela commented Dec 13, 2022

Maybe just having the multithreading implementation (default) and distributed supported would already cover many (most?) use cases?

That sounds good to me. If someone really wanted to use multiple processes on a single machine, they could just use the distributed LocalCluster to set that up.

@trexfeathers trexfeathers linked an issue Feb 20, 2023 that may be closed by this pull request
@trexfeathers trexfeathers self-assigned this Feb 22, 2023
@pp-mo pp-mo mentioned this pull request Mar 10, 2023
7 tasks
@pp-mo
Copy link
Member Author

pp-mo commented Mar 15, 2023

@fnattino Maybe just having the multithreading implementation (default) and distributed supported would already cover many (most?) use cases?

@bouweandela That sounds good to me. If someone really wanted to use multiple processes on a single machine, they could just use the distributed LocalCluster to set that up.

Just FYI I have looked into this within work for #5191, and it is not that simple.
I could find no good way of fixing it using Dask features : It seems not have an equivalent of the distributed.Lock for a local-processes scheduler. I did I try using filelock for this, but the worker processes hang for reasons I that weren't clear (it might be worth just trying that again when code is more mature).
According to https://stackoverflow.com/a/25558333/2615050, it may be possible to use a multiprocessing.Manager for this. I think that can also work for processes that were already in existence when the Manager was created, but still a little unsure.

So I'm still considering this an "optional extra" for purposes of #5191

@pp-mo
Copy link
Member Author

pp-mo commented Mar 15, 2023

Closing this now, in favour of #5191
The other outstanding issues from here I have copied forward to there.
Which were (I think):

  • support for local-process scheduler See above comment : shelved for now, on list of things-to-consider
  • problems with saving real data (see @fnattino comment) : Already fixed in new implementation, I believe
  • option for user to pass in a lock (also @fnattino comment) : forwarded to the checklist of work-to-do

@pp-mo pp-mo closed this Mar 15, 2023
@pp-mo pp-mo deleted the lazy_save branch April 28, 2023 13:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

Support lazy saving
5 participants