From 2cdf04dcdc7f167379566a8ecdb3521f66b929cc Mon Sep 17 00:00:00 2001 From: Hajar AIT EL KADI Date: Fri, 10 Nov 2023 11:23:48 +0100 Subject: [PATCH] feat: add task to check diriegants tables count --- data_pipelines/rne/database/DAG.py | 7 ++++++- data_pipelines/rne/database/process_rne.py | 12 ++++++++++++ data_pipelines/rne/database/task_functions.py | 17 +++++++++++++++++ 3 files changed, 35 insertions(+), 1 deletion(-) diff --git a/data_pipelines/rne/database/DAG.py b/data_pipelines/rne/database/DAG.py index f8065082..cb516f34 100644 --- a/data_pipelines/rne/database/DAG.py +++ b/data_pipelines/rne/database/DAG.py @@ -8,6 +8,7 @@ from dag_datalake_sirene.data_pipelines.rne.database.task_functions import ( get_start_date_minio, get_latest_db, + check_db_count, create_db, process_flux_json_files, process_stock_json_files, @@ -53,6 +54,9 @@ process_flux_json_files = PythonOperator( task_id="process_flux_json_files", python_callable=process_flux_json_files ) + check_db_count = PythonOperator( + task_id="check_db_count", python_callable=check_db_count + ) upload_db_to_minio = PythonOperator( task_id="upload_db_to_minio", python_callable=upload_db_to_minio ) @@ -69,6 +73,7 @@ get_latest_db.set_upstream(create_db) process_stock_json_files.set_upstream(get_latest_db) process_flux_json_files.set_upstream(process_stock_json_files) - upload_db_to_minio.set_upstream(process_flux_json_files) + check_db_count.set_upstream(process_flux_json_files) + upload_db_to_minio.set_upstream(check_db_count) upload_latest_date_rne_minio.set_upstream(upload_db_to_minio) notification_tchap.set_upstream(upload_latest_date_rne_minio) diff --git a/data_pipelines/rne/database/process_rne.py b/data_pipelines/rne/database/process_rne.py index 774007bd..a24a86ef 100644 --- a/data_pipelines/rne/database/process_rne.py +++ b/data_pipelines/rne/database/process_rne.py @@ -200,6 +200,18 @@ def insert_dirigeants_into_db( connection.close() +def get_tables_count(db_path): + connection, cursor = connect_to_db(db_path) + cursor.execute("SELECT COUNT(*) FROM dirigeants_pp") + count_pp = cursor.fetchone()[0] + + cursor.execute("SELECT COUNT(*) FROM dirigeants_pm") + count_pm = cursor.fetchone()[0] + + connection.close() + return count_pp, count_pm + + def get_company_data_from_stock(entity): siren = entity.get("siren") date_maj = entity.get("updatedAt") diff --git a/data_pipelines/rne/database/task_functions.py b/data_pipelines/rne/database/task_functions.py index 44891c68..b467afd8 100644 --- a/data_pipelines/rne/database/task_functions.py +++ b/data_pipelines/rne/database/task_functions.py @@ -11,6 +11,7 @@ ) from dag_datalake_sirene.data_pipelines.rne.database.process_rne import ( create_tables, + get_tables_count, inject_records_into_db, ) from dag_datalake_sirene.data_pipelines.rne.database.db_connexion import ( @@ -240,6 +241,22 @@ def process_flux_json_files(**kwargs): kwargs["ti"].xcom_push(key="last_date_processed", value=last_date_processed) +def check_db_count(ti, min_pp_table_count=12000000, min_pm_table_count=1000000): + try: + rne_db_path = ti.xcom_pull(key="rne_db_path", task_ids="create_db") + count_pp, count_pm = get_tables_count(rne_db_path) + + if count_pp < min_pp_table_count or count_pm < min_pm_table_count: + raise Exception( + f"Counts below the minimum threshold: " + f"count pp : {count_pp}" + f"count pm : {count_pm}" + ) + + except Exception as e: + raise Exception(f"An error occurred: {e}") + + def send_to_minio(list_files): send_files( MINIO_URL=MINIO_URL,