Skip to content

Commit

Permalink
refactor(finess): clean preprocessing method in finess processor
Browse files Browse the repository at this point in the history
  • Loading branch information
HAEKADI committed Nov 27, 2024
1 parent f1c3e02 commit 5e165d9
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions workflows/data_pipelines/finess/finess_processor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import pandas as pd
import logging
import requests
from dag_datalake_sirene.helpers import Notification, DataProcessor
from dag_datalake_sirene.workflows.data_pipelines.finess.config import FINESS_CONFIG
from airflow.operators.python import get_current_context
Expand All @@ -11,21 +10,18 @@ def __init__(self):
super().__init__(FINESS_CONFIG)

def preprocess_data(self):
r = requests.get(self.config.url)
with open(f"{self.config.tmp_folder}/finess-download.csv", "wb") as f:
for chunk in r.iter_content(1024):
f.write(chunk)
destination_path = f"{self.config.tmp_folder}/finess-download.csv"
self.download_data(destination_path)

df_finess = pd.read_csv(
f"{self.config.tmp_folder}/finess-download.csv",
destination_path,
dtype=str,
sep=";",
encoding="Latin-1",
skiprows=1,
header=None,
)
df_finess = df_finess[[1, 18, 22]]
df_finess = df_finess.rename(
df_finess = df_finess[[1, 18, 22]].rename(
columns={1: "finess", 18: "cat_etablissement", 22: "siret"}
)
df_finess = df_finess[df_finess["siret"].notna()]
Expand All @@ -38,15 +34,17 @@ def preprocess_data(self):
df_list_finess["liste_finess"] = df_list_finess["liste_finess"].astype(str)
df_list_finess.to_csv(f"{self.config.tmp_folder}/finess.csv", index=False)

unique_count = df_list_finess["siret"].nunique()
ti = get_current_context()["ti"]
ti.xcom_push(key="nb_siret_finess", value=str(unique_count))

logging.info(f"Processed {unique_count} unique SIRET values.")
self._push_unique_siret_count(df_list_finess)

del df_finess
del df_list_finess

def _push_unique_siret_count(self, df_finess):
unique_count = df_finess["siret"].nunique()
ti = get_current_context()["ti"]
ti.xcom_push(key="nb_siret_finess", value=str(unique_count))
logging.info(f"Processed {unique_count} unique SIRET values.")

def send_file_to_minio(self):
super().send_file_to_minio()
ti = get_current_context()["ti"]
Expand Down

0 comments on commit 5e165d9

Please sign in to comment.