Skip to content

Commit

Permalink
Consolidate data processing to single pipe function
Browse files Browse the repository at this point in the history
  • Loading branch information
jvivian committed Dec 7, 2023
1 parent ebd7665 commit 6a07645
Showing 1 changed file with 48 additions and 133 deletions.
181 changes: 48 additions & 133 deletions covid19_drdfm/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
from pathlib import Path

import fastparquet
import numpy as np
import pandas as pd
import yaml
from covid19_drdfm.constants import NAME_MAP
from sklearn.preprocessing import MinMaxScaler

from covid19_drdfm.constants import DIFF_COLS, LOG_DIFF_COLS, NAME_MAP

ROOT_DIR = Path(__file__).parent.absolute()
DATA_DIR = ROOT_DIR / "data/processed"
Expand All @@ -40,20 +43,16 @@ def get_df() -> pd.DataFrame:
.pipe(adjust_inflation)
.pipe(add_datetime)
.pipe(fix_names)
.pipe(adjust_pandemic_response)
.pipe(diff_vars, cols=DIFF_COLS)
.pipe(diff_vars, cols=LOG_DIFF_COLS, log=True)
.fillna(0)
.pipe(normalize)
.drop(index=0) # Drop first row with NaNs from diff
)


# def get_factors() -> dict[str, (str, str)]:
# """Fetch pre-defined factors for model

# Returns:
# dict[str, (str, str)]: Factors from `./data/processed/factors.yaml`
# """
# with open(DATA_DIR / "factors.json") as f:
# return json.load(f)


def write(df: pd.DataFrame, outpath: Path) -> Path:
def write(df: pd.DataFrame, outpath: Path):
"""Write dataframe given the extension"""
ext = outpath.suffix
if ext == ".xlsx":
Expand Down Expand Up @@ -106,7 +105,6 @@ def adjust_pandemic_response(df: pd.DataFrame) -> pd.DataFrame:
pd.DataFrame: Adjusted DataFrame
"""
govt_fund_dist = get_govt_fund_dist()
# responses = [f"Pandemic_Response_{x}" for x in [13, 14, 15]]
responses = ["ARP", "PPP", "CARES"]
for r in responses:
df[r] = df[r].astype(float)
Expand All @@ -118,7 +116,7 @@ def adjust_pandemic_response(df: pd.DataFrame) -> pd.DataFrame:


def add_datetime(df: pd.DataFrame) -> pd.DataFrame:
"""Sets `Time` column to `DateTime` dtype
"""Set `Time` column to `DateTime` dtype
Args:
df (pd.DataFrame): Input DataFrame
Expand All @@ -143,124 +141,41 @@ def fix_names(df: pd.DataFrame) -> pd.DataFrame:
return df.rename(columns=NAME_MAP)


def get_factors():
"""Returns the pre-assigned factors for the model"""
return {
"Cases1": ("Global", "Pandemic"),
"Cases2": ("Global", "Pandemic"),
"Cases3": ("Global", "Pandemic"),
"Cases4": ("Global", "Pandemic"),
"Cases5": ("Global", "Pandemic"),
"Hosp1": ("Global", "Pandemic"),
"Hosp2": ("Global", "Pandemic"),
"Deaths1": ("Global", "Pandemic"),
"Deaths2": ("Global", "Pandemic"),
"Deaths3": ("Global", "Pandemic"),
"Deaths4": ("Global", "Pandemic"),
"Deaths5": ("Global", "Pandemic"),
"Vax1": ("Global", "Response"),
"Vax2": ("Global", "Response"),
"Vax3": ("Global", "Response"),
"Gather1": ("Global", "Response"),
"Gather2": ("Global", "Response"),
"Gather3": ("Global", "Response"),
"Gather4": ("Global", "Response"),
"SaH": ("Global", "Response"),
"Curfew": ("Global", "Response"),
"Mask1": ("Global", "Response"),
"Mask2": ("Global", "Response"),
"School": ("Global", "Response"),
"ARP": ("Global", "Response"),
"PPP": ("Global", "Response"),
"CARES": ("Global", "Response"),
"School": ("Global", "Response"),
"School": ("Global", "Response"),
"Cons1": ("Global", "Consumption"),
"Cons2": ("Global", "Consumption"),
"Cons3": ("Global", "Consumption"),
"Cons4": ("Global", "Consumption"),
"Cons5": ("Global", "Consumption"),
"Employment1": ("Global", "Employment"),
"Employment2": ("Global", "Employment"),
"UI": ("Global", "Employment"),
"PartR": ("Global", "Employment"),
"UR": ("Global", "Employment"),
"CPI": ("Global", "Inflation"),
"CPIU": ("Global", "Inflation"),
"PCE": ("Global", "Inflation"),
"PCEC": ("Global", "Inflation"),
"RPFI": ("Global", "Uncat"),
"FixAss": ("Global", "Uncat"),
"Prod": ("Global", "Uncat"),
"GDP": ("Global", "Uncat"),
"TBill1mo": ("Global", "Uncat"),
"TBill6mo": ("Global", "Uncat"),
"TBill1yr": ("Global", "Uncat"),
"TBill5yr": ("Global", "Uncat"),
"TBill10yr": ("Global", "Uncat"),
"TBill30yr": ("Global", "Uncat"),
"FFR": ("Global", "Uncat"),
}
def diff_vars(df: pd.DataFrame, cols: list[str], log: bool = False) -> pd.DataFrame:
"""Differences the set of variables within the dataframe
NOTE: Leaves a row with Nas
"""
Diff
"Cases1"
"Cases2"
"Cases3"
"Cases4"
"Cases5"
"Hosp1"
"Hosp2"
"Deaths1"
"Deaths2"
"Deaths3"
"Deaths4"
"Deaths5"
Log-Diff
"Cons1"
"Cons2"
"Cons3"
"Cons4"
"Cons5"
"Employment1"
"Employment2"
"CPI"
"CPIU"
"PCE"
"PCEC"
"RPFI"
"FixAss"
"Prod"
"GDP"
"UI"
Nothing
"Vax1"
"Vax2"
"Vax3"
"Gather1"
"Gather2"
"Gather3"
"Gather4"
"SaH"
"Curfew"
"Mask1"
"Mask2"
"School"
"ARP"
"PPP"
"CARES"
"TBill1mo"
"TBill6mo"
"TBill1yr"
"TBill5yr"
"TBill10yr"
"TBill30yr"
"FFR"
"PartR"
"UR"
"""
Args:
df (pd.DataFrame): Input DataFrame
vars (List[str]): List of columns to difference
log bool: Whether to take the log(difference) or not
Returns:
pd.DataFrame: DataFrame with given vars differenced
"""
if log:
# df[cols] = np.log(df[cols]).diff().fillna(0).apply(lambda x: np.log(x + 0))
df[cols] = df[cols].apply(lambda x: np.log(x + 1)).diff()
else:
df[cols] = df[cols].diff()
return df


def normalize(df: pd.DataFrame) -> pd.DataFrame:
"""Normalize data and make stationary - scaling for post-DFM Synthetic Control Model
Args:
df (pd.DataFrame): State data, pre-normalization
Returns:
pd.DataFrame: Normalized and stationary DataFrame
"""
meta_cols = df[["State", "Time"]]
# df = df.drop(columns=["Time"]) if "Time" in df.columns else df
df = df.drop(columns=["State", "Time"])
# Normalize data
scaler = MinMaxScaler()
new = pd.DataFrame(scaler.fit_transform(df), columns=df.columns)
new["State"] = meta_cols["State"]
return new

0 comments on commit 6a07645

Please sign in to comment.