From b8a52db6fdb32322bf8776888a993bb2c1841f55 Mon Sep 17 00:00:00 2001 From: Alex Parsons Date: Mon, 12 Feb 2024 12:50:54 +0000 Subject: [PATCH] Adds division_io.py - Export parquet dumps of divisions and votes - Ingest manual and automated updates of titles from remote sources Requirements.txt until server python is updated --- conf/packages | 1 + requirements.txt | 3 + scripts/division_io.py | 304 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 308 insertions(+) create mode 100644 requirements.txt create mode 100644 scripts/division_io.py diff --git a/conf/packages b/conf/packages index 4d745fabc7..ca647e3cd9 100644 --- a/conf/packages +++ b/conf/packages @@ -24,4 +24,5 @@ php-mysql php7-xapian python3-lxml python3-mysqldb +python3-pip snarf diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000..aa5a272615 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +rich-click==1.7.3 +pandas==1.3.5 +pyarrow==12.0.1 \ No newline at end of file diff --git a/scripts/division_io.py b/scripts/division_io.py new file mode 100644 index 0000000000..1d9dc862eb --- /dev/null +++ b/scripts/division_io.py @@ -0,0 +1,304 @@ +#!/usr/bin/env python3 +# encoding: utf-8 +""" +division_io.py - Import and export division data from the TWFY database. + +See python scripts/division_io.py --help for usage. + +""" + +import re +from enum import IntEnum +from pathlib import Path +from typing import Dict, cast +from warnings import filterwarnings + +import MySQLdb +import pandas as pd +import rich_click as click +from rich import print +from rich.prompt import Prompt + +# suppress warnings about using mysqldb in pandas +filterwarnings( + "ignore", + category=UserWarning, + message=".*pandas only supports SQLAlchemy connectable.*", +) + + +class TitlePriority(IntEnum): + ORIGINAL_HEADER = 1 + PARLIAMENT_DESCRIBED = 4 + MANUAL = 5 + + +@click.group() +def cli(): + pass + + +def fast_config(config_path: Path) -> Dict[str, str]: + """ + There's a more comprehensive config parser in commonlib/pylib/mysociety/config.py + But this is all we need for this function. + """ + pattern = r"define\s*\(\s*['\"](.*?)['\"]\s*,\s*['\"]?(.*?)['\"]?\s*\);" + + with config_path.open("r") as f: + content = f.read() + + return {key: value for key, value in re.findall(pattern, content)} + + +def get_twfy_db_connection() -> MySQLdb.Connection: + config_path = Path("conf", "general") + config = fast_config(config_path) + + db_connection = cast( + MySQLdb.Connection, + MySQLdb.connect( + host=config["OPTION_TWFY_DB_HOST"], + db=config["OPTION_TWFY_DB_NAME"], + user=config["OPTION_TWFY_DB_USER"], + passwd=config["OPTION_TWFY_DB_PASS"], + charset="utf8", + ), + ) + return db_connection + + +def df_to_db(df: pd.DataFrame, *, new_priority: TitlePriority, verbose: bool = False): + """ + Take a dataframe with: + - a divison_id column or a twfy_votes_url column. + - division_title column + - optionally, yes_text and no_text. + + And load these descriptions into the divisions table. + """ + + if "twfy_votes_url" in df.columns: + df["division_id"] = df["twfy_votes_url"].apply(twfy_votes_url_to_pw_id) + + if "division_id" not in df.columns: + raise ValueError( + "Dataframe to uplaod needs division_id or twfy_votes_url column" + ) + + if "division_title" not in df.columns: + raise ValueError("Dataframe needs a division_title column") + + optional_col_count = 0 + + if "yes_text" in df.columns: + optional_col_count += 1 + + if "no_text" in df.columns: + optional_col_count += 1 + + if optional_col_count == 1: + raise ValueError("Dataframe either needs both yes_text and no_text or neither.") + + db_connection = get_twfy_db_connection() + + # get all divisions with a title_priority below or equal to current priority + existing_df = pd.read_sql( + f"SELECT division_id FROM divisions WHERE title_priority < {new_priority}", + db_connection, + ) + + # limit dataframe to just those that can be updated + + count_before = len(df) + df = df[df["division_id"].isin(existing_df["division_id"])] + count_removed = count_before - len(df) + if verbose: + print( + f"[blue]Updating titles for {len(df)} divisions - ignoring {count_removed} due to priority or absence." + ) + + if optional_col_count == 0: + # update the division_title column in the database + update_command = ( + "UPDATE divisions SET division_title = %s WHERE division_id = %s" + ) + + with db_connection.cursor() as cursor: + update_data = [ + (row["division_title"], row["division_id"]) for _, row in df.iterrows() + ] + cursor.executemany(update_command, update_data) + db_connection.commit() + if verbose: + print(f"[green]{len(df)} rows updated.") + + if optional_col_count == 2: + # update the division_title, yes_text, and no_text columns in the database + update_command = "UPDATE divisions SET division_title = %s, yes_text = %s, no_text = %s WHERE division_id = %s" + + with db_connection.cursor() as cursor: + update_data = [ + ( + row["division_title"], + row["yes_text"], + row["no_text"], + row["division_id"], + ) + for _, row in df.iterrows() + ] + cursor.executemany(update_command, update_data) + db_connection.commit() + if verbose: + print(f"[green]{len(df)} rows updated.") + + db_connection.close() + + +def url_to_db(url: str, *, new_priority: TitlePriority): + """ + Pipe external URL into the update process. + """ + + if url.endswith(".parquet"): + df = pd.read_parquet(url) + elif url.endswith(".csv"): + df = pd.read_csv(url) + elif url.endswith(".json"): + df = pd.read_json(url) + else: + raise ValueError("File not an allowed type (csv, json, or parquet)") + + df_to_db(df, new_priority=new_priority) + + +def run_schema_update(verbose: bool = False): + """ + Check if the title_priority priority column exists in the divisions table + and if not, run db/0023-add-division-title-priority.sql + """ + + db_connection = get_twfy_db_connection() + df = pd.read_sql("SELECT * FROM divisions limit 1", db_connection) + if "title_priority" in df.columns: + if verbose: + print("[blue]Schema already updated[/blue]") + return + + update_command = Path("db", "0023-add-division-title-priority.sql").read_text() + + # Execute the SQL update command + with db_connection.cursor() as cursor: + cursor.execute(update_command) + db_connection.commit() + if verbose: + print("[green]Schema updated successfully[/green]") + db_connection.close() + + +def twfy_votes_url_to_pw_id(url: str) -> str: + """ + Take a twfy_votes style URL (/decisions/division/commons/2024-01-10/37) + and put it in the pw id format + """ + parts = url.split("/") + division_no = parts[-1] + date = parts[-2] + chamber = parts[-3] + return f"pw-{date}-{division_no}-{chamber}" + + +@cli.command() +@click.option("--verbose", is_flag=True, help="Show verbose output") +def export_division_data(verbose: bool = False): + """ + Export division data to publically accessible parquet files + """ + dest_path = Path("data", "pwdata", "votes") + dest_path.mkdir(parents=True, exist_ok=True) + + db_connection = get_twfy_db_connection() + # get divisions + df = pd.read_sql("SELECT * FROM divisions", db_connection) + df.to_parquet(dest_path / "divisions.parquet", index=False) + if verbose: + print(f"[green]Divisions written to {dest_path / 'divisions.parquet'}[/green]") + # get votes + df = pd.read_sql("SELECT * FROM persondivisionvotes", db_connection) + df.to_parquet(dest_path / "votes.parquet", index=False) + if verbose: + print(f"[green]Votes written to {dest_path / 'votes.parquet'}[/green]") + + db_connection.close() + + +@cli.command() +@click.option("--verbose", is_flag=True, help="Show verbose output") +def update_schema(verbose: bool = False): + """ + Run schema update to support division titles if necessary + """ + run_schema_update(verbose=verbose) + + +@cli.command() +def update_division_title(): + """ + Manually update a single division. + """ + item = { + "division_id": Prompt.ask("Division id (pw style)"), + "division_title": Prompt.ask("Division title"), + "yes_text": Prompt.ask( + "Division yes text (optional)", default="", show_default=False + ), + "no_text": Prompt.ask( + "Division no text (optional)", default="", show_default=False + ), + } + + df = pd.DataFrame([item]) + + df_to_db(df, new_priority=TitlePriority.MANUAL) + + +@cli.command() +@click.option( + "--url", + prompt="Link to URL of manual update data", + help="A csv, json or parquet file to update division information from.", +) +@click.option("--verbose", is_flag=True, help="Show verbose output") +def update_from_url(url: str, verbose: bool = False): + """ + Fetch a bulk update file from a remote URL - assumes these are manual updates + and so take priority over other approaches. + """ + url_to_db(url, new_priority=TitlePriority.MANUAL, verbose=verbose) + + +@cli.command() +@click.option("--verbose", is_flag=True, help="Show verbose output") +def update_from_commons_votes(verbose: bool = False): + """ + Import Commons votes description names into local database. + Mid priority - higher than header based approaches, lower than manual approaches. + """ + + df = pd.read_parquet( + "https://pages.mysociety.org/voting-data/data/commons_votes/latest/divisions.parquet" + ) + + df = df[["division_key", "title"]].rename( + columns={"division_key": "division_id", "title": "division_title"} + ) + + df_to_db(df, new_priority=TitlePriority.PARLIAMENT_DESCRIBED, verbose=verbose) + + +def main(): + cli() + + +if __name__ == "__main__": + main()