diff --git a/improver/cli/gradient_between_vertical_levels.py b/improver/cli/gradient_between_vertical_levels.py new file mode 100644 index 0000000000..96d655cdcc --- /dev/null +++ b/improver/cli/gradient_between_vertical_levels.py @@ -0,0 +1,43 @@ +# (C) Crown copyright, Met Office. All rights reserved. +# +# This file is part of IMPROVER and is released under a BSD 3-Clause license. +# See LICENSE in the root of the repository for full licensing details. +"""CLI to calculate a gradient between two vertical levels.""" + +from improver import cli + + +@cli.clizefy +@cli.with_output +def process(*cubes: cli.inputcubelist): + """ + Calculate the gradient between two vertical levels. The gradient is calculated as the + difference between the input cubes divided by the difference in height. + + Input cubes can be provided at height or pressure levels. If the cubes are provided + at a pressure level, the height above sea level is extracted from height_of_pressure_levels + cube. If exactly one of the cubes are provided at height levels this is assumed to be a + height above ground level and the height above sea level is calculated by adding the height + of the orography to the "height" coordinate of the cube. If both cubes have height coordinates + no additional cubes are required. + It is possible for one cube to be defined at height levels and the other at pressure levels. + + Args: + cubes (iris.cube.CubeList): + Contains two cubes of a diagnostic at two different vertical levels. The cubes must + either have a height coordinate or a pressure coordinate. If only one cube is defined at + height levels, an orography cube must also be provided. If either cube is defined at + pressure levels, a geopotential_height cube must also be provided. + + Returns: + iris.cube.Cube: + A single cube containing the gradient between the two height levels. This cube will be + renamed to "gradient_of" the cube name and will have a units attribute of the input + cube units per metre. + + """ + from improver.utilities.gradient_between_vertical_levels import ( + GradientBetweenVerticalLevels, + ) + + return GradientBetweenVerticalLevels()(*cubes) diff --git a/improver/utilities/gradient_between_vertical_levels.py b/improver/utilities/gradient_between_vertical_levels.py new file mode 100644 index 0000000000..010d551804 --- /dev/null +++ b/improver/utilities/gradient_between_vertical_levels.py @@ -0,0 +1,164 @@ +# (C) Crown copyright, Met Office. All rights reserved. +# +# This file is part of IMPROVER and is released under a BSD 3-Clause license. +# See LICENSE in the root of the repository for full licensing details. +"""Calculate the gradient between two vertical levels.""" + +from typing import Optional + +import iris +import numpy as np +from iris.cube import Cube, CubeList +from iris.exceptions import CoordinateNotFoundError + +from improver import BasePlugin +from improver.cube_combiner import Combine +from improver.utilities.common_input_handle import as_cubelist +from improver.utilities.cube_checker import assert_time_coords_valid + + +class GradientBetweenVerticalLevels(BasePlugin): + """Calculate the gradient between two vertical levels. The gradient is calculated as the + difference between the input cubes divided by the difference in height.""" + + @staticmethod + def extract_cube_from_list(cubes: CubeList, name: str) -> Cube: + """Extract a cube from a cubelist based on the name if it exists. If the cube is found + it is removed from the cubelist. + + Args: + cubes: + A cube list containing cubes. + name: + The name of the cube to be extracted. + Returns: + The extracted cube or None if there is no cube with the specified name. Also returns + the input cubelist with the extracted cube removed. + """ + try: + extracted_cube = cubes.extract_cube(iris.Constraint(name)) + except iris.exceptions.ConstraintMismatchError: + extracted_cube = None + else: + cubes.remove(extracted_cube) + + return extracted_cube, cubes + + def gradient_over_vertical_levels( + self, + cubes: CubeList, + geopotential_height: Optional[Cube], + orography: Optional[Cube], + ) -> Cube: + """Calculate the gradient between two vertical levels. The gradient is calculated as the + difference between the two cubes in cubes divided by the difference in height. + + If the cubes are provided at height levels this is assumed to be a height above ground level + and the height above sea level is calculated by adding the height of the orography to the + height coordinate. If the cubes are provided at pressure levels, the height above sea level + is extracted from a geopotential_height cube. If both cubes have a + height coordinate then no additional cubes are required. + + Args: + cubes: + Two cubes containing a diagnostic at two different vertical levels. The cubes must + contain either a height or pressure scalar coordinate. If the cubes contain a height + scalar coordinate this is assumed to be a height above ground level. + geopotential_height: + Optional cube that contains the height above sea level of pressure levels. This cube + is required if any input cube is defined at pressure level. This is used to extract + the height above sea level at the pressure level of the input cubes. + orography: + Optional cube containing the orography height above sea level. This cube is required + if exactly one input cube is defined at height levels and is used to convert the + height above ground level to height above sea level. + + Returns: + A cube containing the gradient between the cubes between two vertical levels. + + Raises: + ValueError: If exactly one input cube is defined at height levels and no orography cube + is provided. + ValueError: If either input cube is defined at pressure levels and no + geopotential_height cube is provided. + """ + + cube_heights = [] + coord_used = [] + for cube in cubes: + try: + cube_height = np.array(cube.coord("height").points) + except CoordinateNotFoundError: + if geopotential_height: + height_ASL = geopotential_height.extract( + iris.Constraint(pressure=cube.coord("pressure").points) + ) + coord_used.append("pressure") + else: + raise ValueError( + """No geopotential height cube provided but one of the inputs cubes has a + pressure coordinate""" + ) + else: + if orography: + height_ASL = orography + cube_height + coord_used.append("height") + elif not (orography or geopotential_height): + height_ASL = cube_height + coord_used.append("height") + else: + raise ValueError( + """No orography cube provided but one of the input cubes has height + coordinate""" + ) + cube_heights.append(height_ASL) + + height_diff = cube_heights[0] - cube_heights[1] + height_diff.data = np.ma.masked_where(height_diff.data == 0, height_diff.data) + + if "height" in coord_used and "pressure" in coord_used: + + try: + missing_coord = cubes[1].coord("pressure") + except CoordinateNotFoundError: + missing_coord = cubes[1].coord("height") + cubes[0].add_aux_coord(missing_coord) + + diff = Combine(operation="-", expand_bound=True)([cubes[0], cubes[1]]) + + gradient = diff / height_diff + + return gradient + + def process(self, *cubes: CubeList) -> Cube: + """ + Process the input cubes to calculate the gradient between two vertical levels. + + Args: + cubes: + A cubelist of two cubes containing a diagnostic at two vertical levels. + The cubes must contain either a height or pressure scalar coordinate. + If exactly one of the cubes contain a height scalar coordinate this is assumed + to be a height above ground level and a cube with the name "surface_altitude" + must also be provided. If either cube contains a pressure scalar coordinate + a cube with the name "geopotential_height" must be provided.If both cubes are + on height levels then no additional cubes are required. + + Returns: + A cube containing the gradient between two vertical levels. The cube will be + named gradient of followed by the name of the input cubes. + """ + cubes = as_cubelist(cubes) + orography, cubes = self.extract_cube_from_list(cubes, "surface_altitude") + geopotential_height, cubes = self.extract_cube_from_list( + cubes, "geopotential_height" + ) + assert_time_coords_valid(cubes, time_bounds=False) + + gradient = self.gradient_over_vertical_levels( + cubes, geopotential_height, orography + ) + + gradient.rename(f"gradient_of_{cubes[0].name()}") + gradient.units = f"{cubes[0].units} m-1" + return gradient diff --git a/improver_tests/acceptance/SHA256SUMS b/improver_tests/acceptance/SHA256SUMS index 28f0132b04..09a4f84f37 100644 --- a/improver_tests/acceptance/SHA256SUMS +++ b/improver_tests/acceptance/SHA256SUMS @@ -532,6 +532,11 @@ f21b03402171eaeb6ad2eb2e0f204db1b17d22f337e93f3790ce69c18f56d253 ./generate-top ba7a8a4dab8552656cfc38962d8cc6dce7e32d0965823c77008906d0c6afe7b5 ./generate-topography-bands-weights/multi_realization/input_land.nc 06911b6df4f8195a9349ff12f352bd1a6251dfcdbcd9f573d652c242f4fed596 ./generate-topography-bands-weights/multi_realization/input_orog.nc 0c52d393be6e53eaa8b9d88a38ceff5ca07772e3385f776ac85df971afaa532b ./generate-topography-bands-weights/multi_realization/kgo.nc +19e48705dd34036cf8e974dacdfeb510a55720f387daf64044d12b73d8d18ca7 ./gradient-between-vertical-levels/height_of_pressure_levels.nc +fafd9c642ef7a63b1c453dae66cb55eab3d7816b02122d12da9320763d2d8db2 ./gradient-between-vertical-levels/kgo.nc +a8eb2c78a65acb58d9535c3390fbf22249e98929002c21b5a4b5c534640d18e1 ./gradient-between-vertical-levels/orography.nc +2048e3a2dc7ea8f9e990dceb1108d69223c52fad5a1b5e21bc6206c60e2115a0 ./gradient-between-vertical-levels/temperature_at_850hpa.nc +0f61a96ffc719c656a6461b4f5562170e12dd1a4fb5096f5ada6ed89cc41ba7b ./gradient-between-vertical-levels/temperature_at_screen_level.nc 129a2405d8460804e577a1dbff2c0ab39aab709bd12f7d92b096af5ea0981380 ./hail-fraction/cloud_condensation_level.nc 6be57a39af9927f742d6805bf7196d2667c94452dfb94b9ac9f8e65faa69471a ./hail-fraction/convective_cloud_top_temperature.nc a01f1ab686e1570b89a91e2eb4d9c0fa1bb71bd1abf26d1e58b5b5610ae953dd ./hail-fraction/hail_melting_level.nc diff --git a/improver_tests/acceptance/test_gradient_between_vertical_levels.py b/improver_tests/acceptance/test_gradient_between_vertical_levels.py new file mode 100644 index 0000000000..e6e3977682 --- /dev/null +++ b/improver_tests/acceptance/test_gradient_between_vertical_levels.py @@ -0,0 +1,37 @@ +# (C) Crown copyright, Met Office. All rights reserved. +# +# This file is part of IMPROVER and is released under a BSD 3-Clause license. +# See LICENSE in the root of the repository for full licensing details. +"""Tests for the gradient-between-vertical-levels CLI""" + +import pytest + +from . import acceptance as acc + +pytestmark = [pytest.mark.acc, acc.skip_if_kgo_missing] +CLI = acc.cli_name_with_dashes(__file__) +run_cli = acc.run_cli(CLI) + + +def test_gradient_vertical_levels(tmp_path): + """Test gradient between vertical levels calculation returns expected result using + temperature at screen level and 850hPa, in combination with orography and height of + pressure levels.""" + kgo_dir = acc.kgo_root() / CLI + kgo_path = kgo_dir / "kgo.nc" + temp_at_screen = kgo_dir / "temperature_at_screen_level.nc" + temp_at_850 = kgo_dir / "temperature_at_850hpa.nc" + orography = kgo_dir / "orography.nc" + height_of_pressure_levels = kgo_dir / "height_of_pressure_levels.nc" + + output_path = tmp_path / "output.nc" + args = [ + temp_at_screen, + temp_at_850, + orography, + height_of_pressure_levels, + "--output", + f"{output_path}", + ] + run_cli(args) + acc.compare(output_path, kgo_path) diff --git a/improver_tests/utilities/test_GradientBetweenVerticalLevels.py b/improver_tests/utilities/test_GradientBetweenVerticalLevels.py new file mode 100644 index 0000000000..4d11152696 --- /dev/null +++ b/improver_tests/utilities/test_GradientBetweenVerticalLevels.py @@ -0,0 +1,169 @@ +# (C) Crown copyright, Met Office. All rights reserved. +# +# This file is part of IMPROVER and is released under a BSD 3-Clause license. +# See LICENSE in the root of the repository for full licensing details. +"""Unit tests for the GradientBetweenVerticalLevels plugin.""" + + +import iris +import numpy as np +import pytest + +from improver.synthetic_data.set_up_test_cubes import set_up_variable_cube +from improver.utilities.gradient_between_vertical_levels import ( + GradientBetweenVerticalLevels, +) + + +@pytest.fixture +def temperature_at_screen_level(): + """Set up a temperature at screen level cube""" + data = np.linspace(0, 3, 4).reshape(2, 2).astype(np.float32) + cube = set_up_variable_cube(data) + cube.add_aux_coord(iris.coords.AuxCoord(1.5, long_name="height", units="m")) + return cube + + +@pytest.fixture +def temperature_at_850hPa(): + """Set up a temperature at 850hPa cube""" + data = np.linspace(3, 0, 4).reshape(2, 2).astype(np.float32) + cube = set_up_variable_cube(data) + cube.add_aux_coord(iris.coords.AuxCoord(85000, long_name="pressure", units="Pa")) + return cube + + +@pytest.fixture +def orography(): + """Set up an orography cube""" + data = np.full((2, 2), 8.5, dtype=np.float32) + cube = set_up_variable_cube(data, name="surface_altitude", units="m") + return cube + + +@pytest.fixture +def height_of_pressure_levels(): + """Set up a height of pressure levels cube""" + data = np.array([np.full((2, 2), 10), np.full((2, 2), 110)]) + cube = set_up_variable_cube( + data, + height_levels=[100000, 85000], + pressure=True, + name="geopotential_height", + units="m", + ) + return cube + + +@pytest.mark.parametrize( + "height_or_pressure", ["height_both", "pressure_both", "mixed"] +) +def test_height_and_pressure( + temperature_at_screen_level, + temperature_at_850hPa, + orography, + height_of_pressure_levels, + height_or_pressure, +): + """Test that the plugin produces the expected result with cubes defined either + both on height or pressure levels. Also check the plugin produces the expected + result with one cube defined on height levels and the other on pressure levels.""" + cubes = [orography, height_of_pressure_levels] + expected_coord = {"height": 1.5, "pressure": 85000} + expected_bounds = None + if height_or_pressure == "height_both": + temperature_at_850hPa.add_aux_coord( + iris.coords.AuxCoord(101.5, long_name="height", units="m") + ) + temperature_at_850hPa.remove_coord("pressure") + cubes = [] + expected_bounds = {"height": [[1.5, 101.5]]} + expected_coord = {"height": 101.5} + elif height_or_pressure == "pressure_both": + temperature_at_screen_level.add_aux_coord( + iris.coords.AuxCoord(100000, long_name="pressure", units="Pa") + ) + temperature_at_screen_level.remove_coord("height") + cubes = [height_of_pressure_levels] + expected_bounds = {"pressure": [[85000, 100000]]} + expected_coord = {"pressure": 100000} + + cubes.append([temperature_at_850hPa, temperature_at_screen_level]) + + expected = [[0.03, 0.01], [-0.01, -0.03]] + result = GradientBetweenVerticalLevels()(iris.cube.CubeList(cubes)) + + for coord in expected_coord.keys(): + np.testing.assert_array_almost_equal( + result.coord(coord).points, expected_coord[coord] + ) + if expected_bounds: + np.testing.assert_array_almost_equal( + result.coord(coord).bounds, expected_bounds[coord] + ) + np.testing.assert_array_almost_equal(result.data, expected) + assert result.name() == "gradient_of_air_temperature" + assert result.units == "K/m" + + +def test_height_diff_0( + temperature_at_screen_level, + temperature_at_850hPa, + orography, + height_of_pressure_levels, +): + """Test that if the height difference a some grid square is 0 the point is masked.""" + height_of_pressure_levels.data[1, 0, 0] = 10 + expected = [[np.inf, 0.01], [-0.01, -0.03]] + expected_mask = [[True, False], [False, False]] + + result = GradientBetweenVerticalLevels()( + iris.cube.CubeList( + [ + temperature_at_850hPa, + temperature_at_screen_level, + height_of_pressure_levels, + orography, + ] + ) + ) + np.testing.assert_array_almost_equal(result.data, expected) + np.testing.assert_array_almost_equal(result.data.mask, expected_mask) + assert result.name() == "gradient_of_air_temperature" + assert result.units == "K/m" + + +def test_height_and_no_orography( + temperature_at_screen_level, temperature_at_850hPa, height_of_pressure_levels +): + """Test an error is raised if a height coordinate is present but no orography cube + is present.""" + + with pytest.raises( + ValueError, match="No orography cube provided", + ): + GradientBetweenVerticalLevels()( + iris.cube.CubeList( + [ + temperature_at_screen_level, + temperature_at_850hPa, + height_of_pressure_levels, + ] + ) + ) + + +def test_pressure_coord_and_no_pressure_levels( + temperature_at_screen_level, temperature_at_850hPa, orography +): + """Test an error is raised if a pressure coordinate is present but no geopotential_height + cube is present.""" + + with pytest.raises( + ValueError, match="No geopotential height", + ): + GradientBetweenVerticalLevels()( + iris.cube.CubeList( + [temperature_at_screen_level, temperature_at_850hPa, orography] + ) + )