Skip to content

Commit

Permalink
refactor: group db injection functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Hajar AIT EL KADI committed Nov 2, 2023
1 parent 5526c4c commit 38fc9f9
Showing 1 changed file with 81 additions and 100 deletions.
181 changes: 81 additions & 100 deletions data_pipelines/rne/database/task_functions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from minio.error import S3Error

from datetime import datetime, timedelta
import os
from minio import Minio
import sqlite3
import json
import re
import logging
Expand All @@ -13,40 +10,27 @@
get_files,
)
from dag_datalake_sirene.data_pipelines.rne.database.process_rne import (
insert_record,
extract_dirigeants_data,
create_tables,
inject_records_into_db,
)
from dag_datalake_sirene.data_pipelines.rne.database.db_connexion import (
connect_to_db,
)
from dag_datalake_sirene.config import (
AIRFLOW_DAG_TMP,
from dag_datalake_sirene.utils.tchap import send_message
from dag_datalake_sirene.data_pipelines.rne.database.vars import (
MINIO_URL,
MINIO_BUCKET,
MINIO_USER,
MINIO_PASSWORD,
PATH_MINIO_RNE_DATA,
LATEST_DATE_FILE,
TMP_FOLDER,
MINIO_STOCK_DATA_PATH,
MINIO_FLUX_DATA_PATH,
)

from dag_datalake_sirene.utils.tchap import send_message

PATH_MINIO_RNE_DATA = "rne/database/"
LATEST_DATE_FILE = "latest_rne_date.json"
MINIO_FLUX_DATA_PATH = "rne/flux/data/"
MINIO_STOCK_DATA_PATH = "rne/stock/data/"

DAG_FOLDER = "datagouvfr_data_pipelines/data_processing/"
TMP_FOLDER = f"{AIRFLOW_DAG_TMP}rne/database/"


client = Minio(
MINIO_URL,
access_key=MINIO_USER,
secret_key=MINIO_PASSWORD,
secure=True,
)

yesterday = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")


def get_start_date_minio(ti):
def get_start_date_minio(**kwargs):
try:
get_files(
MINIO_URL=MINIO_URL,
Expand All @@ -66,53 +50,77 @@ def get_start_date_minio(ti):
with open(f"{TMP_FOLDER}/latest_rne_date.json") as fp:
data = json.load(fp)

start_date = data["latest_date"]
dt_sd = datetime.strptime(start_date, "%Y-%m-%d")
start_date = datetime.strftime((dt_sd + timedelta(days=1)), "%Y-%m-%d")
ti.xcom_push(key="start_date", value=start_date)
previous_latest_date = data["latest_date"]
previous_latest_date = datetime.strptime(previous_latest_date, "%Y-%m-%d")
start_date = datetime.strftime(
(previous_latest_date + timedelta(days=1)), "%Y-%m-%d"
)
kwargs["ti"].xcom_push(key="start_date", value=start_date)
except S3Error as e:
if e.code == "NoSuchKey":
logging.info(
f"The file {PATH_MINIO_RNE_DATA + LATEST_DATE_FILE} "
f"does not exist in the bucket {MINIO_BUCKET}."
)
ti.xcom_push(key="start_date", value=None)
kwargs["ti"].xcom_push(key="start_date", value=None)
else:
raise Exception(
f"An error occurred while trying to get latest date file: {e}"
)


def connect_to_db(database_location):
connection = sqlite3.connect(database_location)
cursor = connection.cursor()
return connection, cursor
def create_db_path(start_date):
"""
Create a database path for RNE data.
Args:
start_date (str): The start date for the RNE data.
def get_database_location(**kwargs):
start_date = kwargs["ti"].xcom_pull(key="start_date", task_ids="get_start_date")
Returns:
str or None: The database path if it doesn't already exist, otherwise None.
"""
# Only return path if start_date does not already exist
if start_date:
return None
rne_database_location = TMP_FOLDER + f"rne_{start_date}.db"
return rne_database_location


def create_db(**kwargs):
rne_database_location = get_database_location(**kwargs)
"""
Create an RNE database, if it doesn't already exist.
Args:
**kwargs: Keyword arguments including 'ti' (TaskInstance) for 'start_date'.
Returns:
None: If the database already exists or couldn't be created.
"""
start_date = kwargs["ti"].xcom_pull(key="start_date", task_ids="get_start_date")

if rne_database_location is None:
rne_db_path = create_db_path(start_date)
kwargs["ti"].xcom_push(key="rne_db_path", value=rne_db_path)
logging.info(f"***********RNE database path: {rne_db_path}")

if rne_db_path is None:
return None

if os.path.exists(rne_database_location):
os.remove(rne_database_location)
if os.path.exists(rne_db_path):
os.remove(rne_db_path)

connection, cursor = connect_to_db(rne_database_location)
connection, cursor = connect_to_db(rne_db_path)
create_tables(cursor)

connection.commit()
connection.close()


def get_latest_db(**kwargs):
"""
This function retrieves the RNE database file associated with the
provided start date from a Minio server and saves it to a
temporary folder for further processing.
"""
start_date = kwargs["ti"].xcom_pull(key="start_date", task_ids="get_start_date")
if start_date is not None:
get_files(
Expand All @@ -133,8 +141,7 @@ def get_latest_db(**kwargs):

def process_stock_json_files(**kwargs):
start_date = kwargs["ti"].xcom_pull(key="start_date", task_ids="get_start_date")
rne_database_location = get_database_location(**kwargs)
logging.info(f"^^^^^^^^^^^ rne database location : {rne_database_location}")
rne_db_path = kwargs["ti"].xcom_pull(key="rne_db_path", task_ids="create_db")

# Only process stock files if a date doesn't already exist
if start_date is not None:
Expand All @@ -152,7 +159,7 @@ def process_stock_json_files(**kwargs):
raise Exception("No RNE stock files found!!!")

for file_path in json_stock_rne_files:
logging.info(f"Processing stock file: {file_path}...")
logging.info(f"*******Processing stock file: {file_path}...")
get_files(
MINIO_URL=MINIO_URL,
MINIO_BUCKET=MINIO_BUCKET,
Expand All @@ -167,16 +174,17 @@ def process_stock_json_files(**kwargs):
}
],
)
inject_stock_records_into_database(file_path, rne_database_location)
inject_records_into_db(file_path, rne_db_path, file_type="stock")
logging.info(
f"File {file_path} processed and stock records injected into the database."
)
os.remove(file_path)
logging.info(f"******Removed file: {file_path}")


def process_flux_json_files(**kwargs):
start_date = kwargs["ti"].xcom_pull(key="start_date", task_ids="get_start_date")
rne_database_location = get_database_location(**kwargs)
logging.info(f"^^^^^^^^^^^ rne database location : {rne_database_location}")
rne_db_path = kwargs["ti"].xcom_pull(key="rne_db_path", task_ids="create_db")

json_daily_flux_files = get_files_from_prefix(
MINIO_URL=MINIO_URL,
Expand All @@ -192,7 +200,7 @@ def process_flux_json_files(**kwargs):
if start_date is None:
start_date = "0000-00-00"

for file_path in sorted(json_daily_flux_files, reverse=True):
for file_path in sorted(json_daily_flux_files, reverse=False):
date_match = re.search(r"rne_flux_(\d{4}-\d{2}-\d{2})", file_path)
if date_match:
file_date = date_match.group(1)
Expand All @@ -213,7 +221,7 @@ def process_flux_json_files(**kwargs):
],
)
json_path = f"{TMP_FOLDER}rne_flux_{file_date}.json"
inject_records_into_database(json_path, rne_database_location)
inject_records_into_db(json_path, rne_db_path, file_type="flux")
logging.info(
f"File {json_path} processed and"
" records injected into the database."
Expand All @@ -225,46 +233,11 @@ def process_flux_json_files(**kwargs):
re.findall(r"rne_flux_(\d{4}-\d{2}-\d{2})", " ".join(json_daily_flux_files))
)
if dates:
last_date = dates[-1]
logging.info(f"***** Last date saved: {last_date}")
last_date_processed = dates[-1]
logging.info(f"***** Last date saved: {last_date_processed}")
else:
last_date = None
kwargs["ti"].xcom_push(key="last_date", value=last_date)


def inject_stock_records_into_database(file_path, db_path):
with open(file_path, "r") as file:
logging.info(f"Processing stock file: {file_path}")
try:
json_data = file.read()
data = json.loads(json_data)
logging.info(f"/////////{data[0]}")
for record in data:
list_dirigeants_pp, list_dirigeants_pm = extract_dirigeants_data(
record, file_type="stock"
)
insert_record(
list_dirigeants_pp, list_dirigeants_pm, file_path, db_path
)
except json.JSONDecodeError as e:
raise Exception(f"JSONDecodeError: {e} in file {file_path}")


def inject_records_into_database(file_path, db_path):
with open(file_path, "r") as file:
logging.info(f"Processing flux file: {file_path}")
for line in file:
try:
data = json.loads(line)
for record in data:
list_dirigeants_pp, list_dirigeants_pm = extract_dirigeants_data(
record, file_type="flux"
)
insert_record(
list_dirigeants_pp, list_dirigeants_pm, file_path, db_path
)
except json.JSONDecodeError as e:
raise Exception(f"JSONDecodeError: {e} in file {file_path}")
last_date_processed = None
kwargs["ti"].xcom_push(key="last_date_processed", value=last_date_processed)


def send_to_minio(list_files):
Expand All @@ -278,23 +251,28 @@ def send_to_minio(list_files):


def upload_db_to_minio(ti):
start_date = ti.xcom_pull(key="start_date", task_ids="get_start_date")
last_date_processed = ti.xcom_pull(
key="last_date_processed", task_ids="process_flux_json_files"
)
send_to_minio(
[
{
"source_path": TMP_FOLDER,
"source_name": f"rne_{start_date}.db",
"source_name": f"rne_{last_date_processed}.db",
"dest_path": PATH_MINIO_RNE_DATA,
"dest_name": f"rne_{start_date}.db",
"dest_name": f"rne_{last_date_processed}.db",
}
]
)


def upload_latest_date_rne_minio(ti):
start_date = ti.xcom_pull(key="start_date", task_ids="get_start_date")
start_date = datetime.strptime(start_date, "%Y-%m-%d")
latest_date = (start_date + timedelta(days=1)).strftime("%Y-%m-%d")
"""Start date saved is the next day"""
last_date_processed = ti.xcom_pull(
key="last_date_processed", task_ids="process_flux_json_files"
)
last_date_processed = datetime.strptime(last_date_processed, "%Y-%m-%d")
latest_date = (last_date_processed + timedelta(days=1)).strftime("%Y-%m-%d")
data = {}
data["latest_date"] = latest_date
with open(TMP_FOLDER + "latest_rne_date.json", "w") as write_file:
Expand All @@ -313,10 +291,13 @@ def upload_latest_date_rne_minio(ti):
ti.xcom_push(key="latest_date", value=latest_date)


def notification_mattermost(ti):
def notification_tchap(ti):
start_date = ti.xcom_pull(key="start_date", task_ids="get_start_date")
end_date = ti.xcom_pull(key="end_date", task_ids="upload_latest_date_rne_minio")
last_date_processed = ti.xcom_pull(
key="last_date_processed", task_ids="process_flux_json_files"
)
send_message(
f"Données RNE traitées de {start_date} à {end_date} sur Minio "
f"Données RNE traitées de {start_date} à {last_date_processed} "
"et stockées sur la base de données sur Minio "
f"- Bucket {MINIO_BUCKET}",
)

0 comments on commit 38fc9f9

Please sign in to comment.