From 51581978ac41320be09a9a45f29e6dc12d530bf6 Mon Sep 17 00:00:00 2001 From: Hajar AIT EL KADI Date: Thu, 26 Oct 2023 22:38:33 +0200 Subject: [PATCH] feat: add tchap failure and success notifications --- DAG-insert-elk-sirene.py | 11 +++++++++++ task_functions/send_notification.py | 16 ++++++++++++++++ 2 files changed, 27 insertions(+) create mode 100644 task_functions/send_notification.py diff --git a/DAG-insert-elk-sirene.py b/DAG-insert-elk-sirene.py index eea8f95b..0f8634ba 100755 --- a/DAG-insert-elk-sirene.py +++ b/DAG-insert-elk-sirene.py @@ -63,6 +63,10 @@ from dag_datalake_sirene.task_functions.replace_unite_legale_table import ( replace_unite_legale_table, ) +from dag_datalake_sirene.task_functions.send_notification import ( + send_notification_success_tchap, + send_notification_failure_tchap, +) from dag_datalake_sirene.task_functions.update_color_file import update_color_file from dag_datalake_sirene.task_functions.update_sitemap import update_sitemap from dag_datalake_sirene.tests.e2e_tests.run_tests import run_e2e_tests @@ -97,6 +101,7 @@ dagrun_timeout=timedelta(minutes=60 * 15), tags=["siren"], catchup=False, # False to ignore past runs + on_failure_callback=send_notification_failure_tchap, max_active_runs=1, ) as dag: get_colors = PythonOperator( @@ -340,6 +345,11 @@ dag=dag, ) + send_notification_tchap = PythonOperator( + task_id="send_notification_tchap", + python_callable=send_notification_success_tchap, + ) + clean_previous_folder.set_upstream(get_colors) create_sqlite_database.set_upstream(clean_previous_folder) @@ -386,3 +396,4 @@ send_email.set_upstream(flush_cache) send_email.set_upstream(update_sitemap) + send_notification_tchap.set_upstream(send_email) diff --git a/task_functions/send_notification.py b/task_functions/send_notification.py new file mode 100644 index 00000000..2aed01a4 --- /dev/null +++ b/task_functions/send_notification.py @@ -0,0 +1,16 @@ +from dag_datalake_sirene.utils.tchap import send_message + + +def send_notification_success_tchap(**kwargs): + doc_count = kwargs["ti"].xcom_pull( + key="doc_count", task_ids="fill_elastic_siren_index" + ) + send_message( + f"\U0001F7E2 Données :" + f"\nDAG d'indexation a été exécuté avec succès." + f"\n - Nombre de documents indexés : {doc_count}" + ) + + +def send_notification_failure_tchap(context): + send_message("\U0001F534 Données :" "\nFail DAG d'indexation!!!!")