Skip to content

Commit

Permalink
Merge pull request #74 from jvivian/jvivian/issue56
Browse files Browse the repository at this point in the history
Refactor codebase to use AnnData (Fixes #56)
  • Loading branch information
jvivian authored Mar 29, 2024
2 parents dd5a202 + 8d7c3c5 commit 69ce2ed
Show file tree
Hide file tree
Showing 29 changed files with 14,513 additions and 814 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ COPY . /code/
# Project initialization:
RUN poetry install --no-interaction

CMD ["c19dfm", "--help"]
ENTRYPOINT ["c19dfm"]
CMD ["--help"]
361 changes: 243 additions & 118 deletions coverage.xml

Large diffs are not rendered by default.

69 changes: 49 additions & 20 deletions covid19_drdfm/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,68 @@
Process data and generate parquet DataFrame
- `c19_dfm process ./outfile.xlsx`
"""
import anndata as ann

import subprocess
from pathlib import Path
from typing import Optional

import typer
from rich import print

from covid19_drdfm.dfm import run_parameterized_model
from covid19_drdfm.processing import get_df, write
from covid19_drdfm.covid19 import get_project_h5ad

from covid19_drdfm.io import DataLoader
from covid19_drdfm.dfm import ModelRunner

app = typer.Typer()


class PreprocessingFailure(Exception):
"""Raised when preprocessing has failed"""
@app.command("run")
def run_dfm(
h5ad: Path,
outdir: Path,
batch: str = typer.Option(help="Name of column in h5ad.obs to use as batch variable"),
global_multiplier: int = 1,
maxiter: int = 10_000,
):
ad = ann.read_h5ad(h5ad)
model = ModelRunner(ad, outdir, batch)
model.run(maxiter, global_multiplier)

pass

@app.command("create_input_data")
def create_input_h5ad(
h5ad_out: Path,
data_path: Path,
factor_path: Path,
metadata_path: Optional[Path] = typer.Option(help="Path to metadata (needed if batching data)"),
):
"""
Convert data, factor, and metadata CSVs to H5AD and save output
@app.command("run")
def run_dfm(outdir: str):
"""Run Model"""
raw = get_df()
# ? Add multiprocessing step here
state = "NY"
run_parameterized_model(raw, state, Path(outdir))
Example: c19dfm create_input_h5ad data.h5ad ./data.csv ./factors.csv --metadata ./metadata.csv
"""
print(f"Creating H5AD at {h5ad_out}")
data = DataLoader().load(data_path, factor_path, metadata_path)
data.write_h5ad(h5ad_out)


@app.command("create_covid_project_data")
def create_project_data(outdir: Path):
"""
Create H5AD object of covid19 response and economic data
"""
ad = get_project_h5ad()
ad.write(outdir / "data.h5ad")
print(f"Project data successfully created at {outdir}/data.h5ad !")


@app.command("process")
def process_data(output_file: str):
@app.command("launch_dashboard")
def launch_dashboard():
"""
Process input data into single `outfile.{xlsx|csv|parquet}` DataFrame
Launch the Dashboard
"""
try:
df = get_df()
write(df, Path(output_file))
except Exception as e:
raise PreprocessingFailure(f"preprocessing failed!: {e}") from e
current_dir = Path(__file__).resolve().parent
dashboard_path = current_dir / "streamlit" / "Dashboard.py"
subprocess.run(["streamlit", "run", dashboard_path])
5 changes: 5 additions & 0 deletions covid19_drdfm/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@
FACTORS_GROUPED[second].append(key)
FACTORS_GROUPED = dict(FACTORS_GROUPED)

GROUPED_FACTORS = {}
for key, values in FACTORS_GROUPED.items():
for value in values:
GROUPED_FACTORS[value] = key

DIFF_COLS = [
"Cases1",
"Cases2",
Expand Down
139 changes: 139 additions & 0 deletions covid19_drdfm/covid19.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
from fractions import Fraction
from functools import reduce
from pathlib import Path

import anndata as ann
import pandas as pd
import yaml
from anndata import AnnData

from covid19_drdfm.constants import NAME_MAP

ROOT_DIR = Path(__file__).parent.absolute()
DATA_DIR = ROOT_DIR / "data/processed"


def _get_raw_df() -> pd.DataFrame:
"""
Merges CSV files specified in 'df_paths.txt'. Return a combined DataFrame.
Returns:
pd.DataFrame: A pandas DataFrame made from CSV files
"""
with open(DATA_DIR / "df_paths.txt") as f:
paths = [ROOT_DIR / x.strip() for x in f.readlines()]
dfs = [pd.read_csv(x) for x in paths]
return reduce(lambda x, y: pd.merge(x, y, on=["State", "Year", "Period"], how="left"), dfs)


def get_raw() -> pd.DataFrame:
"""
Retrieves the raw data as a pandas DataFrame.
Returns:
pd.DataFrame: The raw data.
"""
return (
_get_raw_df()
.drop(columns=["Monetary_1_x", "Monetary_11_x"])
.rename(columns={"Monetary_1_y": "Monetary_1", "Monetary_11_y": "Monetary_11"})
.drop(columns=["Proportion", "proportion_vax2", "Pandemic_Response_8", "Distributed"])
.pipe(add_datetime)
.pipe(fix_names)
)


def get_df() -> pd.DataFrame:
"""
Retrieves and processes the raw covid19 data to generate a cleaned DataFrame.
Returns:
pd.DataFrame: The cleaned DataFrame.
"""
return get_raw().pipe(adjust_inflation).pipe(adjust_pandemic_response)


def get_project_h5ad() -> AnnData:
"""
Load covid19 data.h5ad from the DATA_DIR and return.
Returns:
AnnData: The loaded AnnData object.
"""
return ann.read_h5ad(DATA_DIR / "data.h5ad")


def get_govt_fund_dist() -> list[float]:
"""Reads in govt fund distribution from data/raw/govt_fund_dist.yml
Returns:
list[float]: Distribution values. Length equates to num_months
"""
with open(DATA_DIR / "govt_fund_dist.yml") as f:
return [float(Fraction(x)) for x in yaml.safe_load(f)]


def adjust_inflation(df: pd.DataFrame) -> pd.DataFrame:
"""Adjust for inflation
Args:
df (pd.DataFrame): Input DataFrame (see `get_df`)
Returns:
pd.DataFrame: Adjusted DataFrame
"""
return (
df.assign(Cons1=lambda x: x.Cons1.div(x.PCE / 100))
.assign(Cons2=lambda x: x.Cons2.div(x.PCE / 100))
.assign(Cons3=lambda x: x.Cons3.div(x.PCE / 100))
.assign(Cons4=lambda x: x.Cons4.div(x.PCE / 100))
.assign(Cons5=lambda x: x.Cons5.div(x.PCE / 100))
.assign(GDP=lambda x: x.GDP.div(x.PCE / 100))
.assign(FixAss=lambda x: x.FixAss.div(x.PCE / 100))
)


def adjust_pandemic_response(df: pd.DataFrame) -> pd.DataFrame:
"""Adjust pandemic response given fund distribution
Args:
df (pd.DataFrame): Input DataFrame
Returns:
pd.DataFrame: Adjusted DataFrame
"""
govt_fund_dist = get_govt_fund_dist()
responses = ["ARP", "PPP", "CARES"]
for r in responses:
df[r] = df[r].astype(float)
i = df.index[df[r] > 0][0]
fund = df.loc[i, r]
for n in range(0, len(govt_fund_dist)):
df.loc[i + n, r] = fund * govt_fund_dist[n]
return df


def add_datetime(df: pd.DataFrame) -> pd.DataFrame:
"""Set `Time` column to `DateTime` dtype
Args:
df (pd.DataFrame): Input DataFrame
Returns:
pd.DataFrame: DType adjusted DataFrame
"""
df = df.assign(Month=pd.to_numeric(df.Period.apply(lambda x: x[1:]))).assign(Day=1)
df["Time"] = pd.to_datetime({"year": df.Year, "month": df.Month, "day": df.Day})
return df.drop(columns=["Period", "Month", "Year", "Day"])


def fix_names(df: pd.DataFrame) -> pd.DataFrame:
"""Map sensible names to the merged input dataframe
Args:
df (pd.DataFrame): Input DataFrame after merging all input data
Returns:
pd.DataFrame: DataFrame with mapped names
"""
return df.rename(columns=NAME_MAP)
Loading

0 comments on commit 69ce2ed

Please sign in to comment.