Skip to content

Commit

Permalink
more features for non-svobs and pv-map key used in value
Browse files Browse the repository at this point in the history
  • Loading branch information
ajaits committed Nov 16, 2023
1 parent fbaf134 commit f55b6eb
Show file tree
Hide file tree
Showing 5 changed files with 1,197 additions and 1,099 deletions.
9 changes: 8 additions & 1 deletion scripts/statvar/config_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
os.path.join(os.path.dirname(os.path.dirname(_SCRIPT_DIR)), 'util')
)

import file_util
from config_map import ConfigMap
from mcf_file_util import get_numeric_value

Expand Down Expand Up @@ -131,6 +132,7 @@
'',
'CSV file with existing DCIDs for generated statvars.',
)
flags.DEFINE_string('output_counters_file', '', 'CSV file with counters.')

flags.DEFINE_bool(
'resume',
Expand Down Expand Up @@ -273,6 +275,7 @@
'header_rows': _FLAGS.header_rows,
'header_columns': _FLAGS.header_columns,
'parallelism': _FLAGS.parallelism,
'output_counters_file': _FLAGS.output_counters_file,
'debug': _FLAGS.debug,
'log_level': _FLAGS.log_level,
}
Expand All @@ -295,7 +298,11 @@ def get_config_from_flags(filename: str = None) -> ConfigMap:
if isinstance(filename, dict):
config_dict.update(filename)
filename = None
return ConfigMap(config_dict=config_dict, filename=filename)
else:
# Load config from file.
file_config = file_util.file_load_py_dict(filename)
update_config(file_config, config_dict)
return ConfigMap(config_dict=config_dict)


def set_config_value(param: str, value: str, config: dict):
Expand Down
6 changes: 6 additions & 0 deletions scripts/statvar/process_spreadsheet.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,12 @@ def process_spreadsheets(
if not ws:
logging.fatal(f'Unable to add worksheet: schema_mcf{index}')
data_config['output_schema_mcf'] = ws.url
if 'counters' not in data_sets:
ws = _add_worksheet(gs, title=f'counters{index}')
if not ws:
logging.fatal(f'Unable to add worksheet: counters{index}')
data_config['output_counters_file'] = ws.url


# Process the sheet
logging.info(f'Processing sheet: {data_config}')
Expand Down
146 changes: 96 additions & 50 deletions scripts/statvar/stat_var_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,15 +569,15 @@ def get_pvs_for_key_variants(
if not key:
return None
pvs = self.get_pvs_for_key(key, namespace)
if not pvs:
# Check if GLOBAL map has key namespace:column-key
pvs = self.get_pvs_for_key(f'{namespace}:{key}')
if not pvs:
pvs = self.get_pvs_for_key(key.lower(), namespace)
if pvs:
return [pvs]
# Check if GLOBAL map has key namespace:column-key
pvs = self.get_pvs_for_key(f'{namespace}:{key}')
if pvs:
return [pvs]
pvs = self.get_pvs_for_key(key.lower(), namespace)
if pvs:
return [pvs]
pvs_list = [pvs]
pvs_list.append({self._config.get('pv_lookup_key', 'Key'): key})
return pvs_list
# Check for keys with extra characters removed.
key_filtered = re.sub('[^A-Za-z0-9_%$-]+', ' ', key).strip()
if key_filtered != key:
Expand Down Expand Up @@ -1415,12 +1415,12 @@ def set_statvar_dup_svobs(self, svobs_key: str, svobs: dict):
f' {statvar[dup_svobs_key]}'
)

def add_statvar_obs(self, pvs: dict):
def add_statvar_obs(self, pvs: dict, has_output_column: bool = False):
# Check if the required properties are present.
missing_props = set(
self._config.get('required_statvarobs_properties', [])
).difference(set(pvs.keys()))
if missing_props:
if missing_props and not has_output_column:
logging.error(f'Missing SVObs properties {missing_props} in {pvs}')
self._counters.add_counter(
f'error-svobs-missing-property', 1, f'{missing_props}'
Expand Down Expand Up @@ -1534,7 +1534,9 @@ def is_valid_svobs(self, pvs: dict) -> bool:
return False
# Check if the StatVar exists.
statvar_dcid = strip_namespace(pvs.get('variableMeasured', ''))
if not statvar_dcid:
if not statvar_dcid and not _pvs_has_any_prop(
pvs, self._config.get('output_columns')
):
logging.error(f'Missing statvar_dcid for SVObs {pvs}')
return False
if statvar_dcid not in self._statvars_map:
Expand Down Expand Up @@ -1937,6 +1939,7 @@ def __init__(
self._internal_reference_keys = [
self._config.get('data_key', 'Data'),
self._config.get('numeric_data_key', 'Number'),
self._config.get('pv_lookup_key', 'Key'),
]

# Functions that can be overridden by child classes.
Expand Down Expand Up @@ -2178,14 +2181,14 @@ def resolve_value_references(
return {}
pvs = dict()
resolved_props = set()
unresolved_refs = set()
unresolved_refs = dict()
for d in reversed(pvs_list):
for prop, value_list in d.items():
if not isinstance(value_list, list):
value_list = [value_list]
for value in value_list:
# Check if the value has any references with @
value_unresolved_refs = []
value_unresolved_refs = dict()
refs = self.get_reference_names(value)
# Replace each reference with its value.
for ref in refs:
Expand All @@ -2206,7 +2209,7 @@ def resolve_value_references(
.replace('@' + ref, replacement)
)
else:
value_unresolved_refs.append(ref)
value_unresolved_refs[ref] = {prop: value}
if value_unresolved_refs:
unresolved_refs.update(value_unresolved_refs)
logging.level_debug() and logging.debug(
Expand All @@ -2216,7 +2219,7 @@ def resolve_value_references(
self._counters.add_counter(
'warning-unresolved-value-ref',
1,
','.join(value_unresolved_refs),
','.join(value_unresolved_refs.keys()),
)
else:
resolved_props.add(prop)
Expand All @@ -2235,13 +2238,18 @@ def resolve_value_references(
f'Resolved references in {pvs_list} into {pvs} with unresolved:'
f' {unresolved_refs}'
)
resolvable_refs = resolved_props.intersection(unresolved_refs)
resolvable_refs = resolved_props.intersection(unresolved_refs.keys())
if resolvable_refs:
# Additional unresolved props can be resolved.
logging.level_debug() and logging.debug(
f'Re-resolving references {resolvable_refs} in {pvs}'
)
pvs = self.resolve_value_references([pvs], process_pvs=False)
f'Re-resolving references {resolvable_refs} in {pvs} for unresolved'
f' pvs: {unresolved_refs}'
)
resolve_pvs_list = []
for ref in resolvable_refs:
resolve_pvs_list.append(unresolved_refs[ref])
resolve_pvs_list.append(pvs)
pvs = self.resolve_value_references(resolve_pvs_list, process_pvs=False)
if process_pvs:
if self._pv_mapper.process_pvs_for_data(key=None, pvs=pvs):
# PVs were processed. Resolve any references again.
Expand Down Expand Up @@ -2547,13 +2555,13 @@ def process_row(self, row: list, row_index: int):
self._file_context
)
resolved_col_pvs[col_index] = merged_col_pvs
if self.process_stat_var_obs_pvs(merged_col_pvs, row_index, col_index):
if not self.is_header_index(
row_index, col_index + 1
) and self.process_stat_var_obs_pvs(merged_col_pvs, row_index, col_index):
row_svobs += 1
# If row has no SVObs but has PVs, it must be a header.
if (
not row_svobs
and cols_with_pvs > 0
and self.is_header_index(row_index, col_index + 1)
if self.is_header_index(row_index, col_index + 1) or (
not row_svobs and cols_with_pvs > 0
):
# Any column with PVs must be a header applicable to entire column.
logging.level_debug() and logging.debug(
Expand Down Expand Up @@ -2590,6 +2598,20 @@ def process_stat_var_obs_value(self, pvs: dict) -> bool:
pvs.pop(multiply_prop)
return True

def pvs_has_output_columns(self, pvs: dict) -> bool:
"""Returns True if the pvs have any of the output columns as keys."""
output_columns = self._config.get('output_columns')
if pvs and output_columns:
# value is a mandatory column for SVObs
if 'value' in output_columns:
if not pvs.get(value):
# Output columns are SVObs but no value present. Ignore it.
return False
for prop in pvs.keys():
if prop in output_columns:
return True
return False

def should_ignore_stat_var_obs_pvs(self, pvs: dict) -> bool:
"""Returns True if the pvs should be ignored."""
# TODO(ajaits): add a config to filter pvs.
Expand Down Expand Up @@ -2648,12 +2670,15 @@ def process_stat_var_obs(self, pvs: dict) -> bool:
return status

def process_single_stat_var_obs(self, pvs: dict) -> bool:
has_output_column = False
if not self.process_stat_var_obs_value(pvs):
# No values in this data cell. May be a header.
logging.level_debug() and logging.debug(
f'No SVObs value in dict {pvs} in {self._file_context}'
)
return False
has_output_column = self.pvs_has_output_columns(pvs)
if not has_output_column:
# No values in this data cell. May be a header.
logging.level_debug() and logging.debug(
f'No SVObs value in dict {pvs} in {self._file_context}'
)
return False

if self.should_ignore_stat_var_obs_pvs(pvs):
# Ignore these PVs,
Expand All @@ -2670,15 +2695,17 @@ def process_single_stat_var_obs(self, pvs: dict) -> bool:
# Separate out PVs for StatVar and StatvarObs
statvar_pvs = {}
svobs_pvs = {}
output_columns = self._config.get('output_columns', [])
for prop, value in pvs.items():
if prop == self._config.get('aggregate_key', '#Aggregate'):
svobs_pvs[prop] = value
elif _is_valid_property(
prop, self._config.get('schemaless', False)
) and _is_valid_value(value):
if prop in self._config.get(
'default_svobs_pvs'
) or prop in self._config.get('output_columns', []):
if (
prop in self._config.get('default_svobs_pvs')
or prop in output_columns
):
svobs_pvs[prop] = value
else:
statvar_pvs[prop] = value
Expand All @@ -2689,36 +2716,39 @@ def process_single_stat_var_obs(self, pvs: dict) -> bool:
for p in [
self._config.get('data_key', 'Data'),
self._config.get('numeric_data_key', 'Number'),
self._config.get('pv_lookup_key', 'Key'),
]:
if p in statvar_pvs:
statvar_pvs.pop(p)

self.generate_dependant_stat_vars(statvar_pvs, svobs_pvs)
variable_measured = strip_namespace(svobs_pvs.get('variableMeasured'))
statvar_dcid = self.process_stat_var_pvs(statvar_pvs, variable_measured)
if not statvar_dcid:
if not variable_measured:
# No statvar or variable measured in obs, drop it.
logging.error(
f'Dropping SVObs {svobs_pvs} for invalid statvar {statvar_pvs} in'
f' {self._file_context}'
)
self._counters.add_counter(
f'dropped-svobs-with-invalid-statvar', 1, statvar_dcid
)
return False
statvar_dcid = variable_measured
svobs_pvs['variableMeasured'] = add_namespace(statvar_dcid)
statvar_dcid = ''
if statvar_pvs:
self.generate_dependant_stat_vars(statvar_pvs, svobs_pvs)
variable_measured = strip_namespace(svobs_pvs.get('variableMeasured'))
statvar_dcid = self.process_stat_var_pvs(statvar_pvs, variable_measured)
if not statvar_dcid:
if not variable_measured:
# No statvar or variable measured in obs, drop it.
logging.error(
f'Dropping SVObs {svobs_pvs} for invalid statvar {statvar_pvs} in'
f' {self._file_context}'
)
self._counters.add_counter(
f'dropped-svobs-with-invalid-statvar', 1, statvar_dcid
)
return False
statvar_dcid = variable_measured
svobs_pvs['variableMeasured'] = add_namespace(statvar_dcid)
svobs_pvs[self._config.get('input_reference_column')] = self._file_context

# Create and add SVObs.
self._statvars_map.add_default_pvs(
self._config.get('default_svobs_pvs', {}), svobs_pvs
)
if not self.resolve_svobs_place(svobs_pvs):
if not self.resolve_svobs_place(svobs_pvs) and not has_output_column:
logging.error(f'Unable to resolve SVObs place in {pvs}')
return False
if not self._statvars_map.add_statvar_obs(svobs_pvs):
if not self._statvars_map.add_statvar_obs(svobs_pvs, has_output_column):
logging.error(
f'Dropping invalid SVObs {svobs_pvs} for statvar {statvar_pvs} in'
f' {self._file_context}'
Expand Down Expand Up @@ -2915,6 +2945,14 @@ def write_outputs(self, output_path: str):
output_tmcf_file=output_tmcf_file,
)
self._counters.print_counters()
counters_filename = self._config.get(
'output_counters_file', output_path + '_counters.txt'
)
logging.info(f'Writing counters to {counters_filename}')
file_util.file_write_csv_dict(
OrderedDict(sorted(self._counters.get_counters().items())),
counters_filename,
)

def get_output_files(self, output_path: str) -> list:
"""Returns the list of output file names."""
Expand Down Expand Up @@ -2962,6 +3000,14 @@ def _str_from_number(
return f'{number}'


def _pvs_has_any_prop(pvs: dict, columns: list = None) -> bool:
if pvs and columns:
for prop, value in pvs.items():
if value and prop in columns:
return True
return False


def download_csv_from_url(urls: list, data_path: str) -> list:
"""Download data from the URL into the given file.
Expand Down
Loading

0 comments on commit f55b6eb

Please sign in to comment.