Skip to content

Commit

Permalink
update statvar processor
Browse files Browse the repository at this point in the history
  • Loading branch information
ajaits committed Sep 27, 2023
1 parent 1bf8c2e commit a80fb7e
Show file tree
Hide file tree
Showing 14 changed files with 4,243 additions and 3,072 deletions.
25 changes: 22 additions & 3 deletions scripts/statvar/config_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@
'Comma separated list of namespace:file with property values.')
flags.DEFINE_list('input_data', [],
'Comma separated list of data files to be processed.')
flags.DEFINE_string('input_encoding', 'utf-8', 'Encoding for input_data files.')
flags.DEFINE_list(
'input_xls', [],
'Comma separated list of sheets within xls files to be processed.')
flags.DEFINE_integer('input_rows', sys.maxsize,
'Number of rows per input file to process.')
flags.DEFINE_integer('input_columns', sys.maxsize,
'Number of columns in input file to process.')
flags.DEFINE_integer(
'skip_rows', 0, 'Number of rows to skip at the begining of the input file.')
flags.DEFINE_integer(
Expand All @@ -70,7 +73,9 @@
flags.DEFINE_string('output_path', '',
'File prefix for output mcf, csv and tmcf.')
flags.DEFINE_string('existing_statvar_mcf', '',
'StatVar MCF files for any existing nodes to be resused.')
'StatVar MCF files for any existing stat var nodes to be resused.')
flags.DEFINE_string('existing_schema_mcf', '',
'StatVar MCF files for any existing schema nodes to be resused.')
flags.DEFINE_integer('parallelism', 0, 'Number of parallel processes to use.')
flags.DEFINE_integer('pprof_port', 0, 'HTTP port for pprof server.')
flags.DEFINE_bool('debug', False, 'Enable debug messages.')
Expand All @@ -89,6 +94,8 @@
flags.DEFINE_list('place_type', [], 'List of places types for name reoslution.')
flags.DEFINE_list('places_within', [],
'List of places types for name reoslution.')
flags.DEFINE_string('statvar_dcid_remap_csv', '',
'CSV file with existing DCIDs for generated statvars.')

flags.DEFINE_bool(
'resume', False,
Expand All @@ -106,6 +113,8 @@
2,
'input_data':
_FLAGS.input_data,
'input_encoding':
_FLAGS.input_encoding,
'input_xls':
_FLAGS.input_xls,
'pv_map_drop_undefined_nodes':
Expand Down Expand Up @@ -145,7 +154,10 @@
'measurementQualifier': '',
'statType': 'dcs:measuredValue',
'measuredProperty': 'dcs:count',
'populationType': ''
'populationType': '',
'memberOf': '',
'name': '',
'description': '',
}),
'statvar_dcid_ignore_values': ['measuredValue', 'StatisticalVariable'],
'default_svobs_pvs':
Expand Down Expand Up @@ -173,6 +185,8 @@
# Settings to compare StatVars with existing statvars to reuse dcids.
'existing_statvar_mcf':
_FLAGS.existing_statvar_mcf,
'existing_schema_mcf':
_FLAGS.existing_schema_mcf,
'statvar_fingerprint_ignore_props': [
'dcid', 'name', 'description', 'provenance', 'memberOf'
],
Expand All @@ -181,7 +195,7 @@
# This is used for schemaless statvars that can't be matched with
# existing statvars using property:value
'statvar_dcid_remap_csv':
'',
_FLAGS.statvar_dcid_remap_csv,

# Use numeric data in any column as a value.
# It may still be dropped if no SVObs can be constructed out of it.
Expand Down Expand Up @@ -239,6 +253,9 @@
True, # Skip emitting columns with constant values in the csv
'output_only_new_statvars':
True, # Drop existing statvars from output
'output_precision_digits': 5, # Round floating values to 5 decimal digits.
'generate_schema_mcf': True,
'generate_provisional_schema': True,

# Settings for DC API.
'dc_api_root':
Expand All @@ -253,6 +270,8 @@
_FLAGS.pv_map,
'input_rows':
_FLAGS.input_rows,
'input_columns':
_FLAGS.input_columns,
'skip_rows':
_FLAGS.skip_rows,
'header_rows':
Expand Down
179 changes: 113 additions & 66 deletions scripts/statvar/json_to_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,100 +13,147 @@
# limitations under the License.
"""Script to process data sets from OpenDataAfrica."""

import json
import os
import sys
import json
from typing import Union

from absl import app
from absl import flags
from absl import logging
from typing import Union

_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(_SCRIPT_DIR)
sys.path.append(os.path.dirname(_SCRIPT_DIR))
sys.path.append(os.path.dirname(os.path.dirname(_SCRIPT_DIR)))
sys.path.append(os.path.join(os.path.dirname(os.path.dirname(_SCRIPT_DIR)), 'util'))
sys.path.append(
os.path.join(os.path.dirname(os.path.dirname(_SCRIPT_DIR)), 'util')
)

import file_util

from counters import Counters

flags.DEFINE_string('input_json', '', 'JSON file to be converted into csv')
flags.DEFINE_string('csv_output', '', 'Output CSV file')
flags.DEFINE_list('csv_columns', [], 'List of columns')
flags.DEFINE_list(
'exclude_columns', [], 'Drop columns with the token in the list'
)

_FLAGS = flags.FLAGS


def flatten_dict(nested_dict: dict, key_prefix: str = '') -> dict:
'''Returns a flattened dict with key:values from the nested dict
"""Returns a flattened dict with key:values from the nested dict
{ prop1: { prop2: value }}, concatenating keys: { prop1.prop2: <value> }
'''
output_dict = {}
if nested_dict is None:
return {}
if isinstance(nested_dict, str):
return nested_dict.replace('\n', ' ')
if isinstance(nested_dict, int) or isinstance(nested_dict, float):
# value is a basic type. Keep it as is.
return nested_dict
if isinstance(nested_dict, list):
nested_dict = {str(i): nested_dict[i] for i in range(len(nested_dict))}
# Expand any nested values
for key, value in nested_dict.items():
key = flatten_dict(key)
value = flatten_dict(value)
if isinstance(value, dict):
for nkey, nvalue in value.items():
nvalue = flatten_dict(nvalue)
output_dict[f'{key_prefix}{key}.{nkey}'] = nvalue
else:
output_dict[f'{key_prefix}{key}'] = value
return output_dict
"""
output_dict = {}
if nested_dict is None:
return {}
if isinstance(nested_dict, str):
return nested_dict.replace('\n', ' ')
if isinstance(nested_dict, int) or isinstance(nested_dict, float):
# value is a basic type. Keep it as is.
return nested_dict
if isinstance(nested_dict, list):
nested_dict = {str(i): nested_dict[i] for i in range(len(nested_dict))}
# Expand any nested values
for key, value in nested_dict.items():
key = flatten_dict(key)
value = flatten_dict(value)
if isinstance(value, dict):
for nkey, nvalue in value.items():
nvalue = flatten_dict(nvalue)
output_dict[f'{key_prefix}{key}.{nkey}'] = nvalue
else:
output_dict[f'{key_prefix}{key}'] = value
return output_dict


def list_to_dict(input_list: list, output_dict: dict = None) -> dict:
'''Returns a dict with each element in the list as a value of the index.'''
if output_dict is None:
output_dict = dict()
if isinstance(input_list, dict):
output_dict.update(flatten_dict(input_list))
elif isinstance(input_list, list):
for item in input_list:
output_dict[len(output_dict)] = flatten_dict(item)
return output_dict


def file_json_to_csv(json_file: str, output_csv: str = '') -> str:
'''Returns the CSV file generated from the json file.'''
input_files = file_util.file_get_matching(json_file)
if not input_files:
return ''
input_list = []
for filename in input_files:
file_dict = file_util.file_load_py_dict(filename)
if isinstance(file_dict, list):
input_list.extend(file_dict)
logging.info(f'Loaded {len(file_dict)} items from {filename}')
elif isinstance(file_dict, dict):
input_list.append(file_dict)
logging.info(f'Loaded row from {filename}')

csv_rows = list_to_dict(input_list)
if not output_csv:
output_csv = file_util.file_get_name(input_files[-1], file_ext='.csv')
logging.info(
f'Writing {len(csv_rows)} rows from {input_files} into {output_csv}')
file_util.file_write_csv_dict(csv_rows, output_csv)
return output_csv
"""Returns a dict with each element in the list as a value of the index."""
if output_dict is None:
output_dict = dict()
if isinstance(input_list, dict):
output_dict.update(flatten_dict(input_list))
elif isinstance(input_list, list):
for item in input_list:
output_dict[len(output_dict)] = flatten_dict(item)
return output_dict


def filter_columns(
data: dict, output_columns: list, exclude_columns: list
) -> dict:
"""returns dict with keys only in the columns list."""
if output_columns:
keys = set(data.keys())
keys = keys.difference(output_columns)
for k in keys:
data.pop(k)
if exclude_columns:
keys = list(data.keys())
for k in keys:
for c in exclude_columns:
if c in k:
data.pop(k)
return data


def file_json_to_csv(
json_file: str,
csv_output: str = '',
output_columns: list = None,
exclude_columns: list = None,
) -> str:
"""Returns the CSV file generated from the json file."""
input_files = file_util.file_get_matching(json_file)
if not input_files:
return ''
counters = Counters()
counters.add_counter('total', len(input_files))
input_list = []
for filename in input_files:
file_dict = file_util.file_load_py_dict(filename)
if isinstance(file_dict, list):
input_list.extend(file_dict)
logging.info(f'Loaded {len(file_dict)} items from {filename}')
counters.add_counter('input-rows', file_dict)
elif isinstance(file_dict, dict):
counters.max_counter('input-columns', len(file_dict))
filtered_dict = filter_columns(file_dict, output_columns, exclude_columns)
input_list.append(filtered_dict)
counters.set_counter('output-columns', len(filtered_dict))
logging.info(f'Loaded row from {filename}')
counters.add_counter('input-rows', 1)
counters.add_counter('processed', 1)

csv_rows = list_to_dict(input_list)
if not csv_output:
csv_output = file_util.file_get_name(input_files[-1], file_ext='.csv')
logging.info(
f'Writing {len(csv_rows)} rows from {input_files} into {csv_output}'
)
columns = file_util.file_write_csv_dict(csv_rows, csv_output, output_columns)
counters.set_counter('output-columns', len(columns))
return csv_output


def main(_):
if not _FLAGS.input_json:
logging.error(
f'Please provide a JSON file to be converted to CSV with --input_json'
)
return 1
file_json_to_csv(_FLAGS.input_json, _FLAGS.csv_output)
if not _FLAGS.input_json:
logging.error(
f'Please provide a JSON file to be converted to CSV with --input_json'
)
return 1
file_json_to_csv(
_FLAGS.input_json,
_FLAGS.csv_output,
_FLAGS.csv_columns,
_FLAGS.exclude_columns,
)


if __name__ == '__main__':
app.run(main)
app.run(main)
Loading

0 comments on commit a80fb7e

Please sign in to comment.