From 39d309445749e4e3b994cb13d57833550521c85e Mon Sep 17 00:00:00 2001 From: Patrick Peglar Date: Fri, 21 Oct 2022 01:56:33 +0100 Subject: [PATCH 1/4] Basic functional lazy saving. --- lib/iris/fileformats/netcdf/saver.py | 116 ++++++++++++++++++++++++--- lib/iris/io/__init__.py | 10 ++- 2 files changed, 113 insertions(+), 13 deletions(-) diff --git a/lib/iris/fileformats/netcdf/saver.py b/lib/iris/fileformats/netcdf/saver.py index 650c5e3338..327db3d4be 100644 --- a/lib/iris/fileformats/netcdf/saver.py +++ b/lib/iris/fileformats/netcdf/saver.py @@ -23,6 +23,7 @@ import warnings import cf_units +import dask import dask.array as da import netCDF4 import numpy as np @@ -490,10 +491,51 @@ def __setitem__(self, keys, arr): MESH_ELEMENTS = ("node", "edge", "face") +class DeferredSaveWrapper: + """ + An object which mimics the data access of a netCDF4.Variable, and can be written to. + It encapsulates the netcdf file and variable which are actually to be written to. + This opens the file each time, to enable writing the data chunk, then closes it. + TODO: could be improved with a caching scheme, but this just about works. + """ + + def __init__(self, filepath: str, cf_var: netCDF4.Variable): + # Grab useful properties of the variable, including the identifying 'name'. + self.path = filepath + for key in ("shape", "dtype", "ndim", "name"): + setattr(self, key, getattr(cf_var, key)) + + def __setitem__(self, keys, array_data): + # Write to the variable. + # Re-open the file for writing. + dataset = netCDF4.Dataset(self.path, "r+") + try: + var = dataset.variables[self.name] + var[keys] = array_data + finally: + dataset.close() + + def __repr__(self): + fmt = ( + "<{self.__class__.__name__} shape={self.shape}" + " dtype={self.dtype!r} path={self.path!r}" + " name={self.name!r}>" + ) + return fmt.format(self=self) + + +@dask.delayed +def combined_delayeds(*args): + """A delayed function which simply computes all its arguments.""" + # Dask computes the lazy args before passing them here. + # So job done -- we don't need to do anything with them. + pass + + class Saver: """A manager for saving netcdf files.""" - def __init__(self, filename, netcdf_format): + def __init__(self, filename, netcdf_format, compute=True): """ A manager for saving netcdf files. @@ -506,6 +548,15 @@ def __init__(self, filename, netcdf_format): Underlying netCDF file format, one of 'NETCDF4', 'NETCDF4_CLASSIC', 'NETCDF3_CLASSIC' or 'NETCDF3_64BIT'. Default is 'NETCDF4' format. + * compute (bool): + If True, the Saver performs normal 'synchronous' data writes, where data + is streamed directly into file variables during the save operation. + If False, the file is created as normal, but computation and streaming of + any lazy array content is instead deferred to :class:`dask.delayed` objects, + which are held in a list in the saver 'delayed_writes' property. + The relavant file variables are created empty, and the write can + subsequently be completed by computing the 'save.deferred_writes'. + Returns: None. @@ -542,7 +593,14 @@ def __init__(self, filename, netcdf_format): self._mesh_dims = {} #: A dictionary, mapping formula terms to owner cf variable name self._formula_terms_cache = {} + #: Whether lazy saving. + self.lazy_saves = not compute + #: A list of deferred writes (if lazy saving) + self.deferred_writes = [] + #: Target filepath + self.filepath = filename #: NetCDF dataset + self._dataset = None try: self._dataset = netCDF4.Dataset( filename, mode="w", format=netcdf_format @@ -2442,8 +2500,7 @@ def _increment_name(self, varname): return "{}_{}".format(varname, num) - @staticmethod - def _lazy_stream_data(data, fill_value, fill_warn, cf_var): + def _lazy_stream_data(self, data, fill_value, fill_warn, cf_var): if hasattr(data, "shape") and data.shape == (1,) + cf_var.shape: # (Don't do this check for string data). # Reduce dimensionality where the data array has an extra dimension @@ -2453,13 +2510,36 @@ def _lazy_stream_data(data, fill_value, fill_warn, cf_var): data = data.squeeze(axis=0) if is_lazy_data(data): + if self.lazy_saves: + # deferred lazy streaming + def store(data, cf_var, fill_value): + # Create a data-writeable object that we can stream into, which + # encapsulates the file to be opened + variable to be written. + writeable_var_wrapper = DeferredSaveWrapper( + self.filepath, cf_var + ) + # Add a delayed save to our 'deferred_writes' list. + self.deferred_writes.append( + da.store( + [data], [writeable_var_wrapper], compute=False + ) + ) + # NOTE: in this case, no checking of fill-value violations so just + # return dummy values for this. + # TODO: just for now -- can probably make this work later + is_masked, contains_value = False, False + return is_masked, contains_value - def store(data, cf_var, fill_value): - # Store lazy data and check whether it is masked and contains - # the fill value - target = _FillValueMaskCheckAndStoreTarget(cf_var, fill_value) - da.store([data], [target]) - return target.is_masked, target.contains_value + else: + # Immediate streaming store : check mask+fill as we go. + def store(data, cf_var, fill_value): + # Store lazy data and check whether it is masked and contains + # the fill value + target = _FillValueMaskCheckAndStoreTarget( + cf_var, fill_value + ) + da.store([data], [target]) + return target.is_masked, target.contains_value else: @@ -2526,6 +2606,7 @@ def save( least_significant_digit=None, packing=None, fill_value=None, + compute=True, ): """ Save cube(s) to a netCDF file, given the cube and the filename. @@ -2648,6 +2729,14 @@ def save( `:class:`iris.cube.CubeList`, or a single element, and each element of this argument will be applied to each cube separately. + * compute (bool): + When False, create the output file but defer writing any lazy array content to + its variables, such as (lazy) data and aux-coords points and bounds. + Instead return a class:`dask.delayed` which, when computed, will compute all + the lazy content and stream it to complete the file. + Several such data saves can be performed in parallel, by passing a list of them + into a :func:`dask.compute` call. + Returns: None. @@ -2748,7 +2837,7 @@ def is_valid_packspec(p): raise ValueError(msg) # Initialise Manager for saving - with Saver(filename, netcdf_format) as sman: + with Saver(filename, netcdf_format, compute=compute) as sman: # Iterate through the cubelist. for cube, packspec, fill_value in zip(cubes, packspecs, fill_values): sman.write( @@ -2793,3 +2882,10 @@ def is_valid_packspec(p): # Add conventions attribute. sman.update_global_attributes(Conventions=conventions) + + if compute: + result = None + else: + # For lazy save, return a single 'delayed' representing all lazy writes. + result = combined_delayeds(sman.deferred_writes) + return result diff --git a/lib/iris/io/__init__.py b/lib/iris/io/__init__.py index 4659f70ae3..ccda860437 100644 --- a/lib/iris/io/__init__.py +++ b/lib/iris/io/__init__.py @@ -444,7 +444,7 @@ def save(source, target, saver=None, **kwargs): # Single cube? if isinstance(source, Cube): - saver(source, target, **kwargs) + result = saver(source, target, **kwargs) # CubeList or sequence of cubes? elif isinstance(source, CubeList) or ( @@ -463,13 +463,17 @@ def save(source, target, saver=None, **kwargs): # Force append=True for the tail cubes. Don't modify the incoming # kwargs. kwargs = kwargs.copy() + result = [] for i, cube in enumerate(source): if i != 0: kwargs["append"] = True - saver(cube, target, **kwargs) + result.append(saver(cube, target, **kwargs)) + # Netcdf saver. else: - saver(source, target, **kwargs) + result = saver(source, target, **kwargs) else: raise ValueError("Cannot save; non Cube found in source") + + return result From cac380ab56a1b179d68f8152b705822e15b51b0d Mon Sep 17 00:00:00 2001 From: Patrick Peglar Date: Fri, 21 Oct 2022 02:16:14 +0100 Subject: [PATCH 2/4] Simplify function signature which upsets Sphinx. --- lib/iris/fileformats/netcdf/saver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/iris/fileformats/netcdf/saver.py b/lib/iris/fileformats/netcdf/saver.py index 327db3d4be..7e3ad5d6a2 100644 --- a/lib/iris/fileformats/netcdf/saver.py +++ b/lib/iris/fileformats/netcdf/saver.py @@ -499,7 +499,7 @@ class DeferredSaveWrapper: TODO: could be improved with a caching scheme, but this just about works. """ - def __init__(self, filepath: str, cf_var: netCDF4.Variable): + def __init__(self, filepath, cf_var): # Grab useful properties of the variable, including the identifying 'name'. self.path = filepath for key in ("shape", "dtype", "ndim", "name"): From 8f25a20e9c94d8447be840b975fd71f4cde8d6f2 Mon Sep 17 00:00:00 2001 From: Patrick Peglar Date: Fri, 21 Oct 2022 02:28:17 +0100 Subject: [PATCH 3/4] Non-lazy saves return nothing. --- lib/iris/io/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/iris/io/__init__.py b/lib/iris/io/__init__.py index ccda860437..f79252ebc7 100644 --- a/lib/iris/io/__init__.py +++ b/lib/iris/io/__init__.py @@ -467,8 +467,9 @@ def save(source, target, saver=None, **kwargs): for i, cube in enumerate(source): if i != 0: kwargs["append"] = True - result.append(saver(cube, target, **kwargs)) + saver(cube, target, **kwargs) + result = None # Netcdf saver. else: result = saver(source, target, **kwargs) From ed654db19a32d596dd5996cb7ca7e7e9ba51cf5b Mon Sep 17 00:00:00 2001 From: Patrick Peglar Date: Sun, 23 Oct 2022 15:44:58 +0100 Subject: [PATCH 4/4] Now fixed to enable use with process/distributed scheduling. --- lib/iris/fileformats/netcdf/saver.py | 99 ++++++++++++++++++---------- 1 file changed, 63 insertions(+), 36 deletions(-) diff --git a/lib/iris/fileformats/netcdf/saver.py b/lib/iris/fileformats/netcdf/saver.py index 7e3ad5d6a2..01b095a6d1 100644 --- a/lib/iris/fileformats/netcdf/saver.py +++ b/lib/iris/fileformats/netcdf/saver.py @@ -25,6 +25,8 @@ import cf_units import dask import dask.array as da +from dask.utils import SerializableLock +import filelock import netCDF4 import numpy as np import numpy.ma as ma @@ -499,37 +501,31 @@ class DeferredSaveWrapper: TODO: could be improved with a caching scheme, but this just about works. """ - def __init__(self, filepath, cf_var): - # Grab useful properties of the variable, including the identifying 'name'. + def __init__(self, filepath, cf_var, lockfile_path): self.path = filepath - for key in ("shape", "dtype", "ndim", "name"): - setattr(self, key, getattr(cf_var, key)) + self.varname = cf_var.name + self.lockfile_path = lockfile_path def __setitem__(self, keys, array_data): # Write to the variable. - # Re-open the file for writing. - dataset = netCDF4.Dataset(self.path, "r+") + # First acquire a file-specific lock + # Importantly, in working via the file-system, this is common to all workers, + # even when using processes or distributed. + lock = filelock.FileLock(self.lockfile_path) + lock.acquire() + # Now re-open the file for writing + write to the specific file variable. + dataset = None try: - var = dataset.variables[self.name] + dataset = netCDF4.Dataset(self.path, "r+") + var = dataset.variables[self.varname] var[keys] = array_data finally: - dataset.close() + if dataset: + dataset.close() + lock.release() def __repr__(self): - fmt = ( - "<{self.__class__.__name__} shape={self.shape}" - " dtype={self.dtype!r} path={self.path!r}" - " name={self.name!r}>" - ) - return fmt.format(self=self) - - -@dask.delayed -def combined_delayeds(*args): - """A delayed function which simply computes all its arguments.""" - # Dask computes the lazy args before passing them here. - # So job done -- we don't need to do anything with them. - pass + return f"<{self.__class__.__name__} path={self.path!r} var={self.varname!r}>" class Saver: @@ -595,23 +591,25 @@ def __init__(self, filename, netcdf_format, compute=True): self._formula_terms_cache = {} #: Whether lazy saving. self.lazy_saves = not compute - #: A list of deferred writes (if lazy saving) + #: A list of deferred writes for lazy saving : each is a (source, target) pair self.deferred_writes = [] #: Target filepath - self.filepath = filename + self.filepath = os.path.abspath(filename) + #: Target lockfile path + self._lockfile_path = self.filepath + ".lock" #: NetCDF dataset self._dataset = None try: self._dataset = netCDF4.Dataset( - filename, mode="w", format=netcdf_format + self.filepath, mode="w", format=netcdf_format ) except RuntimeError: - dir_name = os.path.dirname(filename) + dir_name = os.path.dirname(self.filepath) if not os.path.isdir(dir_name): msg = "No such file or directory: {}".format(dir_name) raise IOError(msg) if not os.access(dir_name, os.R_OK | os.W_OK): - msg = "Permission denied: {}".format(filename) + msg = "Permission denied: {}".format(self.filepath) raise IOError(msg) else: raise @@ -2516,14 +2514,10 @@ def store(data, cf_var, fill_value): # Create a data-writeable object that we can stream into, which # encapsulates the file to be opened + variable to be written. writeable_var_wrapper = DeferredSaveWrapper( - self.filepath, cf_var - ) - # Add a delayed save to our 'deferred_writes' list. - self.deferred_writes.append( - da.store( - [data], [writeable_var_wrapper], compute=False - ) + self.filepath, cf_var, self._lockfile_path ) + # Add to the list of deferred writes, used in _deferred_save(). + self.deferred_writes.append((data, writeable_var_wrapper)) # NOTE: in this case, no checking of fill-value violations so just # return dummy values for this. # TODO: just for now -- can probably make this work later @@ -2589,6 +2583,39 @@ def store(data, cf_var, fill_value): ) warnings.warn(msg.format(cf_var.name, fill_value)) + def _deferred_save(self): + """ + Create a 'delayed' to trigger file completion for lazy saves. + + This contains all the deferred writes, which complete the file by filling out + the data of variables initially created empty. + + """ + # Create a lock to satisfy the da.store call. + # We need a serialisable lock for scheduling with processes or distributed. + # See : https://github.com/dask/distributed/issues/780 + # However, this does *not* imply safe access for file writing in parallel. + # For that, DeferredSaveWrapper uses a filelock as well. + lock = SerializableLock() + + # Create a single delayed da.store operation to complete the file. + sources, targets = zip(*self.deferred_writes) + result = da.store(sources, targets, compute=False, lock=lock) + + # Wrap that in an extra operation that follows it by deleting the lockfile. + @dask.delayed + def postsave_remove_lockfile(store_op, lock_path): + if os.path.exists(lock_path): + try: + os.unlink(lock_path) + except Exception as e: + msg = f'Could not remove lockfile "{lock_path}". Error:\n{e}' + raise Exception(msg) + + result = postsave_remove_lockfile(result, self._lockfile_path) + + return result + def save( cube, @@ -2886,6 +2913,6 @@ def is_valid_packspec(p): if compute: result = None else: - # For lazy save, return a single 'delayed' representing all lazy writes. - result = combined_delayeds(sman.deferred_writes) + result = sman._deferred_save() + return result