diff --git a/scripts/world_bank/datasets/README.md b/scripts/world_bank/datasets/README.md new file mode 100644 index 0000000000..430b17f2d6 --- /dev/null +++ b/scripts/world_bank/datasets/README.md @@ -0,0 +1,58 @@ +# World Bank Datasets +- The WorldBankDatasets contains data about multiple databases like World development Indicators,Jobs,Education Statistics +- source: https://data.worldbank.org + +- how to download data: Auto download of data by using python script(datasets.py). + +- type of place: Country. + +- statvars: All Type + +- years: 1960 to 2050 + +- copyright year: 2024 + +## Processes WB datasets. + +update september 2024: +To run all processing methods , please do not pass the mode +Run: python3 datasets.py + +Or If required to check issue in any individual process follow all the steps as below: +Supports the following tasks: + + +## fetching datasets +- fetch_datasets: Fetches WB dataset lists and resources and writes them to 'output/wb-datasets.csv' + +Run `python3 datasets.py --mode=fetch_datasets` + +## Downloadin the datasets +- download_datasets: Downloads datasets listed in 'output/wb-datasets.csv' to the 'output/downloads' folder. + +Run: `python3 datasets.py --mode=download_datasets` + +## Writing wb codes +- write_wb_codes: Extracts World Bank indicator codes (and related information) from files downloaded in the 'output/downloads' folder to 'output/wb-codes.csv'. + +It only operates on files that are named '*_CSV.zip'. + +Run: `python3 datasets.py --mode=write_wb_codes` + +## Loads The Stat vars +- load_stat_vars: Loads stat vars from a mapping file specified via the `stat_vars_file` flag. +- Use this for debugging to ensure that the mappings load correctly and fix any errors logged by this operation. + +Run: `python3 datasets.py --mode=load_stat_vars --stat_vars_file=/path/to/statvars.csv` + +- See `sample-svs.csv` for a sample mappings file. + +## Writing output files +- write_observations: Extracts observations from files downloaded in the 'output/downloads' folder and saves them to CSVs in the 'output/observations' folder. + +- The stat vars file to be used for mappings should be specified using the `stat_vars_file' flag. + +- It only operates on files that are named '*_CSV.zip'. + +Run: `python3 datasets.py --mode=write_observations --stat_vars_file=/path/to/statvars.csv` + diff --git a/scripts/world_bank/datasets/datasets.py b/scripts/world_bank/datasets/datasets.py index fe9d33563d..0793fb4cbb 100644 --- a/scripts/world_bank/datasets/datasets.py +++ b/scripts/world_bank/datasets/datasets.py @@ -13,6 +13,12 @@ # limitations under the License. """Processes WB datasets. +update september 2024: +To run all processing methods , please do not pass the mode +Run: python3 datasets.py + +Or If required to check issue in any individual process follow all the steps as below: + Supports the following tasks: ============================ @@ -41,7 +47,7 @@ Use this for debugging to ensure that the mappings load correctly and fix any errors logged by this operation. -Run: python3 datasets.py --mode=load_stat_vars --stat_vars_file=/path/to/sv_mappings.csv +Run: python3 datasets.py --mode=load_stat_vars --stat_vars_file=/path/to/statvars.csv See `sample-svs.csv` for a sample mappings file. @@ -53,7 +59,7 @@ It only operates on files that are named '*_CSV.zip'. -Run: python3 datasets.py --mode=write_observations --stat_vars_file=/path/to/sv_mappings.csv +Run: python3 datasets.py --mode=write_observations --stat_vars_file=/path/to/statvars.csv """ import requests @@ -66,11 +72,13 @@ import re import urllib3 from urllib3.util.ssl_ import create_urllib3_context +from urllib3.exceptions import HTTPError from absl import flags import zipfile import codecs from itertools import repeat from datetime import datetime +from retry import retry FLAGS = flags.FLAGS @@ -84,7 +92,7 @@ class Mode: flags.DEFINE_string( - 'mode', Mode.WRITE_OBSERVATIONS, + 'mode', None, f"Specify one of the following modes: {Mode.FETCH_DATASETS}, {Mode.DOWNLOAD_DATASETS}, {Mode.WRITE_WB_CODES}, {Mode.LOAD_STAT_VARS}, {Mode.WRITE_OBSERVATIONS}" ) @@ -111,7 +119,7 @@ class Mode: os.makedirs(DOWNLOADS_DIR, exist_ok=True) os.makedirs(OBSERVATIONS_DIR, exist_ok=True) -POOL_SIZE = max(2, multiprocessing.cpu_count() - 1) +POOL_SIZE = 3 #max(2, multiprocessing.cpu_count() - 1) DOWNLOADABLE_RESOURCE_TYPES = set(["Download", "Dataset"]) @@ -131,7 +139,7 @@ class Mode: def download_datasets(): '''Downloads dataset files. This is a very expensive operation so run it with care. It assumes that the datasets CSV is already available.''' - + logging.info('start download_datasets') with open(DATASETS_CSV_FILE_PATH, 'r') as f: csv_rows = list(csv.DictReader(f)) download_urls = [] @@ -139,11 +147,15 @@ def download_datasets(): download_url = csv_row.get(DATASET_DOWNLOAD_URL_COLUMN_NAME) if download_url: download_urls.append(download_url) - + try: with multiprocessing.Pool(POOL_SIZE) as pool: pool.starmap(download, zip(download_urls)) logging.info('# files downloaded: %s', len(download_urls)) + # While downloading from source there is multiple files which may not be required so below exception can be ignored. + # Verifying if all the required files have been generated after writing the output files + except Exception as e: + logging.error("Error downloading %s", exc_info=e) def download(url): @@ -152,20 +164,28 @@ def download(url): if os.path.exists(file_path): logging.info('Already downloaded %s to file %s', url, file_path) return - logging.info('Downloading %s to file %s', url, file_path) + + # response = requests.get(url) + # Using urllib3 for downloading content to avoid SSL issue. + # See: https://github.com/urllib3/urllib3/issues/2653#issuecomment-1165418616 try: - # response = requests.get(url) - # Using urllib3 for downloading content to avoid SSL issue. - # See: https://github.com/urllib3/urllib3/issues/2653#issuecomment-1165418616 - with urllib3.PoolManager(ssl_context=ctx) as http: - response = http.request("GET", url) + response = download_retry(url) with open(file_path, 'wb') as f: f.write(response.data) + # After retrying for multiple times it will move to download next one, fatal not required. except Exception as e: logging.error("Error downloading %s", url, exc_info=e) +@retry(tries=3, delay=2, backoff=2) +def download_retry(url): + with urllib3.PoolManager(ssl_context=ctx, timeout=90) as http: + logging.info('# retrying for url: %s', url) + response = http.request("GET", url) + return response + + def fetch_and_write_datasets_csv(): fetch_dataset_lists() fetch_dataset_views() @@ -201,7 +221,7 @@ def get_datasets_csv_rows(): return csv_rows -DATASET_URL_FIELDS = ['harvet_source', 'url', 'website_url'] +DATASET_URL_FIELDS = ['website_url', 'harvest_source', 'url'] # URLs with this pattern are downloadable only if the URL is trunctated until it. Probably a bug in WB APIs. VERSION_ID_PATTERN = '?versionId=' @@ -277,11 +297,15 @@ def load_json(url, params, response_file): return json.load(f) logging.info("Fetching url %s, params %s", url, params) - response = requests.get(url, params=params).json() - with open(response_file, 'w') as f: - logging.info('Writing response to file %s', response_file) - json.dump(response, f, indent=2) - return response + try: + response = requests.get(url, params=params) + with open(response_file, 'w') as f: + logging.info('Writing response to file %s', response_file) + json.dump(response.json(), f, indent=2) + return True + except Exception as e: + logging.info("Http error %s", e) + return None def load_json_file(json_file): @@ -351,7 +375,8 @@ def write_wb_codes(): def get_all_codes(): all_codes = {} for file_name in os.listdir(DOWNLOADS_DIR): - if file_name.endswith(CSV_ZIP_FILE_SUFFIX): + if file_name.endswith(CSV_ZIP_FILE_SUFFIX) or file_name.endswith( + '_csv.zip'): zip_file = f"{DOWNLOADS_DIR}/{file_name}" codes = get_codes_from_zip(zip_file) if codes: @@ -366,47 +391,56 @@ def get_all_codes(): def get_codes_from_zip(zip_file): - with zipfile.ZipFile(zip_file, 'r') as zip: - (_, series_file) = get_data_and_series_file_names(zip) - if series_file is None: - logging.warning('No series file found in ZIP file: %s', zip_file) - else: - with zip.open(series_file, 'r') as csv_file: - series_rows = sanitize_csv_rows( - list(csv.DictReader(codecs.iterdecode(csv_file, 'utf-8')))) - num_codes = len(series_rows) - logging.info('# code(s) in %s: %s', zip_file, num_codes) - if num_codes == 0: - return {} - - if series_rows[0].get(SERIES_CODE_KEY) is None: - logging.error('No series code found in %s, sample row: %s', - zip_file, series_rows[0]) - return {} - - codes = {} - for series_row in series_rows: - code = series_row.get(SERIES_CODE_KEY) - codes[code] = { - SERIES_CODE_KEY: - code, - INDICATOR_NAME_KEY: - series_row.get(INDICATOR_NAME_KEY), - NUM_DATASETS_KEY: - 1, - TOPIC_KEY: - series_row.get(TOPIC_KEY), - UNIT_OF_MEASURE_KEY: - series_row.get(UNIT_OF_MEASURE_KEY), - SHORT_DEFINITION_KEY: - series_row.get(SHORT_DEFINITION_KEY), - LONG_DEFINITION_KEY: - series_row.get(LONG_DEFINITION_KEY), - LICENSE_TYPE_KEY: - series_row.get(LICENSE_TYPE_KEY), - } - return codes - return {} + try: + with zipfile.ZipFile(zip_file, 'r') as zip: + (_, series_file) = get_data_and_series_file_names(zip) + if series_file is None: + logging.warning('No series file found in ZIP file: %s', + zip_file) + else: + with zip.open(series_file, 'r') as csv_file: + series_rows = sanitize_csv_rows( + list( + csv.DictReader(codecs.iterdecode(csv_file, + 'utf-8')))) + num_codes = len(series_rows) + logging.info('# code(s) in %s: %s', zip_file, num_codes) + if num_codes == 0: + return {} + + if series_rows[0].get(SERIES_CODE_KEY) is None: + logging.error( + 'No series code found in %s, sample row: %s', + zip_file, series_rows[0]) + return {} + + codes = {} + for series_row in series_rows: + code = series_row.get(SERIES_CODE_KEY) + codes[code] = { + SERIES_CODE_KEY: + code, + INDICATOR_NAME_KEY: + series_row.get(INDICATOR_NAME_KEY), + NUM_DATASETS_KEY: + 1, + TOPIC_KEY: + series_row.get(TOPIC_KEY), + UNIT_OF_MEASURE_KEY: + series_row.get(UNIT_OF_MEASURE_KEY), + SHORT_DEFINITION_KEY: + series_row.get(SHORT_DEFINITION_KEY), + LONG_DEFINITION_KEY: + series_row.get(LONG_DEFINITION_KEY), + LICENSE_TYPE_KEY: + series_row.get(LICENSE_TYPE_KEY), + } + return codes + return {} + except Exception as e: + logging.info( + "There is some problem in processing the file %s File name is: %s", + e, zipfile) def write_csv(csv_file_path, csv_columns, csv_rows): @@ -426,17 +460,38 @@ def write_all_observations(stat_vars_file): zip_files = [] for file_name in os.listdir(DOWNLOADS_DIR): - if file_name.endswith(CSV_ZIP_FILE_SUFFIX): + if file_name.endswith(CSV_ZIP_FILE_SUFFIX) or file_name.endswith( + '_csv.zip'): zip_files.append(f"{DOWNLOADS_DIR}/{file_name}") with multiprocessing.Pool(POOL_SIZE) as pool: pool.starmap(write_observations_from_zip, zip(zip_files, repeat(svs))) + check_allFiles_processed() + end = datetime.now() logging.info('End: %s', end) logging.info('Duration: %s', str(end - start)) +def check_allFiles_processed(): + # Verify below observation csv are getting generated or not + expected_files = [ + 'ASPIRE_CSV_obs.csv', 'DB_CSV_obs.csv', 'Economic_Fitness_CSV_obs.csv', + 'EdStats_CSV_obs.csv', 'FINDEX_CSV_obs.csv', 'GFDD_CSV_obs.csv', + 'GPFI_CSV_obs.csv', 'HCI_CSV_obs.csv', 'HEFPI_CSV_obs.csv', + 'IDA_CSV_obs.csv', 'MDG_CSV_obs.csv', 'PovStats_CSV_obs.csv', + 'SDG_CSV_obs.csv', 'SE4ALL_CSV_obs.csv', + 'Subnational-Population_CSV_obs.csv', 'Subnational-Poverty_CSV_obs.csv', + 'WWBI_CSV_obs.csv' + ] + expected_files = sorted(set(expected_files)) + actual_output_files = sorted(set(os.listdir(OBSERVATIONS_DIR))) + # If actual processed files are not equal to expected files, raising fatal + if actual_output_files != expected_files: + logging.fatal('actual output files are not equal to expected') + + def write_observations_from_zip(zip_file, svs): csv_rows = get_observations_from_zip(zip_file, svs) if len(csv_rows) == 0: @@ -453,27 +508,34 @@ def write_observations_from_zip(zip_file, svs): def get_observations_from_zip(zip_file, svs): - with zipfile.ZipFile(zip_file, 'r') as zip: - (data_file, _) = get_data_and_series_file_names(zip) - if data_file is None: - logging.warning('No data file found in ZIP file: %s', zip_file) - return [] - else: - # Use name of file (excluding the extension) as the measurement method - measurement_method = f"{WORLD_BANK_MEASUREMENT_METHOD_PREFIX}_{zip_file.split('/')[-1].split('.')[0]}" - with zip.open(data_file, 'r') as csv_file: - data_rows = sanitize_csv_rows( - list(csv.DictReader(codecs.iterdecode(csv_file, 'utf-8')))) - num_rows = len(data_rows) - logging.info('# data rows in %s: %s', zip_file, num_rows) - - obs_csv_rows = [] - for data_row in data_rows: - obs_csv_rows.extend( - get_observations_from_data_row(data_row, svs, - measurement_method)) - - return obs_csv_rows + try: + with zipfile.ZipFile(zip_file, 'r') as zip: + (data_file, _) = get_data_and_series_file_names(zip) + if data_file is None: + logging.warning('No data file found in ZIP file: %s', zip_file) + return [] + else: + # Use name of file (excluding the extension) as the measurement method + measurement_method = f"{WORLD_BANK_MEASUREMENT_METHOD_PREFIX}_{zip_file.split('/')[-1].split('.')[0]}" + with zip.open(data_file, 'r') as csv_file: + data_rows = sanitize_csv_rows( + list( + csv.DictReader(codecs.iterdecode(csv_file, + 'utf-8')))) + num_rows = len(data_rows) + logging.info('# data rows in %s: %s', zip_file, num_rows) + + obs_csv_rows = [] + for data_row in data_rows: + obs_csv_rows.extend( + get_observations_from_data_row( + data_row, svs, measurement_method)) + + return obs_csv_rows + # Exception can be ignored as there might be some corrupted zip files from source + except Exception as e: + logging.info("There is problem while processing the zip file: %s", e) + return [] def get_observations_from_data_row(data_row, svs, measurement_method): @@ -571,19 +633,27 @@ def get_data_and_series_file_names(zip): def main(_): - match FLAGS.mode: - case Mode.FETCH_DATASETS: - download_datasets() - case Mode.DOWNLOAD_DATASETS: - fetch_and_write_datasets_csv() - case Mode.WRITE_WB_CODES: - write_wb_codes() - case Mode.LOAD_STAT_VARS: - load_stat_vars(FLAGS.stat_vars_file) - case Mode.WRITE_OBSERVATIONS: - write_all_observations(FLAGS.stat_vars_file) - case _: - logging.error('No mode specified.') + logging.info(FLAGS.mode) + if not FLAGS.mode: + fetch_and_write_datasets_csv() + download_datasets() + write_wb_codes() + load_stat_vars(FLAGS.stat_vars_file) + write_all_observations(FLAGS.stat_vars_file) + else: + match FLAGS.mode: + case Mode.FETCH_DATASETS: + fetch_and_write_datasets_csv() + case Mode.DOWNLOAD_DATASETS: + download_datasets() + case Mode.WRITE_WB_CODES: + write_wb_codes() + case Mode.LOAD_STAT_VARS: + load_stat_vars(FLAGS.stat_vars_file) + case Mode.WRITE_OBSERVATIONS: + write_all_observations(FLAGS.stat_vars_file) + case _: + logging.error('No mode specified.') if __name__ == '__main__': diff --git a/scripts/world_bank/datasets/datasets_test.py b/scripts/world_bank/datasets/datasets_test.py new file mode 100644 index 0000000000..02fb378984 --- /dev/null +++ b/scripts/world_bank/datasets/datasets_test.py @@ -0,0 +1,97 @@ +# # Copyright 2020 Google LLC +# # +# # Licensed under the Apache License, Version 2.0 (the "License"); +# # you may not use this file except in compliance with the License. +# # You may obtain a copy of the License at +# # +# # https://www.apache.org/licenses/LICENSE-2.0 +# # +# # Unless required by applicable law or agreed to in writing, software +# # distributed under the License is distributed on an "AS IS" BASIS, +# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# # See the License for the specific language governing permissions and +# # limitations under the License. + +import datasets +import unittest +import os +import pandas as pd +import csv +import re +import codecs +import csv + +MODULE_DIR = os.path.dirname(__file__) + +TEST_DATASET_DIR = os.path.join(MODULE_DIR, "test_data", "input_data") +EXPECTED_FILES_DIR = os.path.join(MODULE_DIR, "test_data", "expected_files") + + +def get_datarow_measurement_method(filename): + data_row = [] + with open(os.path.join(TEST_DATASET_DIR, filename), 'r') as file_content: + csv_reader = csv.DictReader(file_content) + for row in csv_reader: + data_row.append(row) + measurement_method = re.match(r"(.*)\.csv", filename).group(1) + return data_row, measurement_method + + +def get_sv_mapping(indicator_code): + for file in os.listdir(TEST_DATASET_DIR): + if file.endswith('_mapping.csv'): + with open(os.path.join(TEST_DATASET_DIR, file), + 'r') as file_content: + csv_reader = csv.DictReader(file_content) + for row in csv_reader: + series_code = row['seriescode'] + if series_code == indicator_code: + svs = {series_code: row} + return svs + + +def fetch_expected_output(indicator_code): + for file in os.listdir(EXPECTED_FILES_DIR): + if file.endswith('.csv'): + with open(os.path.join(EXPECTED_FILES_DIR, file), + 'r') as file_content: + csv_reader = csv.DictReader(file_content) + for row in csv_reader: + indicator = row['indicatorcode'] + if indicator == indicator_code: + expected_output = row + return expected_output + + +class TestMyFunction(unittest.TestCase): + + def get_required_data(self, filename, indicator_code): + data_row, measurement_method = get_datarow_measurement_method( + 'WorldBank_FINDEX_CSV.csv') + svs = get_sv_mapping(indicator_code) + expected_output = fetch_expected_output(indicator_code) + return data_row, measurement_method, svs, expected_output + + def test_input1(self): + data_row, measurement_method, svs, expected_output = self.get_required_data( + 'WorldBank_FINDEX_CSV.csv', + 'account.t.d', + ) + self.assertEqual( + datasets.get_observations_from_data_row(data_row[0], svs, + measurement_method)[0], + expected_output) + + def test_input2(self): + data_row, measurement_method, svs, expected_output = self.get_required_data( + 'WorldBank_FINDEX_CSV.csv', + 'account.t.d.1', + ) + self.assertEqual( + datasets.get_observations_from_data_row(data_row[1], svs, + measurement_method)[0], + expected_output) + + +if __name__ == '__main__': + unittest.main() diff --git a/scripts/world_bank/datasets/test_data/expected_files/expected_output.csv b/scripts/world_bank/datasets/test_data/expected_files/expected_output.csv new file mode 100644 index 0000000000..665d299d59 --- /dev/null +++ b/scripts/world_bank/datasets/test_data/expected_files/expected_output.csv @@ -0,0 +1,3 @@ +indicatorcode,statvar,measurementmethod,observationabout,observationdate,observationvalue,unit +account.t.d,worldBank/account_t_d,WorldBank_FINDEX_CSV,dcid:country/ARB,2011,22.48, +account.t.d.1,worldBank/account_t_d_1,WorldBank_FINDEX_CSV,dcid:country/EAS,2011,57.5, \ No newline at end of file diff --git a/scripts/world_bank/datasets/test_data/input_data/WorldBank_FINDEX_CSV.csv b/scripts/world_bank/datasets/test_data/input_data/WorldBank_FINDEX_CSV.csv new file mode 100644 index 0000000000..e7df731d90 --- /dev/null +++ b/scripts/world_bank/datasets/test_data/input_data/WorldBank_FINDEX_CSV.csv @@ -0,0 +1,3 @@ +countryname,countrycode,indicatorname,indicatorcode,2011,2014,2017,2021,2022,, +Arab World,ARB,Account (% age 15+),account.t.d,22.48,30.47,37.23,40.21,,, +East Asia & Pacific,EAS,Mobile money account,account.t.d.1,57.5,70.24,71.17,81.38,,, \ No newline at end of file diff --git a/scripts/world_bank/datasets/test_data/input_data/sv_mapping.csv b/scripts/world_bank/datasets/test_data/input_data/sv_mapping.csv new file mode 100644 index 0000000000..5130e4ac14 --- /dev/null +++ b/scripts/world_bank/datasets/test_data/input_data/sv_mapping.csv @@ -0,0 +1,3 @@ +seriescode,unit,statvar +account.t.d,,worldBank/account_t_d +account.t.d.1,,worldBank/account_t_d_1