From be991f8741b2515f8d55960ba5b4aecca1c0f5db Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Mon, 21 Nov 2022 16:35:35 +0100 Subject: [PATCH] Use iris (https://github.com/SciTools/iris/pull/5031) for saving --- esmvalcore/_task.py | 2 +- esmvalcore/preprocessor/__init__.py | 6 ++++-- esmvalcore/preprocessor/_io.py | 28 ++++++++++++---------------- setup.py | 1 - 4 files changed, 17 insertions(+), 20 deletions(-) diff --git a/esmvalcore/_task.py b/esmvalcore/_task.py index fa3c9627d0..fe8a2859cb 100644 --- a/esmvalcore/_task.py +++ b/esmvalcore/_task.py @@ -769,7 +769,7 @@ def _run_dask(self, cfg) -> None: logger.info(f"Skipping task {task.name}") for future in dask.distributed.as_completed(futures_to_files): filename = futures_to_files[future] - logger.info(f"Wrote {filename}") + logger.info(f"Wrote (delayed) {filename}") def _run_sequential(self) -> None: """Run tasks sequentially.""" diff --git a/esmvalcore/preprocessor/__init__.py b/esmvalcore/preprocessor/__init__.py index 10c286c18c..d3e9280426 100644 --- a/esmvalcore/preprocessor/__init__.py +++ b/esmvalcore/preprocessor/__init__.py @@ -4,6 +4,7 @@ import logging from pprint import pformat +from dask.delayed import Delayed from iris.cube import Cube from .._provenance import TrackedFile @@ -458,7 +459,7 @@ def save(self): self._cubes, **self.settings['save'], ) - if not self.settings['save'].get('compute', True): + if isinstance(result, Delayed): self.delayed = result self.files = [self.settings['save']['filename']] self.files = preprocess(self.files, @@ -621,7 +622,8 @@ def _run(self, _): for product in self.products: product.close() - self.delayeds[product.filename] = product.delayed + if product.delayed is not None: + self.delayeds[product.filename] = product.delayed metadata_files = write_metadata(self.products, self.write_ncl_interface) return metadata_files diff --git a/esmvalcore/preprocessor/_io.py b/esmvalcore/preprocessor/_io.py index d624c1ae4c..058eaa54c8 100644 --- a/esmvalcore/preprocessor/_io.py +++ b/esmvalcore/preprocessor/_io.py @@ -10,7 +10,6 @@ import iris.aux_factory import iris.exceptions import numpy as np -import xarray import yaml from cf_units import suppress_errors @@ -285,8 +284,8 @@ def save(cubes, Returns ------- - str - filename + str or dask.delayed.Delayed + filename or delayed Raises ------ @@ -341,20 +340,17 @@ def save(cubes, cube.var_name = alias cube = cubes[0] - if compute is True: - iris.save(cube, **kwargs) + if not compute and not cube.has_lazy_data(): + # What should happen if the data is not lazy and we're asked for a + # lazy save? + # https://github.com/SciTools/iris/pull/5031#issuecomment-1322166230 + compute = True + + result = iris.save(cube, compute=compute, **kwargs) + if compute: + logger.info("Wrote (immediate) %s", filename) return filename - - data_array = xarray.DataArray.from_iris(cube) - kwargs.pop('target') - kwargs['_FillValue'] = kwargs.pop('fill_value') - encoding = {cube.var_name: kwargs} - delayed = data_array.to_netcdf( - filename, - encoding=encoding, - compute=False, - ) - return delayed + return result def _get_debug_filename(filename, step): diff --git a/setup.py b/setup.py index 62f338c8c4..24011cf256 100755 --- a/setup.py +++ b/setup.py @@ -54,7 +54,6 @@ 'scitools-iris>=3.2.1', 'shapely[vectorized]', 'stratify', - 'xarray', 'yamale', ], # Test dependencies