From ac6d1bf687526489f382f062b8dbc7712efb2bb9 Mon Sep 17 00:00:00 2001 From: Patrick Peglar Date: Thu, 9 Jun 2022 17:03:16 +0100 Subject: [PATCH] Split iris.fileformats.netcdf into separate load+save submodules. --- lib/iris/fileformats/netcdf/__init__.py | 41 ++ lib/iris/fileformats/netcdf/loader.py | 596 ++++++++++++++++++ .../{netcdf.py => netcdf/saver.py} | 578 +---------------- 3 files changed, 645 insertions(+), 570 deletions(-) create mode 100644 lib/iris/fileformats/netcdf/__init__.py create mode 100644 lib/iris/fileformats/netcdf/loader.py rename lib/iris/fileformats/{netcdf.py => netcdf/saver.py} (84%) diff --git a/lib/iris/fileformats/netcdf/__init__.py b/lib/iris/fileformats/netcdf/__init__.py new file mode 100644 index 00000000000..19a8b37149a --- /dev/null +++ b/lib/iris/fileformats/netcdf/__init__.py @@ -0,0 +1,41 @@ +# +# This file is part of Iris and is released under the LGPL license. +# See COPYING and COPYING.LESSER in the root of the repository for full +# licensing details. +""" +Module to support the loading of a NetCDF file into an Iris cube. + +See also: `netCDF4 python `_ + +Also refer to document 'NetCDF Climate and Forecast (CF) Metadata Conventions'. + +""" +from .loader import DEBUG, NetCDFDataProxy, load_cubes, logger +from .saver import ( + CF_CONVENTIONS_VERSION, + MESH_ELEMENTS, + SPATIO_TEMPORAL_AXES, + CFNameCoordMap, + Saver, + UnknownCellMethodWarning, + parse_cell_methods, + save, +) + +# Export all public elements from the loader and saver submodules. +# NOTE: the separation is purely for neatness and developer convenience; from +# the user point of view, it is still all one module. +__all__ = ( + CF_CONVENTIONS_VERSION, + CFNameCoordMap, + DEBUG, + MESH_ELEMENTS, + Saver, + SPATIO_TEMPORAL_AXES, + NetCDFDataProxy, + UnknownCellMethodWarning, + load_cubes, + logger, + parse_cell_methods, + save, +) diff --git a/lib/iris/fileformats/netcdf/loader.py b/lib/iris/fileformats/netcdf/loader.py new file mode 100644 index 00000000000..fa3aef0c4eb --- /dev/null +++ b/lib/iris/fileformats/netcdf/loader.py @@ -0,0 +1,596 @@ +# Copyright Iris contributors +# +# This file is part of Iris and is released under the LGPL license. +# See COPYING and COPYING.LESSER in the root of the repository for full +# licensing details. +""" +Module to support the loading of a NetCDF file into an Iris cube. +The "Load parts" of the module :mod:`iris.fileformats.netcdf`. + +See also: `netCDF4 python `_ + +Also refer to document 'NetCDF Climate and Forecast (CF) Metadata Conventions'. + +""" +import warnings + +import netCDF4 +import numpy as np + +from iris._lazy_data import as_lazy_data +from iris.aux_factory import ( + AtmosphereSigmaFactory, + HybridHeightFactory, + HybridPressureFactory, + OceanSFactory, + OceanSg1Factory, + OceanSg2Factory, + OceanSigmaFactory, + OceanSigmaZFactory, +) +import iris.config +import iris.coord_systems +import iris.coords +import iris.exceptions +import iris.fileformats.cf +from iris.fileformats.netcdf.saver import _CF_ATTRS +import iris.io +import iris.util + +# Show actions activation statistics. +DEBUG = False + +# Configure the logger : shared logger for all in 'iris.fileformats.netcdf'. +from .. import __name__ as _parent_module_name + +logger = iris.config.get_logger(_parent_module_name) +del _parent_module_name + + +def _actions_engine(): + # Return an 'actions engine', which provides a pyke-rules-like interface to + # the core cf translation code. + # Deferred import to avoid circularity. + import iris.fileformats._nc_load_rules.engine as nc_actions_engine + + engine = nc_actions_engine.Engine() + return engine + + +class NetCDFDataProxy: + """A reference to the data payload of a single NetCDF file variable.""" + + __slots__ = ("shape", "dtype", "path", "variable_name", "fill_value") + + def __init__(self, shape, dtype, path, variable_name, fill_value): + self.shape = shape + self.dtype = dtype + self.path = path + self.variable_name = variable_name + self.fill_value = fill_value + + @property + def ndim(self): + return len(self.shape) + + def __getitem__(self, keys): + dataset = netCDF4.Dataset(self.path) + try: + variable = dataset.variables[self.variable_name] + # Get the NetCDF variable data and slice. + var = variable[keys] + finally: + dataset.close() + return np.asanyarray(var) + + def __repr__(self): + fmt = ( + "<{self.__class__.__name__} shape={self.shape}" + " dtype={self.dtype!r} path={self.path!r}" + " variable_name={self.variable_name!r}>" + ) + return fmt.format(self=self) + + def __getstate__(self): + return {attr: getattr(self, attr) for attr in self.__slots__} + + def __setstate__(self, state): + for key, value in state.items(): + setattr(self, key, value) + + +def _assert_case_specific_facts(engine, cf, cf_group): + # Initialise a data store for built cube elements. + # This is used to patch element attributes *not* setup by the actions + # process, after the actions code has run. + engine.cube_parts["coordinates"] = [] + engine.cube_parts["cell_measures"] = [] + engine.cube_parts["ancillary_variables"] = [] + + # Assert facts for CF coordinates. + for cf_name in cf_group.coordinates.keys(): + engine.add_case_specific_fact("coordinate", (cf_name,)) + + # Assert facts for CF auxiliary coordinates. + for cf_name in cf_group.auxiliary_coordinates.keys(): + engine.add_case_specific_fact("auxiliary_coordinate", (cf_name,)) + + # Assert facts for CF cell measures. + for cf_name in cf_group.cell_measures.keys(): + engine.add_case_specific_fact("cell_measure", (cf_name,)) + + # Assert facts for CF ancillary variables. + for cf_name in cf_group.ancillary_variables.keys(): + engine.add_case_specific_fact("ancillary_variable", (cf_name,)) + + # Assert facts for CF grid_mappings. + for cf_name in cf_group.grid_mappings.keys(): + engine.add_case_specific_fact("grid_mapping", (cf_name,)) + + # Assert facts for CF labels. + for cf_name in cf_group.labels.keys(): + engine.add_case_specific_fact("label", (cf_name,)) + + # Assert facts for CF formula terms associated with the cf_group + # of the CF data variable. + + # Collect varnames of formula-root variables as we go. + # NOTE: use dictionary keys as an 'OrderedSet' + # - see: https://stackoverflow.com/a/53657523/2615050 + # This is to ensure that we can handle the resulting facts in a definite + # order, as using a 'set' led to indeterminate results. + formula_root = {} + for cf_var in cf.cf_group.formula_terms.values(): + for cf_root, cf_term in cf_var.cf_terms_by_root.items(): + # Only assert this fact if the formula root variable is + # defined in the CF group of the CF data variable. + if cf_root in cf_group: + formula_root[cf_root] = True + engine.add_case_specific_fact( + "formula_term", + (cf_var.cf_name, cf_root, cf_term), + ) + + for cf_root in formula_root.keys(): + engine.add_case_specific_fact("formula_root", (cf_root,)) + + +def _actions_activation_stats(engine, cf_name): + print("-" * 80) + print("CF Data Variable: %r" % cf_name) + + engine.print_stats() + + print("Rules Triggered:") + + for rule in sorted(list(engine.rules_triggered)): + print("\t%s" % rule) + + print("Case Specific Facts:") + kb_facts = engine.get_kb() + + for key in kb_facts.entity_lists.keys(): + for arg in kb_facts.entity_lists[key].case_specific_facts: + print("\t%s%s" % (key, arg)) + + +def _set_attributes(attributes, key, value): + """Set attributes dictionary, converting unicode strings appropriately.""" + + if isinstance(value, str): + try: + attributes[str(key)] = str(value) + except UnicodeEncodeError: + attributes[str(key)] = value + else: + attributes[str(key)] = value + + +def _add_unused_attributes(iris_object, cf_var): + """ + Populate the attributes of a cf element with the "unused" attributes + from the associated CF-netCDF variable. That is, all those that aren't CF + reserved terms. + + """ + + def attribute_predicate(item): + return item[0] not in _CF_ATTRS + + tmpvar = filter(attribute_predicate, cf_var.cf_attrs_unused()) + for attr_name, attr_value in tmpvar: + _set_attributes(iris_object.attributes, attr_name, attr_value) + + +def _get_actual_dtype(cf_var): + # Figure out what the eventual data type will be after any scale/offset + # transforms. + dummy_data = np.zeros(1, dtype=cf_var.dtype) + if hasattr(cf_var, "scale_factor"): + dummy_data = cf_var.scale_factor * dummy_data + if hasattr(cf_var, "add_offset"): + dummy_data = cf_var.add_offset + dummy_data + return dummy_data.dtype + + +def _get_cf_var_data(cf_var, filename): + # Get lazy chunked data out of a cf variable. + dtype = _get_actual_dtype(cf_var) + + # Create cube with deferred data, but no metadata + fill_value = getattr( + cf_var.cf_data, + "_FillValue", + netCDF4.default_fillvals[cf_var.dtype.str[1:]], + ) + proxy = NetCDFDataProxy( + cf_var.shape, dtype, filename, cf_var.cf_name, fill_value + ) + # Get the chunking specified for the variable : this is either a shape, or + # maybe the string "contiguous". + chunks = cf_var.cf_data.chunking() + # In the "contiguous" case, pass chunks=None to 'as_lazy_data'. + if chunks == "contiguous": + chunks = None + return as_lazy_data(proxy, chunks=chunks) + + +class _OrderedAddableList(list): + """ + A custom container object for actions recording. + + Used purely in actions debugging, to accumulate a record of which actions + were activated. + + It replaces a set, so as to preserve the ordering of operations, with + possible repeats, and it also numbers the entries. + + The actions routines invoke an 'add' method, so this effectively replaces + a set.add with a list.append. + + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._n_add = 0 + + def add(self, msg): + self._n_add += 1 + n_add = self._n_add + self.append(f"#{n_add:03d} : {msg}") + + +def _load_cube(engine, cf, cf_var, filename): + from iris.cube import Cube + + """Create the cube associated with the CF-netCDF data variable.""" + data = _get_cf_var_data(cf_var, filename) + cube = Cube(data) + + # Reset the actions engine. + engine.reset() + + # Initialise engine rule processing hooks. + engine.cf_var = cf_var + engine.cube = cube + engine.cube_parts = {} + engine.requires = {} + engine.rules_triggered = _OrderedAddableList() + engine.filename = filename + + # Assert all the case-specific facts. + # This extracts 'facts' specific to this data-variable (aka cube), from + # the info supplied in the CFGroup object. + _assert_case_specific_facts(engine, cf, cf_var.cf_group) + + # Run the actions engine. + # This creates various cube elements and attaches them to the cube. + # It also records various other info on the engine, to be processed later. + engine.activate() + + # Having run the rules, now add the "unused" attributes to each cf element. + def fix_attributes_all_elements(role_name): + elements_and_names = engine.cube_parts.get(role_name, []) + + for iris_object, cf_var_name in elements_and_names: + _add_unused_attributes(iris_object, cf.cf_group[cf_var_name]) + + # Populate the attributes of all coordinates, cell-measures and ancillary-vars. + fix_attributes_all_elements("coordinates") + fix_attributes_all_elements("ancillary_variables") + fix_attributes_all_elements("cell_measures") + + # Also populate attributes of the top-level cube itself. + _add_unused_attributes(cube, cf_var) + + # Work out reference names for all the coords. + names = { + coord.var_name: coord.standard_name or coord.var_name or "unknown" + for coord in cube.coords() + } + + # Add all the cube cell methods. + cube.cell_methods = [ + iris.coords.CellMethod( + method=method.method, + intervals=method.intervals, + comments=method.comments, + coords=[ + names[coord_name] if coord_name in names else coord_name + for coord_name in method.coord_names + ], + ) + for method in cube.cell_methods + ] + + if DEBUG: + # Show activation statistics for this data-var (i.e. cube). + _actions_activation_stats(engine, cf_var.cf_name) + + return cube + + +def _load_aux_factory(engine, cube): + """ + Convert any CF-netCDF dimensionless coordinate to an AuxCoordFactory. + + """ + formula_type = engine.requires.get("formula_type") + if formula_type in [ + "atmosphere_sigma_coordinate", + "atmosphere_hybrid_height_coordinate", + "atmosphere_hybrid_sigma_pressure_coordinate", + "ocean_sigma_z_coordinate", + "ocean_sigma_coordinate", + "ocean_s_coordinate", + "ocean_s_coordinate_g1", + "ocean_s_coordinate_g2", + ]: + + def coord_from_term(term): + # Convert term names to coordinates (via netCDF variable names). + name = engine.requires["formula_terms"].get(term, None) + if name is not None: + for coord, cf_var_name in engine.cube_parts["coordinates"]: + if cf_var_name == name: + return coord + warnings.warn( + "Unable to find coordinate for variable " + "{!r}".format(name) + ) + + if formula_type == "atmosphere_sigma_coordinate": + pressure_at_top = coord_from_term("ptop") + sigma = coord_from_term("sigma") + surface_air_pressure = coord_from_term("ps") + factory = AtmosphereSigmaFactory( + pressure_at_top, sigma, surface_air_pressure + ) + elif formula_type == "atmosphere_hybrid_height_coordinate": + delta = coord_from_term("a") + sigma = coord_from_term("b") + orography = coord_from_term("orog") + factory = HybridHeightFactory(delta, sigma, orography) + elif formula_type == "atmosphere_hybrid_sigma_pressure_coordinate": + # Hybrid pressure has two valid versions of its formula terms: + # "p0: var1 a: var2 b: var3 ps: var4" or + # "ap: var1 b: var2 ps: var3" where "ap = p0 * a" + # Attempt to get the "ap" term. + delta = coord_from_term("ap") + if delta is None: + # The "ap" term is unavailable, so try getting terms "p0" + # and "a" terms in order to derive an "ap" equivalent term. + coord_p0 = coord_from_term("p0") + if coord_p0 is not None: + if coord_p0.shape != (1,): + msg = ( + "Expecting {!r} to be a scalar reference " + "pressure coordinate, got shape {!r}".format( + coord_p0.var_name, coord_p0.shape + ) + ) + raise ValueError(msg) + if coord_p0.has_bounds(): + msg = ( + "Ignoring atmosphere hybrid sigma pressure " + "scalar coordinate {!r} bounds.".format( + coord_p0.name() + ) + ) + warnings.warn(msg) + coord_a = coord_from_term("a") + if coord_a is not None: + if coord_a.units.is_unknown(): + # Be graceful, and promote unknown to dimensionless units. + coord_a.units = "1" + delta = coord_a * coord_p0.points[0] + delta.units = coord_a.units * coord_p0.units + delta.rename("vertical pressure") + delta.var_name = "ap" + cube.add_aux_coord(delta, cube.coord_dims(coord_a)) + + sigma = coord_from_term("b") + surface_air_pressure = coord_from_term("ps") + factory = HybridPressureFactory(delta, sigma, surface_air_pressure) + elif formula_type == "ocean_sigma_z_coordinate": + sigma = coord_from_term("sigma") + eta = coord_from_term("eta") + depth = coord_from_term("depth") + depth_c = coord_from_term("depth_c") + nsigma = coord_from_term("nsigma") + zlev = coord_from_term("zlev") + factory = OceanSigmaZFactory( + sigma, eta, depth, depth_c, nsigma, zlev + ) + elif formula_type == "ocean_sigma_coordinate": + sigma = coord_from_term("sigma") + eta = coord_from_term("eta") + depth = coord_from_term("depth") + factory = OceanSigmaFactory(sigma, eta, depth) + elif formula_type == "ocean_s_coordinate": + s = coord_from_term("s") + eta = coord_from_term("eta") + depth = coord_from_term("depth") + a = coord_from_term("a") + depth_c = coord_from_term("depth_c") + b = coord_from_term("b") + factory = OceanSFactory(s, eta, depth, a, b, depth_c) + elif formula_type == "ocean_s_coordinate_g1": + s = coord_from_term("s") + c = coord_from_term("c") + eta = coord_from_term("eta") + depth = coord_from_term("depth") + depth_c = coord_from_term("depth_c") + factory = OceanSg1Factory(s, c, eta, depth, depth_c) + elif formula_type == "ocean_s_coordinate_g2": + s = coord_from_term("s") + c = coord_from_term("c") + eta = coord_from_term("eta") + depth = coord_from_term("depth") + depth_c = coord_from_term("depth_c") + factory = OceanSg2Factory(s, c, eta, depth, depth_c) + cube.add_aux_factory(factory) + + +def _translate_constraints_to_var_callback(constraints): + """ + Translate load constraints into a simple data-var filter function, if possible. + + Returns: + * function(cf_var:CFDataVariable): --> bool, + or None. + + For now, ONLY handles a single NameConstraint with no 'STASH' component. + + """ + import iris._constraints + + constraints = iris._constraints.list_of_constraints(constraints) + result = None + if len(constraints) == 1: + (constraint,) = constraints + if ( + isinstance(constraint, iris._constraints.NameConstraint) + and constraint.STASH == "none" + ): + # As long as it doesn't use a STASH match, then we can treat it as + # a testing against name properties of cf_var. + # That's just like testing against name properties of a cube, except that they may not all exist. + def inner(cf_datavar): + match = True + for name in constraint._names: + expected = getattr(constraint, name) + if name != "STASH" and expected != "none": + attr_name = "cf_name" if name == "var_name" else name + # Fetch property : N.B. CFVariable caches the property values + # The use of a default here is the only difference from the code in NameConstraint. + if not hasattr(cf_datavar, attr_name): + continue + actual = getattr(cf_datavar, attr_name, "") + if actual != expected: + match = False + break + return match + + result = inner + return result + + +def load_cubes(filenames, callback=None, constraints=None): + """ + Loads cubes from a list of NetCDF filenames/OPeNDAP URLs. + + Args: + + * filenames (string/list): + One or more NetCDF filenames/OPeNDAP URLs to load from. + + Kwargs: + + * callback (callable function): + Function which can be passed on to :func:`iris.io.run_callback`. + + Returns: + Generator of loaded NetCDF :class:`iris.cube.Cube`. + + """ + # TODO: rationalise UGRID/mesh handling once experimental.ugrid is folded + # into standard behaviour. + # Deferred import to avoid circular imports. + from iris.experimental.ugrid.cf import CFUGridReader + from iris.experimental.ugrid.load import ( + PARSE_UGRID_ON_LOAD, + _build_mesh_coords, + _meshes_from_cf, + ) + from iris.io import run_callback + + # Create a low-level data-var filter from the original load constraints, if they are suitable. + var_callback = _translate_constraints_to_var_callback(constraints) + + # Create an actions engine. + engine = _actions_engine() + + if isinstance(filenames, str): + filenames = [filenames] + + for filename in filenames: + # Ingest the netCDF file. + meshes = {} + if PARSE_UGRID_ON_LOAD: + cf = CFUGridReader(filename) + meshes = _meshes_from_cf(cf) + else: + cf = iris.fileformats.cf.CFReader(filename) + + # Process each CF data variable. + data_variables = list(cf.cf_group.data_variables.values()) + list( + cf.cf_group.promoted.values() + ) + for cf_var in data_variables: + if var_callback and not var_callback(cf_var): + # Deliver only selected results. + continue + + # cf_var-specific mesh handling, if a mesh is present. + # Build the mesh_coords *before* loading the cube - avoids + # mesh-related attributes being picked up by + # _add_unused_attributes(). + mesh_name = None + mesh = None + mesh_coords, mesh_dim = [], None + if PARSE_UGRID_ON_LOAD: + mesh_name = getattr(cf_var, "mesh", None) + if mesh_name is not None: + try: + mesh = meshes[mesh_name] + except KeyError: + message = ( + f"File does not contain mesh: '{mesh_name}' - " + f"referenced by variable: '{cf_var.cf_name}' ." + ) + logger.debug(message) + if mesh is not None: + mesh_coords, mesh_dim = _build_mesh_coords(mesh, cf_var) + + cube = _load_cube(engine, cf, cf_var, filename) + + # Attach the mesh (if present) to the cube. + for mesh_coord in mesh_coords: + cube.add_aux_coord(mesh_coord, mesh_dim) + + # Process any associated formula terms and attach + # the corresponding AuxCoordFactory. + try: + _load_aux_factory(engine, cube) + except ValueError as e: + warnings.warn("{}".format(e)) + + # Perform any user registered callback function. + cube = run_callback(callback, cube, cf_var, filename) + + # Callback mechanism may return None, which must not be yielded + if cube is None: + continue + + yield cube diff --git a/lib/iris/fileformats/netcdf.py b/lib/iris/fileformats/netcdf/saver.py similarity index 84% rename from lib/iris/fileformats/netcdf.py rename to lib/iris/fileformats/netcdf/saver.py index 6a7b37a1cc7..559596df458 100644 --- a/lib/iris/fileformats/netcdf.py +++ b/lib/iris/fileformats/netcdf/saver.py @@ -4,16 +4,14 @@ # See COPYING and COPYING.LESSER in the root of the repository for full # licensing details. """ -Module to support the loading of a NetCDF file into an Iris cube. +"Save" parts of the module :mod:`iris.fileformats.netcdf`. See also: `netCDF4 python `_ Also refer to document 'NetCDF Climate and Forecast (CF) Metadata Conventions'. """ - import collections -import collections.abc from itertools import repeat, zip_longest import os import os.path @@ -28,7 +26,7 @@ import numpy as np import numpy.ma as ma -from iris._lazy_data import _co_realise_lazy_arrays, as_lazy_data, is_lazy_data +from iris._lazy_data import _co_realise_lazy_arrays, is_lazy_data from iris.aux_factory import ( AtmosphereSigmaFactory, HybridHeightFactory, @@ -48,28 +46,17 @@ import iris.io import iris.util -# Show actions activation statistics. -DEBUG = False +# Configure the logger : shared logger for all in 'iris.fileformats.netcdf'. +from .. import __name__ as _parent_module_name + +logger = iris.config.get_logger(_parent_module_name) +del _parent_module_name -# Configure the logger. -logger = iris.config.get_logger(__name__) # Standard CML spatio-temporal axis names. SPATIO_TEMPORAL_AXES = ["t", "z", "y", "x"] -# Pass through CF attributes: -# - comment -# - Conventions -# - flag_masks -# - flag_meanings -# - flag_values -# - history -# - institution -# - reference -# - source -# - title -# - positive -# +# The CF-meaningful attributes which may appear on a data variable. _CF_ATTRS = [ "add_offset", "ancillary_variables", @@ -447,555 +434,6 @@ def coord(self, name): return result -def _actions_engine(): - # Return an 'actions engine', which provides a pyke-rules-like interface to - # the core cf translation code. - # Deferred import to avoid circularity. - import iris.fileformats._nc_load_rules.engine as nc_actions_engine - - engine = nc_actions_engine.Engine() - return engine - - -class NetCDFDataProxy: - """A reference to the data payload of a single NetCDF file variable.""" - - __slots__ = ("shape", "dtype", "path", "variable_name", "fill_value") - - def __init__(self, shape, dtype, path, variable_name, fill_value): - self.shape = shape - self.dtype = dtype - self.path = path - self.variable_name = variable_name - self.fill_value = fill_value - - @property - def ndim(self): - return len(self.shape) - - def __getitem__(self, keys): - dataset = netCDF4.Dataset(self.path) - try: - variable = dataset.variables[self.variable_name] - # Get the NetCDF variable data and slice. - var = variable[keys] - finally: - dataset.close() - return np.asanyarray(var) - - def __repr__(self): - fmt = ( - "<{self.__class__.__name__} shape={self.shape}" - " dtype={self.dtype!r} path={self.path!r}" - " variable_name={self.variable_name!r}>" - ) - return fmt.format(self=self) - - def __getstate__(self): - return {attr: getattr(self, attr) for attr in self.__slots__} - - def __setstate__(self, state): - for key, value in state.items(): - setattr(self, key, value) - - -def _assert_case_specific_facts(engine, cf, cf_group): - # Initialise a data store for built cube elements. - # This is used to patch element attributes *not* setup by the actions - # process, after the actions code has run. - engine.cube_parts["coordinates"] = [] - engine.cube_parts["cell_measures"] = [] - engine.cube_parts["ancillary_variables"] = [] - - # Assert facts for CF coordinates. - for cf_name in cf_group.coordinates.keys(): - engine.add_case_specific_fact("coordinate", (cf_name,)) - - # Assert facts for CF auxiliary coordinates. - for cf_name in cf_group.auxiliary_coordinates.keys(): - engine.add_case_specific_fact("auxiliary_coordinate", (cf_name,)) - - # Assert facts for CF cell measures. - for cf_name in cf_group.cell_measures.keys(): - engine.add_case_specific_fact("cell_measure", (cf_name,)) - - # Assert facts for CF ancillary variables. - for cf_name in cf_group.ancillary_variables.keys(): - engine.add_case_specific_fact("ancillary_variable", (cf_name,)) - - # Assert facts for CF grid_mappings. - for cf_name in cf_group.grid_mappings.keys(): - engine.add_case_specific_fact("grid_mapping", (cf_name,)) - - # Assert facts for CF labels. - for cf_name in cf_group.labels.keys(): - engine.add_case_specific_fact("label", (cf_name,)) - - # Assert facts for CF formula terms associated with the cf_group - # of the CF data variable. - - # Collect varnames of formula-root variables as we go. - # NOTE: use dictionary keys as an 'OrderedSet' - # - see: https://stackoverflow.com/a/53657523/2615050 - # This is to ensure that we can handle the resulting facts in a definite - # order, as using a 'set' led to indeterminate results. - formula_root = {} - for cf_var in cf.cf_group.formula_terms.values(): - for cf_root, cf_term in cf_var.cf_terms_by_root.items(): - # Only assert this fact if the formula root variable is - # defined in the CF group of the CF data variable. - if cf_root in cf_group: - formula_root[cf_root] = True - engine.add_case_specific_fact( - "formula_term", - (cf_var.cf_name, cf_root, cf_term), - ) - - for cf_root in formula_root.keys(): - engine.add_case_specific_fact("formula_root", (cf_root,)) - - -def _actions_activation_stats(engine, cf_name): - print("-" * 80) - print("CF Data Variable: %r" % cf_name) - - engine.print_stats() - - print("Rules Triggered:") - - for rule in sorted(list(engine.rules_triggered)): - print("\t%s" % rule) - - print("Case Specific Facts:") - kb_facts = engine.get_kb() - - for key in kb_facts.entity_lists.keys(): - for arg in kb_facts.entity_lists[key].case_specific_facts: - print("\t%s%s" % (key, arg)) - - -def _set_attributes(attributes, key, value): - """Set attributes dictionary, converting unicode strings appropriately.""" - - if isinstance(value, str): - try: - attributes[str(key)] = str(value) - except UnicodeEncodeError: - attributes[str(key)] = value - else: - attributes[str(key)] = value - - -def _add_unused_attributes(iris_object, cf_var): - """ - Populate the attributes of a cf element with the "unused" attributes - from the associated CF-netCDF variable. That is, all those that aren't CF - reserved terms. - - """ - - def attribute_predicate(item): - return item[0] not in _CF_ATTRS - - tmpvar = filter(attribute_predicate, cf_var.cf_attrs_unused()) - for attr_name, attr_value in tmpvar: - _set_attributes(iris_object.attributes, attr_name, attr_value) - - -def _get_actual_dtype(cf_var): - # Figure out what the eventual data type will be after any scale/offset - # transforms. - dummy_data = np.zeros(1, dtype=cf_var.dtype) - if hasattr(cf_var, "scale_factor"): - dummy_data = cf_var.scale_factor * dummy_data - if hasattr(cf_var, "add_offset"): - dummy_data = cf_var.add_offset + dummy_data - return dummy_data.dtype - - -def _get_cf_var_data(cf_var, filename): - # Get lazy chunked data out of a cf variable. - dtype = _get_actual_dtype(cf_var) - - # Create cube with deferred data, but no metadata - fill_value = getattr( - cf_var.cf_data, - "_FillValue", - netCDF4.default_fillvals[cf_var.dtype.str[1:]], - ) - proxy = NetCDFDataProxy( - cf_var.shape, dtype, filename, cf_var.cf_name, fill_value - ) - # Get the chunking specified for the variable : this is either a shape, or - # maybe the string "contiguous". - chunks = cf_var.cf_data.chunking() - # In the "contiguous" case, pass chunks=None to 'as_lazy_data'. - if chunks == "contiguous": - chunks = None - return as_lazy_data(proxy, chunks=chunks) - - -class _OrderedAddableList(list): - """ - A custom container object for actions recording. - - Used purely in actions debugging, to accumulate a record of which actions - were activated. - - It replaces a set, so as to preserve the ordering of operations, with - possible repeats, and it also numbers the entries. - - The actions routines invoke an 'add' method, so this effectively replaces - a set.add with a list.append. - - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._n_add = 0 - - def add(self, msg): - self._n_add += 1 - n_add = self._n_add - self.append(f"#{n_add:03d} : {msg}") - - -def _load_cube(engine, cf, cf_var, filename): - from iris.cube import Cube - - """Create the cube associated with the CF-netCDF data variable.""" - data = _get_cf_var_data(cf_var, filename) - cube = Cube(data) - - # Reset the actions engine. - engine.reset() - - # Initialise engine rule processing hooks. - engine.cf_var = cf_var - engine.cube = cube - engine.cube_parts = {} - engine.requires = {} - engine.rules_triggered = _OrderedAddableList() - engine.filename = filename - - # Assert all the case-specific facts. - # This extracts 'facts' specific to this data-variable (aka cube), from - # the info supplied in the CFGroup object. - _assert_case_specific_facts(engine, cf, cf_var.cf_group) - - # Run the actions engine. - # This creates various cube elements and attaches them to the cube. - # It also records various other info on the engine, to be processed later. - engine.activate() - - # Having run the rules, now add the "unused" attributes to each cf element. - def fix_attributes_all_elements(role_name): - elements_and_names = engine.cube_parts.get(role_name, []) - - for iris_object, cf_var_name in elements_and_names: - _add_unused_attributes(iris_object, cf.cf_group[cf_var_name]) - - # Populate the attributes of all coordinates, cell-measures and ancillary-vars. - fix_attributes_all_elements("coordinates") - fix_attributes_all_elements("ancillary_variables") - fix_attributes_all_elements("cell_measures") - - # Also populate attributes of the top-level cube itself. - _add_unused_attributes(cube, cf_var) - - # Work out reference names for all the coords. - names = { - coord.var_name: coord.standard_name or coord.var_name or "unknown" - for coord in cube.coords() - } - - # Add all the cube cell methods. - cube.cell_methods = [ - iris.coords.CellMethod( - method=method.method, - intervals=method.intervals, - comments=method.comments, - coords=[ - names[coord_name] if coord_name in names else coord_name - for coord_name in method.coord_names - ], - ) - for method in cube.cell_methods - ] - - if DEBUG: - # Show activation statistics for this data-var (i.e. cube). - _actions_activation_stats(engine, cf_var.cf_name) - - return cube - - -def _load_aux_factory(engine, cube): - """ - Convert any CF-netCDF dimensionless coordinate to an AuxCoordFactory. - - """ - formula_type = engine.requires.get("formula_type") - if formula_type in [ - "atmosphere_sigma_coordinate", - "atmosphere_hybrid_height_coordinate", - "atmosphere_hybrid_sigma_pressure_coordinate", - "ocean_sigma_z_coordinate", - "ocean_sigma_coordinate", - "ocean_s_coordinate", - "ocean_s_coordinate_g1", - "ocean_s_coordinate_g2", - ]: - - def coord_from_term(term): - # Convert term names to coordinates (via netCDF variable names). - name = engine.requires["formula_terms"].get(term, None) - if name is not None: - for coord, cf_var_name in engine.cube_parts["coordinates"]: - if cf_var_name == name: - return coord - warnings.warn( - "Unable to find coordinate for variable " - "{!r}".format(name) - ) - - if formula_type == "atmosphere_sigma_coordinate": - pressure_at_top = coord_from_term("ptop") - sigma = coord_from_term("sigma") - surface_air_pressure = coord_from_term("ps") - factory = AtmosphereSigmaFactory( - pressure_at_top, sigma, surface_air_pressure - ) - elif formula_type == "atmosphere_hybrid_height_coordinate": - delta = coord_from_term("a") - sigma = coord_from_term("b") - orography = coord_from_term("orog") - factory = HybridHeightFactory(delta, sigma, orography) - elif formula_type == "atmosphere_hybrid_sigma_pressure_coordinate": - # Hybrid pressure has two valid versions of its formula terms: - # "p0: var1 a: var2 b: var3 ps: var4" or - # "ap: var1 b: var2 ps: var3" where "ap = p0 * a" - # Attempt to get the "ap" term. - delta = coord_from_term("ap") - if delta is None: - # The "ap" term is unavailable, so try getting terms "p0" - # and "a" terms in order to derive an "ap" equivalent term. - coord_p0 = coord_from_term("p0") - if coord_p0 is not None: - if coord_p0.shape != (1,): - msg = ( - "Expecting {!r} to be a scalar reference " - "pressure coordinate, got shape {!r}".format( - coord_p0.var_name, coord_p0.shape - ) - ) - raise ValueError(msg) - if coord_p0.has_bounds(): - msg = ( - "Ignoring atmosphere hybrid sigma pressure " - "scalar coordinate {!r} bounds.".format( - coord_p0.name() - ) - ) - warnings.warn(msg) - coord_a = coord_from_term("a") - if coord_a is not None: - if coord_a.units.is_unknown(): - # Be graceful, and promote unknown to dimensionless units. - coord_a.units = "1" - delta = coord_a * coord_p0.points[0] - delta.units = coord_a.units * coord_p0.units - delta.rename("vertical pressure") - delta.var_name = "ap" - cube.add_aux_coord(delta, cube.coord_dims(coord_a)) - - sigma = coord_from_term("b") - surface_air_pressure = coord_from_term("ps") - factory = HybridPressureFactory(delta, sigma, surface_air_pressure) - elif formula_type == "ocean_sigma_z_coordinate": - sigma = coord_from_term("sigma") - eta = coord_from_term("eta") - depth = coord_from_term("depth") - depth_c = coord_from_term("depth_c") - nsigma = coord_from_term("nsigma") - zlev = coord_from_term("zlev") - factory = OceanSigmaZFactory( - sigma, eta, depth, depth_c, nsigma, zlev - ) - elif formula_type == "ocean_sigma_coordinate": - sigma = coord_from_term("sigma") - eta = coord_from_term("eta") - depth = coord_from_term("depth") - factory = OceanSigmaFactory(sigma, eta, depth) - elif formula_type == "ocean_s_coordinate": - s = coord_from_term("s") - eta = coord_from_term("eta") - depth = coord_from_term("depth") - a = coord_from_term("a") - depth_c = coord_from_term("depth_c") - b = coord_from_term("b") - factory = OceanSFactory(s, eta, depth, a, b, depth_c) - elif formula_type == "ocean_s_coordinate_g1": - s = coord_from_term("s") - c = coord_from_term("c") - eta = coord_from_term("eta") - depth = coord_from_term("depth") - depth_c = coord_from_term("depth_c") - factory = OceanSg1Factory(s, c, eta, depth, depth_c) - elif formula_type == "ocean_s_coordinate_g2": - s = coord_from_term("s") - c = coord_from_term("c") - eta = coord_from_term("eta") - depth = coord_from_term("depth") - depth_c = coord_from_term("depth_c") - factory = OceanSg2Factory(s, c, eta, depth, depth_c) - cube.add_aux_factory(factory) - - -def _translate_constraints_to_var_callback(constraints): - """ - Translate load constraints into a simple data-var filter function, if possible. - - Returns: - * function(cf_var:CFDataVariable): --> bool, - or None. - - For now, ONLY handles a single NameConstraint with no 'STASH' component. - - """ - import iris._constraints - - constraints = iris._constraints.list_of_constraints(constraints) - result = None - if len(constraints) == 1: - (constraint,) = constraints - if ( - isinstance(constraint, iris._constraints.NameConstraint) - and constraint.STASH == "none" - ): - # As long as it doesn't use a STASH match, then we can treat it as - # a testing against name properties of cf_var. - # That's just like testing against name properties of a cube, except that they may not all exist. - def inner(cf_datavar): - match = True - for name in constraint._names: - expected = getattr(constraint, name) - if name != "STASH" and expected != "none": - attr_name = "cf_name" if name == "var_name" else name - # Fetch property : N.B. CFVariable caches the property values - # The use of a default here is the only difference from the code in NameConstraint. - if not hasattr(cf_datavar, attr_name): - continue - actual = getattr(cf_datavar, attr_name, "") - if actual != expected: - match = False - break - return match - - result = inner - return result - - -def load_cubes(filenames, callback=None, constraints=None): - """ - Loads cubes from a list of NetCDF filenames/OPeNDAP URLs. - - Args: - - * filenames (string/list): - One or more NetCDF filenames/OPeNDAP URLs to load from. - - Kwargs: - - * callback (callable function): - Function which can be passed on to :func:`iris.io.run_callback`. - - Returns: - Generator of loaded NetCDF :class:`iris.cube.Cube`. - - """ - # TODO: rationalise UGRID/mesh handling once experimental.ugrid is folded - # into standard behaviour. - # Deferred import to avoid circular imports. - from iris.experimental.ugrid.cf import CFUGridReader - from iris.experimental.ugrid.load import ( - PARSE_UGRID_ON_LOAD, - _build_mesh_coords, - _meshes_from_cf, - ) - from iris.io import run_callback - - # Create a low-level data-var filter from the original load constraints, if they are suitable. - var_callback = _translate_constraints_to_var_callback(constraints) - - # Create an actions engine. - engine = _actions_engine() - - if isinstance(filenames, str): - filenames = [filenames] - - for filename in filenames: - # Ingest the netCDF file. - meshes = {} - if PARSE_UGRID_ON_LOAD: - cf = CFUGridReader(filename) - meshes = _meshes_from_cf(cf) - else: - cf = iris.fileformats.cf.CFReader(filename) - - # Process each CF data variable. - data_variables = list(cf.cf_group.data_variables.values()) + list( - cf.cf_group.promoted.values() - ) - for cf_var in data_variables: - if var_callback and not var_callback(cf_var): - # Deliver only selected results. - continue - - # cf_var-specific mesh handling, if a mesh is present. - # Build the mesh_coords *before* loading the cube - avoids - # mesh-related attributes being picked up by - # _add_unused_attributes(). - mesh_name = None - mesh = None - mesh_coords, mesh_dim = [], None - if PARSE_UGRID_ON_LOAD: - mesh_name = getattr(cf_var, "mesh", None) - if mesh_name is not None: - try: - mesh = meshes[mesh_name] - except KeyError: - message = ( - f"File does not contain mesh: '{mesh_name}' - " - f"referenced by variable: '{cf_var.cf_name}' ." - ) - logger.debug(message) - if mesh is not None: - mesh_coords, mesh_dim = _build_mesh_coords(mesh, cf_var) - - cube = _load_cube(engine, cf, cf_var, filename) - - # Attach the mesh (if present) to the cube. - for mesh_coord in mesh_coords: - cube.add_aux_coord(mesh_coord, mesh_dim) - - # Process any associated formula terms and attach - # the corresponding AuxCoordFactory. - try: - _load_aux_factory(engine, cube) - except ValueError as e: - warnings.warn("{}".format(e)) - - # Perform any user registered callback function. - cube = run_callback(callback, cube, cf_var, filename) - - # Callback mechanism may return None, which must not be yielded - if cube is None: - continue - - yield cube - - def _bytes_if_ascii(string): """ Convert the given string to a byte string (str in py2k, bytes in py3k)