From 5e165d942e114a4235f0a1dd6a285c263685650c Mon Sep 17 00:00:00 2001 From: HAEKADI Date: Tue, 26 Nov 2024 20:22:31 +0100 Subject: [PATCH] refactor(finess): clean preprocessing method in finess processor --- .../data_pipelines/finess/finess_processor.py | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/workflows/data_pipelines/finess/finess_processor.py b/workflows/data_pipelines/finess/finess_processor.py index 77ffaf7f..40e46852 100644 --- a/workflows/data_pipelines/finess/finess_processor.py +++ b/workflows/data_pipelines/finess/finess_processor.py @@ -1,6 +1,5 @@ import pandas as pd import logging -import requests from dag_datalake_sirene.helpers import Notification, DataProcessor from dag_datalake_sirene.workflows.data_pipelines.finess.config import FINESS_CONFIG from airflow.operators.python import get_current_context @@ -11,21 +10,18 @@ def __init__(self): super().__init__(FINESS_CONFIG) def preprocess_data(self): - r = requests.get(self.config.url) - with open(f"{self.config.tmp_folder}/finess-download.csv", "wb") as f: - for chunk in r.iter_content(1024): - f.write(chunk) + destination_path = f"{self.config.tmp_folder}/finess-download.csv" + self.download_data(destination_path) df_finess = pd.read_csv( - f"{self.config.tmp_folder}/finess-download.csv", + destination_path, dtype=str, sep=";", encoding="Latin-1", skiprows=1, header=None, ) - df_finess = df_finess[[1, 18, 22]] - df_finess = df_finess.rename( + df_finess = df_finess[[1, 18, 22]].rename( columns={1: "finess", 18: "cat_etablissement", 22: "siret"} ) df_finess = df_finess[df_finess["siret"].notna()] @@ -38,15 +34,17 @@ def preprocess_data(self): df_list_finess["liste_finess"] = df_list_finess["liste_finess"].astype(str) df_list_finess.to_csv(f"{self.config.tmp_folder}/finess.csv", index=False) - unique_count = df_list_finess["siret"].nunique() - ti = get_current_context()["ti"] - ti.xcom_push(key="nb_siret_finess", value=str(unique_count)) - - logging.info(f"Processed {unique_count} unique SIRET values.") + self._push_unique_siret_count(df_list_finess) del df_finess del df_list_finess + def _push_unique_siret_count(self, df_finess): + unique_count = df_finess["siret"].nunique() + ti = get_current_context()["ti"] + ti.xcom_push(key="nb_siret_finess", value=str(unique_count)) + logging.info(f"Processed {unique_count} unique SIRET values.") + def send_file_to_minio(self): super().send_file_to_minio() ti = get_current_context()["ti"]