Skip to content

Commit

Permalink
ajoute corrections pote 2022
Browse files Browse the repository at this point in the history
  • Loading branch information
clallemand committed Jul 31, 2024
1 parent f1a01d6 commit 570e68e
Show file tree
Hide file tree
Showing 7 changed files with 332 additions and 311 deletions.
80 changes: 40 additions & 40 deletions openfisca_france_data/pote/input_data_builder/console.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
from openfisca_survey_manager.survey_collections import SurveyCollection
from openfisca_france_data.pote.input_data_builder.step_00_variables_pote import create_pote_openfisca_variables_list
from openfisca_france_data.pote.input_data_builder.step_01_create_individus import build_individus
from openfisca_france_data.pote.input_data_builder.step_02_a_create_table_presimulation import create_table_foyer_fiscal_preparation
from openfisca_france_data.pote.input_data_builder.step_02_b_simulation_credits_reductions import simulation_preparation_credits_reductions
from openfisca_france_data.pote.input_data_builder.step_02_c_create_table_foyer_fiscal import create_table_foyer_fiscal
from openfisca_france_data.pote.input_data_builder.analyse_variables import liens_variables
import logging
import os
import shutil

year = 2022
chunk_size = 1000000
nrange = 42
config_files_directory = "C:/Users/Public/Documents/TRAVAIL/Pote_openfisca/.config/openfisca-survey-manager/"

survey = SurveyCollection(name = 'raw_pote', config_files_directory = config_files_directory)
raw_data_directory = survey.config.get('data','raw_pote')
output_path = survey.config.get('data','output_directory')
tmp_directory = survey.config.get('data','tmp_directory')
pote_colonne_file_path = survey.config.get('openfisca_france_data_pote','pote_colonne_file_path')
errors_path = survey.config.get('openfisca_france_data_pote','errors_path')
logging.basicConfig(filename=f"{errors_path}builder_log.log", encoding = 'utf-8')
# if os.path.exists(tmp_directory):
# shutil.rmtree(tmp_directory)
# os.mkdir(tmp_directory)

if not os.path.exists(os.path.join(output_path,"foyer_fiscal")):
os.mkdir(os.path.join(output_path,"foyer_fiscal"))
if not os.path.exists(os.path.join(output_path,"individu")):
os.mkdir(os.path.join(output_path,"individu"))

variables_individu, variables_foyer_fiscal = create_pote_openfisca_variables_list(year, errors_path, pote_colonne_file_path)

variables_to_compute, enfants_tot, dictionnaire_enfant_parents, dictionnaire_parent_enfants = liens_variables(year)

build_individus(year, chunk_size, variables_individu, config_files_directory, raw_data_directory, output_path, errors_path, nrange)
create_table_foyer_fiscal_preparation(raw_data_directory, year, output_path, config_files_directory, variables_to_compute, dictionnaire_parent_enfants, tmp_directory)
simulation_preparation_credits_reductions(config_files_directory, variables_to_compute, dictionnaire_parent_enfants, tmp_directory)
create_table_foyer_fiscal(raw_data_directory, variables_foyer_fiscal, year, output_path, config_files_directory, variables_to_compute, dictionnaire_parent_enfants, tmp_directory)
from openfisca_survey_manager.survey_collections import SurveyCollection
from openfisca_france_data.pote.input_data_builder.step_00_variables_pote import create_pote_openfisca_variables_list
from openfisca_france_data.pote.input_data_builder.step_01_create_individus import build_individus
from openfisca_france_data.pote.input_data_builder.step_02_a_create_table_presimulation import create_table_foyer_fiscal_preparation
from openfisca_france_data.pote.input_data_builder.step_02_b_simulation_credits_reductions import simulation_preparation_credits_reductions
from openfisca_france_data.pote.input_data_builder.step_02_c_create_table_foyer_fiscal import create_table_foyer_fiscal
from openfisca_france_data.pote.input_data_builder.analyse_variables import liens_variables
import logging
import os
import shutil

year = 2022
chunk_size = 1000000
nrange = 41
config_files_directory = "C:/Users/Public/Documents/TRAVAIL/Pote_openfisca/.config/openfisca-survey-manager/"

survey = SurveyCollection(name = 'raw_pote', config_files_directory = config_files_directory)
raw_data_directory = survey.config.get('data','raw_pote')
output_path = survey.config.get('data','output_directory')
tmp_directory = survey.config.get('data','tmp_directory')
#pote_colonne_file_path = survey.config.get('openfisca_france_data_pote','pote_colonne_file_path')
errors_path = survey.config.get('openfisca_france_data_pote','errors_path')
logging.basicConfig(filename=f"{errors_path}builder_log.log", encoding = 'utf-8')
# if os.path.exists(tmp_directory):
# shutil.rmtree(tmp_directory)
# os.mkdir(tmp_directory)

if not os.path.exists(os.path.join(output_path,"foyer_fiscal")):
os.mkdir(os.path.join(output_path,"foyer_fiscal"))
if not os.path.exists(os.path.join(output_path,"individu")):
os.mkdir(os.path.join(output_path,"individu"))

variables_individu, variables_foyer_fiscal = create_pote_openfisca_variables_list(year, errors_path, raw_data_directory)

variables_to_compute, enfants_tot, dictionnaire_enfant_parents, dictionnaire_parent_enfants = liens_variables(year)

build_individus(year, chunk_size, variables_individu, config_files_directory, raw_data_directory, output_path, errors_path, nrange)
create_table_foyer_fiscal_preparation(raw_data_directory, year, output_path, config_files_directory, variables_to_compute, dictionnaire_parent_enfants, tmp_directory)
simulation_preparation_credits_reductions(year,config_files_directory, variables_to_compute, dictionnaire_parent_enfants, tmp_directory)
create_table_foyer_fiscal(raw_data_directory, variables_foyer_fiscal, year, output_path, config_files_directory, variables_to_compute, dictionnaire_parent_enfants, tmp_directory)
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from openfisca_france_data.utils import build_cerfa_fields_by_variable
import pandas as pd
import logging
import glob

def create_pote_openfisca_variables_list(year, errors_path, pote_colonne_file_path):
def create_pote_openfisca_variables_list(year, errors_path, raw_data_directory):
logging.warning("Récupération des colonnes en commun entre Pote et Openfisca")
dict_variables_cerfa_field = build_cerfa_fields_by_variable(year = year)

Expand All @@ -16,12 +17,13 @@ def create_pote_openfisca_variables_list(year, errors_path, pote_colonne_file_pa

del doublons

colonnes_pote_2021 = pd.read_csv(pote_colonne_file_path,sep =",",encoding = "latin_1")
colonnes_pote_2021 = ["f" + str.lower(c[1:]) for c in colonnes_pote_2021.NAME if str.lower(c).startswith('z')]
colonnes_pote = glob.glob(f"{raw_data_directory}*.parquet")
colonnes_pote = [col.split("\\")[-1].split("_")[1].split(".")[0] for col in colonnes_pote]
colonnes_pote = ["f" + str.lower(c[1:]) for c in colonnes_pote if str.lower(c).startswith('z')]

var_to_keep = list(set(colonnes_pote_2021) & set(variables_cerfa_field))
logging.warning(f"Parmi les {len(colonnes_pote_2021)} variables de pote, {len(var_to_keep)} ont été trouvées dans openfisca")
var_not_in_openfisca = [c for c in colonnes_pote_2021 if c not in variables_cerfa_field]
var_to_keep = list(set(colonnes_pote) & set(variables_cerfa_field))
logging.warning(f"Parmi les {len(colonnes_pote)} variables de pote, {len(var_to_keep)} ont été trouvées dans openfisca")
var_not_in_openfisca = [c for c in colonnes_pote if c not in variables_cerfa_field]
pd.DataFrame({'liste_var': var_not_in_openfisca}).to_csv(f"{errors_path}cerfa_manquants_openfisca.csv")

variables_foyer_fiscal = dict()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import pandas as pd
from os.path import exists
import os
from pyarrow.parquet import ParquetFile
import pyarrow as pa
import numpy as np
# from openfisca_survey_manager.input_dataframe_generator import set_table_in_survey
from openfisca_survey_manager.surveys import Survey
import logging
from openfisca_survey_manager.survey_collections import SurveyCollection

def build_individus(year, chunk_size, variables_individu,config_files_directory, raw_data_directory, output_path, errors_path, nrange):
'''
Expand All @@ -26,11 +29,13 @@ def build_individus(year, chunk_size, variables_individu,config_files_directory,
#p = nb petits enfants rattachés alternance
#dnpa4c : annnees de naissances de tous les pacs (y compris les majeurs par ex)


columns_iter = dict()
for col in columns:
pf = ParquetFile(f"{raw_data_directory}pote_{col}.parquet")
columns_iter[col] = pf.iter_batches(batch_size = chunk_size)

# récupération des colonnes de POTE qui sont dans les cerfa d'openfisca france
columns_revenus_iter = dict()
for openfisca_var, cerfa in variables_individu.items():
for c in cerfa:
Expand All @@ -51,6 +56,8 @@ def build_individus(year, chunk_size, variables_individu,config_files_directory,
"n":list(),
"r":list()}
incoherence_revenus = list()

# script par batch par limite de la capacité sur le CASD
for i in range(nrange):
print(f"Etape 1 : début du round {i} sur {nrange}")
df = pd.DataFrame()
Expand All @@ -60,7 +67,7 @@ def build_individus(year, chunk_size, variables_individu,config_files_directory,
df[col] = df_col

# 1) création des individus déclarants et conjoint si existant
df["foyer_fiscal_id"] = range(len(df))
df["foyer_fiscal_id"] = [j + i * chunk_size for j in range(len(df))]
df_indiv = df[["foyer_fiscal_id","mat","aged",'agec']]
## tests
mat_na = mat_na + list(df_indiv.loc[df_indiv['mat'].isna()].foyer_fiscal_id)
Expand Down Expand Up @@ -119,6 +126,7 @@ def build_individus(year, chunk_size, variables_individu,config_files_directory,

df_indiv = pd.concat([df_indiv,df_pac])

# 3) récupération des variables de revenus individuels pour les individus identifiés dans l'étape précédente
revenus_individu = pd.DataFrame()
for openfisca_var, cerfa in variables_individu.items():
table_temp = pd.DataFrame()
Expand All @@ -131,7 +139,7 @@ def build_individus(year, chunk_size, variables_individu,config_files_directory,
table_temp[str(rang)] = df_col.fillna(0)
value_vars += str(rang)
rang +=1
table_temp["foyer_fiscal_id"] = range(len(table_temp))
table_temp["foyer_fiscal_id"] = [j + i * chunk_size for j in range(len(df))]
table_temp = pd.melt(table_temp, id_vars=["foyer_fiscal_id"], value_vars=value_vars, var_name="rang", value_name=openfisca_var)
if len(revenus_individu) == 0:
revenus_individu = table_temp
Expand Down Expand Up @@ -168,32 +176,29 @@ def build_individus(year, chunk_size, variables_individu,config_files_directory,

revenus_individu.to_parquet(f"{output_path}individu/individu_{i}.parquet")

survey = Survey(
name = f"pote_{year}",
label = None,
survey_collection = survey_collection,
parquet_file_path = output_path
)
survey.tables[f"individu_{year}"] = {
"source_format":"parquet",
"variables":revenus_individu.columns,
"parquet_file":f"{output_path}individu/",
}
survey_collection.surveys = [kept_survey for kept_survey in survey_collection.surveys if kept_survey.name != f"pote-{variable_to_compute}"]
survey_collection.surveys.append(survey)
collections_directory = survey_collection.config.get('collections', 'collections_directory')
collection_json_path = os.path.join(collections_directory, "pote.json")
survey_collection.dump(json_file_path=collection_json_path)

# set_table_in_survey(
# revenus_individu,
# entity = "individu",
# period = year,
# collection = "pote",
# survey_name = f"pote_{year}",
# config_files_directory = config_files_directory,
# source_format = "parquet"
# )
if i == nrange - 1:
columns = revenus_individu.columns

survey = Survey(
name = f"pote_{year}",
label = None,
#survey_collection = survey_collection,
parquet_file_path = output_path
)
survey.tables[f"individu_{year}"] = {
"source_format":"parquet",
"variables":[c for c in columns],
"parquet_file":f"{output_path}individu/",
}

survey_collection = SurveyCollection.load(collection = 'pote', config_files_directory = config_files_directory)
survey_collection.surveys = [kept_survey for kept_survey in survey_collection.surveys if kept_survey.name != f"pote_{year}"]
survey_collection.surveys.append(survey)
collections_directory = survey_collection.config.get('collections', 'collections_directory')
collection_json_path = os.path.join(collections_directory, "pote.json")
print(collection_json_path)
print(survey_collection)
survey_collection.dump(json_file_path=collection_json_path)

# errors_ids = {
# 'mat_na':[mat_na], # un peu crade on met dans une liste pour exporter en json via pandas (donc avec meme nombre d'élements) car le paquet json par sur le casd so far
Expand Down
Loading

0 comments on commit 570e68e

Please sign in to comment.