-
Notifications
You must be signed in to change notification settings - Fork 0
/
Estabelecimentos.py
323 lines (257 loc) · 14.5 KB
/
Estabelecimentos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
import pandas as pd
import wget
from tqdm import tqdm
from zipfile import ZipFile
import os
import psycopg2
from psycopg2 import Error
def main(comp):
competencia = comp #É Necessário Digitar a competência que deseja buscar.
data = []
#Cria a pasta com a competência deseja, se a mesma ainda não existir.
try:
os.mkdir(f"{competencia}")
except OSError:
print(f"O diretório {competencia} já existe")
#Define os caminhos onde o arquivo será buscado, o nome do arquivo específico
#dentro do arquivo .zip com todos os dados que é baixado do cnes e define o
#local onde os dados vão ser salvo (a pasta que foi criada anteriormente).
link_down = f"ftp://ftp.datasus.gov.br/cnes/BASE_DE_DADOS_CNES_{competencia}.ZIP"
arquivo = f"BASE_DE_DADOS_CNES_{competencia}.ZIP"
save_dir = f"{competencia}/"
#Faz o download dos dados, cajo já não tenha sido baixados.
def DownloadData():
if os.listdir().count(save_dir+arquivo) == 0:
try:
print("Baixando os dados da competência solicitada")
tqdm(wget.download(link_down, save_dir))
print("Download efetuado com sucesso")
except:
print("A Competência não foi liberada ainda")
else:
print(f" O arquivo {arquivo} já foi baixado")
DownloadData()
if os.listdir().count(save_dir+arquivo) > 0:
#Extrai somente o arquivo que contem informações sobre os estabelecimentos
with ZipFile(save_dir+arquivo, 'r') as zip:
zip.extract(f"tbEstabelecimento{competencia}.csv", path=save_dir)
zip.extract(f"rlEstabServClass{competencia}.csv", path=save_dir)
#Faz a leitura dos dados para um Data Frame (data) para realizar o tratamento
#dos dados.
def ReadAndSaveData():
global data
global telefone
#Leitura dos dados
data = pd.read_csv(save_dir+f"tbEstabelecimento{competencia}.csv",
delimiter = ";", low_memory=False)
atende_sus = pd.read_csv(save_dir+f"rlEstabServClass{competencia}.csv",
delimiter = ";", low_memory=False)
#Inserindo uma coluna de competência na tabala de dados
competencia_list = pd.DataFrame([competencia for i in data.index], columns = ['COMPETENCIA'])
data = pd.concat([competencia_list, data], axis = 1)
#Eliminio todos os dados que não são do Ceará, ou seja, aqueles em que o
#CO_ESTADO_GESTOR são diferentes de 23. O Código do Ceará é 23.
data = data.loc[(data['CO_ESTADO_GESTOR'] == 23)]
#Separa os dados de telefone para serem analisados separadamente
telefone = data[['CO_UNIDADE','NU_TELEFONE']]
#Convertendo o Data Frame para dados tipo STRING
data = data.astype(str)
atende_sus = atende_sus.astype(str)
#Eliminar as colunas que possuem dados desnecessários
data = data.drop(columns=['CO_REGIAO_SAUDE',
'NU_FAX','TP_PFPJ','NIVEL_DEP',
'CO_MICRO_REGIAO','CO_TURNO_ATENDIMENTO',
'CO_DISTRITO_SANITARIO',
'CO_DISTRITO_ADMINISTRATIVO',
'CO_ATIVIDADE','CO_CLIENTELA',
'NU_ALVARA','DT_EXPEDICAO',
'TP_ORGAO_EXPEDIDOR',
'DT_VAL_LIC_SANI',
'TP_LIC_SANI','NU_TELEFONE',
'CO_USUARIO','CO_CPFDIRETORCLN',
'REG_DIRETORCLN','ST_ADESAO_FILANTROP',
'CO_MOTIVO_DESAB',
'NO_URL',
'TO_CHAR(DT_ATU_GEO,\'DD/MM/YYYY\')',
'NO_USUARIO_GEO','CO_NATUREZA_JUR',
'TP_ESTAB_SEMPRE_ABERTO',
'ST_GERACREDITO_GERENTE_SGIF',
'ST_CONEXAO_INTERNET','CO_ESTADO_GESTOR',
'CO_TIPO_UNIDADE','NO_FANTASIA_ABREV',
'TP_GESTAO',
'TO_CHAR(DT_ATUALIZACAO_ORIGEM,\'DD/MM/YYYY\')',
'ST_CONTRATO_FORMALIZADO'
])
data = data.rename(columns = {"TO_CHAR(DT_ATUALIZACAO,'DD/MM/YYYY')":"DT_ATUALIZACAO"})
atende_sus = atende_sus.drop(columns =['CO_SERVICO',
'CO_CLASSIFICACAO','TP_CARACTERISTICA',
'CO_CNPJCPF','CO_AMBULATORIAL',
'CO_HOSPITALAR','CO_END_COMPL',
'ST_ATIVO_SN','TO_CHAR(DT_ATUALIZACAO,\'DD/MM/YYYY\')',
'CO_USUARIO'
])
atende_sus_list = ['NÃO' if (atende_sus['CO_AMBULATORIAL_SUS'][i] == '2' and atende_sus['CO_HOSPITALAR_SUS'][i] == '2') else 'SIM' for i in range(len(atende_sus['CO_AMBULATORIAL_SUS']))]
atende_sus['ATENDE_SUS'] = atende_sus_list
atende_sus = atende_sus.drop(columns =['CO_AMBULATORIAL_SUS','CO_HOSPITALAR_SUS'])
data = pd.merge(data, atende_sus, how="left", on="CO_UNIDADE")
data = data.drop_duplicates()
def CnesTreatment(lista):
if len(lista) == 6:
lista = "0" + lista
elif len(lista) == 5:
lista = "00" + lista
else:
lista = lista
return lista
data["CO_CNES"] = [CnesTreatment(lis) for lis in data["CO_CNES"]]
def ativTreatment(lista):
if lista == "nan":
lista = 0
else:
lista = str(int(float(lista)))
return lista
data["CO_ATIVIDADE_PRINCIPAL"] = [ativTreatment(lis) for lis in data["CO_ATIVIDADE_PRINCIPAL"]]
ReadAndSaveData()
#------------------------------------------------------------------------------
#_________________ Fazendo a Normalização dos telefones _____________________
def PhoneTreatment():
global telefone
global data
telefone = telefone.dropna()
telefone = telefone.loc[(telefone['NU_TELEFONE'] != '( )')]
telefone = telefone.reset_index(drop = True)
listaInitial = telefone.values.tolist()
lista = [item[1] for item in listaInitial]
def NumericChar(numero):
return ''.join([digit for digit in numero if digit.isnumeric()])
def InitialZero(listaTelefonica):
for i in range(len(listaTelefonica) - 1):
if listaTelefonica[i][0] == '0':
listaTelefonica[i] = listaTelefonica[i][1:]
return(listaTelefonica)
def RemoveDDD(listaTelefonica):
for i in range(len(listaTelefonica) - 1):
if listaTelefonica[i][0] == '8' and len(listaTelefonica) >= 10:
listaTelefonica[i] = listaTelefonica[i][2:]
return(listaTelefonica)
def DigitFilter(listaTelefonica):
result = [number if (len(number) == 8) or (len(number) == 9 and number[0] == '9') else '' for number in listaTelefonica]
return(result)
def SizeFilter(listaTelefonica,val1,val2,val3):
result = [num if (len(num) == val1 or len(num) == val2 or len(num) == val3) else '' for num in listaTelefonica]
return(result)
def Treatment(listaTelefonica):
print("Realizando o processo de Normalização dos dados de telefone")
print("Elimindando dados não-numéricos...")
listaTelefonica = [NumericChar(numero) for numero in listaTelefonica if len(NumericChar(numero)) >= 1]
print("Eliminando o zero inicial")
listaTelefonica = InitialZero(listaTelefonica)
print("Eliminando o DDD")
listaTelefonica = RemoveDDD(listaTelefonica)
#print("Eliminando dados com tamanhos fora do escopo")
#listaTelefonica = SizeFilter(listaTelefonica, 8, 9, 16)
print("Elimina números com nove dígitos que não iniciam com 9")
listaTelefonica = DigitFilter(listaTelefonica)
print("Limpeza completa")
return(listaTelefonica)
lista_tratada = Treatment(lista)
telefone['NU_TELEFONE'] = lista_tratada
#Unindo os dados dos estabelecimentos com os dados de telefone
data = pd.merge(data, telefone, how="left", on="CO_UNIDADE")
data = data.astype(str)
data = data.reset_index(drop = True)
def PhoneTreatment2(lista):
if lista == "nan":
lista = ""
return lista
data["NU_TELEFONE"] = [PhoneTreatment2(lis) for lis in data["NU_TELEFONE"]]
#Salva os dados em um arquivo .csv local
data.to_csv(save_dir+f"Estab-{competencia}.csv", index=False)
PhoneTreatment()
#------------------------------------------------------------------------------
#____________________ Fazendo a unificação dos dados ________________________
#Realizar a conexão com o banco de dados
def Connect():
global data
print("Conectando ao Banco de Dados PostgreSQL...")
try:
#Conexão com o banco de dados do Mapa Produção
conn = psycopg2.connect(
host = '',
database = '',
user = '',
port = '',
password = ''
)
#Realiza as conexões
cur = conn.cursor()
cur.execute("SELECT version();")
rec = cur.fetchone()
print("You are connected to - ", rec, "\n")
#EXECUTA AS QUERRIES AQUI
#Querry para criar a tabela cnesEstabelecimentos se não existir
#e truncar a tabela para inserção dos novos dados
sql = '''CREATE TABLE IF NOT EXISTS cnesEstabelecimentos(
COMPETENCIA TEXT,
CO_UNIDADE TEXT,
CO_CNES TEXT,
NU_CNPJ_MANTENEDORA TEXT,
NO_RAZAO_SOCIAL TEXT,
NO_FANTASIA TEXT,
NO_LOGRADOURO TEXT,
NU_ENDERECO TEXT,
NO_COMPLEMENTO TEXT,
NO_BAIRRO TEXT,
CO_CEP TEXT,
NO_EMAIL TEXT,
NU_CPF TEXT,
NU_CNPJ TEXT,
TP_UNIDADE TEXT,
CO_MUNICIPIO_GESTOR TEXT,
DT_ATUALIZACAO TEXT,
NU_LATITUDE TEXT,
NU_LONGITUDE TEXT,
CO_TIPO_ESTABELECIMENTO TEXT,
CO_ATIVIDADE_PRINCIPAL TEXT,
ATENDE_SUS TEXT,
NU_TELEFONE TEXT);
TRUNCATE TABLE cnesEstabelecimentos;
'''
cur.execute(sql)
conn.commit()
print("Tabela Criada com Sucesso")
for i in tqdm(data.index):
#insere os dados do Data Frame data
sql = ''' INSERT INTO cnesEstabelecimentos
(COMPETENCIA,CO_UNIDADE,CO_CNES,NU_CNPJ_MANTENEDORA,NO_RAZAO_SOCIAL,
NO_FANTASIA,NO_LOGRADOURO,NU_ENDERECO, NO_COMPLEMENTO,
NO_BAIRRO,CO_CEP,NO_EMAIL,NU_CPF,
NU_CNPJ,TP_UNIDADE,CO_MUNICIPIO_GESTOR,DT_ATUALIZACAO,
NU_LATITUDE,NU_LONGITUDE,CO_TIPO_ESTABELECIMENTO,
CO_ATIVIDADE_PRINCIPAL,ATENDE_SUS,NU_TELEFONE)
VALUES ('%s','%s','%s','%s','%s','%s','%s','%s','%s',
'%s','%s','%s','%s','%s','%s','%s',
'%s','%s','%s','%s','%s','%s','%s');''' % (data["COMPETENCIA"][i],data["CO_UNIDADE"][i],
data["CO_CNES"][i],data["NU_CNPJ_MANTENEDORA"][i],data["NO_RAZAO_SOCIAL"][i],
data["NO_FANTASIA"][i],data["NO_LOGRADOURO"][i],data["NU_ENDERECO"][i],
data["NO_COMPLEMENTO"][i],data["NO_BAIRRO"][i],data["CO_CEP"][i],
data["NO_EMAIL"][i],data["NU_CPF"][i],data["NU_CNPJ"][i],
data["TP_UNIDADE"][i],data["CO_MUNICIPIO_GESTOR"][i],
data["DT_ATUALIZACAO"][i],data["NU_LATITUDE"][i],data["NU_LONGITUDE"][i],
data["CO_TIPO_ESTABELECIMENTO"][i],data["CO_ATIVIDADE_PRINCIPAL"][i],
data["ATENDE_SUS"][i],data["NU_TELEFONE"][i])
cur.execute(sql)
conn.commit()
print("Table was succesfullt updated")
#-------------------------------
return conn
except (Exception, Error) as error:
print("Error while connecting to PostgreSQL", error)
finally:
if (conn):
cur.close()
conn.close()
print("PostgreSQL connection is closed")
Connect()
if __name__ == '__main__':
main()