Skip to content

Commit

Permalink
feat: add task to check diriegants tables count
Browse files Browse the repository at this point in the history
  • Loading branch information
Hajar AIT EL KADI committed Nov 10, 2023
1 parent c376ce4 commit 2cdf04d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 1 deletion.
7 changes: 6 additions & 1 deletion data_pipelines/rne/database/DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dag_datalake_sirene.data_pipelines.rne.database.task_functions import (
get_start_date_minio,
get_latest_db,
check_db_count,
create_db,
process_flux_json_files,
process_stock_json_files,
Expand Down Expand Up @@ -53,6 +54,9 @@
process_flux_json_files = PythonOperator(
task_id="process_flux_json_files", python_callable=process_flux_json_files
)
check_db_count = PythonOperator(
task_id="check_db_count", python_callable=check_db_count
)
upload_db_to_minio = PythonOperator(
task_id="upload_db_to_minio", python_callable=upload_db_to_minio
)
Expand All @@ -69,6 +73,7 @@
get_latest_db.set_upstream(create_db)
process_stock_json_files.set_upstream(get_latest_db)
process_flux_json_files.set_upstream(process_stock_json_files)
upload_db_to_minio.set_upstream(process_flux_json_files)
check_db_count.set_upstream(process_flux_json_files)
upload_db_to_minio.set_upstream(check_db_count)
upload_latest_date_rne_minio.set_upstream(upload_db_to_minio)
notification_tchap.set_upstream(upload_latest_date_rne_minio)
12 changes: 12 additions & 0 deletions data_pipelines/rne/database/process_rne.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,18 @@ def insert_dirigeants_into_db(
connection.close()


def get_tables_count(db_path):
connection, cursor = connect_to_db(db_path)
cursor.execute("SELECT COUNT(*) FROM dirigeants_pp")
count_pp = cursor.fetchone()[0]

cursor.execute("SELECT COUNT(*) FROM dirigeants_pm")
count_pm = cursor.fetchone()[0]

connection.close()
return count_pp, count_pm


def get_company_data_from_stock(entity):
siren = entity.get("siren")
date_maj = entity.get("updatedAt")
Expand Down
17 changes: 17 additions & 0 deletions data_pipelines/rne/database/task_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)
from dag_datalake_sirene.data_pipelines.rne.database.process_rne import (
create_tables,
get_tables_count,
inject_records_into_db,
)
from dag_datalake_sirene.data_pipelines.rne.database.db_connexion import (
Expand Down Expand Up @@ -240,6 +241,22 @@ def process_flux_json_files(**kwargs):
kwargs["ti"].xcom_push(key="last_date_processed", value=last_date_processed)


def check_db_count(ti, min_pp_table_count=12000000, min_pm_table_count=1000000):
try:
rne_db_path = ti.xcom_pull(key="rne_db_path", task_ids="create_db")
count_pp, count_pm = get_tables_count(rne_db_path)

if count_pp < min_pp_table_count or count_pm < min_pm_table_count:
raise Exception(
f"Counts below the minimum threshold: "
f"count pp : {count_pp}"
f"count pm : {count_pm}"
)

except Exception as e:
raise Exception(f"An error occurred: {e}")


def send_to_minio(list_files):
send_files(
MINIO_URL=MINIO_URL,
Expand Down

0 comments on commit 2cdf04d

Please sign in to comment.