diff --git a/scripts/us_hud/income/README.md b/scripts/us_hud/income/README.md index 57522f078c..6a1674484c 100644 --- a/scripts/us_hud/income/README.md +++ b/scripts/us_hud/income/README.md @@ -15,4 +15,7 @@ The `match_bq.csv` file contains places that have additional dcids that we would To run unit tests: ``` python3 -m unittest discover -v -s ../ -p "*_test.py" + +you need to install the required packages ,the python-calamine package for handling .xls files: +pip install python-calamine==0.3.0 ``` diff --git a/scripts/us_hud/income/process.py b/scripts/us_hud/income/process.py index fb9fc767b9..9dc97771bb 100644 --- a/scripts/us_hud/income/process.py +++ b/scripts/us_hud/income/process.py @@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + '''Generates cleaned CSVs for HUD Income Limits data. Produces: @@ -19,12 +20,19 @@ Usage: python3 process.py ''' + + + import csv import datetime import os import pandas as pd from absl import app from absl import flags +from absl import logging +from typing import IO, Iterator +import python_calamine +import requests FLAGS = flags.FLAGS flags.DEFINE_string('income_output_dir', 'csv', 'Path to write cleaned CSVs.') @@ -33,14 +41,7 @@ def get_url(year): - '''Return xls url for year. - - Args: - year: Input year. - - Returns: - xls url for given year. - ''' + '''Return xls url for year.''' if year < 2006: return '' suffix = str(year)[-2:] @@ -64,68 +65,124 @@ def get_url(year): return '' -def compute_150(df, person): - '''Compute 150th percentile income in-place. +def download_file(url: str, filename: str, input_folder: str): + '''Download file and save it locally in the specified folder.''' + try: + if not os.path.exists(input_folder): + os.makedirs(input_folder) + file_path = os.path.join(input_folder, filename) + response = requests.get(url) + if response.status_code == 200: + with open(file_path, 'wb') as file: + file.write(response.content) + logging.info(f"Downloaded file: {file_path}") + else: + logging.fatal(f"Failed to download from {url}, status code {response.status_code}") + except Exception as e: + logging.fatal(f"Failed to download {url}: {str(e)}") + + +def iter_excel_calamine(file: IO[bytes]) -> Iterator[dict[str, object]]: + '''Reads Excel file using python_calamine.''' + workbook = python_calamine.CalamineWorkbook.from_filelike(file) # type: ignore[arg-type] + rows = iter(workbook.get_sheet_by_index(0).to_python()) + headers = list(map(str, next(rows))) # Get headers from the first row + for row in rows: + yield dict(zip(headers, row)) + - Args: - df: Input dataframe (will be modified). - person: Number of people in household. - ''' +def compute_150(df, person): + '''Compute 150th percentile income in-place.''' df[f'l150_{person}'] = df.apply( lambda x: round(x[f'l80_{person}'] / 80 * 150), axis=1) -def process(year, matches, output_dir): - '''Generate cleaned CSV. - - Args: - year: Input year. - matches: Map of fips dcid -> city dcid. - output_dir: Directory to write cleaned CSV. - ''' +def process(year, matches, output_data, input_folder): + '''Generate cleaned data and accumulate it in output_data.''' url = get_url(year) - try: - df = pd.read_excel(url) - except: - print(f'No file found for {url}.') - return + + + if year == 2023 or year == 2024: + try: + filename = f"Section8-FY{year}.xlsx" + download_file(url, filename, input_folder) + with open(os.path.join(input_folder, filename), 'rb') as f: + rows = iter_excel_calamine(f) + data = [row for row in rows] + df = pd.DataFrame(data) + except Exception as e: + logging.fatal(f'Error in the process method : {year}: {url} {e}.') + return + else: + # For other years, download via URL + try: + filename = f"Section8-FY{year}.xls" + download_file(url, filename, input_folder) + df = pd.read_excel(os.path.join(input_folder, filename)) + except Exception as e : + logging.fatal(f'Error in the process method : {url} {e}.') + return + + # Process the DataFrame (common code for all years) if 'fips2010' in df: df = df.rename(columns={'fips2010': 'fips'}) - # Filter to 80th percentile income stats for each household size. + # Filter to 80th percentile income stats for each household size df = df.loc[:, [ 'fips', 'l80_1', 'l80_2', 'l80_3', 'l80_4', 'l80_5', 'l80_6', 'l80_7', 'l80_8' ]] - df['fips'] = df.apply(lambda x: 'dcs:geoId/' + str(x['fips']).zfill(10), - axis=1) - df['fips'] = df.apply(lambda x: x['fips'][:-5] - if x['fips'][-5:] == '99999' else x['fips'], - axis=1) + # Format FIPS codes + df['fips'] = df.apply(lambda x: 'dcs:geoId/' + str(x['fips']).zfill(10), axis=1) + df['fips'] = df.apply(lambda x: x['fips'][:-5] if x['fips'][-5:] == '99999' else x['fips'], axis=1) + + # Compute 150th percentile for each household size for i in range(1, 9): compute_150(df, i) - df['year'] = [year for i in range(len(df))] + + # Add year column + df['year'] = [year for _ in range(len(df))] - # Add stats for matching dcids. + # Add stats for matching dcids df_match = df.copy().loc[df['fips'].isin(matches)] if not df_match.empty: df_match['fips'] = df_match.apply(lambda x: matches[x['fips']], axis=1) df = pd.concat([df, df_match]) - df.to_csv(os.path.join(output_dir, f'output_{year}.csv'), index=False) + # Append this year's data to the output_data list + output_data.append(df) def main(argv): + '''Main function to process data for all years and merge into a single CSV.''' with open('match_bq.csv') as f: reader = csv.DictReader(f) matches = {'dcs:' + row['fips']: 'dcs:' + row['city'] for row in reader} + + # Ensure the output directory exists if not os.path.exists(FLAGS.income_output_dir): os.makedirs(FLAGS.income_output_dir) today = datetime.date.today() - for year in range(2006, today.year): + + # List to accumulate all data + output_data = [] + + # Define input folder for downloaded files + input_folder = 'input' + + + # Process data for years 2006 to the current year + for year in range(2006, today.year + 1): print(year) - process(year, matches, FLAGS.income_output_dir) + process(year, matches, output_data, input_folder) + + # Concatenate all DataFrames in output_data into one single DataFrame + final_df = pd.concat(output_data, ignore_index=True) + + # Save the merged data to a single CSV + final_df.to_csv(os.path.join(FLAGS.income_output_dir, 'output_all_years.csv'), index=False) + logging.info(f'Merged data saved to {FLAGS.income_output_dir}/output_all_years.csv') if __name__ == '__main__': diff --git a/scripts/us_hud/income/requirements.txt b/scripts/us_hud/income/requirements.txt new file mode 100644 index 0000000000..3a30cef53c --- /dev/null +++ b/scripts/us_hud/income/requirements.txt @@ -0,0 +1 @@ +python-calamine==0.3.0