Skip to content

Commit

Permalink
feat: start execution based on last saved json file in minio
Browse files Browse the repository at this point in the history
  • Loading branch information
Hajar AIT EL KADI committed Nov 16, 2023
1 parent 8d61a1c commit da0e433
Showing 1 changed file with 69 additions and 11 deletions.
80 changes: 69 additions & 11 deletions data_pipelines/rne/flux/flux_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
from dag_datalake_sirene.utils.tchap import send_message
from dag_datalake_sirene.utils.minio_helpers import (
get_files_from_prefix,
get_object_minio,
send_files,
)
from dag_datalake_sirene.data_pipelines.rne.flux.rne_api import ApiRNEClient
from dag_datalake_sirene.config import (
AIRFLOW_DAG_TMP,
AIRFLOW_ENV,
MINIO_URL,
MINIO_BUCKET,
MINIO_USER,
Expand Down Expand Up @@ -49,13 +51,46 @@ def get_last_json_file_date():
return None


def get_latest_json_file(ti):
start_date = compute_start_date()
last_json_file_path = f"{DATADIR}/rne_flux_{start_date}.json"
get_object_minio(
f"rne_flux_{start_date}.json",
f"ae/{AIRFLOW_ENV}/{MINIO_DATA_PATH}",
last_json_file_path,
MINIO_BUCKET,
)
logging.info(f"***** Got file: {last_json_file_path}")
ti.xcom_push(key="last_json_file_path", value=last_json_file_path)
return last_json_file_path


def get_last_siren(ti):
last_json_file_path = get_latest_json_file(ti)
with open(last_json_file_path, "r") as file:
json_lines = file.read().splitlines()
if json_lines:
# Get the last line (last JSON object)
last_line = json_lines[-1]
latest_dict = json.loads(last_line)
# Extract the "siren" field
latest_company = latest_dict[-1]
last_siren = latest_company.get("company", {}).get("siren")
logging.info(
f"****Last siren in saved file {last_json_file_path}: {last_siren}"
)
return last_siren
else:
return None


def compute_start_date():
last_json_date = get_last_json_file_date()

if last_json_date:
last_date_obj = datetime.strptime(last_json_date, "%Y-%m-%d")
next_day = last_date_obj + timedelta(days=1)
start_date = next_day.strftime("%Y-%m-%d")
start_date = last_date_obj.strftime("%Y-%m-%d")
logging.info(f"++++++++Start date: {start_date}")
else:
start_date = DEFAULT_START_DATE

Expand All @@ -65,6 +100,8 @@ def compute_start_date():
def get_and_save_daily_flux_rne(
start_date: str,
end_date: str,
first_exec: bool,
ti,
):
"""
Fetches daily flux data from RNE API,
Expand All @@ -84,7 +121,10 @@ def get_and_save_daily_flux_rne(
logging.info(f"********** Creating {DATADIR}")
os.makedirs(DATADIR)

last_siren = None # Initialize last_siren
if first_exec:
last_siren = get_last_siren(ti)
else:
last_siren = None # Initialize last_siren
page_data = True

rne_client = ApiRNEClient()
Expand All @@ -100,10 +140,27 @@ def get_and_save_daily_flux_rne(
json.dump(page_data, json_file)
json_file.write("\n") # Add a newline for multiple JSON objects
except Exception as e:
# If exception accures, save uncompleted file as tmp file
tmp_json_file_name = f"tmp_rne_flux_{start_date}.json"
if os.path.exists(json_file_path):
send_files(
MINIO_URL=MINIO_URL,
MINIO_BUCKET=MINIO_BUCKET,
MINIO_USER=MINIO_USER,
MINIO_PASSWORD=MINIO_PASSWORD,
list_files=[
{
"source_path": f"{DATADIR}/",
"source_name": f"{json_file_name}",
"dest_path": MINIO_DATA_PATH,
"dest_name": f"{tmp_json_file_name}",
},
],
)
# If the API request failed, delete the current
# JSON file and break the loop
logging.info(f"****** Deleting file: {json_file_path}")
os.remove(json_file_path)
# os.remove(json_file_path)
raise Exception(f"Error occurred during the API request: {e}")

if os.path.exists(json_file_path):
Expand All @@ -124,7 +181,7 @@ def get_and_save_daily_flux_rne(
logging.info(f"****** Sent file to MinIO: {json_file_name}")


def get_every_day_flux(**kwargs):
def get_every_day_flux(ti):
"""
Fetches daily flux data from the Registre National des Entreprises (RNE) API
and saves it to JSON files for a range of dates. This function iterates through
Expand All @@ -138,18 +195,19 @@ def get_every_day_flux(**kwargs):

current_date = datetime.strptime(start_date, "%Y-%m-%d")
end_date_dt = datetime.strptime(end_date, "%Y-%m-%d")

first_exec = True
while current_date <= end_date_dt:
start_date_formatted = current_date.strftime("%Y-%m-%d")
next_day = current_date + timedelta(days=1)
next_day_formatted = next_day.strftime("%Y-%m-%d")

get_and_save_daily_flux_rne(start_date_formatted, next_day_formatted)

get_and_save_daily_flux_rne(
start_date_formatted, next_day_formatted, first_exec, ti
)
first_exec = False
current_date = next_day

kwargs["ti"].xcom_push(key="rne_flux_start_date", value=start_date)
kwargs["ti"].xcom_push(key="rne_flux_end_date", value=end_date)
ti.xcom_push(key="rne_flux_start_date", value=start_date)
ti.xcom_push(key="rne_flux_end_date", value=end_date)


def send_notification_success_tchap(**kwargs):
Expand Down

0 comments on commit da0e433

Please sign in to comment.