-
Notifications
You must be signed in to change notification settings - Fork 0
/
main_script.py
56 lines (39 loc) · 1.99 KB
/
main_script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import os
import pandas as pd
from dotenv import load_dotenv
from modulos.utils import ConexionAPIDescargaJSON, RedshiftManager
load_dotenv() #carga variables ambientales del archivo .env
# VARIABLES
credenciales_redshift = {
'redshift_user': os.getenv('redshift_user'),
'redshift_pass': os.getenv('redshift_pass'),
'redshift_host': os.getenv('redshift_host'),
'redshift_port': os.getenv('redshift_port'),
'redshift_database': os.getenv('redshift_database')
}
schema = 'cjquirozv_coderhouse'
api_key = os.getenv('api_key')
# VARIABLES EDITABLES
latitud = -33.437
longitud = -70.650
nombre_tabla = 'meteorología_santiago_cl'
# Guardo las coordenadas para incluirlas posteriormente en el email_task del dag
with open('/opt/airflow/logs/coordenadas.txt', 'w') as f:
f.write(f'Latitud= {latitud}, Longitud= {longitud}, Nombre tabla: {nombre_tabla}')
url_base = f'https://api.openweathermap.org/data/2.5/forecast?lat={latitud}&lon={longitud}&appid={api_key}'
if __name__=="__main__":
conexion = ConexionAPIDescargaJSON(url_base)
conexion.conectar_API_devolver_json() #archivo sin parsear de la respuesta del servidor que se guarda en el objeto conexion.response_json
conexion.convertir_json_a_dataframe() #retornará self.df que será guardado en el atributo conexion.df
df = conexion.procesar_dataframe()
redshift = RedshiftManager(credenciales_redshift, schema)
redshift.crear_motor_conexion_redshift()
# agrego un condicional en caso de que las coordenadas y ciudad cambien y tenga que generar una nueva tabla. Si no existe se crea
if not redshift.verificar_si_tabla_existe(nombre_tabla):
redshift.crear_nueva_tabla(nombre_tabla)
redshift.actualizar_fechas_horas(df, nombre_tabla)
redshift.cargar_nuevos_datos(df, nombre_tabla)
redshift.cerrar_conexion_redshift()
redshift.actualizar_fechas_horas(df, nombre_tabla)
redshift.cargar_datos_redshift(df, nombre_tabla)
redshift.cerrar_conexion_redshift()