From 860908d50a24495ffb3fcc893b6222e2840b18d1 Mon Sep 17 00:00:00 2001 From: Ajai Date: Wed, 18 Oct 2023 16:17:44 +0530 Subject: [PATCH 1/2] Add options to limit downloads by year, counties (#909) * add options to limit downloads by year, counties * fix limt --- requirements.txt | 1 + scripts/us_usda/quickstats/process.py | 93 ++++++++++++++++----------- 2 files changed, 58 insertions(+), 36 deletions(-) diff --git a/requirements.txt b/requirements.txt index 663ced4214..dc970e0aac 100644 --- a/requirements.txt +++ b/requirements.txt @@ -39,3 +39,4 @@ xlrd==1.2.0 yapf zipp beautifulsoup4 +ratelimit diff --git a/scripts/us_usda/quickstats/process.py b/scripts/us_usda/quickstats/process.py index b0db6fb150..90768e668c 100644 --- a/scripts/us_usda/quickstats/process.py +++ b/scripts/us_usda/quickstats/process.py @@ -21,21 +21,21 @@ If the key is not specified as above, it falls back to using a key specified in a GCS config file. However, that file is available to DC team members only. - """ -import json - -import requests -import sys import csv -import multiprocessing +from datetime import datetime from itertools import repeat +import json +import multiprocessing import os -from datetime import datetime -from google.cloud import storage +import sys + from absl import app from absl import flags +from google.cloud import storage +from ratelimit import limits +import requests API_BASE = 'https://quickstats.nass.usda.gov/api' @@ -53,15 +53,25 @@ '998', # "OTHER" county code ]) -_GCS_PROJECT_ID = "datcom-204919" -_GCS_BUCKET = "datcom-csv" -_GCS_FILE_PATH = "usda/agriculture_survey/config.json" +_GCS_PROJECT_ID = 'datcom-204919' +_GCS_BUCKET = 'datcom-csv' +_GCS_FILE_PATH = 'usda/agriculture_survey/config.json' _USDA_API_KEY = 'usda_api_key' _FLAGS = flags.FLAGS flags.DEFINE_string(_USDA_API_KEY, None, 'USDA quickstats API key.') +flags.DEFINE_integer( + 'start_year', + os.getenv('start_year', 2000), + 'Year from whihc data is processed.', +) +flags.DEFINE_integer( + 'num_counties', + os.getenv('num_counties', 5000), + 'number of counties for which data is processed.', +) def process_survey_data(year, svs, out_dir): @@ -75,6 +85,7 @@ def process_survey_data(year, svs, out_dir): print('Getting county names') county_names = get_param_values('county_name') + county_names = county_names[:_FLAGS.num_counties] print('# counties =', len(county_names)) pool_size = max(2, multiprocessing.cpu_count() - 1) @@ -82,7 +93,8 @@ def process_survey_data(year, svs, out_dir): with multiprocessing.Pool(pool_size) as pool: pool.starmap( fetch_and_write, - zip(county_names, repeat(year), repeat(svs), repeat(out_dir))) + zip(county_names, repeat(year), repeat(svs), repeat(out_dir)), + ) write_aggregate_csv(year, out_dir) @@ -96,15 +108,15 @@ def get_parts_dir(out_dir, year): def get_response_dir(out_dir, year): - return f"{out_dir}/response/{year}" + return f'{out_dir}/response/{year}' def get_response_file_path(out_dir, year, county): - return f"{get_response_dir(out_dir, year)}/{county}.json" + return f'{get_response_dir(out_dir, year)}/{county}.json' def get_year_csv_file_path(out_dir, year): - return f"{out_dir}/ag-{year}.csv" + return f'{out_dir}/ag-{year}.csv' def write_aggregate_csv(year, out_dir): @@ -120,13 +132,13 @@ def write_aggregate_csv(year, out_dir): lineterminator='\n') csv_writer.writeheader() for part_file in part_files: - if part_file.endswith(".csv"): - with open(f"{parts_dir}/{part_file}", 'r') as part: + if part_file.endswith('.csv'): + with open(f'{parts_dir}/{part_file}', 'r') as part: csv_writer.writerows(csv.DictReader(part)) def write_consolidated_csv(years, out_dir): - out_file = f"{out_dir}/consolidated.csv" + out_file = f'{out_dir}/consolidated.csv' print('Writing consolidated CSV', out_file) @@ -141,11 +153,19 @@ def write_consolidated_csv(years, out_dir): def fetch_and_write(county_name, year, svs, out_dir): - out_file = f"{get_parts_dir(out_dir, year)}/{county_name.replace('[^a-zA-Z0-9]', '')}.csv" + out_file = ( + f"{get_parts_dir(out_dir, year)}/{county_name.replace('[^a-zA-Z0-9]', '')}.csv" + ) api_data = get_survey_county_data(year, county_name, out_dir) county_csv_rows = to_csv_rows(api_data, svs) - print('Writing', len(county_csv_rows), 'rows for county', county_name, - 'to file', out_file) + print( + 'Writing', + len(county_csv_rows), + 'rows for county', + county_name, + 'to file', + out_file, + ) with open(out_file, 'w', newline='') as out: write_csv(out, county_csv_rows) @@ -161,9 +181,9 @@ def get_survey_county_data(year, county, out_dir): else: params = { 'key': get_usda_api_key(), - 'source_desc': "SURVEY", + 'source_desc': 'SURVEY', 'year': year, - 'county_name': county + 'county_name': county, } response = get_data(params) with open(response_file, 'w') as f: @@ -178,6 +198,7 @@ def get_survey_county_data(year, county, out_dir): return response +@limits(calls=10, period=60) def get_data(params): return requests.get(f'{API_BASE}/api_GET', params=params).json() @@ -189,19 +210,19 @@ def get_param_values(param): return [] if param not in response else response[param] -'''Converts a quickstats data row to a DC CSV row. +"""Converts a quickstats data row to a DC CSV row. data = quickstats data row svs = {name: {name: ..., sv: ..., unit: ...}} returns = {variableMeasured: ..., observationAbout: ..., value: ..., unit: ...} -''' +""" def to_csv_row(data_row, svs): name = data_row['short_desc'] - if data_row['domaincat_desc'] and data_row[ - 'domaincat_desc'] != 'NOT SPECIFIED': + if (data_row['domaincat_desc'] and + data_row['domaincat_desc'] != 'NOT SPECIFIED'): name = f"{name}%%{data_row['domaincat_desc']}" if name not in svs: @@ -213,16 +234,16 @@ def to_csv_row(data_row, svs): eprint('SKIPPED, Unsupported county code', county_code) return None - value = (data_row['value'] if 'value' in data_row else - data_row['Value']).strip().replace(',', '') + value = ((data_row['value'] if 'value' in data_row else + data_row['Value']).strip().replace(',', '')) if value in SKIPPED_VALUES: eprint('SKIPPED, Invalid value', f"'{value}'", 'for', name) return None value = int(value) - observation_about = f"dcid:geoId/{data_row['state_fips_code']}{county_code}" if \ - data_row[ - 'state_fips_code'] else 'dcid:country/USA' + observation_about = ( + f"dcid:geoId/{data_row['state_fips_code']}{county_code}" + if data_row['state_fips_code'] else 'dcid:country/USA') sv = svs[name] @@ -248,7 +269,7 @@ def to_csv_rows(api_data, svs): def load_svs(): svs = {} - with open("sv.csv", newline='') as csvfile: + with open('sv.csv', newline='') as csvfile: reader = csv.DictReader(csvfile) for row in reader: svs[row['name']] = row @@ -267,16 +288,16 @@ def eprint(*args, **kwargs): def get_all_counties(): svs = load_svs() - process_survey_data(2023, svs, "output") + process_survey_data(2023, svs, 'output') def get_multiple_years(): start = datetime.now() print('Start', start) - out_dir = "output" + out_dir = 'output' svs = load_svs() - years = range(2000, datetime.now().year + 1) + years = range(_FLAGS.start_year, datetime.now().year + 1) for year in years: process_survey_data(year, svs, out_dir) From 63c73888c483d743f4ab361fbae65a4e8d5d2846 Mon Sep 17 00:00:00 2001 From: hareesh-ms <101345004+hareesh-ms@users.noreply.github.com> Date: Tue, 31 Oct 2023 17:01:46 +0530 Subject: [PATCH 2/2] Remove pandas version number from requirements.txt (#916) * Remove pandas version number from requirements.txt * Set version 1.6.4 for netCDF4 in requirements.txt --- requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index dc970e0aac..8f52b0b021 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,10 +20,10 @@ google-cloud-scheduler==2.10.0 gspread lxml==4.9.1 matplotlib==3.3.0 -netCDF4 +netCDF4==1.6.4 numpy openpyxl==3.0.7 -pandas==1.3.5 +pandas pylint pytest rasterio