diff --git a/data_pipelines/rne/database/task_functions.py b/data_pipelines/rne/database/task_functions.py index 66aff8da..28c84367 100644 --- a/data_pipelines/rne/database/task_functions.py +++ b/data_pipelines/rne/database/task_functions.py @@ -1,9 +1,6 @@ from minio.error import S3Error - from datetime import datetime, timedelta import os -from minio import Minio -import sqlite3 import json import re import logging @@ -13,40 +10,27 @@ get_files, ) from dag_datalake_sirene.data_pipelines.rne.database.process_rne import ( - insert_record, - extract_dirigeants_data, create_tables, + inject_records_into_db, +) +from dag_datalake_sirene.data_pipelines.rne.database.db_connexion import ( + connect_to_db, ) -from dag_datalake_sirene.config import ( - AIRFLOW_DAG_TMP, +from dag_datalake_sirene.utils.tchap import send_message +from dag_datalake_sirene.data_pipelines.rne.database.vars import ( MINIO_URL, MINIO_BUCKET, MINIO_USER, MINIO_PASSWORD, + PATH_MINIO_RNE_DATA, + LATEST_DATE_FILE, + TMP_FOLDER, + MINIO_STOCK_DATA_PATH, + MINIO_FLUX_DATA_PATH, ) -from dag_datalake_sirene.utils.tchap import send_message - -PATH_MINIO_RNE_DATA = "rne/database/" -LATEST_DATE_FILE = "latest_rne_date.json" -MINIO_FLUX_DATA_PATH = "rne/flux/data/" -MINIO_STOCK_DATA_PATH = "rne/stock/data/" - -DAG_FOLDER = "datagouvfr_data_pipelines/data_processing/" -TMP_FOLDER = f"{AIRFLOW_DAG_TMP}rne/database/" - -client = Minio( - MINIO_URL, - access_key=MINIO_USER, - secret_key=MINIO_PASSWORD, - secure=True, -) - -yesterday = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d") - - -def get_start_date_minio(ti): +def get_start_date_minio(**kwargs): try: get_files( MINIO_URL=MINIO_URL, @@ -66,31 +50,36 @@ def get_start_date_minio(ti): with open(f"{TMP_FOLDER}/latest_rne_date.json") as fp: data = json.load(fp) - start_date = data["latest_date"] - dt_sd = datetime.strptime(start_date, "%Y-%m-%d") - start_date = datetime.strftime((dt_sd + timedelta(days=1)), "%Y-%m-%d") - ti.xcom_push(key="start_date", value=start_date) + previous_latest_date = data["latest_date"] + previous_latest_date = datetime.strptime(previous_latest_date, "%Y-%m-%d") + start_date = datetime.strftime( + (previous_latest_date + timedelta(days=1)), "%Y-%m-%d" + ) + kwargs["ti"].xcom_push(key="start_date", value=start_date) except S3Error as e: if e.code == "NoSuchKey": logging.info( f"The file {PATH_MINIO_RNE_DATA + LATEST_DATE_FILE} " f"does not exist in the bucket {MINIO_BUCKET}." ) - ti.xcom_push(key="start_date", value=None) + kwargs["ti"].xcom_push(key="start_date", value=None) else: raise Exception( f"An error occurred while trying to get latest date file: {e}" ) -def connect_to_db(database_location): - connection = sqlite3.connect(database_location) - cursor = connection.cursor() - return connection, cursor +def create_db_path(start_date): + """ + Create a database path for RNE data. + Args: + start_date (str): The start date for the RNE data. -def get_database_location(**kwargs): - start_date = kwargs["ti"].xcom_pull(key="start_date", task_ids="get_start_date") + Returns: + str or None: The database path if it doesn't already exist, otherwise None. + """ + # Only return path if start_date does not already exist if start_date: return None rne_database_location = TMP_FOLDER + f"rne_{start_date}.db" @@ -98,21 +87,40 @@ def get_database_location(**kwargs): def create_db(**kwargs): - rne_database_location = get_database_location(**kwargs) + """ + Create an RNE database, if it doesn't already exist. + + Args: + **kwargs: Keyword arguments including 'ti' (TaskInstance) for 'start_date'. + + Returns: + None: If the database already exists or couldn't be created. + """ + start_date = kwargs["ti"].xcom_pull(key="start_date", task_ids="get_start_date") - if rne_database_location is None: + rne_db_path = create_db_path(start_date) + kwargs["ti"].xcom_push(key="rne_db_path", value=rne_db_path) + logging.info(f"***********RNE database path: {rne_db_path}") + + if rne_db_path is None: return None - if os.path.exists(rne_database_location): - os.remove(rne_database_location) + if os.path.exists(rne_db_path): + os.remove(rne_db_path) - connection, cursor = connect_to_db(rne_database_location) + connection, cursor = connect_to_db(rne_db_path) create_tables(cursor) + connection.commit() connection.close() def get_latest_db(**kwargs): + """ + This function retrieves the RNE database file associated with the + provided start date from a Minio server and saves it to a + temporary folder for further processing. + """ start_date = kwargs["ti"].xcom_pull(key="start_date", task_ids="get_start_date") if start_date is not None: get_files( @@ -133,8 +141,7 @@ def get_latest_db(**kwargs): def process_stock_json_files(**kwargs): start_date = kwargs["ti"].xcom_pull(key="start_date", task_ids="get_start_date") - rne_database_location = get_database_location(**kwargs) - logging.info(f"^^^^^^^^^^^ rne database location : {rne_database_location}") + rne_db_path = kwargs["ti"].xcom_pull(key="rne_db_path", task_ids="create_db") # Only process stock files if a date doesn't already exist if start_date is not None: @@ -152,7 +159,7 @@ def process_stock_json_files(**kwargs): raise Exception("No RNE stock files found!!!") for file_path in json_stock_rne_files: - logging.info(f"Processing stock file: {file_path}...") + logging.info(f"*******Processing stock file: {file_path}...") get_files( MINIO_URL=MINIO_URL, MINIO_BUCKET=MINIO_BUCKET, @@ -167,16 +174,17 @@ def process_stock_json_files(**kwargs): } ], ) - inject_stock_records_into_database(file_path, rne_database_location) + inject_records_into_db(file_path, rne_db_path, file_type="stock") logging.info( f"File {file_path} processed and stock records injected into the database." ) + os.remove(file_path) + logging.info(f"******Removed file: {file_path}") def process_flux_json_files(**kwargs): start_date = kwargs["ti"].xcom_pull(key="start_date", task_ids="get_start_date") - rne_database_location = get_database_location(**kwargs) - logging.info(f"^^^^^^^^^^^ rne database location : {rne_database_location}") + rne_db_path = kwargs["ti"].xcom_pull(key="rne_db_path", task_ids="create_db") json_daily_flux_files = get_files_from_prefix( MINIO_URL=MINIO_URL, @@ -192,7 +200,7 @@ def process_flux_json_files(**kwargs): if start_date is None: start_date = "0000-00-00" - for file_path in sorted(json_daily_flux_files, reverse=True): + for file_path in sorted(json_daily_flux_files, reverse=False): date_match = re.search(r"rne_flux_(\d{4}-\d{2}-\d{2})", file_path) if date_match: file_date = date_match.group(1) @@ -213,7 +221,7 @@ def process_flux_json_files(**kwargs): ], ) json_path = f"{TMP_FOLDER}rne_flux_{file_date}.json" - inject_records_into_database(json_path, rne_database_location) + inject_records_into_db(json_path, rne_db_path, file_type="flux") logging.info( f"File {json_path} processed and" " records injected into the database." @@ -225,46 +233,11 @@ def process_flux_json_files(**kwargs): re.findall(r"rne_flux_(\d{4}-\d{2}-\d{2})", " ".join(json_daily_flux_files)) ) if dates: - last_date = dates[-1] - logging.info(f"***** Last date saved: {last_date}") + last_date_processed = dates[-1] + logging.info(f"***** Last date saved: {last_date_processed}") else: - last_date = None - kwargs["ti"].xcom_push(key="last_date", value=last_date) - - -def inject_stock_records_into_database(file_path, db_path): - with open(file_path, "r") as file: - logging.info(f"Processing stock file: {file_path}") - try: - json_data = file.read() - data = json.loads(json_data) - logging.info(f"/////////{data[0]}") - for record in data: - list_dirigeants_pp, list_dirigeants_pm = extract_dirigeants_data( - record, file_type="stock" - ) - insert_record( - list_dirigeants_pp, list_dirigeants_pm, file_path, db_path - ) - except json.JSONDecodeError as e: - raise Exception(f"JSONDecodeError: {e} in file {file_path}") - - -def inject_records_into_database(file_path, db_path): - with open(file_path, "r") as file: - logging.info(f"Processing flux file: {file_path}") - for line in file: - try: - data = json.loads(line) - for record in data: - list_dirigeants_pp, list_dirigeants_pm = extract_dirigeants_data( - record, file_type="flux" - ) - insert_record( - list_dirigeants_pp, list_dirigeants_pm, file_path, db_path - ) - except json.JSONDecodeError as e: - raise Exception(f"JSONDecodeError: {e} in file {file_path}") + last_date_processed = None + kwargs["ti"].xcom_push(key="last_date_processed", value=last_date_processed) def send_to_minio(list_files): @@ -278,23 +251,28 @@ def send_to_minio(list_files): def upload_db_to_minio(ti): - start_date = ti.xcom_pull(key="start_date", task_ids="get_start_date") + last_date_processed = ti.xcom_pull( + key="last_date_processed", task_ids="process_flux_json_files" + ) send_to_minio( [ { "source_path": TMP_FOLDER, - "source_name": f"rne_{start_date}.db", + "source_name": f"rne_{last_date_processed}.db", "dest_path": PATH_MINIO_RNE_DATA, - "dest_name": f"rne_{start_date}.db", + "dest_name": f"rne_{last_date_processed}.db", } ] ) def upload_latest_date_rne_minio(ti): - start_date = ti.xcom_pull(key="start_date", task_ids="get_start_date") - start_date = datetime.strptime(start_date, "%Y-%m-%d") - latest_date = (start_date + timedelta(days=1)).strftime("%Y-%m-%d") + """Start date saved is the next day""" + last_date_processed = ti.xcom_pull( + key="last_date_processed", task_ids="process_flux_json_files" + ) + last_date_processed = datetime.strptime(last_date_processed, "%Y-%m-%d") + latest_date = (last_date_processed + timedelta(days=1)).strftime("%Y-%m-%d") data = {} data["latest_date"] = latest_date with open(TMP_FOLDER + "latest_rne_date.json", "w") as write_file: @@ -313,10 +291,13 @@ def upload_latest_date_rne_minio(ti): ti.xcom_push(key="latest_date", value=latest_date) -def notification_mattermost(ti): +def notification_tchap(ti): start_date = ti.xcom_pull(key="start_date", task_ids="get_start_date") - end_date = ti.xcom_pull(key="end_date", task_ids="upload_latest_date_rne_minio") + last_date_processed = ti.xcom_pull( + key="last_date_processed", task_ids="process_flux_json_files" + ) send_message( - f"Données RNE traitées de {start_date} à {end_date} sur Minio " + f"Données RNE traitées de {start_date} à {last_date_processed} " + "et stockées sur la base de données sur Minio " f"- Bucket {MINIO_BUCKET}", )