From 6a076450d0a648a5edb136df407b7a9af25ba6cf Mon Sep 17 00:00:00 2001 From: John Vivian Date: Thu, 7 Dec 2023 14:01:02 -0800 Subject: [PATCH] Consolidate data processing to single `pipe` function --- covid19_drdfm/processing.py | 181 ++++++++++-------------------------- 1 file changed, 48 insertions(+), 133 deletions(-) diff --git a/covid19_drdfm/processing.py b/covid19_drdfm/processing.py index 588f898..0c947d9 100644 --- a/covid19_drdfm/processing.py +++ b/covid19_drdfm/processing.py @@ -12,9 +12,12 @@ from pathlib import Path import fastparquet +import numpy as np import pandas as pd import yaml -from covid19_drdfm.constants import NAME_MAP +from sklearn.preprocessing import MinMaxScaler + +from covid19_drdfm.constants import DIFF_COLS, LOG_DIFF_COLS, NAME_MAP ROOT_DIR = Path(__file__).parent.absolute() DATA_DIR = ROOT_DIR / "data/processed" @@ -40,20 +43,16 @@ def get_df() -> pd.DataFrame: .pipe(adjust_inflation) .pipe(add_datetime) .pipe(fix_names) + .pipe(adjust_pandemic_response) + .pipe(diff_vars, cols=DIFF_COLS) + .pipe(diff_vars, cols=LOG_DIFF_COLS, log=True) + .fillna(0) + .pipe(normalize) + .drop(index=0) # Drop first row with NaNs from diff ) -# def get_factors() -> dict[str, (str, str)]: -# """Fetch pre-defined factors for model - -# Returns: -# dict[str, (str, str)]: Factors from `./data/processed/factors.yaml` -# """ -# with open(DATA_DIR / "factors.json") as f: -# return json.load(f) - - -def write(df: pd.DataFrame, outpath: Path) -> Path: +def write(df: pd.DataFrame, outpath: Path): """Write dataframe given the extension""" ext = outpath.suffix if ext == ".xlsx": @@ -106,7 +105,6 @@ def adjust_pandemic_response(df: pd.DataFrame) -> pd.DataFrame: pd.DataFrame: Adjusted DataFrame """ govt_fund_dist = get_govt_fund_dist() - # responses = [f"Pandemic_Response_{x}" for x in [13, 14, 15]] responses = ["ARP", "PPP", "CARES"] for r in responses: df[r] = df[r].astype(float) @@ -118,7 +116,7 @@ def adjust_pandemic_response(df: pd.DataFrame) -> pd.DataFrame: def add_datetime(df: pd.DataFrame) -> pd.DataFrame: - """Sets `Time` column to `DateTime` dtype + """Set `Time` column to `DateTime` dtype Args: df (pd.DataFrame): Input DataFrame @@ -143,124 +141,41 @@ def fix_names(df: pd.DataFrame) -> pd.DataFrame: return df.rename(columns=NAME_MAP) -def get_factors(): - """Returns the pre-assigned factors for the model""" - return { - "Cases1": ("Global", "Pandemic"), - "Cases2": ("Global", "Pandemic"), - "Cases3": ("Global", "Pandemic"), - "Cases4": ("Global", "Pandemic"), - "Cases5": ("Global", "Pandemic"), - "Hosp1": ("Global", "Pandemic"), - "Hosp2": ("Global", "Pandemic"), - "Deaths1": ("Global", "Pandemic"), - "Deaths2": ("Global", "Pandemic"), - "Deaths3": ("Global", "Pandemic"), - "Deaths4": ("Global", "Pandemic"), - "Deaths5": ("Global", "Pandemic"), - "Vax1": ("Global", "Response"), - "Vax2": ("Global", "Response"), - "Vax3": ("Global", "Response"), - "Gather1": ("Global", "Response"), - "Gather2": ("Global", "Response"), - "Gather3": ("Global", "Response"), - "Gather4": ("Global", "Response"), - "SaH": ("Global", "Response"), - "Curfew": ("Global", "Response"), - "Mask1": ("Global", "Response"), - "Mask2": ("Global", "Response"), - "School": ("Global", "Response"), - "ARP": ("Global", "Response"), - "PPP": ("Global", "Response"), - "CARES": ("Global", "Response"), - "School": ("Global", "Response"), - "School": ("Global", "Response"), - "Cons1": ("Global", "Consumption"), - "Cons2": ("Global", "Consumption"), - "Cons3": ("Global", "Consumption"), - "Cons4": ("Global", "Consumption"), - "Cons5": ("Global", "Consumption"), - "Employment1": ("Global", "Employment"), - "Employment2": ("Global", "Employment"), - "UI": ("Global", "Employment"), - "PartR": ("Global", "Employment"), - "UR": ("Global", "Employment"), - "CPI": ("Global", "Inflation"), - "CPIU": ("Global", "Inflation"), - "PCE": ("Global", "Inflation"), - "PCEC": ("Global", "Inflation"), - "RPFI": ("Global", "Uncat"), - "FixAss": ("Global", "Uncat"), - "Prod": ("Global", "Uncat"), - "GDP": ("Global", "Uncat"), - "TBill1mo": ("Global", "Uncat"), - "TBill6mo": ("Global", "Uncat"), - "TBill1yr": ("Global", "Uncat"), - "TBill5yr": ("Global", "Uncat"), - "TBill10yr": ("Global", "Uncat"), - "TBill30yr": ("Global", "Uncat"), - "FFR": ("Global", "Uncat"), - } +def diff_vars(df: pd.DataFrame, cols: list[str], log: bool = False) -> pd.DataFrame: + """Differences the set of variables within the dataframe + NOTE: Leaves a row with Nas -""" -Diff - "Cases1" - "Cases2" - "Cases3" - "Cases4" - "Cases5" - "Hosp1" - "Hosp2" - "Deaths1" - "Deaths2" - "Deaths3" - "Deaths4" - "Deaths5" - -Log-Diff - "Cons1" - "Cons2" - "Cons3" - "Cons4" - "Cons5" - "Employment1" - "Employment2" - "CPI" - "CPIU" - "PCE" - "PCEC" - "RPFI" - "FixAss" - "Prod" - "GDP" - "UI" - -Nothing - - "Vax1" - "Vax2" - "Vax3" - "Gather1" - "Gather2" - "Gather3" - "Gather4" - "SaH" - "Curfew" - "Mask1" - "Mask2" - "School" - "ARP" - "PPP" - "CARES" - "TBill1mo" - "TBill6mo" - "TBill1yr" - "TBill5yr" - "TBill10yr" - "TBill30yr" - "FFR" - "PartR" - "UR" - - """ + Args: + df (pd.DataFrame): Input DataFrame + vars (List[str]): List of columns to difference + log bool: Whether to take the log(difference) or not + + Returns: + pd.DataFrame: DataFrame with given vars differenced + """ + if log: + # df[cols] = np.log(df[cols]).diff().fillna(0).apply(lambda x: np.log(x + 0)) + df[cols] = df[cols].apply(lambda x: np.log(x + 1)).diff() + else: + df[cols] = df[cols].diff() + return df + + +def normalize(df: pd.DataFrame) -> pd.DataFrame: + """Normalize data and make stationary - scaling for post-DFM Synthetic Control Model + + Args: + df (pd.DataFrame): State data, pre-normalization + + Returns: + pd.DataFrame: Normalized and stationary DataFrame + """ + meta_cols = df[["State", "Time"]] + # df = df.drop(columns=["Time"]) if "Time" in df.columns else df + df = df.drop(columns=["State", "Time"]) + # Normalize data + scaler = MinMaxScaler() + new = pd.DataFrame(scaler.fit_transform(df), columns=df.columns) + new["State"] = meta_cols["State"] + return new