Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error messages when data is missing #917

Merged
merged 10 commits into from
Feb 1, 2021
143 changes: 86 additions & 57 deletions esmvalcore/_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,21 +432,18 @@ def _update_fx_files(step_name, settings, variable, config_user, fx_vars):
if not fx_vars:
return

fx_vars = [
_get_fx_file(variable, fxvar, config_user)
for fxvar in fx_vars
]
fx_vars = [_get_fx_file(variable, fxvar, config_user) for fxvar in fx_vars]

fx_dict = {fx_var[1]['short_name']: fx_var[0] for fx_var in fx_vars}
settings['fx_variables'] = fx_dict
logger.info('Using fx_files: %s for variable %s during step %s',
pformat(settings['fx_variables']),
variable['short_name'],
pformat(settings['fx_variables']), variable['short_name'],
step_name)


def _update_fx_settings(settings, variable, config_user):
"""Update fx settings depending on the needed method."""

# get fx variables either from user defined attribute or fixed
def _get_fx_vars_from_attribute(step_settings, step_name):
user_fx_vars = step_settings.get('fx_variables')
Expand All @@ -457,8 +454,8 @@ def _get_fx_vars_from_attribute(step_settings, step_name):
user_fx_vars.append('sftof')
elif step_name == 'mask_landseaice':
user_fx_vars = ['sftgif']
elif step_name in ('area_statistics',
'volume_statistics', 'zonal_statistics'):
elif step_name in ('area_statistics', 'volume_statistics',
'zonal_statistics'):
user_fx_vars = []
return user_fx_vars

Expand All @@ -470,8 +467,8 @@ def _get_fx_vars_from_attribute(step_settings, step_name):
for step_name, step_settings in settings.items():
if step_name in fx_steps:
fx_vars = _get_fx_vars_from_attribute(step_settings, step_name)
_update_fx_files(step_name, step_settings,
variable, config_user, fx_vars)
_update_fx_files(step_name, step_settings, variable, config_user,
fx_vars)


def _read_attributes(filename):
Expand Down Expand Up @@ -653,16 +650,12 @@ def get_matching(attributes):
return grouped_products


def _get_preprocessor_products(variables,
profile,
order,
ancestor_products,
config_user):
"""
Get preprocessor product definitions for a set of datasets.
def _get_preprocessor_products(variables, profile, order, ancestor_products,
config_user, name):
"""Get preprocessor product definitions for a set of datasets.

It updates recipe settings as needed by various preprocessors
and sets the correct ancestry.
It updates recipe settings as needed by various preprocessors and
sets the correct ancestry.
"""
products = set()
for variable in variables:
Expand All @@ -673,7 +666,7 @@ def _get_preprocessor_products(variables,
grouped_ancestors = _match_products(ancestor_products, variables)
else:
grouped_ancestors = {}

missing_vars = set()
for variable in variables:
settings = _get_default_settings(
variable,
Expand All @@ -682,29 +675,26 @@ def _get_preprocessor_products(variables,
)
_apply_preprocessor_profile(settings, profile)
_update_multi_dataset_settings(variable, settings)
_update_target_levels(
variable=variable,
variables=variables,
settings=settings,
config_user=config_user,
)
_update_extract_shape(settings, config_user)
_update_weighting_settings(settings, variable)
_update_fx_settings(settings=settings,
variable=variable,
config_user=config_user)
_update_target_grid(
variable=variable,
variables=variables,
settings=settings,
config_user=config_user,
)
_update_regrid_time(variable, settings)
try:
_update_target_levels(
variable=variable,
variables=variables,
settings=settings,
config_user=config_user,
)
except RecipeError as ex:
missing_vars.add(ex.message)
_update_preproc_functions(settings, config_user, variable, variables,
missing_vars)
ancestors = grouped_ancestors.get(variable['filename'])
if not ancestors:
ancestors = _get_ancestors(variable, config_user)
if config_user.get('skip-nonexistent') and not ancestors:
logger.info("Skipping: no data found for %s", variable)
try:
ancestors = _get_ancestors(variable, config_user)
except RecipeError as ex:
if config_user.get('skip-nonexistent') and not ancestors:
logger.info("Skipping: %s", ex.message)
else:
missing_vars.add(ex.message)
continue
product = PreprocessorFile(
attributes=variable,
Expand All @@ -713,6 +703,11 @@ def _get_preprocessor_products(variables,
)
products.add(product)

if missing_vars:
separator = "\n- "
raise RecipeError(f'Missing data for preprocessor {name}:{separator}'
f'{separator.join(sorted(missing_vars))}')

_update_statistic_settings(products, order, config_user['preproc_dir'])

for product in products:
Expand All @@ -721,6 +716,25 @@ def _get_preprocessor_products(variables,
return products


def _update_preproc_functions(settings, config_user, variable, variables,
missing_vars):
_update_extract_shape(settings, config_user)
_update_weighting_settings(settings, variable)
_update_fx_settings(settings=settings,
variable=variable,
config_user=config_user)
try:
_update_target_grid(
variable=variable,
variables=variables,
settings=settings,
config_user=config_user,
)
except RecipeError as ex:
missing_vars.add(ex.message)
_update_regrid_time(variable, settings)


def _get_single_preprocessor_task(variables,
profile,
config_user,
Expand All @@ -741,7 +755,9 @@ def _get_single_preprocessor_task(variables,
profile=profile,
order=order,
ancestor_products=ancestor_products,
config_user=config_user)
config_user=config_user,
name=name,
)

if not products:
raise RecipeError(
Expand Down Expand Up @@ -911,7 +927,13 @@ def __init__(self,
raw_recipe['diagnostics'], raw_recipe.get('datasets', []))
self.entity = self._initialize_provenance(
raw_recipe.get('documentation', {}))
self.tasks = self.initialize_tasks() if initialize_tasks else None
try:
self.tasks = self.initialize_tasks() if initialize_tasks else None
except RecipeError as ex:
logger.error(ex.message)
for task in ex.failed_tasks:
logger.error(task.message)
raise

@staticmethod
def _need_ncl(raw_diagnostics):
Expand Down Expand Up @@ -975,8 +997,7 @@ def _initialize_datasets(raw_datasets):

@staticmethod
def _expand_ensemble(variables):
"""
Expand ensemble members to multiple datasets.
"""Expand ensemble members to multiple datasets.

Expansion only supports ensembles defined as strings, not lists.
"""
Expand Down Expand Up @@ -1267,24 +1288,29 @@ def initialize_tasks(self):
tasknames_to_run = self._cfg.get('diagnostics')

priority = 0
failed_tasks = []
for diagnostic_name, diagnostic in self.diagnostics.items():
logger.info("Creating tasks for diagnostic %s", diagnostic_name)

# Create preprocessor tasks
for variable_group in diagnostic['preprocessor_output']:
task_name = diagnostic_name + TASKSEP + variable_group
logger.info("Creating preprocessor task %s", task_name)
task = _get_preprocessor_task(
variables=diagnostic['preprocessor_output']
[variable_group],
profiles=self._preprocessors,
config_user=self._cfg,
task_name=task_name,
)
for task0 in task.flatten():
task0.priority = priority
tasks.add(task)
priority += 1
try:
task = _get_preprocessor_task(
variables=diagnostic['preprocessor_output']
[variable_group],
profiles=self._preprocessors,
config_user=self._cfg,
task_name=task_name,
)
except RecipeError as ex:
failed_tasks.append(ex)
else:
for task0 in task.flatten():
task0.priority = priority
tasks.add(task)
priority += 1

# Create diagnostic tasks
for script_name, script_cfg in diagnostic['scripts'].items():
Expand All @@ -1299,7 +1325,10 @@ def initialize_tasks(self):
task.priority = priority
tasks.add(task)
priority += 1

if failed_tasks:
ex = RecipeError('Could not create all tasks')
ex.failed_tasks.extend(failed_tasks)
raise ex
check.tasks_valid(tasks)

# Resolve diagnostic ancestors
Expand Down
20 changes: 14 additions & 6 deletions esmvalcore/_recipe_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import itertools
import logging
import os
import re
import subprocess
from shutil import which
import re

import yamale

Expand All @@ -16,6 +16,14 @@

class RecipeError(Exception):
"""Recipe contains an error."""
def __init__(self, msg):
super().__init__(self)
self.message = msg
self.failed_tasks = []

def __str__(self):
"""Return message string."""
return self.message


def ncl_version():
Expand Down Expand Up @@ -115,7 +123,8 @@ def data_availability(input_files, var, dirnames, filenames):
"Looked for files matching %s, but did not find any existing "
"input directory", filenames)
logger.error("Set 'log_level' to 'debug' to get more information")
raise RecipeError("Missing data")
raise RecipeError(
f"Missing data for {var['alias']}: {var['short_name']}")

# check time avail only for non-fx variables
if var['frequency'] == 'fx':
Expand Down Expand Up @@ -184,10 +193,9 @@ def valid_multimodel_statistic(statistic):
"""Check that `statistic` is a valid argument for multimodel stats."""
valid_names = ["mean", "median", "std", "min", "max"]
valid_patterns = [r"^(p\d{1,2})(\.\d*)?$"]
if not (statistic in valid_names or
re.match(r'|'.join(valid_patterns), statistic)):
if not (statistic in valid_names
or re.match(r'|'.join(valid_patterns), statistic)):
raise RecipeError(
"Invalid value encountered for `statistic` in preprocessor "
f"`multi_model_statistics`. Valid values are {valid_names} "
f"or patterns matching {valid_patterns}. Got '{statistic}.'"
)
f"or patterns matching {valid_patterns}. Got '{statistic}.'")
Loading