Skip to content

Commit

Permalink
Merge branch 'datacommonsorg:master' into us_bea_harish
Browse files Browse the repository at this point in the history
  • Loading branch information
HarishC727 authored Nov 7, 2023
2 parents 1480cac + 63c7388 commit 6354261
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 38 deletions.
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ google-cloud-scheduler==2.10.0
gspread
lxml==4.9.1
matplotlib==3.3.0
netCDF4
netCDF4==1.6.4
numpy
openpyxl==3.0.7
pandas==1.3.5
pandas
pylint
pytest
rasterio
Expand All @@ -39,3 +39,4 @@ xlrd==1.2.0
yapf
zipp
beautifulsoup4
ratelimit
93 changes: 57 additions & 36 deletions scripts/us_usda/quickstats/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@
If the key is not specified as above, it falls back to using a key specified
in a GCS config file. However, that file is available to DC team members only.
"""

import json

import requests
import sys
import csv
import multiprocessing
from datetime import datetime
from itertools import repeat
import json
import multiprocessing
import os
from datetime import datetime
from google.cloud import storage
import sys

from absl import app
from absl import flags
from google.cloud import storage
from ratelimit import limits
import requests

API_BASE = 'https://quickstats.nass.usda.gov/api'

Expand All @@ -53,15 +53,25 @@
'998', # "OTHER" county code
])

_GCS_PROJECT_ID = "datcom-204919"
_GCS_BUCKET = "datcom-csv"
_GCS_FILE_PATH = "usda/agriculture_survey/config.json"
_GCS_PROJECT_ID = 'datcom-204919'
_GCS_BUCKET = 'datcom-csv'
_GCS_FILE_PATH = 'usda/agriculture_survey/config.json'

_USDA_API_KEY = 'usda_api_key'

_FLAGS = flags.FLAGS

flags.DEFINE_string(_USDA_API_KEY, None, 'USDA quickstats API key.')
flags.DEFINE_integer(
'start_year',
os.getenv('start_year', 2000),
'Year from whihc data is processed.',
)
flags.DEFINE_integer(
'num_counties',
os.getenv('num_counties', 5000),
'number of counties for which data is processed.',
)


def process_survey_data(year, svs, out_dir):
Expand All @@ -75,14 +85,16 @@ def process_survey_data(year, svs, out_dir):

print('Getting county names')
county_names = get_param_values('county_name')
county_names = county_names[:_FLAGS.num_counties]
print('# counties =', len(county_names))

pool_size = max(2, multiprocessing.cpu_count() - 1)

with multiprocessing.Pool(pool_size) as pool:
pool.starmap(
fetch_and_write,
zip(county_names, repeat(year), repeat(svs), repeat(out_dir)))
zip(county_names, repeat(year), repeat(svs), repeat(out_dir)),
)

write_aggregate_csv(year, out_dir)

Expand All @@ -96,15 +108,15 @@ def get_parts_dir(out_dir, year):


def get_response_dir(out_dir, year):
return f"{out_dir}/response/{year}"
return f'{out_dir}/response/{year}'


def get_response_file_path(out_dir, year, county):
return f"{get_response_dir(out_dir, year)}/{county}.json"
return f'{get_response_dir(out_dir, year)}/{county}.json'


def get_year_csv_file_path(out_dir, year):
return f"{out_dir}/ag-{year}.csv"
return f'{out_dir}/ag-{year}.csv'


def write_aggregate_csv(year, out_dir):
Expand All @@ -120,13 +132,13 @@ def write_aggregate_csv(year, out_dir):
lineterminator='\n')
csv_writer.writeheader()
for part_file in part_files:
if part_file.endswith(".csv"):
with open(f"{parts_dir}/{part_file}", 'r') as part:
if part_file.endswith('.csv'):
with open(f'{parts_dir}/{part_file}', 'r') as part:
csv_writer.writerows(csv.DictReader(part))


def write_consolidated_csv(years, out_dir):
out_file = f"{out_dir}/consolidated.csv"
out_file = f'{out_dir}/consolidated.csv'

print('Writing consolidated CSV', out_file)

Expand All @@ -141,11 +153,19 @@ def write_consolidated_csv(years, out_dir):


def fetch_and_write(county_name, year, svs, out_dir):
out_file = f"{get_parts_dir(out_dir, year)}/{county_name.replace('[^a-zA-Z0-9]', '')}.csv"
out_file = (
f"{get_parts_dir(out_dir, year)}/{county_name.replace('[^a-zA-Z0-9]', '')}.csv"
)
api_data = get_survey_county_data(year, county_name, out_dir)
county_csv_rows = to_csv_rows(api_data, svs)
print('Writing', len(county_csv_rows), 'rows for county', county_name,
'to file', out_file)
print(
'Writing',
len(county_csv_rows),
'rows for county',
county_name,
'to file',
out_file,
)
with open(out_file, 'w', newline='') as out:
write_csv(out, county_csv_rows)

Expand All @@ -161,9 +181,9 @@ def get_survey_county_data(year, county, out_dir):
else:
params = {
'key': get_usda_api_key(),
'source_desc': "SURVEY",
'source_desc': 'SURVEY',
'year': year,
'county_name': county
'county_name': county,
}
response = get_data(params)
with open(response_file, 'w') as f:
Expand All @@ -178,6 +198,7 @@ def get_survey_county_data(year, county, out_dir):
return response


@limits(calls=10, period=60)
def get_data(params):
return requests.get(f'{API_BASE}/api_GET', params=params).json()

Expand All @@ -189,19 +210,19 @@ def get_param_values(param):
return [] if param not in response else response[param]


'''Converts a quickstats data row to a DC CSV row.
"""Converts a quickstats data row to a DC CSV row.
data = quickstats data row
svs = {name: {name: ..., sv: ..., unit: ...}}
returns = {variableMeasured: ..., observationAbout: ..., value: ..., unit: ...}
'''
"""


def to_csv_row(data_row, svs):
name = data_row['short_desc']
if data_row['domaincat_desc'] and data_row[
'domaincat_desc'] != 'NOT SPECIFIED':
if (data_row['domaincat_desc'] and
data_row['domaincat_desc'] != 'NOT SPECIFIED'):
name = f"{name}%%{data_row['domaincat_desc']}"

if name not in svs:
Expand All @@ -213,16 +234,16 @@ def to_csv_row(data_row, svs):
eprint('SKIPPED, Unsupported county code', county_code)
return None

value = (data_row['value'] if 'value' in data_row else
data_row['Value']).strip().replace(',', '')
value = ((data_row['value'] if 'value' in data_row else
data_row['Value']).strip().replace(',', ''))
if value in SKIPPED_VALUES:
eprint('SKIPPED, Invalid value', f"'{value}'", 'for', name)
return None
value = int(value)

observation_about = f"dcid:geoId/{data_row['state_fips_code']}{county_code}" if \
data_row[
'state_fips_code'] else 'dcid:country/USA'
observation_about = (
f"dcid:geoId/{data_row['state_fips_code']}{county_code}"
if data_row['state_fips_code'] else 'dcid:country/USA')

sv = svs[name]

Expand All @@ -248,7 +269,7 @@ def to_csv_rows(api_data, svs):

def load_svs():
svs = {}
with open("sv.csv", newline='') as csvfile:
with open('sv.csv', newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
svs[row['name']] = row
Expand All @@ -267,16 +288,16 @@ def eprint(*args, **kwargs):

def get_all_counties():
svs = load_svs()
process_survey_data(2023, svs, "output")
process_survey_data(2023, svs, 'output')


def get_multiple_years():
start = datetime.now()
print('Start', start)

out_dir = "output"
out_dir = 'output'
svs = load_svs()
years = range(2000, datetime.now().year + 1)
years = range(_FLAGS.start_year, datetime.now().year + 1)
for year in years:
process_survey_data(year, svs, out_dir)

Expand Down

0 comments on commit 6354261

Please sign in to comment.