From d02f98614c579956097448107c82112388849195 Mon Sep 17 00:00:00 2001 From: Jorge Rivera Date: Thu, 26 Jan 2023 18:58:16 +0100 Subject: [PATCH] reading data and indicator functions --- requirements.txt | 2 + scripts/indicators.py | 45 ++++++++++++++++++ scripts/read_data.py | 103 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 150 insertions(+) create mode 100644 scripts/indicators.py create mode 100644 scripts/read_data.py diff --git a/requirements.txt b/requirements.txt index e69de29..c40d0f3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -0,0 +1,2 @@ +pymongo +pandas \ No newline at end of file diff --git a/scripts/indicators.py b/scripts/indicators.py new file mode 100644 index 0000000..76573a5 --- /dev/null +++ b/scripts/indicators.py @@ -0,0 +1,45 @@ +from functools import partial + +import pandas as pd + +from scripts.read_data import get_indicator, CollectionCursor, COLLECTION_NAME + +# Create a partial function that will always use the same collection cursor. +_get_indicator = partial(get_indicator, CollectionCursor(COLLECTION_NAME)) + + +def get_current_health_exp() -> pd.DataFrame: + """Get current health expenditure data""" + return _get_indicator("ghed_current_health_expenditure") + + +def get_health_exp_by_source() -> pd.DataFrame: + """Get health expenditure by source data""" + return _get_indicator("ghed_current_health_expenditure_by_source") + + +def get_health_exp_by_function() -> pd.DataFrame: + """Get health expenditure by function data""" + return _get_indicator("ghed_current_health_expenditure_by_health_care_function") + + +def get_health_exp_by_disease() -> pd.DataFrame: + """Get health expenditure by disease data""" + return _get_indicator("ghed_current_health_expenditure_by_disease_and_conditions") + + +def get_health_exp_by_financing_scheme() -> pd.DataFrame: + """Get health expenditure by financing scheme data""" + return _get_indicator("ghed_current_health_expenditure_by_financing_schemes") + + +if __name__ == "__main__": + exp = get_current_health_exp() + + exp_source = get_health_exp_by_source() + + exp_function = get_health_exp_by_function() + + exp_disease = get_health_exp_by_disease() + + exp_scheme = get_health_exp_by_financing_scheme() diff --git a/scripts/read_data.py b/scripts/read_data.py new file mode 100644 index 0000000..3ca38df --- /dev/null +++ b/scripts/read_data.py @@ -0,0 +1,103 @@ +"""Read data from the Policy Database""" + +import logging +import os +from contextlib import contextmanager + +import pandas as pd +import pymongo + +CLUSTER = "gpdata" +DATABASE = "policy_data" +METADATA = "metadata" +COLLECTION_NAME = "ghed" + + +def check_credentials(username: str | None, password: str | None) -> tuple: + """check credentials, return from environment if not provided""" + + if username is None: + try: + username = os.environ["MONGO_USERNAME"] + except KeyError: + logging.critical("No username provided") + raise KeyError("No username provided") + + if password is None: + try: + password = os.environ["MONGO_PASSWORD"] + except KeyError: + logging.critical("No password provided") + raise KeyError("No password provided") + + return username, password + + +def get_client(username: str = None, password: str = None) -> pymongo.MongoClient: + """Context manager for MongoDB client.""" + + username, password = check_credentials(username, password) + return pymongo.MongoClient( + f"mongodb+srv://{username}:{password}@{CLUSTER}." + f"egoty6s.mongodb.net/?retryWrites=true&w=majority" + ) + + +class CollectionCursor: + """An object to connect to a data collection in the policy_data database + Parameters: + data_collection_name: name of the collection to connect to + """ + + def __init__(self, data_collection_name): + + self.client = None + self.database = None + self.metadata = None + self.data = None + self.data_collection_name = data_collection_name + + def connect(self, username: str = None, password: str = None) -> None: + """Connect to MongoDB database.""" + + self.client = get_client(username, password) + self.database = self.client[DATABASE] + self.metadata = self.database[METADATA] + + if self.data_collection_name in self.database.list_collection_names(): + self.data = self.database[self.data_collection_name] + logging.info(f"Connected to database.") + else: + logging.critical(f"Collection does not exist: {self.data_collection_name} ") + raise ValueError(f"Collection does not exist: {self.data_collection_name} ") + + def close(self): + """Close connection to MongoDB database.""" + self.client.close() + logging.info(f"Closed connection to database.") + + @contextmanager + def managed_connection(self, username: str = None, password: str = None): + """Context manager for MongoDB client.""" + try: + self.connect(username=username, password=password) + yield self + + finally: + self.close() + + +def get_indicator(cursor: CollectionCursor, indicator_code: str) -> pd.DataFrame: + """Get data for a given indicator code""" + + with cursor.managed_connection() as connection: + response = connection.data.find({"indicator_code": indicator_code}, {"_id": 0}) + return pd.DataFrame(list(response)) + + +if __name__ == "__main__": + sample_indicator = "ghed_current_health_expenditure" + + ghed_collection = CollectionCursor(data_collection_name=COLLECTION_NAME) + + data = get_indicator(cursor=ghed_collection, indicator_code=sample_indicator)