Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy implementation of multi_model_statistics and ensemble_statistics preprocessors #968

Merged
merged 81 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
faaaa38
First attempt at replacing multimodel stats with a native iris altern…
Peter9192 Jan 28, 2021
4e5a675
temporary test draft to derive new tests from
Peter9192 Jan 28, 2021
d5b75b5
Support percentiles
Peter9192 Jan 28, 2021
98580d1
Docstrings and check for no overlap
Peter9192 Jan 28, 2021
a0336cc
Resolve span gets its own function
Peter9192 Jan 29, 2021
a5322a2
fix merge conflict
Jan 29, 2021
8bbd878
first set unit tests for multimodel stats and span
Jan 29, 2021
8e24ed9
Also accept std as valid statistics
Peter9192 Jan 29, 2021
5e63398
Another attempt at supporting std
Peter9192 Jan 29, 2021
f93d42f
deal better with kwargs
Peter9192 Jan 29, 2021
2bff8ef
corrected expected test results
bvreede Jan 29, 2021
b0f93e6
masterplan for overhaul of multimodel tests
Peter9192 Feb 1, 2021
973979d
git stash dump - wip
Peter9192 Feb 18, 2021
40d98ab
Merge branch 'master' into mmstats_lazy
stefsmeets Feb 19, 2021
930bec9
Fix bugs to ensure that tests can run
stefsmeets Feb 22, 2021
e645812
Clean up var mapping to `iris.analysis`
stefsmeets Feb 22, 2021
d3b31c9
Update tests
stefsmeets Feb 22, 2021
a108445
Fix test cube metadata so that iris will merge them
stefsmeets Feb 23, 2021
24f2270
Update multimodel tests
stefsmeets Feb 23, 2021
ff6ade8
Implement edge cases for multimodel tests
stefsmeets Feb 23, 2021
11232c5
Refactor multimodel tests
stefsmeets Feb 24, 2021
e0577b2
Add test cases for _resolve_span
stefsmeets Feb 24, 2021
c3a9a0a
Generate cube using existing function
stefsmeets Feb 24, 2021
2aa566a
Fix Codacy issue
stefsmeets Feb 24, 2021
fb33c19
Remove redundant test file
stefsmeets Feb 25, 2021
f45438c
Merge branch 'master' into mmstats_lazy
stefsmeets Feb 25, 2021
8bd9858
Mark tests failing because of inconsistent plev data with xfail
stefsmeets Feb 25, 2021
649b8f3
Make cache key more readable
stefsmeets Feb 25, 2021
554edea
Compare coord / metadata attributes directly
stefsmeets Feb 25, 2021
ade5f35
Use `allclose` instead of `almost_equal` for array comparison
stefsmeets Feb 25, 2021
04dbaf6
Group iris.analysis functions
stefsmeets Feb 25, 2021
59f2564
Add tests to make sure returned cubes are still lazy
stefsmeets Mar 1, 2021
24bcea0
Fix bug with check if cubes are already aligned
stefsmeets Mar 1, 2021
873ddd0
Merge branch 'master' into mmstats_lazy
stefsmeets Mar 1, 2021
1a6695b
Remove aux coord after cubes are merged
stefsmeets Mar 1, 2021
9b3708f
Implement lazy alignment for span='overlap'
stefsmeets Mar 1, 2021
b07c704
Fix validation data
stefsmeets Mar 1, 2021
a43ecb9
Use extrapolate extend time points for span='full'
stefsmeets Mar 1, 2021
2ffcb9a
Implement lazy cube extension scheme for span='full'
stefsmeets Mar 2, 2021
999d514
Fix pep257: D417
stefsmeets Mar 2, 2021
bdfc125
Update documentation
stefsmeets Mar 2, 2021
cea76e4
Metadata tweaks to make it work with sample data / test recipe
stefsmeets Mar 2, 2021
5b7f833
Merge branch 'master' into mmstats_lazy
stefsmeets Mar 3, 2021
c19335f
Update docstring and add note
stefsmeets Mar 4, 2021
34994cc
Apply suggestions from code review
stefsmeets Mar 5, 2021
9921b33
Update documentation
stefsmeets Mar 5, 2021
e560bcd
Remove unused test code
stefsmeets Mar 5, 2021
0bd079b
Copy cubes to avoid updating them inplace
stefsmeets Mar 5, 2021
709d849
Add option to rechunk data before computing statistics
Peter9192 Mar 11, 2021
f9f5600
Expand options for statistics
stefsmeets Mar 11, 2021
cf6bc9c
Convert input to lazy arrays for memory efficiency
stefsmeets Mar 12, 2021
9f53512
Outline of workaround for non-lazy iris funcs
Peter9192 Mar 15, 2021
1f1bfcd
undo accidental changes to tests
Peter9192 Mar 15, 2021
62987b6
Fix work-around for iris non-lazy aggregators
stefsmeets Mar 16, 2021
7687457
Use `slice_over` to generate time slices
stefsmeets Mar 16, 2021
3ff0879
Remove dask config
stefsmeets Mar 16, 2021
bb8b3cd
Remove temporary coordinate
stefsmeets Mar 16, 2021
ee7af3d
Fix func/dim name and docstring
stefsmeets Mar 16, 2021
3cad48f
Raise error when a single model is passed to multicube statistics
stefsmeets Mar 16, 2021
2b5c943
Merge remote-tracking branch 'origin/master' into mmstats_lazy
Peter9192 Mar 17, 2021
9c8eb67
Realize data beforehand if aggregator is not lazy
Peter9192 Mar 17, 2021
d76298e
Stronger separate lazy from non-lazy path
Peter9192 Mar 18, 2021
c4f0ab2
Merge branch 'master' into mmstats_lazy
stefsmeets Mar 19, 2021
db3aa93
Make concat dim a global and fix tests
stefsmeets Mar 19, 2021
99159ac
Simplify one-line if statement as per codacy suggestion
Peter9192 Mar 30, 2021
3ac43b5
Merge branch origin/main into this branch and resolve conflicts
Peter9192 Jun 9, 2021
73840dc
Revert "Merge branch origin/main into this branch and resolve conflicts"
Peter9192 Jun 9, 2021
f47aceb
Merge remote-tracking branch 'origin/main' into mmstats_lazy - second…
Peter9192 Jun 9, 2021
9d15cea
Various improvements, move rechunking to before regridding
bouweandela Jun 11, 2021
e839dbe
Fix typo
bouweandela Jun 12, 2021
efbcb34
Add unit tests for _rechunk
bouweandela Jun 13, 2021
78d73b3
Log whether or not data is lazy
bouweandela Jun 13, 2021
6f6e61f
Fix sample data tests
bouweandela Jun 13, 2021
721e255
Merge branch 'main' of github.com:esmvalgroup/esmvalcore into mmstats…
bouweandela Jun 14, 2021
aab318e
Merge branch 'main' of github.com:esmvalgroup/esmvalcore into mmstats…
bouweandela Jun 22, 2021
0658219
Fix some types
bouweandela Jun 22, 2021
17dd7fd
Merge branch 'main' of github.com:esmvalgroup/esmvalcore into mmstats…
bouweandela Jun 28, 2022
3247d9c
Merge branch 'main' of github.com:ESMValGroup/ESMValCore into mmstats…
bouweandela Sep 23, 2022
5b4fecd
Merge branch 'main' of github.com:esmvalgroup/esmvalcore into mmstats…
bouweandela Jun 1, 2023
9039242
Undo needless changes and fix test
bouweandela Jun 1, 2023
a364c84
Merge branch 'main' into mmstats_lazy
bouweandela Jun 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion esmvalcore/preprocessor/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ def save(cubes,
"The cube is probably unchanged.", cubes, filename)
return filename

logger.debug("Saving cubes %s to %s", cubes, filename)
for cube in cubes:
logger.debug("Saving cube:\n%s\nwith %s data to %s", cube,
"lazy" if cube.has_lazy_data() else "realized", filename)
if optimize_access:
cube = cubes[0]
if optimize_access == 'map':
Expand Down
110 changes: 68 additions & 42 deletions esmvalcore/preprocessor/_multimodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,18 @@ def _map_to_new_time(cube, time_points):
Missing data inside original bounds is filled with nearest neighbour
Missing data outside original bounds is masked.
"""
time_points = cube.coord('time').units.num2date(time_points)
time_coord = cube.coord('time')

# Try if the required time points can be obtained by slicing the cube.
time_slice = np.isin(time_coord.points, time_points)
if np.any(time_slice) and np.array_equal(time_coord.points[time_slice],
time_points):
time_idx, = cube.coord_dims('time')
indices = tuple(time_slice if i == time_idx else slice(None)
for i in range(cube.ndim))
return cube[indices]

time_points = time_coord.units.num2date(time_points)
sample_points = [('time', time_points)]
scheme = iris.analysis.Nearest(extrapolation_mode='mask')

Expand Down Expand Up @@ -505,43 +516,17 @@ def _compute_eager(
"""Compute statistics one slice at a time."""
_ = [cube.data for cube in cubes] # make sure the cubes' data are realized

result_slices = []
result_slices = iris.cube.CubeList()
for chunk in _compute_slices(cubes):
if chunk is None:
single_model_slices = cubes # scalar cubes
input_slices = cubes # scalar cubes
else:
single_model_slices = [cube[chunk] for cube in cubes]
combined_slice = _combine(single_model_slices)
with warnings.catch_warnings():
warnings.filterwarnings(
'ignore',
message=(
"Collapsing a non-contiguous coordinate. "
f"Metadata may not be fully descriptive for '{CONCAT_DIM}."
),
category=UserWarning,
module='iris',
)
warnings.filterwarnings(
'ignore',
message=(
f"Cannot check if coordinate is contiguous: Invalid "
f"operation for '{CONCAT_DIM}'"
),
category=UserWarning,
module='iris',
)
collapsed_slice = combined_slice.collapsed(CONCAT_DIM, operator,
**kwargs)

# some iris aggregators modify dtype, see e.g.
# https://numpy.org/doc/stable/reference/generated/numpy.ma.average.html
collapsed_slice.data = collapsed_slice.data.astype(np.float32)

result_slices.append(collapsed_slice)
input_slices = [cube[chunk] for cube in cubes]
result_slice = _compute(input_slices, operator=operator, **kwargs)
result_slices.append(result_slice)

try:
result_cube = CubeList(result_slices).concatenate_cube()
result_cube = result_slices.concatenate_cube()
except Exception as excinfo:
raise ValueError(
f"Multi-model statistics failed to concatenate results into a "
Expand All @@ -551,7 +536,45 @@ def _compute_eager(
f"dtypes") from excinfo

result_cube.data = np.ma.array(result_cube.data)

return result_cube


def _compute(cubes: list, *, operator: iris.analysis.Aggregator, **kwargs):
"""Compute statistic."""
cube = _combine(cubes)

with warnings.catch_warnings():
warnings.filterwarnings(
'ignore',
message=(
"Collapsing a non-contiguous coordinate. "
f"Metadata may not be fully descriptive for '{CONCAT_DIM}."
),
category=UserWarning,
module='iris',
)
warnings.filterwarnings(
'ignore',
message=(
f"Cannot check if coordinate is contiguous: Invalid "
f"operation for '{CONCAT_DIM}'"
),
category=UserWarning,
module='iris',
)
# This will always return a masked array
result_cube = cube.collapsed(CONCAT_DIM, operator, **kwargs)

# Remove concatenation dimension added by _combine
result_cube.remove_coord(CONCAT_DIM)
for cube in cubes:
cube.remove_coord(CONCAT_DIM)

# some iris aggregators modify dtype, see e.g.
# https://numpy.org/doc/stable/reference/generated/numpy.ma.average.html
result_cube.data = result_cube.core_data().astype(np.float32)

if result_cube.cell_methods:
cell_method = result_cube.cell_methods[0]
result_cube.cell_methods = None
Expand All @@ -578,12 +601,12 @@ def _multicube_statistics(cubes, statistics, span, ignore_scalar_coords=False):
)

# Avoid modifying inputs
copied_cubes = [cube.copy() for cube in cubes]
cubes = [cube.copy() for cube in cubes]
valeriupredoi marked this conversation as resolved.
Show resolved Hide resolved

# Remove scalar coordinates in input cubes if desired to ignore them when
# merging
if ignore_scalar_coords:
for cube in copied_cubes:
for cube in cubes:
for scalar_coord in cube.coords(dimensions=()):
cube.remove_coord(scalar_coord)
logger.debug(
Expand All @@ -595,11 +618,11 @@ def _multicube_statistics(cubes, statistics, span, ignore_scalar_coords=False):

# If all cubes contain a time coordinate, align them. If no cube contains a
# time coordinate, do nothing. Else, raise an exception.
time_coords = [cube.coords('time') for cube in copied_cubes]
time_coords = [cube.coords('time') for cube in cubes]
if all(time_coords):
aligned_cubes = _align_time_coord(copied_cubes, span=span)
cubes = _align_time_coord(cubes, span=span)
elif not any(time_coords):
aligned_cubes = copied_cubes
pass
else:
raise ValueError(
"Multi-model statistics failed to merge input cubes into a single "
Expand All @@ -609,13 +632,14 @@ def _multicube_statistics(cubes, statistics, span, ignore_scalar_coords=False):

# Calculate statistics
statistics_cubes = {}
lazy_input = any(cube.has_lazy_data() for cube in cubes)
valeriupredoi marked this conversation as resolved.
Show resolved Hide resolved
for statistic in statistics:
logger.debug('Multicube statistics: computing: %s', statistic)
operator, kwargs = _resolve_operator(statistic)

result_cube = _compute_eager(aligned_cubes,
operator=operator,
**kwargs)
if lazy_input and operator.lazy_func is not None:
result_cube = _compute(cubes, operator=operator, **kwargs)
else:
result_cube = _compute_eager(cubes, operator=operator, **kwargs)
statistics_cubes[statistic] = result_cube

return statistics_cubes
Expand Down Expand Up @@ -720,6 +744,8 @@ def multi_model_statistics(products,
arguments. Except for percentiles, these operators are currently not
supported.

Lazy operation is supported for all statistics, except ``median``.

Parameters
----------
products: list
Expand Down
28 changes: 28 additions & 0 deletions esmvalcore/preprocessor/_regrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pathlib import Path
from typing import Dict

import dask.array as da
import iris
import numpy as np
import stratify
Expand Down Expand Up @@ -650,11 +651,38 @@ def regrid(cube, target_grid, scheme, lat_offset=True, lon_offset=True):
# scheme is a function f(src_cube, grid_cube) -> Cube
cube = loaded_scheme
else:
cube = _rechunk(cube, target_grid)
cube = cube.regrid(target_grid, loaded_scheme)

return cube


def _rechunk(cube, target_grid):
"""Re-chunk cube with optimal chunk sizes for target grid."""
if not cube.has_lazy_data() or cube.ndim < 3:
# Only rechunk lazy multidimensional data
return cube

if 2 * np.prod(cube.shape[-2:]) > np.prod(target_grid.shape):
# Only rechunk if target grid is more than a factor of 2 larger,
# because rechunking will keep the original chunk in memory.
return cube

data = cube.lazy_data()

# Compute a good chunk size for the target array
tgt_shape = data.shape[:-2] + target_grid.shape
tgt_chunks = data.chunks[:-2] + target_grid.shape
tgt_data = da.empty(tgt_shape, dtype=data.dtype, chunks=tgt_chunks)
tgt_data = tgt_data.rechunk({i: "auto" for i in range(cube.ndim - 2)})
valeriupredoi marked this conversation as resolved.
Show resolved Hide resolved

# Adjust chunks to source array and rechunk
chunks = tgt_data.chunks[:-2] + data.shape[-2:]
cube.data = data.rechunk(chunks)

return cube


def _horizontal_grid_is_close(cube1, cube2):
"""Check if two cubes have the same horizontal grid definition.

Expand Down
6 changes: 3 additions & 3 deletions tests/sample_data/multimodel_statistics/test_multimodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
SPAN_PARAMS = ('overlap', 'full')


def assert_array_almost_equal(this, other):
def assert_array_almost_equal(this, other, rtol=1e-7):
"""Assert that array `this` almost equals array `other`."""
if np.ma.isMaskedArray(this) or np.ma.isMaskedArray(other):
np.testing.assert_array_equal(this.mask, other.mask)

np.testing.assert_allclose(this, other)
np.testing.assert_allclose(this, other, rtol=rtol)


def assert_coords_equal(this: list, other: list):
Expand Down Expand Up @@ -188,7 +188,7 @@ def multimodel_regression_test(cubes, span, name):
if filename.exists():
reference_cube = iris.load_cube(str(filename))

assert_array_almost_equal(result_cube.data, reference_cube.data)
assert_array_almost_equal(result_cube.data, reference_cube.data, 5e-7)
assert_metadata_equal(result_cube.metadata, reference_cube.metadata)
assert_coords_equal(result_cube.coords(), reference_cube.coords())

Expand Down
2 changes: 0 additions & 2 deletions tests/unit/preprocessor/_multimodel/test_multimodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ def test_multimodel_statistics(frequency, span, statistics, expected):
assert_array_allclose(result_cube.data, expected_data)


@pytest.mark.xfail(reason='Lazy data not (yet) supported.')
@pytest.mark.parametrize('span', SPAN_OPTIONS)
def test_lazy_data_consistent_times(span):
"""Test laziness of multimodel statistics with consistent time axis."""
Expand All @@ -362,7 +361,6 @@ def test_lazy_data_consistent_times(span):
assert result_cube.has_lazy_data()


@pytest.mark.xfail(reason='Lazy data not (yet) supported.')
@pytest.mark.parametrize('span', SPAN_OPTIONS)
def test_lazy_data_inconsistent_times(span):
"""Test laziness of multimodel statistics with inconsistent time axis.
Expand Down
59 changes: 59 additions & 0 deletions tests/unit/preprocessor/_regrid/test_regrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import unittest
from unittest import mock

import dask
import dask.array as da
import iris
import numpy as np
import pytest
Expand All @@ -14,6 +16,7 @@
_CACHE,
HORIZONTAL_SCHEMES,
_horizontal_grid_is_close,
_rechunk,
)


Expand Down Expand Up @@ -65,6 +68,7 @@ def setUp(self):
regrid=self.regrid,
dtype=float,
)
self.src_cube.ndim = 1
self.tgt_grid_coord = mock.Mock()
self.tgt_grid = mock.Mock(
spec=iris.cube.Cube, coord=self.tgt_grid_coord)
Expand Down Expand Up @@ -262,5 +266,60 @@ def test_regrid_is_skipped_if_grids_are_the_same():
assert expected_different_cube is not cube


def test_rechunk_on_increased_grid():
"""Test that an increase in grid size rechunks."""
with dask.config.set({'array.chunk-size': '128 M'}):

time_dim = 246
src_grid_dims = (91, 180)
data = da.empty((time_dim, ) + src_grid_dims, dtype=np.float32)

tgt_grid_dims = (361, 720)
tgt_grid = da.empty(tgt_grid_dims, dtype=np.float32)

result = _rechunk(iris.cube.Cube(data), iris.cube.Cube(tgt_grid))

assert result.core_data().chunks == ((123, 123), (91, ), (180, ))


def test_no_rechunk_on_decreased_grid():
"""Test that a decrease in grid size does not rechunk."""
with dask.config.set({'array.chunk-size': '128 M'}):

time_dim = 200
src_grid_dims = (361, 720)
data = da.empty((time_dim, ) + src_grid_dims, dtype=np.float32)

tgt_grid_dims = (91, 180)
tgt_grid = da.empty(tgt_grid_dims, dtype=np.float32)

result = _rechunk(iris.cube.Cube(data), iris.cube.Cube(tgt_grid))

assert result.core_data().chunks == data.chunks


def test_no_rechunk_2d():
"""Test that a 2D cube is not rechunked."""
with dask.config.set({'array.chunk-size': '64 MiB'}):

src_grid_dims = (361, 720)
data = da.empty(src_grid_dims, dtype=np.float32)

tgt_grid_dims = (3601, 7200)
tgt_grid = da.empty(tgt_grid_dims, dtype=np.float32)

result = _rechunk(iris.cube.Cube(data), iris.cube.Cube(tgt_grid))

assert result.core_data().chunks == data.chunks


def test_no_rechunk_non_lazy():
"""Test that a cube with non-lazy data does not crash."""
cube = iris.cube.Cube(np.arange(2 * 4).reshape([1, 2, 4]))
tgt_cube = iris.cube.Cube(np.arange(4 * 8).reshape([4, 8]))
result = _rechunk(cube, tgt_cube)
assert result.data is cube.data


if __name__ == '__main__':
unittest.main()