From 5c49eddaa87b3edde26ec6b08f314909b12338a7 Mon Sep 17 00:00:00 2001 From: Ajai Date: Tue, 17 Dec 2024 22:06:03 +0530 Subject: [PATCH] Utils for statvar processor for string lookup (#1133) * Utils for statvar processor for strng lookup * move statvar files to tools/statvar_importer * fix comments --- .../statvar_importer}/__init__.py | 0 .../statvar_importer}/mcf_diff.py | 0 .../statvar_importer}/mcf_diff_test.py | 0 .../statvar_importer}/mcf_file_util.py | 335 ++++++++----- .../statvar_importer}/mcf_file_util_test.py | 0 .../statvar_importer}/mcf_filter.py | 0 .../statvar_importer}/mcf_filter_test.py | 0 tools/statvar_importer/ngram_matcher.py | 225 +++++++++ tools/statvar_importer/ngram_matcher_test.py | 42 ++ .../statvar_importer/property_value_cache.py | 447 ++++++++++++++++++ .../property_value_cache_test.py | 127 +++++ .../india_census_sample_output_stat_vars.mcf | 0 .../test_data/sample_filtered.mcf | 0 .../test_data/sample_output_stat_vars.mcf | 0 .../us_census_B01001_output_stat_vars.mcf | 0 15 files changed, 1050 insertions(+), 126 deletions(-) rename {scripts/statvar => tools/statvar_importer}/__init__.py (100%) rename {scripts/statvar => tools/statvar_importer}/mcf_diff.py (100%) rename {scripts/statvar => tools/statvar_importer}/mcf_diff_test.py (100%) rename {scripts/statvar => tools/statvar_importer}/mcf_file_util.py (77%) rename {scripts/statvar => tools/statvar_importer}/mcf_file_util_test.py (100%) rename {scripts/statvar => tools/statvar_importer}/mcf_filter.py (100%) rename {scripts/statvar => tools/statvar_importer}/mcf_filter_test.py (100%) create mode 100644 tools/statvar_importer/ngram_matcher.py create mode 100644 tools/statvar_importer/ngram_matcher_test.py create mode 100644 tools/statvar_importer/property_value_cache.py create mode 100644 tools/statvar_importer/property_value_cache_test.py rename {scripts/statvar => tools/statvar_importer}/test_data/india_census_sample_output_stat_vars.mcf (100%) rename {scripts/statvar => tools/statvar_importer}/test_data/sample_filtered.mcf (100%) rename {scripts/statvar => tools/statvar_importer}/test_data/sample_output_stat_vars.mcf (100%) rename {scripts/statvar => tools/statvar_importer}/test_data/us_census_B01001_output_stat_vars.mcf (100%) diff --git a/scripts/statvar/__init__.py b/tools/statvar_importer/__init__.py similarity index 100% rename from scripts/statvar/__init__.py rename to tools/statvar_importer/__init__.py diff --git a/scripts/statvar/mcf_diff.py b/tools/statvar_importer/mcf_diff.py similarity index 100% rename from scripts/statvar/mcf_diff.py rename to tools/statvar_importer/mcf_diff.py diff --git a/scripts/statvar/mcf_diff_test.py b/tools/statvar_importer/mcf_diff_test.py similarity index 100% rename from scripts/statvar/mcf_diff_test.py rename to tools/statvar_importer/mcf_diff_test.py diff --git a/scripts/statvar/mcf_file_util.py b/tools/statvar_importer/mcf_file_util.py similarity index 77% rename from scripts/statvar/mcf_file_util.py rename to tools/statvar_importer/mcf_file_util.py index 14dabe0685..970e7c3496 100644 --- a/scripts/statvar/mcf_file_util.py +++ b/tools/statvar_importer/mcf_file_util.py @@ -145,17 +145,18 @@ def strip_namespace(value: str) -> str: def strip_value(value: str) -> str: - """Returns the string value with spacesding/trailing space stripped. + """Returns the string value with leading/trailing space stripped + even if the value is enclosed in double quotes. - Args: - value: string to be cleaned. + Args: + value: string to be cleaned as text or qithin double quotes. - Returns: - string without extra leading and trailing spaces. - """ + Returns: + string without extra leading and trailing spaces. + """ if value and isinstance(value, str): value = value.strip() - if value[0] == '"' and value[-1] == '"': + if value and value[0] == '"' and value[-1] == '"': value_str = value[1:-1] value_str.strip() value = '"' + value_str + '"' @@ -213,21 +214,31 @@ def add_pv_to_node( if value and ',' in value: # Split the comma separated value into a list. value = normalize_list(value, False) - if not value: - return node + # Allow empty value + # if not value: + # return node + value_list = [] if isinstance(value, list): - # Add each value recursively. - for v in value: + value_list = value + elif isinstance(value, str) and ',' in value: + value_list = get_value_list(value) + if value_list: + if len(value_list) == 1: + value = value_list[0] + else: + # Add each value recursively. + for v in value_list: add_pv_to_node(prop, v, node, append_value, strip_namespaces, normalize) - return node - if not value: - return node + return node + # allow empty values + # if not value: + # return node existing_value = node.get(prop) - if existing_value and prop != 'Node' and prop != 'dcid': + if existing_value is not None and prop != 'Node' and prop != 'dcid': # Property already exists. Add value to a list if not present. - if value and value != existing_value and value not in existing_value.split( - ','): + if (value is not None and value != existing_value and + value not in existing_value.split(',')): if append_value: # Append value to a list of existing values node[prop] = f'{node[prop]},{value}' @@ -254,8 +265,15 @@ def add_comment_to_node(comment: str, node: dict) -> dict: for example, '# comment1', '# comment2'. """ # Count the existing comments in the node. - comments = [c for c in node.keys() if c and c[0] == '#'] - next_comment_index = len(comments) + 1 + num_comments = 0 + for c, v in node.items(): + if not c or c[0] != '#': + continue + if v == comment: + # skip existing comment + return node + num_comments += 1 + next_comment_index = num_comments + 1 # Add the new comment with the next index. node[f'# comment{next_comment_index}'] = comment return node @@ -289,14 +307,16 @@ def add_mcf_node( normalize: bool = True, ) -> dict: """Add a node with property values into the nodes dict + If the node exists, the PVs are added to the existing node. Args: pvs: dictionary of property: values of the new node to be added. nodes: dictionary of existing nodes with property:value dict for each node. strip_namespaces: if True, strip namespace from the dcid key and values. - append_values: if True, append new value for an exsting property, else replace - with new value. + append_values: if True, append new value for an exsting property, else + replace with new value. + Returns nodes dictionary to which the new node is added. """ @@ -315,11 +335,47 @@ def add_mcf_node( for prop, value in pvs.items(): add_pv_to_node(prop, value, node, append_values, strip_namespaces, normalize) - logging.level_debug() and logging.debug( - f'Added node {dcid} with properties: {pvs.keys()}') + logging.level_debug() and logging.log( + 2, f'Added node {dcid} with properties: {pvs.keys()}') return nodes +def update_mcf_nodes( + nodes: dict, + output_nodes: dict, + strip_namespaces: bool = False, + append_values: bool = True, + normalize: bool = True, +) -> dict: + """Returns output_nodes with Property:values from nodes added. + + Args: + nodes: dictionary of MCF nodes in the form: + { : { : ...} ... } + output_nodes: Nodes to be updated + strip_namespaces: if True, strip namespace from the dcid key and values. + append_values: if True, append new value for an exsting property, else + replace with new value. + normalize: if True, values are normalized. + + Returns: + dictionary of output_nodes updated with property:values from nodes. + """ + index = 0 + for key, node in nodes.items(): + # Set the node dcid if not present. + dcid = get_node_dcid(node) + if not dcid: + dcid = key + if not key: + dcid = str(index) + node['Node'] = add_namespace(dcid) + # Add PVs from node to output_nodes + add_mcf_node(node, output_nodes, strip_namespaces, append_values, + normalize) + return output_nodes + + def load_mcf_nodes( filenames: Union[str, list], nodes: dict = None, @@ -328,16 +384,17 @@ def load_mcf_nodes( normalize: bool = True, ) -> dict: """Return a dict of nodes from the MCF file with the key as the dcid + and a dict of property:value for each node. Args: filenames: command seperated string or a list of MCF filenames - nodes: dictonary to which new nodes are added. - If a node with dcid exists, the new properties are added to the existing node. - strip_namespace: if True, strips namespace from the value for node properties - as well as the dcid key for the nodes dict. - append_values: if True, appends new values for existing properties - into a comma seperated list, else replaces existing value. + nodes: dictonary to which new nodes are added. If a node with dcid exists, + the new properties are added to the existing node. + strip_namespace: if True, strips namespace from the value for node + properties as well as the dcid key for the nodes dict. + append_values: if True, appends new values for existing properties into a + comma seperated list, else replaces existing value. Returns: dictionary with dcid as the key and a values as a dict of property:values @@ -354,7 +411,7 @@ def load_mcf_nodes( } """ if not filenames: - return {} + return nodes # Load files in order of input files = [] if isinstance(filenames, str): @@ -364,9 +421,22 @@ def load_mcf_nodes( if nodes is None: nodes = _get_new_node(normalize) for file in files: - if file: - num_nodes = 0 - num_props = 0 + if not file: + continue + num_nodes = 0 + num_props = 0 + if file.endswith('.csv'): + # Load nodes from CSV + file_nodes = file_util.file_load_csv_dict(file) + for key, pvs in file_nodes.items(): + if 'Node' not in pvs: + pvs['Node'] = key + num_props += len(pvs) + add_mcf_node(pvs, nodes, strip_namespaces, append_values, + normalize) + num_nodes = len(file_nodes) + else: + # Load nodes from MCF file. with file_util.FileIO(file, 'r', errors='ignore') as input_f: pvs = _get_new_node(normalize) for line in input_f: @@ -399,9 +469,9 @@ def load_mcf_nodes( add_mcf_node(pvs, nodes, strip_namespaces, append_values, normalize) num_nodes += 1 - logging.info( - f'Loaded {num_nodes} nodes with {num_props} properties from file' - f' {file}') + logging.info( + f'Loaded {num_nodes} nodes with {num_props} properties from file {file}' + ) return nodes @@ -413,17 +483,17 @@ def filter_mcf_nodes( ) -> dict: """Filter dictionary of Nodes to a subset of allowed dcids. - Args: - nodes: dictionary of nodes keyed by dcid. - allow_dcids: list of dcids to be returned. - allow_nodes_with_pv: list of properties - nodes with any of the properties in the list are returned. - ignore_nodes_with_pv: list of properties to be ignored. - nodes with any of the properties in the list are dropped. - - Returns: - dictionary with the filtered nodes. - """ + Args: + nodes: dictionary of nodes keyed by dcid. + allow_dcids: list of dcids to be returned. + allow_nodes_with_pv: list of properties nodes with any of the properties in + the list are returned. + ignore_nodes_with_pv: list of properties to be ignored. nodes with any of + the properties in the list are dropped. + + Returns: + dictionary with the filtered nodes. + """ # Normalize ignored PVs. ignored_pvs = set() ignored_pvs = _pv_list_to_dict(ignore_nodes_with_pv) @@ -432,22 +502,24 @@ def filter_mcf_nodes( for k, v in nodes.items(): # Drop nodes with dcid not in allowed list. if allow_dcids and strip_namespace(k) in allow_dcids: - logging.debug(f'Dropping dcid not in compare_dcid: {k}, {v}') + logging.log(2, f'Dropping dcid not in compare_dcid: {k}, {v}') continue # Drop nodes containing any ignored property value. drop_node = False for prop, value in v.items(): if prop and prop[0] != '#': if _is_pv_in_dict(prop, value, ignored_pvs): - logging.debug( + logging.log( + 2, f'Dropping dcid with ignored pv {prop}:{value}: {k}, {v}' ) drop_node = True break if compared_pvs and not _is_pv_in_dict(prop, value, compared_pvs): - logging.debug( - f'Dropping dcid without any compared pv {prop}:{value}: {k}, {v}' + logging.log( + 2, + f'Dropping dcid without any compared pv {prop}:{value}: {k}, {v}', ) drop_node = True break @@ -461,16 +533,16 @@ def get_numeric_value(value: str, separator_chars: str = ' ,$%') -> Union[int, float, None]: """Returns the float value from string or None. - Args: - value: string to be converted into a number. - It can have comma separted digits with decimal points, for eg: NN,NNN.NNN - decimal_char: character used for decimal place seperator, default: '.' - seperator_char: seperator characters for 1000s or 100s - for example: NNN,NNN,NNN + Args: + value: string to be converted into a number. It can have comma separted + digits with decimal points, for eg: NN,NNN.NNN + decimal_char: character used for decimal place seperator, default: '.' + seperator_char: seperator characters for 1000s or 100s for example: + NNN,NNN,NNN - Returns: - number as a float or int if the value is a number, None otherwise - """ + Returns: + number as a float or int if the value is a number, None otherwise + """ if isinstance(value, int) or isinstance(value, float): return value if value and isinstance(value, str): @@ -501,13 +573,13 @@ def get_numeric_value(value: str, def get_quoted_value(value: str, is_quoted: bool = None) -> str: """Returns a quoted string if there are spaces and special characters. - Args: - value: string value to be quoted if necessary. - is_quoted: if True, returns values as quotes strings. + Args: + value: string value to be quoted if necessary. + is_quoted: if True, returns values as quotes strings. - Returns: - value with optional double quotes. - """ + Returns: + value with optional double quotes. + """ if not value or not isinstance(value, str): return value @@ -526,6 +598,7 @@ def get_value_list(value: str) -> list: Args: value: string with a single value or comma seperated list of values + Returns: value as a list. """ @@ -545,14 +618,14 @@ def get_value_list(value: str) -> list: def normalize_list(value: str, sort: bool = True) -> str: """Normalize a comma separated list of sorting strings. - Args: - value: string value to be normalized. - Can be a comma separated list or a sequence of characters. - sort: if True, lists are sorted alphabetically. + Args: + value: string value to be normalized. Can be a comma separated list or a + sequence of characters. + sort: if True, lists are sorted alphabetically. - Returns: - string that is a normalized version of value with duplicates removed. - """ + Returns: + string that is a normalized version of value with duplicates removed. + """ if ',' in value: has_quotes = False if '"' in value: @@ -570,10 +643,12 @@ def normalize_list(value: str, sort: bool = True) -> str: value_list = sorted(value_list) for v in value_list: if v not in values: - normalized_v = normalize_value(v, - quantity_range_to_dcid=False, - maybe_list=False, - is_quoted=has_quotes) + normalized_v = normalize_value( + v, + quantity_range_to_dcid=False, + maybe_list=False, + is_quoted=has_quotes, + ) normalized_v = str(normalized_v) values.append(normalized_v) return ','.join(values) @@ -584,16 +659,16 @@ def normalize_list(value: str, sort: bool = True) -> str: def normalize_range(value: str, quantity_range_to_dcid: bool = False) -> str: """Normalize a quantity range into [ Unit]. - Args: - value: quantity or quantity range as a string. - quantity_range_to_dcid: if True, converts quantity range to a dcid - [ ] is converted to dcid:UnitTo - if False, the quantity range is returned with unit at the end. - - Retruns: - string with quantity range of the form '[ ]' - or dcid:UnitStartToEnd if quantity_range_to_dcid is True. - """ + Args: + value: quantity or quantity range as a string. + quantity_range_to_dcid: if True, converts quantity range to a dcid [ + ] is converted to dcid:UnitTo if False, the + quantity range is returned with unit at the end. + + Retruns: + string with quantity range of the form '[ ]' + or dcid:UnitStartToEnd if quantity_range_to_dcid is True. + """ # Check if value is a quantity range quantity_pat = ( r'\[ *(?P[A-Z][A-Za-z0-9_/]*)? *(?P[0-9\.]+|-)?' @@ -606,7 +681,7 @@ def normalize_range(value: str, quantity_range_to_dcid: bool = False) -> str: if not match_dict: return value - logging.debug(f'Matched range: {match_dict}') + logging.log(2, f'Matched range: {match_dict}') # Value is a quantity range. Get the start, end and unit. start = match_dict.get('start', '') @@ -642,24 +717,28 @@ def normalize_range(value: str, quantity_range_to_dcid: bool = False) -> str: return normalized_range -def normalize_value(value, - quantity_range_to_dcid: bool = False, - maybe_list: bool = True, - is_quoted: bool = False) -> str: +def normalize_value( + value, + quantity_range_to_dcid: bool = False, + maybe_list: bool = True, + is_quoted: bool = False, +) -> str: """Normalize a property value adding a standard namespace prefix 'dcid:'. - Args: - value: string as a value of a property to be normalized. - quantity_range_to_dcid: if True, convert quantity range to a dcid. - maybe_list: if True, values with ',' are converted to a normalized list. + Args: + value: string as a value of a property to be normalized. + quantity_range_to_dcid: if True, convert quantity range to a dcid. + maybe_list: if True, values with ',' are converted to a normalized list. - Returns: - normalized value with namespace 'dcid' for dcid values - sorted list for comma separated values. - """ + Returns: + normalized value with namespace 'dcid' for dcid values + sorted list for comma separated values. + """ if value: if isinstance(value, str): value = value.strip() + if not value: + return '' if value[0] == '"' and value[-1] == '"' and len(value) > 100: # Retain very long strings, such as geoJsonCoordinates, as is. return value @@ -674,6 +753,10 @@ def normalize_value(value, if ' ' in value or ',' in value or is_quoted: return get_quoted_value(value, is_quoted) # Normalize string with a standardized namespace prefix. + if '__' in value: + # For concatenated sequence of dcids, keep them sorted. + values = strip_namespace(value).split('__') + value = '__'.join(sorted(values)) return add_namespace(strip_namespace(value)) elif isinstance(value, float): # Return a fixed precision float string. @@ -691,13 +774,13 @@ def normalize_value(value, def normalize_pv(prop: str, value: str) -> str: """Returns a normalized property:value string. - Args: - prop: property name as a string - value: property value as a string + Args: + prop: property name as a string + value: property value as a string - Returns: - string of the form ':' where value is normalized. - """ + Returns: + string of the form ':' where value is normalized. + """ return ':'.join([prop.strip(), normalize_value(value)]) @@ -707,6 +790,7 @@ def normalize_mcf_node( quantity_range_to_dcid: bool = False, ) -> dict: """Returns a normalized MCF node with all PVs in alphabetical order, + a common namespace of 'dcid' and comma separated lists also sorted. Args: @@ -743,14 +827,14 @@ def normalize_mcf_node( def node_dict_to_text(node: dict, default_pvs: dict = _DEFAULT_NODE_PVS) -> str: """Convert a dictionary node of PVs into text. - Args: - node: dictionary of property: values. - default_pvs: dictionary with default property:values. - These properties are added to the node if not present. + Args: + node: dictionary of property: values. + default_pvs: dictionary with default property:values. These properties are + added to the node if not present. - Returns: - node as a text string with a property:value per line - """ + Returns: + node as a text string with a property:value per line + """ props = list(node.keys()) pvs = [] # Add any initial comments @@ -793,19 +877,18 @@ def write_mcf_nodes( ): """Write the nodes to an MCF file. - Args: - node_dicts: dictionary of nodes keyed by dcid and - each node as a dictionary of property:value. - filename: output MCF file to be written - mode: if 'a', nodes are appended to existing file. - else file is overwritten with the nodes. - default_pvs: dictionary of default property:value to be - added to all nodes. - header: string written as a comment at the begining of the file. - ignore_comments: if True, drop comments that begin with '#' in the property. - sort: if True, nodes in the output file are sorted by dcid. - the properties in the node are also sorted. - """ + Args: + node_dicts: dictionary of nodes keyed by dcid and each node as a dictionary + of property:value. + filename: output MCF file to be written + mode: if 'a', nodes are appended to existing file. else file is overwritten + with the nodes. + default_pvs: dictionary of default property:value to be added to all nodes. + header: string written as a comment at the begining of the file. + ignore_comments: if True, drop comments that begin with '#' in the property. + sort: if True, nodes in the output file are sorted by dcid. the properties + in the node are also sorted. + """ if not node_dicts: return if isinstance(node_dicts, dict): diff --git a/scripts/statvar/mcf_file_util_test.py b/tools/statvar_importer/mcf_file_util_test.py similarity index 100% rename from scripts/statvar/mcf_file_util_test.py rename to tools/statvar_importer/mcf_file_util_test.py diff --git a/scripts/statvar/mcf_filter.py b/tools/statvar_importer/mcf_filter.py similarity index 100% rename from scripts/statvar/mcf_filter.py rename to tools/statvar_importer/mcf_filter.py diff --git a/scripts/statvar/mcf_filter_test.py b/tools/statvar_importer/mcf_filter_test.py similarity index 100% rename from scripts/statvar/mcf_filter_test.py rename to tools/statvar_importer/mcf_filter_test.py diff --git a/tools/statvar_importer/ngram_matcher.py b/tools/statvar_importer/ngram_matcher.py new file mode 100644 index 0000000000..4f375aea33 --- /dev/null +++ b/tools/statvar_importer/ngram_matcher.py @@ -0,0 +1,225 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Class to match sub-strings using ngrams. + +Example: + # Load the matcher with search-key: values + matcher = NgramMatcher({'ngram-size': 4}) + matcher.add_key_value('California', 'dcid:geoId/06') + matcher.add_key_value('San Jose California', 'dcid:geoId/0668000') + matcher.add_key_value('San Jose Costa Rica', 'dcid:wikidataId/Q647808') + + # Look for matching keys + results = matcher.lookup('SanJose') + # returns a ranked list of (key, value) tuples: + # [('San Jose California', 'dcid:geoId/0668000'), ('San Jose Costa Rica', + 'dcid:wikidataId/Q647808')] + + # To get top 10 results with match details: + results = matcher.lookup('SanJose', 10, True) + # Returns a list of tuples with (key,
): + # [(, { 'value': , 'info': {'score': 1.2, 'ngram_matches': 3} }), + # ...] +""" + +import unicodedata + +from absl import logging + +# Default configuration settings for NgramMatcher +_DEFAULT_CONFIG = { + 'ngram_size': 4, + 'ignore_non_alphanum': True, + 'min_match_fraction': 0.8, +} + + +class NgramMatcher: + + def __init__(self, config: dict = {}): + self._config = dict(_DEFAULT_CONFIG) + if config: + self._config.update(config) + self._ngram_size = self._config.get('ngram_size', 4) + # List of (key, value) tuples. + self._key_values = list() + # Dictionary of ngram to set of string ids that contain the ngram. + # { '': { (id1, pos1), (id2, pos2), ...}, ...} + self._ngram_dict = {} + + def get_tuples_count(self): + return len(self._key_values) + + def get_key_values(self): + return dict(self._key_values) + + def add_keys_values(self, kvs: dict[str, any]) -> None: + for key, value in kvs.items(): + self.add_key_value(key, value) + + def add_key_value(self, key: str, value): + """Add a key and value. + + When the key matches a lookup string, the key and corresponding value is + returned. + + Args: + key: string to be looked up + value: value to be returned on key match. + """ + self._key_values.append((key, value)) + key_index = len(self._key_values) - 1 + self._add_key_index(key, key_index) + + def get_ngrams_count(self) -> int: + """Returns the number of ngrams in the index.""" + return len(self._ngram_dict) + + def lookup( + self, + key: str, + num_results: int = None, + return_score: bool = False, + config: dict = None, + ) -> list: + """Lookup a key string. + + Returns an ordered list of (key, value) tuples matching the key. + """ + normalized_key = self._normalize_string(key) + ngrams = self._get_ngrams(normalized_key) + logging.level_debug() and logging.log(2, + f'looking up ngrams {ngrams} for {key}') + lookup_config = self._config + if config: + # Use the match config passed in. + lookup_config = dict(self._config) + lookup_config.update(config) + # Get the matching key indices for all ngrams. + matches = dict() + for ngram in ngrams: + ngram_matches = self._ngram_dict.get(ngram, {}) + if ngram_matches: + # Use IDF score for each ngram + ngram_score = 1 / len(ngram_matches) + for key_index, ngram_pos in ngram_matches: + # Collect matches and update score for each ngram + if key_index not in matches: + matches[key_index] = { + 'score': ngram_score, + 'ngram_matches': 1, + 'ngram_pos': ngram_pos, + } + else: + key_match = matches[key_index] + key_match['score'] = key_match['score'] + ngram_score + key_match[ + 'ngram_matches'] = key_match['ngram_matches'] + 1 + key_match['ngram_pos'] = min(key_match['ngram_pos'], + ngram_pos) + + logging.level_debug() and logging.log(2, f'Matches for {key}: {matches}') + # Collect all key indices that matches with counts. + match_indices = list() + min_matches = max( + 1, + len(ngrams) * lookup_config.get('min_match_fraction', 0.8)) + for key_index, result in matches.items(): + if result['ngram_matches'] >= min_matches: + match_indices.append((key_index, result)) + + # Order key_index by decreasing number of matches. + key_len = len(normalized_key) + match_indices.sort( + key=lambda x: self._get_ngram_match_score(x[1], key_len), + reverse=True) + logging.level_debug() and logging.log(2, + f'Sorted matches for {key}: {match_indices}') + + # Collect results in sorted order + results = list() + for match in match_indices: + result_key, result_value = self._key_values[match[0]] + if return_score: + results.append((result_key, { + 'value': result_value, + 'info': match[1] + })) + else: + results.append((result_key, result_value)) + if num_results and len(results) >= num_results: + # There are enough results. Return these. + break + return results + + def _get_ngrams(self, key: str) -> list: + """Returns a list of ngrams for the key.""" + normalized_key = self._normalize_string(key) + ngrams = normalized_key.split(' ') + max_index = max(len(normalized_key) - self._ngram_size, 0) + 1 + for pos in range(max_index): + ngram = normalized_key[pos:pos + self._ngram_size] + if ngram not in ngrams: + ngrams.append(ngram) + return ngrams + + def _add_key_index(self, key: str, key_index: int): + """Adds the key into the ngrams index.""" + # Remove extra characters and convert to lower case. + normalized_key = self._normalize_string(key) + # index by all unique ngrams in the key + ngrams = self._get_ngrams(normalized_key) + for ngram in ngrams: + if ngram not in self._ngram_dict: + self._ngram_dict[ngram] = set() + ngram_pos = normalized_key.find(ngram) + self._ngram_dict[ngram].add((key_index, ngram_pos)) + logging.level_debug() and logging.log( + 3, f'Added ngram "{ngram}" for {key}:{key_index}') + + def _normalize_string(self, key: str) -> str: + """Returns a normalized string removing special characters""" + return normalized_string(key, + self._config.get('ignore_non_alphanum', True)) + + def _get_ngram_match_score(self, match: dict, key_len: int) -> float: + """Returns a score for the ngram match components.""" + # IDF score + score = match['score'] + # Boost for match at the beginning of the key. + score += (key_len - match['ngram_pos']) * 10000 + # DF score + score += match['ngram_matches'] * 100 + return score + + +def normalized_string(key: str, ignore_non_alnum: bool = True) -> str: + """Returns a normalized string for match. + + Args: + key: string to be normalized. + ignore_non_alnum: if True, non alpha numeric characters are removed. + + Returns: + normalized string + """ + normalized_key = unicodedata.normalize('NFKD', key) + normalized_key = normalized_key.lower() + # Remove extra spaces + normalized_key = ' '.join([w for w in normalized_key.split(' ') if w]) + # Remove extra punctuation. + if ignore_non_alnum: + normalized_key = ''.join( + [c for c in normalized_key if c.isalnum() or c == ' ']) + return normalized_key diff --git a/tools/statvar_importer/ngram_matcher_test.py b/tools/statvar_importer/ngram_matcher_test.py new file mode 100644 index 0000000000..557624a4c9 --- /dev/null +++ b/tools/statvar_importer/ngram_matcher_test.py @@ -0,0 +1,42 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for NgramMatcher.""" + +import unittest + +from absl import app +from absl import logging +import ngram_matcher + + +class NgramMatcherTest(unittest.TestCase): + + def setUp(self): + # logging.set_verbosity(2) + return + + def test_lookup_string(self): + matcher = ngram_matcher.NgramMatcher(config={'ngram_size': 4}) + matcher.add_key_value('Test Key 1', 1) + matcher.add_key_value('TESTKey Two', 'two') + matches = matcher.lookup('Test') + self.assertEqual([('TESTKey Two', 'two'), ('Test Key 1', 1)], matches) + self.assertTrue( + matcher.lookup('Tester', config={'min_match_fraction': 0.1})) + self.assertFalse(matcher.lookup('ABCDEF')) + + +if __name__ == '__main__': + app.run() + unittest.main() diff --git a/tools/statvar_importer/property_value_cache.py b/tools/statvar_importer/property_value_cache.py new file mode 100644 index 0000000000..8c1d9592f6 --- /dev/null +++ b/tools/statvar_importer/property_value_cache.py @@ -0,0 +1,447 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Class to store set of property:value for multiple keys. + +The values are stored as a dict with any selected property such as dcid as the +key. The cache is persisted in a file. +""" + +import csv +import os +import sys +import unicodedata + +from absl import app +from absl import flags +from absl import logging + +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(_SCRIPT_DIR) +sys.path.append(os.path.dirname(_SCRIPT_DIR)) +sys.path.append( + os.path.join(os.path.dirname(os.path.dirname(_SCRIPT_DIR)), 'util')) + +import file_util +from mcf_file_util import add_pv_to_node +from counters import Counters + +# Indexed properties in order for lookup. +_DEFAULT_KEY_PROPS = [ + 'key', + 'dcid', + 'placeId', + 'wikidataId', + 'name', + 'place_name', +] + + +class PropertyValueCache: + """Class to store property:values for a key. + + It allows lookup for an entry by any of the values for a set of key + properties. The entries are loaded from a file and persisted in the file after + updates. + + Example usage: + pv_cache = PropertyValueCache('/tmp/pv-cache.csv', + key_props=['name', 'dcid', 'isoCode'], + normalize_key=True) + + # Add an entry to cache + pv_cache.add( { + 'dcid': 'country/IND', + 'typeOf': 'Country', + 'name': 'India', + 'isoCode': 'IND' + }) + + # Lookup above entry by any value of a property + india_entry = pv_cache.get_entry(prop='isoCode', value='IND') + # Lookup by value of any key property + india_entry = pv_cache.get_entry('india') + """ + + def __init__( + self, + filename: str = '', + key_props: list = _DEFAULT_KEY_PROPS, + props: list = [], + normalize_key: bool = True, + counters: Counters = None, + ): + """Initialize the PropertyValueCache. + + Args: + filename: CSV file with one row per cache entry with + properties as columns. + The entries in the file are loaded on init and + saved periodically and on exit. + key_props: list of properties that can be used for lookup. + The values of these properties are assumed to be unique and + values are stored in an index per property for lookup by value. + props: List of properties across entries. + normalize_key: if True, values are normalized (lower case) + before lookup in the per-property index. + counters: Counters object for cache hits and misses. + """ + self._filename = filename + self._normalize_key = normalize_key + + # List of properties that can be used as keys. + # Each values for the keys are assumed to be unique across entries. + self._key_props = [] + + # Index per key_property. + # Mapping from a key property to entry in the _entries list + # { '': { '': { }, + # '': { } + # ...}, + # '': { ' : { } ,... } + # } + self._prop_index = {} + + if not self._key_props: + self._key_props = [] + self._counters = counters + if counters is None: + self._counters = Counters() + + # List of cache entries, each with a dict of property:values + # The property indexes have references to the entry + self._entries = {} + + # List of properties across all entries + self._props = [] + self._add_props(key_props=key_props, props=props) + + # Load entries from file. + self.load_cache_file(filename) + # Flag to indicate cache has been updated and has changed from file. + self._is_modified = False + + def __del__(self): + self.save_cache_file() + + def load_cache_file(self, filename: str): + """Load entries of property:value dicts from files. + + Args: + filename: CSV file(s) from which property:values are loaded + with one row per entry. + """ + for file in file_util.file_get_matching(filename): + with file_util.FileIO(filename) as csv_file: + csv_reader = csv.DictReader(csv_file) + # Add columns as properties in order of input. + self._add_props(props=csv_reader.fieldnames) + + # Add an entry for each row in the file. + num_rows = 0 + for row in csv_reader: + num_rows += 1 + self.add(row) + logging.info( + f'Loaded {num_rows} with columns: {self._props} from {filename} into' + ' cache') + + def get_entry(self, value: str, prop: str = '') -> dict: + """Returns a dict entry that contains the prop:value. + + Args: + value: value to be looke dup in the property index + If normalize_key was set in init(), + value is converted to lower case string. + prop: One of the key-properties in which the value is looked up. + If not set, value is looked up in all key properties in order. + + Returns: + dict entry that contains the prop:value if it exists. + """ + if isinstance(value, list): + logging.error(f'Cannot lookup {value} for {prop}') + return {} + key = self.get_lookup_key(prop=prop, value=value) + if not prop or prop not in self._key_props: + # Property is not a key property + # Lookup value in map for all key properties. + for prop in self._key_props: + entry = self._get_prop_key_entry(prop, key) + if entry: + return entry + return self._get_prop_key_entry(prop, key) + + def get_entry_for_dict(self, pvs: dict) -> dict: + """Return the entry for the pvs in the dict. + + Args: + pvs: dictionary with partial set of property:values. + The values of any of the key properties is used to lookup. + Returns: + dict of cache entry that matches the first prop:value in pvs. + """ + for prop in self._key_props: + value = pvs.get(prop, None) + if value is not None: + cached_entry = self.get_entry(prop=prop, value=value) + if cached_entry: + return cached_entry + return {} + + def add(self, entry: dict) -> dict: + """Add a dict of property:values into the cache. + If the entry already exists for an existing key, + the entry is merged with the new values. + + Args: + entry: dict of property:values. + The entry is cached and values and entry is also indexed + by value of each key-property. + + Returns: + dict that was added or merged into. + """ + # Add any new properties + self._add_props(props=entry.keys()) + + # Check if an entry exists, matching any of the key prop:value. + cached_entry = self.get_entry_for_dict(entry) + if cached_entry: + # Merge new PVs into the existing entry + self.update_entry(entry, cached_entry) + entry = cached_entry + else: + # Add a new entry + cached_entry = dict(entry) + self._entries[len(self._entries)] = cached_entry + self._counters.add_counter('pv-cache-entries', 1) + + # Add entry to the lookup index for all key properties. + for prop in self._key_props: + values = entry.get(prop, None) + if values is not None: + # Add the entry to the lookup index with each of the values + # for key property. + if not isinstance(values, list): + values = [values] + for value in values: + self._add_prop_key_entry(prop, value, entry) + self._is_modified = True + logging.level_debug() and logging.log( + 2, f'Added cache entry {cached_entry}') + return cached_entry + + def update_entry(self, src: dict, dst: dict): + """Add PVs from src to dst. + + If a property exists with a value, add new values to a list. + """ + #for prop, values in src.items(): + # add_pv_to_node(prop, + # values, + # dst, + # append_value=True, + # normalize=self._normalize_key) + #return dst + for prop, values in src.items(): + # Add new values to list of existing values. + dst_value = dst.get(prop, None) + if dst_value: + value_added = False + dst_value = _get_value_list(dst_value) + values = _get_value_list(values) + for value in values: + if value not in dst_value: + dst_value.append(value) + value_added = True + if value_added: + # New values were added. + dst[prop] = dst_value + else: + # Add the new prop:value to dst dict + dst[prop] = values + self._is_modified = True + logging.level_debug() and logging.debug(f'Merged {src} into {dst}') + return dst + + def save_cache_file(self): + """Save the cache entries into the CSV file. + + File is only written into if cache has been modified + by adding a new entry since the last write. + """ + if not self.is_dirty(): + # No change in cache. Skip writing to file. + return + # Get the cache filename. + # Save cache to the last file loaded in case of multiple files. + filename = file_util.file_get_matching(self._filename) + if filename: + filename = filename[-1] + else: + filename = self._filename + + if not filename: + return + + logging.info(f'Writing {len(self._entries)} cache entries with columns' + f' {self._props} into file {filename}') + logging.debug(f'Writing cache entries: {self._entries}') + with file_util.FileIO(filename, mode='w') as cache_file: + csv_writer = csv.DictWriter( + cache_file, + fieldnames=self._props, + escapechar='\\', + quotechar='"', + quoting=csv.QUOTE_NONNUMERIC, + extrasaction='ignore', + ) + csv_writer.writeheader() + for entry in self._entries.values(): + # Flatten key properties with multiple values to + # rows with one value per property. + for pvs in flatten_dict(entry, self._key_props): + logging.debug(f'Saving cache entry: {pvs}') + csv_writer.writerow(pvs) + self._is_modified = False + + def is_dirty(self): + """Returns True if the cache has been modified since the last write.""" + return self._is_modified + + def normalize_string(self, key: str) -> str: + """Returns a normalized string for lookup. + The key has special characters removed and converted to lower case. + + Args: + key: string to be normalized for lookup. + Returns: + normalized key + """ + if not isinstance(key, str): + key = str(key) + normalized_key = unicodedata.normalize('NFKD', key) + normalized_key = normalized_key.lower() + # Remove extra spaces + normalized_key = ' '.join([w for w in normalized_key.split(' ') if w]) + # Remove extra punctuation. + normalized_key = ''.join( + [c for c in normalized_key if c.isalnum() or c == ' ']) + return normalized_key + + def get_lookup_key(self, value: str, prop: str = '') -> str: + """Returns key for lookup, normalizing if needed. + + Args: + value: string value to be looked up in the index. + The value is notmalized if needed. + prop: (optional) property for the value. + + Returns: + string to be looked up in the property index + which is value normalized if needed. + """ + if isinstance(value, list): + value = value[0] + if self._normalize_key: + return self.normalize_string(value) + return value + + def _add_props(self, key_props: list = [], props: list = []): + # Add any new key property. + if key_props: + for prop in key_props: + if prop not in self._key_props: + self._key_props.append(prop) + self._prop_index[prop] = dict() + if prop not in self._props: + self._props.append(prop) + + # Add remaining properties across entries. + if props: + for prop in props: + if prop not in self._props: + self._props.append(prop) + if not self._key_props and self._props: + # No key properties set. Use the first property as key. + self._key_props.append(self._props[1]) + + def _add_prop_key_entry(self, prop: str, value: str, entry: dict) -> bool: + """Adds the entry to the lookup map for property with the key.""" + if not value: + return False + key = self.get_lookup_key(prop=prop, value=value) + prop_index = self._prop_index.get(prop) + if prop_index is None: + logging.error(f'Invalid key prop {prop}:{key} for {entry}') + return False + existing_entry = prop_index.get(key) + if existing_entry and existing_entry.get(prop) != value: + logging.error( + f'Conflicting {prop}:{key} old:{existing_entry} new:{entry}') + prop_index[key] = entry + return True + + def _get_prop_key_entry(self, prop: str, key: str) -> dict: + """Returns the entry for the key in the lookup map for prop.""" + entry = self._prop_index.get(prop, {}).get(key, {}) + if entry: + self._counters.add_counter(f'pv-cache-hits-{prop}', 1) + else: + self._counters.add_counter(f'pv-cache-misses-{prop}', 1) + return entry + + +def flatten_dict(pvs: dict, props: list) -> list: + """Returns a list of dicts, flattening out props with multiple values.""" + # Get dictionary with prop:value not to be flattend + base_pvs = {} + for prop, value in pvs.items(): + if prop not in props: + if isinstance(value, list) or isinstance(value, set): + base_pvs[prop] = ','.join([str(v) for v in value]) + else: + base_pvs[prop] = value + + # List of dicts with expanded prop:values + pvs_list = [base_pvs] + for prop in props: + values = pvs.get(prop, '') + if not values: + continue + if not isinstance(values, list) and not isinstance(values, set): + values = [values] + list_with_prop = [] + for value in values: + for item in pvs_list: + pvs_with_prop = {prop: value} + pvs_with_prop.update(item) + list_with_prop.append(pvs_with_prop) + pvs_list = list_with_prop + return pvs_list + + +def _get_value_list(values: str) -> list: + """Returns a list of unique values from a comma separated string.""" + if not values: + return [] + values_list = [] + if isinstance(values, str): + values = values.split(',') + if not isinstance(values, list) and not isinstance(values, set): + values = [values] + for value in values: + if value not in values_list: + values_list.append(value) + return values_list diff --git a/tools/statvar_importer/property_value_cache_test.py b/tools/statvar_importer/property_value_cache_test.py new file mode 100644 index 0000000000..c2e54f824e --- /dev/null +++ b/tools/statvar_importer/property_value_cache_test.py @@ -0,0 +1,127 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for property_value_cache.py.""" + +import unittest + +from absl import app +from absl import logging +from property_value_cache import PropertyValueCache, flatten_dict + + +class PropertyValueCacheTest(unittest.TestCase): + + def test_add_entry(self): + pv_cache = PropertyValueCache() + + # Add an entry with name and dcid + pv_cache.add({'name': 'California', 'dcid': 'geoId/06'}) + pv_cache.add({'name': 'India', 'dcid': 'country/IND'}) + + # Add entry with additional properties + pv_cache.add({'dcid': 'geoId/06', 'typeOf': 'AdministrativeArea1'}) + pv_cache.add({'dcid': 'geoId/06', 'typeOf': 'State', 'name': 'CA'}) + pv_cache.add({ + 'dcid': 'country/IND', + 'placeId': 'ChIJkbeSa_BfYzARphNChaFPjNc' + }) + + expected_entry1 = { + 'name': ['California', 'CA'], + 'dcid': 'geoId/06', + 'typeOf': ['AdministrativeArea1', 'State'], + } + self.assertEqual(expected_entry1, + pv_cache.get_entry(prop='name', value='California')) + self.assertEqual(expected_entry1, + pv_cache.get_entry('geoId/06', 'dcid')) + + expected_entry2 = { + 'name': 'India', + 'dcid': 'country/IND', + 'placeId': 'ChIJkbeSa_BfYzARphNChaFPjNc', + } + self.assertEqual(expected_entry2, pv_cache.get_entry('India', 'name')) + self.assertEqual(expected_entry2, + pv_cache.get_entry('country/IND', 'dcid')) + self.assertEqual(expected_entry2, pv_cache.get_entry('India')) + + # Lookup by dict with placeId + # Match of one property, placeId is sufficient. + self.assertEqual( + expected_entry2, + pv_cache.get_entry_for_dict({ + # Matching key + 'placeId': 'ChIJkbeSa_BfYzARphNChaFPjNc', + # Key not matching + 'name': 'IND', + }), + ) + self.assertFalse({}, pv_cache.get_entry_for_dict({'name': 'IND'})) + + def test_flatten_dict(self): + pvs = { + 'name': ['California', 'CA'], + 'dcid': 'geoId/06', + 'typeOf': ['AdministrativeArea1', 'State'], + } + flattened_pvs = flatten_dict(pvs, ['name']) + self.assertEqual( + [ + { + 'name': 'California', + 'dcid': 'geoId/06', + 'typeOf': 'AdministrativeArea1,State', + }, + { + 'name': 'CA', + 'dcid': 'geoId/06', + 'typeOf': 'AdministrativeArea1,State', + }, + ], + flattened_pvs, + ) + # expected pvs have lists joined with ',' + merged_pvs = {} + for p, v in pvs.items(): + if isinstance(v, list): + v = ','.join(v) + merged_pvs[p] = v + self.assertEqual([merged_pvs], flatten_dict(pvs, ['dcid'])) + name_type_pvs = flatten_dict(pvs, ['name', 'typeOf']) + self.assertEqual( + [ + { + 'name': 'California', + 'dcid': 'geoId/06', + 'typeOf': 'AdministrativeArea1', + }, + { + 'name': 'CA', + 'dcid': 'geoId/06', + 'typeOf': 'AdministrativeArea1' + }, + { + 'name': 'California', + 'dcid': 'geoId/06', + 'typeOf': 'State' + }, + { + 'name': 'CA', + 'dcid': 'geoId/06', + 'typeOf': 'State' + }, + ], + name_type_pvs, + ) diff --git a/scripts/statvar/test_data/india_census_sample_output_stat_vars.mcf b/tools/statvar_importer/test_data/india_census_sample_output_stat_vars.mcf similarity index 100% rename from scripts/statvar/test_data/india_census_sample_output_stat_vars.mcf rename to tools/statvar_importer/test_data/india_census_sample_output_stat_vars.mcf diff --git a/scripts/statvar/test_data/sample_filtered.mcf b/tools/statvar_importer/test_data/sample_filtered.mcf similarity index 100% rename from scripts/statvar/test_data/sample_filtered.mcf rename to tools/statvar_importer/test_data/sample_filtered.mcf diff --git a/scripts/statvar/test_data/sample_output_stat_vars.mcf b/tools/statvar_importer/test_data/sample_output_stat_vars.mcf similarity index 100% rename from scripts/statvar/test_data/sample_output_stat_vars.mcf rename to tools/statvar_importer/test_data/sample_output_stat_vars.mcf diff --git a/scripts/statvar/test_data/us_census_B01001_output_stat_vars.mcf b/tools/statvar_importer/test_data/us_census_B01001_output_stat_vars.mcf similarity index 100% rename from scripts/statvar/test_data/us_census_B01001_output_stat_vars.mcf rename to tools/statvar_importer/test_data/us_census_B01001_output_stat_vars.mcf