Skip to content

Commit

Permalink
refactor(colter): refactor dag with DataProcessor
Browse files Browse the repository at this point in the history
The original DAG is split into two to avoid changing the DataProcessor paradigm
which should produce only one output file.

The `data_processing_collectivite_territoriale` triggers the
`data_processing_collectivite_territoriale_elus` regardless of the status
of its tasks.
  • Loading branch information
hacherix committed Dec 31, 2024
1 parent 1969e83 commit b297238
Show file tree
Hide file tree
Showing 8 changed files with 447 additions and 489 deletions.
42 changes: 0 additions & 42 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ class DataSourceConfig:
MARCHE_INCLUSION_TMP_FOLDER = f"{AIRFLOW_DAG_TMP}marche_inclusion/"
AGENCE_BIO_TMP_FOLDER = f"{AIRFLOW_DAG_TMP}agence_bio/"
BILANS_FINANCIERS_TMP_FOLDER = f"{AIRFLOW_DAG_TMP}bilans_financiers/"
COLTER_TMP_FOLDER = f"{AIRFLOW_DAG_TMP}colter/"
CC_TMP_FOLDER = f"{AIRFLOW_DAG_TMP}convention_collective/"
MINIO_DATA_SOURCE_UPDATE_DATES_FILE = "data_source_updates.json"

Expand Down Expand Up @@ -171,47 +170,6 @@ class DataSourceConfig:
f"https://object.files.data.gouv.fr/opendata/ae/{AIRFLOW_ENV}/bilans_financiers"
"/latest/metadata.json"
)
RESOURCE_ID_COLTER_REGIONS = "619ee62e-8f9e-4c62-b166-abc6f2b86201"
URL_COLTER_REGIONS = (
f"https://www.data.gouv.fr/fr/datasets/r/{RESOURCE_ID_COLTER_REGIONS}"
)
RESOURCE_ID_COLTER_DEP = "2f4f901d-e3ce-4760-b122-56a311340fc4"
URL_COLTER_DEP = f"https://www.data.gouv.fr/fr/datasets/r/{RESOURCE_ID_COLTER_DEP}"
RESOURCE_ID_COLTER_COMMUNES = "42b16d68-958e-4518-8551-93e095fe8fda"
URL_COLTER_COMMUNES = (
f"https://www.data.gouv.fr/fr/datasets/r/{RESOURCE_ID_COLTER_COMMUNES}"
)
RESOURCE_ID_ELUS_EPCI = "41d95d7d-b172-4636-ac44-32656367cdc7"
URL_ELUS_EPCI = f"https://www.data.gouv.fr/fr/datasets/r/{RESOURCE_ID_ELUS_EPCI}"
RESOURCE_ID_CONSEILLERS_REGIONAUX = "430e13f9-834b-4411-a1a8-da0b4b6e715c"
URL_CONSEILLERS_REGIONAUX = (
f"https://www.data.gouv.fr/fr/datasets/r/{RESOURCE_ID_CONSEILLERS_REGIONAUX}"
)
RESOURCE_ID_CONSEILLERS_DEPARTEMENTAUX = "601ef073-d986-4582-8e1a-ed14dc857fba"
URL_CONSEILLERS_DEPARTEMENTAUX = (
f"https://www.data.gouv.fr/fr/datasets/r/{RESOURCE_ID_CONSEILLERS_DEPARTEMENTAUX}"
)
RESOURCE_ID_CONSEILLERS_MUNICIPAUX = "d5f400de-ae3f-4966-8cb6-a85c70c6c24a"
URL_CONSEILLERS_MUNICIPAUX = (
f"https://www.data.gouv.fr/fr/datasets/r/{RESOURCE_ID_CONSEILLERS_MUNICIPAUX}"
)
RESOURCE_ID_ASSEMBLEE_COL_STATUT_PARTICULIER = "a595be27-cfab-4810-b9d4-22e193bffe35"
URL_ASSEMBLEE_COL_STATUT_PARTICULIER = (
"https://www.data.gouv.fr/fr/datasets/"
f"r/{RESOURCE_ID_ASSEMBLEE_COL_STATUT_PARTICULIER}"
)
URL_MINIO_COLTER = (
f"https://object.files.data.gouv.fr/opendata/ae/{AIRFLOW_ENV}/colter"
"/latest/colter.csv"
)
URL_MINIO_COLTER_METADATA = (
f"https://object.files.data.gouv.fr/opendata/ae/{AIRFLOW_ENV}/colter"
"/latest/metadata.json"
)
URL_MINIO_ELUS = (
f"https://object.files.data.gouv.fr/opendata/ae/{AIRFLOW_ENV}/colter"
"/latest/elus.csv"
)
RESOURCE_ID_CONVENTION_COLLECTIVE = "a22e54f7-b937-4483-9a72-aad2ea1316f1"
URL_CONVENTION_COLLECTIVE = (
f"https://www.data.gouv.fr/fr/datasets/r/{RESOURCE_ID_CONVENTION_COLLECTIVE}"
Expand Down
72 changes: 0 additions & 72 deletions workflows/data_pipelines/colter/DAG.py

This file was deleted.

63 changes: 63 additions & 0 deletions workflows/data_pipelines/colter/colter_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from dag_datalake_sirene.config import (
DataSourceConfig,
MINIO_BASE_URL,
DATA_GOUV_BASE_URL,
)


COLTER_CONFIG = DataSourceConfig(
name="colter",
tmp_folder=f"{DataSourceConfig.base_tmp_folder}/colter",
minio_path="colter",
file_name="colter",
files_to_download={
"colter_regions": {
"url": f"{DATA_GOUV_BASE_URL}619ee62e-8f9e-4c62-b166-abc6f2b86201",
"resource_id": "619ee62e-8f9e-4c62-b166-abc6f2b86201",
},
"colter_deps": {
"url": f"{DATA_GOUV_BASE_URL}2f4f901d-e3ce-4760-b122-56a311340fc4",
"resource_id": "2f4f901d-e3ce-4760-b122-56a311340fc4",
},
"colter_communes": {
"url": f"{DATA_GOUV_BASE_URL}42b16d68-958e-4518-8551-93e095fe8fda",
"resource_id": "42b16d68-958e-4518-8551-93e095fe8fda",
},
},
url_minio=f"{MINIO_BASE_URL}colter/latest/colter.csv",
url_minio_metadata=f"{MINIO_BASE_URL}colter/latest/metadata.json",
file_output=f"{DataSourceConfig.base_tmp_folder}/colter/colter.csv",
)


ELUS_CONFIG = DataSourceConfig(
name="colter_elus",
tmp_folder=f"{DataSourceConfig.base_tmp_folder}/colter_elus",
minio_path="colter_elus",
file_name="elus",
files_to_download={
"assemblee_col_statut_particulier": {
"url": f"{DATA_GOUV_BASE_URL}a595be27-cfab-4810-b9d4-22e193bffe35",
"resource_id": "a595be27-cfab-4810-b9d4-22e193bffe35",
},
"conseillers_regionaux": {
"url": f"{DATA_GOUV_BASE_URL}430e13f9-834b-4411-a1a8-da0b4b6e715c",
"resource_id": "430e13f9-834b-4411-a1a8-da0b4b6e715c",
},
"conseillers_departementaux": {
"url": f"{DATA_GOUV_BASE_URL}601ef073-d986-4582-8e1a-ed14dc857fba",
"resource_id": "601ef073-d986-4582-8e1a-ed14dc857fba",
},
"conseillers_municipaux": {
"url": f"{DATA_GOUV_BASE_URL}d5f400de-ae3f-4966-8cb6-a85c70c6c24a",
"resource_id": "d5f400de-ae3f-4966-8cb6-a85c70c6c24a",
},
"elus_epci": {
"url": f"{DATA_GOUV_BASE_URL}41d95d7d-b172-4636-ac44-32656367cdc7",
"resource_id": "41d95d7d-b172-4636-ac44-32656367cdc7",
},
},
url_minio=f"{MINIO_BASE_URL}colter_elus/latest/elus.csv",
url_minio_metadata=f"{MINIO_BASE_URL}colter_elus/latest/metadata.json",
file_output=f"{DataSourceConfig.base_tmp_folder}/colter_elus/elus.csv",
)
118 changes: 118 additions & 0 deletions workflows/data_pipelines/colter/colter_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from datetime import timedelta

from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.datasets import Dataset

from dag_datalake_sirene.config import EMAIL_LIST
from dag_datalake_sirene.helpers import Notification
from dag_datalake_sirene.workflows.data_pipelines.colter.colter_config import (
COLTER_CONFIG,
ELUS_CONFIG,
)
from dag_datalake_sirene.workflows.data_pipelines.colter.colter_processor import (
ColterProcessor,
ElusProcessor,
)

default_args = {
"depends_on_past": False,
"email_on_failure": True,
"email_on_retry": False,
"email": EMAIL_LIST,
"retries": 1,
}

dataset_colter = Dataset(COLTER_CONFIG.name)


@dag(
tags=["collectivités", "communes", "régions", "départements"],
default_args=default_args,
schedule_interval="0 16 * * *",
start_date=days_ago(8),
dagrun_timeout=timedelta(minutes=60),
on_failure_callback=Notification.send_notification_tchap,
on_success_callback=Notification.send_notification_tchap,
params={},
catchup=False,
)
def data_processing_collectivite_territoriale():
colter_processor = ColterProcessor()

@task.bash
def clean_previous_outputs():
return (
f"rm -rf {COLTER_CONFIG.tmp_folder} && mkdir -p {COLTER_CONFIG.tmp_folder}"
)

@task()
def preprocess_data():
return colter_processor.preprocess_data()

@task
def save_date_last_modified():
return colter_processor.save_date_last_modified()

@task
def send_file_to_minio():
return colter_processor.send_file_to_minio()

@task
def compare_files_minio(outlets=[dataset_colter]):
return colter_processor.compare_files_minio()

(
clean_previous_outputs()
>> preprocess_data()
>> save_date_last_modified()
>> send_file_to_minio()
>> compare_files_minio()
)


@dag(
tags=["collectivités", "élus", "conseillers", "epci"],
default_args=default_args,
schedule=[dataset_colter],
start_date=days_ago(8),
dagrun_timeout=timedelta(minutes=60),
on_failure_callback=Notification.send_notification_tchap,
on_success_callback=Notification.send_notification_tchap,
params={},
catchup=False,
)
def data_processing_collectivite_territoriale_elus():
elus_processor = ElusProcessor()

@task.bash
def clean_previous_outputs():
return f"rm -rf {ELUS_CONFIG.tmp_folder} && mkdir -p {ELUS_CONFIG.tmp_folder}"

@task
def preprocess_data():
return elus_processor.preprocess_data()

@task
def save_date_last_modified():
return elus_processor.save_date_last_modified()

@task
def send_file_to_minio():
return elus_processor.send_file_to_minio()

@task
def compare_files_minio():
return elus_processor.compare_files_minio()

(
clean_previous_outputs()
>> preprocess_data()
>> save_date_last_modified()
>> send_file_to_minio()
>> compare_files_minio()
)


data_processing_collectivite_territoriale()
data_processing_collectivite_territoriale_elus()
Loading

0 comments on commit b297238

Please sign in to comment.