-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #7 from ONEcampaign/1-establish-a-connection-to-th…
…e-data reading data and indicator functions
- Loading branch information
Showing
3 changed files
with
150 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
pymongo | ||
pandas |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
from functools import partial | ||
|
||
import pandas as pd | ||
|
||
from scripts.read_data import get_indicator, CollectionCursor, COLLECTION_NAME | ||
|
||
# Create a partial function that will always use the same collection cursor. | ||
_get_indicator = partial(get_indicator, CollectionCursor(COLLECTION_NAME)) | ||
|
||
|
||
def get_current_health_exp() -> pd.DataFrame: | ||
"""Get current health expenditure data""" | ||
return _get_indicator("ghed_current_health_expenditure") | ||
|
||
|
||
def get_health_exp_by_source() -> pd.DataFrame: | ||
"""Get health expenditure by source data""" | ||
return _get_indicator("ghed_current_health_expenditure_by_source") | ||
|
||
|
||
def get_health_exp_by_function() -> pd.DataFrame: | ||
"""Get health expenditure by function data""" | ||
return _get_indicator("ghed_current_health_expenditure_by_health_care_function") | ||
|
||
|
||
def get_health_exp_by_disease() -> pd.DataFrame: | ||
"""Get health expenditure by disease data""" | ||
return _get_indicator("ghed_current_health_expenditure_by_disease_and_conditions") | ||
|
||
|
||
def get_health_exp_by_financing_scheme() -> pd.DataFrame: | ||
"""Get health expenditure by financing scheme data""" | ||
return _get_indicator("ghed_current_health_expenditure_by_financing_schemes") | ||
|
||
|
||
if __name__ == "__main__": | ||
exp = get_current_health_exp() | ||
|
||
exp_source = get_health_exp_by_source() | ||
|
||
exp_function = get_health_exp_by_function() | ||
|
||
exp_disease = get_health_exp_by_disease() | ||
|
||
exp_scheme = get_health_exp_by_financing_scheme() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
"""Read data from the Policy Database""" | ||
|
||
import logging | ||
import os | ||
from contextlib import contextmanager | ||
|
||
import pandas as pd | ||
import pymongo | ||
|
||
CLUSTER = "gpdata" | ||
DATABASE = "policy_data" | ||
METADATA = "metadata" | ||
COLLECTION_NAME = "ghed" | ||
|
||
|
||
def check_credentials(username: str | None, password: str | None) -> tuple: | ||
"""check credentials, return from environment if not provided""" | ||
|
||
if username is None: | ||
try: | ||
username = os.environ["MONGO_USERNAME"] | ||
except KeyError: | ||
logging.critical("No username provided") | ||
raise KeyError("No username provided") | ||
|
||
if password is None: | ||
try: | ||
password = os.environ["MONGO_PASSWORD"] | ||
except KeyError: | ||
logging.critical("No password provided") | ||
raise KeyError("No password provided") | ||
|
||
return username, password | ||
|
||
|
||
def get_client(username: str = None, password: str = None) -> pymongo.MongoClient: | ||
"""Context manager for MongoDB client.""" | ||
|
||
username, password = check_credentials(username, password) | ||
return pymongo.MongoClient( | ||
f"mongodb+srv://{username}:{password}@{CLUSTER}." | ||
f"egoty6s.mongodb.net/?retryWrites=true&w=majority" | ||
) | ||
|
||
|
||
class CollectionCursor: | ||
"""An object to connect to a data collection in the policy_data database | ||
Parameters: | ||
data_collection_name: name of the collection to connect to | ||
""" | ||
|
||
def __init__(self, data_collection_name): | ||
|
||
self.client = None | ||
self.database = None | ||
self.metadata = None | ||
self.data = None | ||
self.data_collection_name = data_collection_name | ||
|
||
def connect(self, username: str = None, password: str = None) -> None: | ||
"""Connect to MongoDB database.""" | ||
|
||
self.client = get_client(username, password) | ||
self.database = self.client[DATABASE] | ||
self.metadata = self.database[METADATA] | ||
|
||
if self.data_collection_name in self.database.list_collection_names(): | ||
self.data = self.database[self.data_collection_name] | ||
logging.info(f"Connected to database.") | ||
else: | ||
logging.critical(f"Collection does not exist: {self.data_collection_name} ") | ||
raise ValueError(f"Collection does not exist: {self.data_collection_name} ") | ||
|
||
def close(self): | ||
"""Close connection to MongoDB database.""" | ||
self.client.close() | ||
logging.info(f"Closed connection to database.") | ||
|
||
@contextmanager | ||
def managed_connection(self, username: str = None, password: str = None): | ||
"""Context manager for MongoDB client.""" | ||
try: | ||
self.connect(username=username, password=password) | ||
yield self | ||
|
||
finally: | ||
self.close() | ||
|
||
|
||
def get_indicator(cursor: CollectionCursor, indicator_code: str) -> pd.DataFrame: | ||
"""Get data for a given indicator code""" | ||
|
||
with cursor.managed_connection() as connection: | ||
response = connection.data.find({"indicator_code": indicator_code}, {"_id": 0}) | ||
return pd.DataFrame(list(response)) | ||
|
||
|
||
if __name__ == "__main__": | ||
sample_indicator = "ghed_current_health_expenditure" | ||
|
||
ghed_collection = CollectionCursor(data_collection_name=COLLECTION_NAME) | ||
|
||
data = get_indicator(cursor=ghed_collection, indicator_code=sample_indicator) |