Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NetCDF save - stream ALL lazy arrays. #4375

Merged
merged 9 commits into from
Oct 20, 2021
9 changes: 9 additions & 0 deletions docs/src/whatsnew/latest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ This document explains the changes made to Iris for this release
#. `@larsbarring`_ updated :func:`~iris.util.equalise_attributes` to return a list of dictionaries
containing the attributes removed from each :class:`~iris.cube.Cube`. (:pull:`4357`)

#. `@trexfeathers`_ enabled streaming of **all** lazy arrays when saving to
NetCDF files (was previously just :class:`~iris.cube.Cube`
:attr:`~iris.cube.Cube.data`). This is
important given the much greater size of
:class:`~iris.coords.AuxCoord` :attr:`~iris.coords.AuxCoord.points` and
:class:`~iris.experimental.ugrid.mesh.Connectivity`
:attr:`~iris.experimental.ugrid.mesh.Connectivity.indices` under the
`UGRID`_ model. (:pull:`4375`)


🐛 Bugs Fixed
=============
Expand Down
188 changes: 104 additions & 84 deletions lib/iris/fileformats/netcdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import numpy as np
import numpy.ma as ma

from iris._lazy_data import as_lazy_data
from iris._lazy_data import _co_realise_lazy_arrays, as_lazy_data, is_lazy_data
from iris.aux_factory import (
AtmosphereSigmaFactory,
HybridHeightFactory,
Expand Down Expand Up @@ -1475,12 +1475,17 @@ def _add_mesh(self, cube_or_mesh):
if conn.src_dim == 1:
# Has the 'other' dimension order, =reversed
conn_dims = conn_dims[::-1]
if iris.util.is_masked(conn.core_indices()):
# Flexible mesh.
fill_value = -1
else:
fill_value = None
cf_conn_name = self._create_generic_cf_array_var(
cube_or_mesh,
[],
conn,
element_dims=conn_dims,
fill_value=-1,
fill_value=fill_value,
)
# Add essential attributes to the Connectivity variable.
cf_conn_var = self._dataset.variables[cf_conn_name]
Expand Down Expand Up @@ -1994,10 +1999,14 @@ def _ensure_valid_dtype(self, values, src_name, src_object):
"NETCDF3_64BIT",
"NETCDF4_CLASSIC",
):
val_min, val_max = (values.min(), values.max())
if is_lazy_data(values):
val_min, val_max = _co_realise_lazy_arrays([val_min, val_max])
# Cast to an integer type supported by netCDF3.
if not np.can_cast(values.max(), np.int32) or not np.can_cast(
values.min(), np.int32
):
can_cast = all(
[np.can_cast(m, np.int32) for m in (val_min, val_max)]
)
if not can_cast:
msg = (
"The data type of {} {!r} is not supported by {} and"
" its values cannot be safely cast to a supported"
Expand Down Expand Up @@ -2030,7 +2039,7 @@ def _create_cf_bounds(self, coord, cf_var, cf_name):
if hasattr(coord, "has_bounds") and coord.has_bounds():
# Get the values in a form which is valid for the file format.
bounds = self._ensure_valid_dtype(
coord.bounds, "the bounds of coordinate", coord
coord.core_bounds(), "the bounds of coordinate", coord
)
n_bounds = bounds.shape[-1]

Expand All @@ -2057,7 +2066,12 @@ def _create_cf_bounds(self, coord, cf_var, cf_name):
bounds.dtype.newbyteorder("="),
cf_var.dimensions + (bounds_dimension_name,),
)
cf_var_bounds[:] = bounds
self._lazy_stream_data(
data=bounds,
fill_value=None,
fill_warn=True,
cf_var=cf_var_bounds,
)

def _get_cube_variable_name(self, cube):
"""
Expand Down Expand Up @@ -2311,7 +2325,7 @@ def _create_generic_cf_array_var(
# Get the data values, in a way which works for any element type, as
# all are subclasses of _DimensionalMetadata.
# (e.g. =points if a coord, =data if an ancillary, etc)
data = element._values
data = element._core_values()

if np.issubdtype(data.dtype, np.str_):
# Deal with string-type variables.
Expand All @@ -2336,7 +2350,10 @@ def _create_generic_cf_array_var(
# Convert data from an array of strings into a character array
# with an extra string-length dimension.
if len(element_dims) == 1:
data = list("%- *s" % (string_dimension_depth, data[0]))
data_first = data[0]
if is_lazy_data(data_first):
data_first = data_first.compute()
data = list("%- *s" % (string_dimension_depth, data_first))
else:
orig_shape = data.shape
new_shape = orig_shape + (string_dimension_depth,)
Expand All @@ -2353,22 +2370,14 @@ def _create_generic_cf_array_var(
element_type = type(element).__name__
data = self._ensure_valid_dtype(data, element_type, element)

if fill_value is not None:
if np.ma.is_masked(data):
# Use a specific fill-value in place of the netcdf default.
data = np.ma.filled(data, fill_value)
else:
# Create variable without a (non-standard) fill_value
fill_value = None

# Check if this is a dim-coord.
is_dimcoord = cube is not None and element in cube.dim_coords

if isinstance(element, iris.coords.CellMeasure):
# Disallow saving of *masked* cell measures.
# NOTE: currently, this is the only functional difference in
# variable creation between an ancillary and a cell measure.
if ma.is_masked(data):
if iris.util.is_masked(data):
# We can't save masked points properly, as we don't maintain
# a fill_value. (Load will not record one, either).
msg = "Cell measures with missing data are not supported."
Expand Down Expand Up @@ -2398,7 +2407,9 @@ def _create_generic_cf_array_var(
self._create_cf_bounds(element, cf_var, cf_name)

# Add the data to the CF-netCDF variable.
cf_var[:] = data # TODO: support dask streaming
self._lazy_stream_data(
data=data, fill_value=fill_value, fill_warn=True, cf_var=cf_var
)

# Add names + units
self._set_cf_var_attributes(cf_var, element)
Expand Down Expand Up @@ -2696,6 +2707,8 @@ def _create_cf_data_variable(
The newly created CF-netCDF data variable.

"""
# Get the values in a form which is valid for the file format.
data = self._ensure_valid_dtype(cube.core_data(), "cube", cube)

if packing:
if isinstance(packing, dict):
Expand All @@ -2717,11 +2730,12 @@ def _create_cf_data_variable(
raise ValueError(msg)
else:
# We compute the scale_factor and add_offset based on the
# min/max of the data. This requires the data to be loaded.
masked = ma.isMaskedArray(cube.data)
# min/max of the data.
masked = iris.util.is_masked(data)
dtype = np.dtype(packing)
cmax = cube.data.max()
cmin = cube.data.min()
cmin, cmax = (data.min(), data.max())
if is_lazy_data(data):
cmin, cmax = _co_realise_lazy_arrays([cmin, cmax])
n = dtype.itemsize * 8
if masked:
scale_factor = (cmax - cmin) / (2 ** n - 2)
Expand All @@ -2734,6 +2748,8 @@ def _create_cf_data_variable(
add_offset = (cmax + cmin) / 2
else:
add_offset = cmin + 2 ** (n - 1) * scale_factor
else:
dtype = data.dtype.newbyteorder("=")

def set_packing_ncattrs(cfvar):
"""Set netCDF packing attributes."""
Expand All @@ -2747,74 +2763,19 @@ def set_packing_ncattrs(cfvar):
while cf_name in self._dataset.variables:
cf_name = self._increment_name(cf_name)

# if netcdf3 avoid streaming due to dtype handling
if not cube.has_lazy_data() or self._dataset.file_format in (
"NETCDF3_CLASSIC",
"NETCDF3_64BIT",
):

# Get the values in a form which is valid for the file format.
data = self._ensure_valid_dtype(cube.data, "cube", cube)

def store(data, cf_var, fill_value):
cf_var[:] = data
is_masked = ma.is_masked(data)
contains_value = fill_value is not None and fill_value in data
return is_masked, contains_value

else:
data = cube.lazy_data()

def store(data, cf_var, fill_value):
# Store lazy data and check whether it is masked and contains
# the fill value
target = _FillValueMaskCheckAndStoreTarget(cf_var, fill_value)
da.store([data], [target])
return target.is_masked, target.contains_value

if not packing:
dtype = data.dtype.newbyteorder("=")

# Create the cube CF-netCDF data variable with data payload.
cf_var = self._dataset.createVariable(
cf_name, dtype, dimension_names, fill_value=fill_value, **kwargs
)
set_packing_ncattrs(cf_var)

# If packing attributes are specified, don't bother checking whether
pp-mo marked this conversation as resolved.
Show resolved Hide resolved
# the fill value is in the data.
if packing:
fill_value_to_check = None
elif fill_value is not None:
fill_value_to_check = fill_value
else:
fill_value_to_check = netCDF4.default_fillvals[dtype.str[1:]]

# Store the data and check if it is masked and contains the fill value
is_masked, contains_fill_value = store(
data, cf_var, fill_value_to_check
set_packing_ncattrs(cf_var)
self._lazy_stream_data(
data=data,
fill_value=fill_value,
fill_warn=(not packing),
cf_var=cf_var,
)

if dtype.itemsize == 1 and fill_value is None:
if is_masked:
msg = (
"Cube '{}' contains byte data with masked points, but "
"no fill_value keyword was given. As saved, these "
"points will read back as valid values. To save as "
"masked byte data, please explicitly specify the "
"'fill_value' keyword."
)
warnings.warn(msg.format(cube.name()))
elif contains_fill_value:
msg = (
"Cube '{}' contains unmasked data points equal to the "
"fill-value, {}. As saved, these points will read back "
"as missing data. To save these as normal values, please "
"specify a 'fill_value' keyword not equal to any valid "
"data points."
)
warnings.warn(msg.format(cube.name(), fill_value))

if cube.standard_name:
_setncattr(cf_var, "standard_name", cube.standard_name)

Expand Down Expand Up @@ -2902,6 +2863,65 @@ def _increment_name(self, varname):

return "{}_{}".format(varname, num)

@staticmethod
def _lazy_stream_data(data, fill_value, fill_warn, cf_var):
if is_lazy_data(data):

def store(data, cf_var, fill_value):
# Store lazy data and check whether it is masked and contains
# the fill value
target = _FillValueMaskCheckAndStoreTarget(cf_var, fill_value)
da.store([data], [target])
return target.is_masked, target.contains_value

else:

def store(data, cf_var, fill_value):
cf_var[:] = data
is_masked = np.ma.is_masked(data)
contains_value = fill_value is not None and fill_value in data
return is_masked, contains_value

dtype = cf_var.dtype

# fill_warn allows us to skip warning if packing attributes have been
# specified. It would require much more complex operations to work out
# what the values and fill_value _would_ be in such a case.
if fill_warn:
if fill_value is not None:
fill_value_to_check = fill_value
else:
fill_value_to_check = netCDF4.default_fillvals[dtype.str[1:]]
else:
fill_value_to_check = None

# Store the data and check if it is masked and contains the fill value.
is_masked, contains_fill_value = store(
data, cf_var, fill_value_to_check
)

if dtype.itemsize == 1 and fill_value is None:
if is_masked:
msg = (
"CF var '{}' contains byte data with masked points, but "
"no fill_value keyword was given. As saved, these "
"points will read back as valid values. To save as "
"masked byte data, `_FillValue` needs to be explicitly "
"set. For Cube data this can be done via the 'fill_value' "
"keyword during saving, otherwise use ncedit/equivalent."
)
warnings.warn(msg.format(cf_var.name))
elif contains_fill_value:
msg = (
"CF var '{}' contains unmasked data points equal to the "
"fill-value, {}. As saved, these points will read back "
"as missing data. To save these as normal values, "
"`_FillValue` needs to be set to not equal any valid data "
"points. For Cube data this can be done via the 'fill_value' "
"keyword during saving, otherwise use ncedit/equivalent."
)
warnings.warn(msg.format(cf_var.name, fill_value))


def save(
cube,
Expand Down
Loading