Skip to content

Commit

Permalink
Slight refactoring of diag `galytska23/select_variables_for_tigramite…
Browse files Browse the repository at this point in the history
….py` for generality and portability (for Changelog v2.10: authors: @valeriupredoi and @egalytska) (#3298)

Co-authored-by: Evgenia Galytska <[email protected]>
  • Loading branch information
valeriupredoi and egalytska authored Aug 11, 2023
1 parent 6747e63 commit a8186dd
Showing 1 changed file with 132 additions and 71 deletions.
203 changes: 132 additions & 71 deletions esmvaltool/diag_scripts/galytska23/select_variables_for_tigramite.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,27 @@
save_data,
)
from esmvaltool.diag_scripts.shared._base import (
get_plot_filename,
)
get_plot_filename, )

logger = logging.getLogger(Path(__file__).stem)

# Fixed parameters
# list of variables to be ignored per model
ignored_variables = {"HadISST": ["heat_flux"]}

# list of variables per dataset that will be processed
proc_vars = {
"ERA5": [
'PV', 'Arctic_temperature', 'Psl_Ural', 'Psl_Sib', 'Psl_Aleut',
'heat_flux'
],
"HadISST": ['BK_sic', 'Ok_sic'],
"all_other_datasets": [
'PV', 'Arctic_temperature', 'Psl_Ural', 'Psl_Sib', 'Psl_Aleut',
'heat_flux', 'BK_sic', 'Ok_sic'
],
}


def get_provenance_record(ancestor_files):
"""Create a provenance record describing the diagnostic data and plot."""
Expand Down Expand Up @@ -124,55 +140,68 @@ def calculate_heat_flux(list_va_ta):
return hf_anom_zm


def variable_cases(var, item):
def variable_cases(var_name, var):
"""Match preprocessor name and corresponding calculations."""
if var == 'pv':
out_var = calculate_polar_vortex(item)
elif var == 'pre_tas':
out_var = calculate_arctic_tas(item)
elif var == 'pressure_ural':
out_var = calculate_slp(item)
if var_name == 'pv':
out_var = calculate_polar_vortex(var)
elif var_name == 'pre_tas':
out_var = calculate_arctic_tas(var)
elif var_name == 'pressure_ural':
out_var = calculate_slp(var)
out_var.var_name = 'Psl_Ural'
elif var == 'pressure_sib':
out_var = calculate_slp(item)
elif var_name == 'pressure_sib':
out_var = calculate_slp(var)
out_var.var_name = 'Psl_Sib'
elif var == 'pressure_aleut':
out_var = calculate_slp(item)
elif var_name == 'pressure_aleut':
out_var = calculate_slp(var)
out_var.var_name = 'Psl_Aleut'
elif var == 'bk_ice':
out_var = finalize_bk_ice(item)
elif var == 'ok_ice':
out_var = finalize_ok_ice(item)
elif var == 'heat_flux':
out_var = prepare_heat_flux(item)
elif var_name == 'bk_ice':
out_var = finalize_bk_ice(var)
elif var_name == 'ok_ice':
out_var = finalize_ok_ice(var)
elif var_name == 'heat_flux':
out_var = prepare_heat_flux(var)
else:
raise NotImplementedError(f"Variable '{var}' not supported")
raise NotImplementedError(f"Variable '{var_name}' not yet supported.")
return out_var


def calculate_variables(input_dict):
def calculate_variables(dataset_dict):
"""Calculate all necessary variables."""
logger.debug("Variables are calculated for the following datasources:%s",
input_dict.keys())
dictionary = {}
for key, value in input_dict.items():
logger.debug("Calculating final variables for %s dataset", key)
dictionary.setdefault(key, {})
tmp_list = []
for item in value:
if item['preprocessor'] == "heat_flux":
tmp_list.append(variable_cases(item['preprocessor'], item))
else:
dictionary[key].setdefault(
variable_cases(item['preprocessor'], item).var_name,
variable_cases(item['preprocessor'], item)
)

if key != "HadISST":
# Calculate heat flux for all data sources except HadISST
dataset_dict.keys())
processed_vars = {}
for dataset, variables in dataset_dict.items():
processed_vars[dataset] = {}

logger.debug("Calculating final variables %s for %s dataset",
variables, dataset)

if dataset in ignored_variables:
to_ignore_vars = ignored_variables.get(dataset, None)
for var in variables:
var_name = var['preprocessor']
if var_name not in to_ignore_vars:
new_var = variable_cases(var_name, var)
new_var_name = new_var.var_name
processed_vars[dataset][new_var_name] = new_var
else:
tmp_list = []
for var in variables:
var_name = var['preprocessor']
if var_name == "heat_flux":
tmp_list.append(variable_cases(var_name, var))
else:
new_var = variable_cases(var_name, var)
new_var_name = new_var.var_name
processed_vars[dataset][new_var_name] = new_var
if len(tmp_list) != 2:
raise IndexError("The preprocessor heat flux requests two \
variables in the recipe: va and ta")
heat_flux = calculate_heat_flux(tmp_list)
dictionary[key].setdefault(heat_flux.var_name, heat_flux)
return dictionary
processed_vars[dataset][heat_flux.var_name] = heat_flux

return processed_vars


def plotting_support(cube, key, **kwargs):
Expand All @@ -193,52 +222,84 @@ def plot_timeseries(dictionary, var, cfg):
colors = plt.cm.viridis(np.linspace(0, 1, len(dictionary.keys())))
baseplotname = f"Timeseries_{var}_anomalies"
filename = get_plot_filename(baseplotname, cfg)
for i, key in enumerate(dictionary.keys()):
if var not in ('BK_sic', 'Ok_sic'):
if key == "HadISST":
for idx, dataset in enumerate(dictionary.keys()):
if var not in proc_vars["HadISST"]:
if dataset == "HadISST":
continue
if key != 'ERA5':
plotting_support(dictionary[key][var], key,
color=colors[i])
if dataset != 'ERA5':
plotting_support(dictionary[dataset][var],
dataset, color=colors[idx])
else:
plotting_support(dictionary[key][var], key,
color='k', linewidth=2)
plotting_support(dictionary[dataset][var],
dataset,
color='k',
linewidth=2)
else:
if key == "ERA5":
if dataset == "ERA5":
continue
if key != 'HadISST':
plotting_support(dictionary[key][var], key, color=colors[i])
if dataset != 'HadISST':
plotting_support(dictionary[dataset][var],
dataset, color=colors[idx])
else:
plotting_support(dictionary[key][var], key, color='blue',
plotting_support(dictionary[dataset][var],
dataset,
color='blue',
linewidth=2)
fig.savefig(filename, bbox_inches='tight')


def assemble_cube_list(dataset, var, special_datasets):
"""
Assemble a list of processed vars cubes.
Depending on what vars are needed per dataset,
variables list differs per analyzed dataset. Dict holding the
needed variables per dataset needs updating everytime a new dataset
or variable gets included.
Parameters
----------
dataset: str
dataset name.
var: dict
variable dictionary.
special_datasets: list
list of datasets to be treated separately,
with restricted variables.
type: list of datasets (list of strings).
Returns
-------
iris.cube.CubeList
list of cubes.
"""
if dataset not in special_datasets:
cube_list = iris.cube.CubeList(
[var[proc_var] for proc_var in proc_vars["all_other_datasets"]])
else:
cube_list = iris.cube.CubeList(
[var[proc_var] for proc_var in proc_vars[dataset]])

return cube_list


def main(cfg):
"""Calculate and save final variables into .nc files."""
special_datasets = ["ERA5", "HadISST"]

my_files_dict = group_metadata(cfg['input_data'].values(), 'dataset')
all_variables = calculate_variables(my_files_dict)

# Check is timeseries should be plotted
if cfg['plot_timeseries'] is True:
if cfg['plot_timeseries']:
plot_timeseries(all_variables, cfg['variable_to_plot'], cfg)
for key in my_files_dict:
logger.info("Processing final calculations in dataset %s", key)
prov_record = get_provenance_record([key])
var = all_variables[key]
if key == "ERA5":
cube_list = iris.cube.CubeList([
var['PV'], var['Arctic_temperature'], var['Psl_Ural'],
var['Psl_Sib'], var['Psl_Aleut'], var['heat_flux']])
elif key == "HadISST":
cube_list = iris.cube.CubeList([
var['BK_sic'], var['Ok_sic']])
else:
cube_list = iris.cube.CubeList([
var['PV'], var['Arctic_temperature'], var['Psl_Ural'],
var['Psl_Sib'], var['Psl_Aleut'], var['heat_flux'],
var['BK_sic'], var['Ok_sic']])
save_data(key, prov_record, cfg, cube_list)
logger.info("%s data is saved in .nc", key)
for dataset in my_files_dict:
logger.info("Processing final calculations in dataset %s", dataset)
prov_record = get_provenance_record([dataset])
var = all_variables[dataset]
cube_list = assemble_cube_list(dataset, var, special_datasets)
save_data(dataset, prov_record, cfg, cube_list)
logger.info("%s data is saved in .nc", dataset)
logger.info("Done.")


Expand Down

0 comments on commit a8186dd

Please sign in to comment.