Problems with and @op that process a pandas dataframe to insert it in a DB #21185
Closed
AndreaLlerena2003
started this conversation in
Community
Replies: 1 comment
-
Troublehshooting taking place in your Dagster Community Slack thread |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hello, I try to run my op in my job, and it never finishes running or even gives an error, I already tried it in a python script outside of Dagster and it works normally. I send you what my op is like and the only feedback that the platform gives me because it doesn't even print my print()
---- MY OP
@op(ins={"df_data_app": In(pd.DataFrame),
"input_v1":In()},
out=Out(pd.DataFrame))
def funcion_biTaggeosBi(df_data_app,input_v1):
print("Inicio")
#context.log.info("Inicio")
#params_sql = sql_params()
print("1")
#context.log.info("1")
eventos_excluidos = ['mas', 'mas_huellaDigital', 'et_error', 'et_informativo', 'navegar_interactuar', 'navegar_finalizar', 'navegar_regresar', 'inicio_ingresar', 'inicio_recuperarClave', 'inicio_registrarClave', 'inicio_recuperarClave_resultado', 'inicio_registrarClave_resultado', 'inicio_continuar', 'app_exception', 'app_remove', 'notification_dismiss', 'notification_foreground', 'notification_receive', 'screen_view', 'session_start', 'user_engagement', 'privilegio_busqueda']
print("2")
#context.log.info("2")
tmp1 = df_data_app[~df_data_app['event_name'].isin(eventos_excluidos)]
#context.log.info("3")
#hacemos group by mediante las columnas mencionadas
lista_group_by = ['event_date','event_name','user_id','event_timestamp','event_params']
#context.log.info("4")
tmp1 = tmp1.groupby(lista_group_by).size().reset_index(name='count')
#context.log.info("5")
tmp2 = tmp1[['event_params', 'event_name']]
#context.log.info("6")
tablas_temporales = creacion_temporales_tipo_eventos_clean(tmp2, 'event_name')
#context.log.info("7")
columnas_unicas = generar_columnas_unicas(tablas_temporales)
#context.log.info("8")
columnas_unicas = volver_set_lista(columnas_unicas)
#context.log.info("9")
dataframes_actualizados = generacion_df_con_columnas_faltantes(tablas_temporales,columnas_unicas)
#context.log.info("10")
tabla_resultante = generar_tabla_resultante(dataframes_actualizados)
#context.log.info("11")
concatenated_df = tabla_final_v1(tmp1,tabla_resultante)
#context.log.info("12")
concatenated_df[['Evento', 'Evento_det']] = concatenated_df.apply(casos_event,axis=1,result_type='expand')
#context.log.info("13")
#Agrupamos las columnas especificas y calculamos el num de transacciones y socios
result = concatenated_df.groupby(['event_date','Evento','Evento_det']).agg(trx=('event_timestamp',pd.Series.nunique),Socios=('user_id',pd.Series.nunique)).reset_index()
#renombramos las columnas
#context.log.info("14")
result.rename(columns={'event_date': 'Fecha'},inplace=True)
#context.log.info("15")
df_hist_app_Taggeos_bi = result.copy()
#context.log.info("conexion")
params_sql = sql_params()
engine = create_engine('mssql+pyodbc:///?odbc_connect={}'.format(params_sql),fast_executemany=True)
try:
with engine.connect() as conn:
try:
df_hist_app_Taggeos_bi.to_sql('mTaggeos_PBI', conn, if_exists='append', index=False,schema='bi')
return df_hist_app_Taggeos_bi
except:
df_hist_app_Taggeos_bi.to_sql('mTaggeos_PBI', conn, if_exists='replace', index=False,schema='bi')
return df_hist_app_Taggeos_bi
except Exception as e:
print("Error")
print("Excepción:", str(e))
return e
------------- THE FEEDBACK --------------------
2024-04-12 02:31:11,012 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2024-04-12 02:31:11,012 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2024-04-12 02:31:11,116 - alembic.runtime.migration - INFO - Running stamp_revision -> 46b412388816
2024-04-12 02:31:11,126 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2024-04-12 02:31:11,126 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2024-04-12 02:31:11,245 - alembic.runtime.migration - INFO - Running stamp_revision -> 46b412388816
2024-04-12 02:31:11,257 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2024-04-12 02:31:11,257 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2024-04-12 02:31:11,373 - alembic.runtime.migration - INFO - Running stamp_revision -> 46b412388816
2024-04-12 02:31:11,385 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2024-04-12 02:31:11,386 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2024-04-12 02:31:11,498 - alembic.runtime.migration - INFO - Running stamp_revision -> 46b412388816
2024-04-12 02:31:11,515 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2024-04-12 02:31:11,516 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2024-04-12 02:31:11,622 - alembic.runtime.migration - INFO - Running stamp_revision -> 46b412388816
2024-04-12 02:31:11,632 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2024-04-12 02:31:11,632 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2024-04-12 02:31:11,810 - alembic.runtime.migration - INFO - Running stamp_revision -> 46b412388816
2024-04-12 02:31:11,822 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2024-04-12 02:31:11,823 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2024-04-12 02:31:11,926 - alembic.runtime.migration - INFO - Running stamp_revision -> 46b412388816
2024-04-12 02:31:11,935 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2024-04-12 02:31:11,936 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2024-04-12 02:31:12,032 - alembic.runtime.migration - INFO - Running stamp_revision -> 46b412388816
2024-04-12 02:31:12,043 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2024-04-12 02:31:12,043 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2024-04-12 02:31:12,132 - alembic.runtime.migration - INFO - Running stamp_revision -> 46b412388816
2024-04-12 02:31:12,145 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2024-04-12 02:31:12,146 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2024-04-12 02:31:12,238 - alembic.runtime.migration - INFO - Running stamp_revision -> 46b412388816
2024-04-12 02:31:12,259 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2024-04-12 02:31:12,259 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2024-04-12 02:31:12,361 - alembic.runtime.migration - INFO - Running stamp_revision -> 46b412388816
2024-04-12 02:31:12,376 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2024-04-12 02:31:12,376 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.
2024-04-12 02:31:12,472 - alembic.runtime.migration - INFO - Running stamp_revision -> 46b412388816
2024-04-12 02:31:12,484 - alembic.runtime.migration - INFO - Context impl SQLiteImpl.
2024-04-12 02:31:12,484 - alembic.runtime.migration - INFO - Will assume non-transactional DDL.-
Beta Was this translation helpful? Give feedback.
All reactions