Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tasks taking too long time after 2.7.0 Airflow update #33688

Closed
potiuk opened this issue Aug 24, 2023 Discussed in #33664 · 53 comments
Closed

Tasks taking too long time after 2.7.0 Airflow update #33688

potiuk opened this issue Aug 24, 2023 Discussed in #33664 · 53 comments
Labels
affected_version:2.7 Issues Reported for 2.7 area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug
Milestone

Comments

@potiuk
Copy link
Member

potiuk commented Aug 24, 2023

Discussed in #33664

Originally posted by jaetma August 23, 2023
Hi community!
We have been using Airflow quite a long time, and right after updating from version 2.6.3 to 2.7.0, the running time increased extremely high. Tasks that used to take 15 seconds to complete now are taking 10 minutes!

This is problematic because there are more tasks being queued than those that are finished.

We've detected this issue in 3 projects running with Airflow, across 2 instances in Kubernetes and 1 with Docker.
Illustrating image:
image

@potiuk potiuk added this to the Airflow 2.7.1 milestone Aug 24, 2023
@potiuk
Copy link
Member Author

potiuk commented Aug 24, 2023

From the discussion:

HI @jaetma ,
Could you help answer the following questions that could help us check this more?

  1. do you have some monitoring in place? Can you check what is the resource consumption for each of the Airflow components/pods?
  2. Have you added any new DAGs that could be eating up a lot of resources and hence tasks are contending for resources?
  3. What do the task logs say? Are they stuck at some specific step always?
  4. How and where is Airflow deployed?
  5. Which executor? Celery/Kubernetes/?

Answers:

  1. Currently, we only have this DAG running, with a maximum of 4 DAG runs, and those are taking all the machine resources. Tasks are taking over 2 minutes to start, and when running, they are also slow:
  2. No, recently we have not added any DAGs with high resource consumption.
  3. Logs are taking too much time to load in the Airflow UI. They are always stuck before starting, in the queued state.
  4. Airflow is deployed on a server with docker-compose.
  5. We use Celery as the executor.

image

image

Today we did a downgrade to 2.6.3 again, the issue seems to be resolved!

image

@potiuk
Copy link
Member Author

potiuk commented Aug 24, 2023

I converted that discussion to an issue, as I think it is something we should at least try to diagnose before 2.7.1 - and maybe @jaetma you could help with it and (assuming we will find a root cause) you could test it with release candidate of 2.7.1 that should go out soon).

@jaetma - woudl it be possible to get some logs from the time when it was slow - namely logs from workers and task, ideally maybe getting logs from the same task from before and after change to 2.7.0 - showing what's going on and maybe comparing what was happening then? Seeing the same logs from 2.7.0 and 2.6.3 could help us to make some hypothesis what was wrong.

Ideally maybe two gists showing logs from similar run of the same task?

@pankajkoti
Copy link
Member

pankajkoti commented Aug 24, 2023

Also @jaetma, is it that the tasks remain queued for a long time before they begin execution? Can you check if the scheduler logs have something saying as well?

@jaetma
Copy link

jaetma commented Aug 25, 2023

@potiuk

Thanks for your response, I can test in our dev environ!

After the downgrade the task run time is normal:
image

The next line are from Ariflow tasks logs 2.6.3 and 2.7.0:

AIRFLOW 2.6.3 TASK RUN LOGS

TASK: fetch_header

9b9bc0eb214e
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195/task_id=fetch_header/attempt=2.log
[2023-08-23, 19:28:45 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:28:45 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:28:45 UTC] {taskinstance.py:1308} INFO - Starting attempt 2 of 2
[2023-08-23, 19:28:45 UTC] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): fetch_header> on 2023-08-22 01:36:30+00:00
[2023-08-23, 19:28:45 UTC] {standard_task_runner.py:57} INFO - Started process 4198 to run task
[2023-08-23, 19:28:45 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'fetch_header', 'mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195', '--job-id', '1182100', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmpqdgbw1tb']
[2023-08-23, 19:28:45 UTC] {standard_task_runner.py:85} INFO - Job 1182100: Subtask fetch_header
[2023-08-23, 19:28:45 UTC] {task_command.py:410} INFO - Running <TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [running]> on host 9b9bc0eb214e
[2023-08-23, 19:28:46 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='fetch_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:30+00:00' AIRFLOW_CTX_TRY_NUMBER='2' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195'
[2023-08-23, 19:28:46 UTC] {logging_mixin.py:150} INFO - { censored }
[2023-08-23, 19:28:46 UTC] {python.py:183} INFO - Done. Returned value was: { censored }
[2023-08-23, 19:28:46 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=fetch_header, execution_date=20230822T013630, start_date=20230823T192845, end_date=20230823T192846
[2023-08-23, 19:28:46 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-08-23, 19:28:46 UTC] {taskinstance.py:2653} INFO - 1 downstream tasks scheduled from follow-on schedule check


TASK: add_fixed_values_header

9b9bc0eb214e
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195/task_id=add_fixed_values_header/attempt=2.log
[2023-08-23, 19:28:51 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.add_fixed_values_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:28:51 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.add_fixed_values_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:28:51 UTC] {taskinstance.py:1308} INFO - Starting attempt 2 of 2
[2023-08-23, 19:28:51 UTC] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): add_fixed_values_header> on 2023-08-22 01:36:30+00:00
[2023-08-23, 19:28:51 UTC] {standard_task_runner.py:57} INFO - Started process 4240 to run task
[2023-08-23, 19:28:51 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'add_fixed_values_header', 'mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195', '--job-id', '1182114', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmpqkq5pev4']
[2023-08-23, 19:28:51 UTC] {standard_task_runner.py:85} INFO - Job 1182114: Subtask add_fixed_values_header
[2023-08-23, 19:28:52 UTC] {task_command.py:410} INFO - Running <TaskInstance: mydagname.add_fixed_values_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [running]> on host 9b9bc0eb214e
[2023-08-23, 19:28:52 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='add_fixed_values_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:30+00:00' AIRFLOW_CTX_TRY_NUMBER='2' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195'
[2023-08-23, 19:28:52 UTC] {python.py:183} INFO - Done. Returned value was: { censored }
[2023-08-23, 19:28:52 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=add_fixed_values_header, execution_date=20230822T013630, start_date=20230823T192851, end_date=20230823T192852
[2023-08-23, 19:28:53 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-08-23, 19:28:53 UTC] {taskinstance.py:2653} INFO - 1 downstream tasks scheduled from follow-on schedule check


TASK: schema_validation_header

9b9bc0eb214e
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195/task_id=schema_validation_header/attempt=1.log
[2023-08-23, 19:29:01 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.schema_validation_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:29:01 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.schema_validation_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:29:01 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-08-23, 19:29:01 UTC] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): schema_validation_header> on 2023-08-22 01:36:30+00:00
[2023-08-23, 19:29:01 UTC] {standard_task_runner.py:57} INFO - Started process 4293 to run task
[2023-08-23, 19:29:01 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'schema_validation_header', 'mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195', '--job-id', '1182141', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmpx34mwefz']
[2023-08-23, 19:29:01 UTC] {standard_task_runner.py:85} INFO - Job 1182141: Subtask schema_validation_header
[2023-08-23, 19:29:02 UTC] {task_command.py:410} INFO - Running <TaskInstance: mydagname.schema_validation_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [running]> on host 9b9bc0eb214e
[2023-08-23, 19:29:03 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='schema_validation_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:30+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195'
[2023-08-23, 19:29:03 UTC] {python.py:183} INFO - Done. Returned value was: { censored }
[2023-08-23, 19:29:03 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=schema_validation_header, execution_date=20230822T013630, start_date=20230823T192901, end_date=20230823T192903
[2023-08-23, 19:29:03 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-08-23, 19:29:03 UTC] {taskinstance.py:2653} INFO - 1 downstream tasks scheduled from follow-on schedule check


TASK: generate_sql_header

9b9bc0eb214e
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195/task_id=generate_sql_header/attempt=1.log
[2023-08-23, 19:29:08 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.generate_sql_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:29:08 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.generate_sql_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:29:08 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-08-23, 19:29:08 UTC] {taskinstance.py:1327} INFO - Executing <Task(PythonOperator): generate_sql_header> on 2023-08-22 01:36:30+00:00
[2023-08-23, 19:29:08 UTC] {standard_task_runner.py:57} INFO - Started process 4345 to run task
[2023-08-23, 19:29:08 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'generate_sql_header', 'mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195', '--job-id', '1182162', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmp03kw7z3y']
[2023-08-23, 19:29:08 UTC] {standard_task_runner.py:85} INFO - Job 1182162: Subtask generate_sql_header
[2023-08-23, 19:29:08 UTC] {task_command.py:410} INFO - Running <TaskInstance: mydagname.generate_sql_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [running]> on host 9b9bc0eb214e
[2023-08-23, 19:29:09 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='generate_sql_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:30+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195'
[2023-08-23, 19:29:09 UTC] {python.py:183} INFO - Done. censored
[2023-08-23, 19:29:09 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=generate_sql_header, execution_date=20230822T013630, start_date=20230823T192908, end_date=20230823T192909
[2023-08-23, 19:29:10 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-08-23, 19:29:10 UTC] {taskinstance.py:2653} INFO - 1 downstream tasks scheduled from follow-on schedule check


TASK: execute_sql

9b9bc0eb214e
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195/task_id=execute_sql/attempt=1.log
[2023-08-23, 19:29:14 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.execute_sql mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:29:14 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.execute_sql mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [queued]>
[2023-08-23, 19:29:14 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 1
[2023-08-23, 19:29:14 UTC] {taskinstance.py:1327} INFO - Executing <Task(PostgresOperator): execute_sql> on 2023-08-22 01:36:30+00:00
[2023-08-23, 19:29:14 UTC] {standard_task_runner.py:57} INFO - Started process 4380 to run task
[2023-08-23, 19:29:14 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'execute_sql', 'mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195', '--job-id', '1182181', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmp_7byh4dx']
[2023-08-23, 19:29:14 UTC] {standard_task_runner.py:85} INFO - Job 1182181: Subtask execute_sql
[2023-08-23, 19:29:15 UTC] {task_command.py:410} INFO - Running <TaskInstance: mydagname.execute_sql mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [running]> on host 9b9bc0eb214e
[2023-08-23, 19:29:15 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='execute_sql' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:30+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195'
[2023-08-23, 19:29:15 UTC] {sql.py:265} INFO - censored
[2023-08-23, 19:29:16 UTC] {base.py:73} INFO - Using connection ID 'conn' for task execution.
[2023-08-23, 19:29:16 UTC] {base.py:73} INFO - Using connection ID 'conn' for task execution.
[2023-08-23, 19:29:16 UTC] {sql.py:374} INFO - censored
[2023-08-23, 19:29:16 UTC] {sql.py:383} INFO - Rows affected: 1
[2023-08-23, 19:29:16 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=execute_sql, execution_date=20230822T013630, start_date=20230823T192914, end_date=20230823T192916
[2023-08-23, 19:29:16 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-08-23, 19:29:16 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check









AIRFLOW 2.7.0 TASK RUN LOGS

TASK: fetch_header

90c66b7612c1
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083/task_id=fetch_header/attempt=1.log
[2023-08-23, 18:23:04 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:23:04 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:23:04 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2023-08-23, 18:23:04 UTC] {taskinstance.py:1382} INFO - Executing <Task(PythonOperator): fetch_header> on 2023-08-22 01:36:29+00:00
[2023-08-23, 18:23:04 UTC] {standard_task_runner.py:57} INFO - Started process 32217 to run task
[2023-08-23, 18:23:04 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'fetch_header', 'mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083', '--job-id', '1180119', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmp6qr0sdyy']
[2023-08-23, 18:23:04 UTC] {standard_task_runner.py:85} INFO - Job 1180119: Subtask fetch_header
[2023-08-23, 18:23:35 UTC] {task_command.py:415} INFO - Running <TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [running]> on host 90c66b7612c1
[2023-08-23, 18:24:06 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='fetch_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:29+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083'
[2023-08-23, 18:24:06 UTC] {logging_mixin.py:151} INFO - {censored }
[2023-08-23, 18:24:06 UTC] {python.py:194} INFO - Done. Returned value was: { censored }
[2023-08-23, 18:24:06 UTC] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=fetch_header, execution_date=20230822T013629, start_date=20230823T182304, end_date=20230823T182406
[2023-08-23, 18:24:06 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-08-23, 18:24:06 UTC] {taskinstance.py:2784} INFO - 1 downstream tasks scheduled from follow-on schedule check


TASK: add_fixed_values_header

90c66b7612c1
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083/task_id=add_fixed_values_header/attempt=1.log
[2023-08-23, 18:25:22 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.add_fixed_values_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:25:22 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.add_fixed_values_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:25:22 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2023-08-23, 18:25:22 UTC] {taskinstance.py:1382} INFO - Executing <Task(PythonOperator): add_fixed_values_header> on 2023-08-22 01:36:29+00:00
[2023-08-23, 18:25:22 UTC] {standard_task_runner.py:57} INFO - Started process 32288 to run task
[2023-08-23, 18:25:22 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'add_fixed_values_header', 'mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083', '--job-id', '1180123', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmpxx8qvkq_']
[2023-08-23, 18:25:22 UTC] {standard_task_runner.py:85} INFO - Job 1180123: Subtask add_fixed_values_header
[2023-08-23, 18:25:48 UTC] {task_command.py:415} INFO - Running <TaskInstance: mydagname.add_fixed_values_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [running]> on host 90c66b7612c1
[2023-08-23, 18:26:12 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='add_fixed_values_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:29+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083'
[2023-08-23, 18:26:12 UTC] {python.py:194} INFO - Done. Returned value was: { censored }
[2023-08-23, 18:26:12 UTC] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=add_fixed_values_header, execution_date=20230822T013629, start_date=20230823T182522, end_date=20230823T182612
[2023-08-23, 18:26:12 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-08-23, 18:26:12 UTC] {taskinstance.py:2784} INFO - 1 downstream tasks scheduled from follow-on schedule check


TASK: schema_validation_header

90c66b7612c1
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083/task_id=schema_validation_header/attempt=1.log
[2023-08-23, 18:27:28 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.schema_validation_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:27:28 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.schema_validation_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:27:28 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2023-08-23, 18:27:28 UTC] {taskinstance.py:1382} INFO - Executing <Task(PythonOperator): schema_validation_header> on 2023-08-22 01:36:29+00:00
[2023-08-23, 18:27:28 UTC] {standard_task_runner.py:57} INFO - Started process 32360 to run task
[2023-08-23, 18:27:28 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'schema_validation_header', 'mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083', '--job-id', '1180127', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmp7fjqwb36']
[2023-08-23, 18:27:28 UTC] {standard_task_runner.py:85} INFO - Job 1180127: Subtask schema_validation_header
[2023-08-23, 18:27:54 UTC] {task_command.py:415} INFO - Running <TaskInstance: mydagname.schema_validation_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [running]> on host 90c66b7612c1
[2023-08-23, 18:28:20 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='schema_validation_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:29+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083'
[2023-08-23, 18:28:20 UTC] {python.py:194} INFO - Done. Returned value was: { censored }
[2023-08-23, 18:28:20 UTC] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=schema_validation_header, execution_date=20230822T013629, start_date=20230823T182728, end_date=20230823T182820
[2023-08-23, 18:28:20 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-08-23, 18:28:20 UTC] {taskinstance.py:2784} INFO - 1 downstream tasks scheduled from follow-on schedule check


TASK: generate_sql_header

*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083/task_id=generate_sql_header/attempt=1.log
[2023-08-23, 18:29:32 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.generate_sql_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:29:32 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.generate_sql_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:29:32 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2023-08-23, 18:29:32 UTC] {taskinstance.py:1382} INFO - Executing <Task(PythonOperator): generate_sql_header> on 2023-08-22 01:36:29+00:00
[2023-08-23, 18:29:32 UTC] {standard_task_runner.py:57} INFO - Started process 32423 to run task
[2023-08-23, 18:29:32 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'generate_sql_header', 'mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083', '--job-id', '1180131', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmpq9g7hxl_']
[2023-08-23, 18:29:32 UTC] {standard_task_runner.py:85} INFO - Job 1180131: Subtask generate_sql_header
[2023-08-23, 18:30:00 UTC] {task_command.py:415} INFO - Running <TaskInstance: mydagname.generate_sql_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [running]> on host 90c66b7612c1
[2023-08-23, 18:30:41 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='generate_sql_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:29+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083'
[2023-08-23, 18:30:41 UTC] {python.py:194} INFO - Done. Returned value was: censored
[2023-08-23, 18:30:41 UTC] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=generate_sql_header, execution_date=20230822T013629, start_date=20230823T182932, end_date=20230823T183041
[2023-08-23, 18:30:42 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-08-23, 18:30:42 UTC] {taskinstance.py:2784} INFO - 0 downstream tasks scheduled from follow-on schedule check


TASK: execute_sql

90c66b7612c1
*** Found local files:
***   * /opt/airflow/logs/dag_id=mydagname/run_id=mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083/task_id=execute_sql/attempt=1.log
[2023-08-23, 18:32:47 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: mydagname.execute_sql mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:32:47 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: mydagname.execute_sql mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [queued]>
[2023-08-23, 18:32:47 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 1
[2023-08-23, 18:32:47 UTC] {taskinstance.py:1382} INFO - Executing <Task(PostgresOperator): execute_sql> on 2023-08-22 01:36:29+00:00
[2023-08-23, 18:32:47 UTC] {standard_task_runner.py:57} INFO - Started process 32534 to run task
[2023-08-23, 18:32:47 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'mydagname', 'execute_sql', 'mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083', '--job-id', '1180141', '--raw', '--subdir', 'DAGS_FOLDER/example/mydagname.py', '--cfg-path', '/tmp/tmpfmvj5dnz']
[2023-08-23, 18:32:47 UTC] {standard_task_runner.py:85} INFO - Job 1180141: Subtask execute_sql
[2023-08-23, 18:33:22 UTC] {task_command.py:415} INFO - Running <TaskInstance: mydagname.execute_sql mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [running]> on host 90c66b7612c1
[2023-08-23, 18:33:43 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='execute_sql' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:29+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083'
[2023-08-23, 18:33:43 UTC] {sql.py:274} INFO - Executing: censored
[2023-08-23, 18:33:43 UTC] {base.py:73} INFO - Using connection ID 'conn' for task execution.
[2023-08-23, 18:33:43 UTC] {base.py:73} INFO - Using connection ID 'conn' for task execution.
[2023-08-23, 18:33:43 UTC] {sql.py:418} INFO - Running statement: censored
[2023-08-23, 18:33:43 UTC] {sql.py:427} INFO - Rows affected: 1
[2023-08-23, 18:33:43 UTC] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=mydagname, task_id=execute_sql, execution_date=20230822T013629, start_date=20230823T183247, end_date=20230823T183343
[2023-08-23, 18:33:43 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-08-23, 18:33:44 UTC] {taskinstance.py:2784} INFO - 0 downstream tasks scheduled from follow-on schedule check

@jaetma
Copy link

jaetma commented Aug 25, 2023

Hi @pankajkoti!

Sadly, I cannot obtain the scheduler logs from the time it was slow :/
However, you're correct: the tasks remained queued for a long time before they began execution. Every task was stuck in a queued state for ~2 minutes before starting its execution. Once running, they took several more minutes to finish.
You can see that in the logs I share

This resulted in a DAG run time of 10 minutes!

@potiuk
Copy link
Member Author

potiuk commented Aug 25, 2023

OK. That gives us some clue. It seems that there are huge delays (20-30 seconds) between some steps that in 2.6.3 have been almost immediate:

2.7.0

[2023-08-23, 18:23:04 UTC] {standard_task_runner.py:85} INFO - Job 1180119: Subtask fetch_header
+ 29 s
[2023-08-23, 18:23:35 UTC] {task_command.py:415} INFO - Running <TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [running]> on host 90c66b7612c1
+ 29 s
[2023-08-23, 18:24:06 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='fetch_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:29+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083'
[2023-08-23, 18:24:06 UTC] {logging_mixin.py:151} INFO - {censored }

Compare it with 2.6.3

[2023-08-23, 19:28:45 UTC] {standard_task_runner.py:85} INFO - Job 1182100: Subtask fetch_header
[2023-08-23, 19:28:45 UTC] {task_command.py:410} INFO - Running <TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195 [running]> on host 9b9bc0eb214e
[2023-08-23, 19:28:46 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_EMAIL='[email protected]' AIRFLOW_CTX_DAG_OWNER='admin' AIRFLOW_CTX_DAG_ID='mydagname' AIRFLOW_CTX_TASK_ID='fetch_header' AIRFLOW_CTX_EXECUTION_DATE='2023-08-22T01:36:30+00:00' AIRFLOW_CTX_TRY_NUMBER='2' AIRFLOW_CTX_DAG_RUN_ID='mydagname__bot_extract_sales__v10678132plzv__2023-08-21T20:36:30.234195'
[2023-08-23, 19:28:46 UTC] {logging_mixin.py:150} INFO - { censored }

So at least we might come up with some hypotheses. I will take a closer look but maybe others migh have some ideas.

@pankajkoti
Copy link
Member

I think we might need some more insights on the specific DAG (what it does) or the dependencies.
In general, for our deployments, our suite that triggers a few DAGs using TriggerDagOperator with numerous tasks within them including deferrable operators (execution duration ~40-45mins), I have not observed any such slowdown after upgrading to 2.7.0

Screenshot 2023-08-25 at 2 46 31 PM

@sunank200
Copy link
Collaborator

@jaetma could you share the DAG you are using? That way I can try to replicate it locally. As @pankajkoti mentioned in our deployments I cannot see any slowness.

@potiuk
Copy link
Member Author

potiuk commented Aug 25, 2023

Yes. I do not suspect it's a "widespread" problem - IMHO this is something environmental on your side that triggers it - some dependency, that introduces a lot of overhead, specific database connectivity you have, some resource limiting you have in your environment @jaetma.

Few questions: - can you share a very detailed description of your environment - including versions of OS you run it on, whether the machine are run in some cloud or bare metal whether you have some virtualisation techniques, how your docker compose si configured, do you have any special resources limits applied (memory, sockets, I/O/CPU), what is the database you are using, whether the database has some limits etc. do you use PGBouncer if you use Postgres.

My question is also - is airflow version the ONLY thing that changes when you migrate? Do you also migrate other things together with airflow (docker/docker/compose/different DB instance/different account with different resource limits etc.) . I want to rule out that Airflow is the culprit - maybe together with Airflow migration. You also migrated other things and they are the guilty ones?

@potiuk
Copy link
Member Author

potiuk commented Aug 25, 2023

I looked shortly and:

The logs below are coming from the same forked process.

Between this:

[2023-08-23, 18:23:04 UTC] {standard_task_runner.py:85} INFO - Job 1180119: Subtask fetch_header

and this:

[2023-08-23, 18:23:35 UTC] {task_command.py:415} INFO - Running <TaskInstance: mydagname.fetch_header mydagname__bot_extract_sales__v10678108plzv__2023-08-21T20:36:29.046083 [running]> on host 90c66b7612c1

the following things happen:

  1. setproctitle -> setting title of the process
  2. setting few environment variables: _AIRFLOW_PARSING_CONTEXT_DAG_ID, _AIRFLOW_PARSING_CONTEXT_TASK_ID
  3. parsing command parameters ("airflow task run --raw ...."
  4. loading config file prepared before the process is forked
  5. on_starting() listener is fired
  6. dag/tasks are parsed from DAG files or if you are using pickling (likely now) from pickled representation

From those actions, I could likely exclude firs 4 (unless your filesystem is broken) and by the deduction method if we exclude the impossible, what remains must be the reason. So it's either 5) or 6).

My best guess @jaetma is that either:

Hypothesis 1

Your on_start_listener is hanging on something -> not very likely that you already have some listener but since you were on 2.6, it's possible.

Hypothesis 2

Parsing your DAGs inside the Celery worker is hanging on something. The most likely reason is that you have some TOP level code that (for example) does a networking call that hangs.

My intelligent guess and the 30 seconds suggest that your DNS is misconfigured/broken or networking prevents it from responding quickly.

Hanging on DNS call is quite plausible hypothesis. From what I remember 30 seconds is often default DNS resolution timeout. So my best guest is that somewhere during your migration your environment's networking gets broken in your Docker Compose and the DNS you have is not working properly, thus making whatever you do on Top level of your DAG (which BTW you should not if you do) slow to run.

Obligatory Haiku:

image

Hypothesis 2a

Another variant of the Hypothesis 2) if you are using Airflow Variables at the top of your DAG code or any other database access (which BTW you should not if you do), this might lead to a database connection. And opening new DB connection might be problematic from your server point of view if there are already many connections opened. You will see in your database server by high number of opened connections. And Postgres does not cope well with the number of connection airflow opens so if you use Postgres, and do not have Pgbouncer between Airflow and Postgres - this might be the reason. I would love if you could check it because I have a reason to believe we could have many more connections opened in Airflow 2.7 (I have just a suspicion about it). So if this could be checked - you should see a much larger number of connections to your DB if my guess is right when you run 2.7). If you could check that hypothesis, that would be great.

@mpolatcan
Copy link
Contributor

mpolatcan commented Aug 27, 2023

I have encountered same problem on our production environment that uses KubernetesExecutor. We have 150+ DAGs and these DAGs generated by single generator DAG file. Interesting part is that worker pod memory usages reached to 3x memory of previous versions. Previously we gave 500 MB memory to worker pod. Now, we had to gave 1.5 GB and in general worker pod doesn't do anything heavily so i am shocked when I see memory usages :( Also, Airflow Scheduler dies frequently I didn't understood really well. I monitored database, scheduler pod memory usages etc. Everything is normal but scheduler dies frequently and DAGBag import time increased. So, I downgraded to previous version 2.6.3 and problem is solved for now 👍 ☺️

P.S We are using 3 replicas PgBouncer in front of our AWS RDS instance.

@jaetma
Copy link

jaetma commented Aug 28, 2023

Hello @sunank200

This is the DAG, its just a simple query insertion:

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import timedelta, datetime
import logging
import os

logger = logging.getLogger("airflow.task")

default_run_days = 1

interval = {
    'dev': None,
    'qa': None,
    'prod': None
}

emails = {
    'dev': ['{ censored }'],
    'qa': ['{ censored }'],
    'prod': ['{ censored }']
}

# default arguments
default_args = {
    'owner': 'admin',
    'depends_on_past': False,
    'start_date': days_ago(0),
    'email': emails[os.environ['ENVIRONMENT']],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=2)
}

def on_failure_callback(context):
    from sendgrid import SendGridAPIClient
    from sendgrid.helpers.mail import Mail
    import traceback

    ti = context['task_instance']
    print("FAILURE CALLBACK")

    exception = context.get('exception')

    formatted_exception = ''.join(
        traceback.format_exception(etype=type(exception), value=exception, tb=exception.__traceback__)
    ).strip()
    
    message = Mail(
        from_email=os.environ.get('SENDGRID_MAIL_FROM'),
        to_emails=emails[os.environ['ENVIRONMENT']],
        subject=f'Airflow error: {ti.dag_id}, Env:' + str(os.environ['ENVIRONMENT']),
        html_content=f"""
        <p>ENV: {str(os.environ['ENVIRONMENT'])}</p>
        <p>DAG ID: {ti.dag_id}</p>
        <p>TASK ID: {ti.task_id}</p>
        <p>DATE: {str(datetime.now())}</p>
        <p>EXCEPTION: {formatted_exception}</p>
        """
    )
    sg = SendGridAPIClient(open(os.environ.get('SENDGRID_API_KEY')).read().replace('\n', ''))
    sg.send(message)

# initializing dag
dag = DAG(
    'sale_transaction',
    default_args=default_args,
    catchup=False,
    schedule_interval=interval[os.environ['ENVIRONMENT']],
    max_active_runs=20,
    tags=['sale_transaction'],
)

def fetch_header(ti, **kwargs):
    record = kwargs["dag_run"].conf
    print(record)
    output_json = { 
        "trx_id": 1, 
        "trx_tipo_trx": record['acquirer']['transaction_type_code'],
        "trx_emisor": record['acquirer']['acquirer_code'],
        "trx_comercio": record['merchant']['merchant_code'], 
        "trx_local": record['merchant']['store_code'], 
        "trx_pos": record['merchant']['terminal_code'], 
        "trx_pais": record['merchant']['country'],
        "trx_boleta": record['transaction']['document_number'], 
        "trx_fecha": datetime.utcfromtimestamp(record['transaction']['requested_at']).strftime('%Y-%m-%d %H:%M:%S'), 
        "trx_hora": datetime.utcfromtimestamp(record['transaction']['requested_at']).strftime('%Y-%m-%d %H:%M:%S'), 
        "trx_numero": record['transaction']['transaction_number'], 
        "trx_estado": record['status']['status_code'], 
        "trx_cod_rechazo": record['status']['rejected_code'], 
        "trx_glosa_rech": record['status']['rejected_description'], 
        "trx_estado_portal": record['status']['portal'], 
        "trx_version": record['status']['version'], 
        "trx_obs": record['status']['obs'],
        "trx_tarjeta": record['acquirer']['card_type_code'],
        "trx_monto": record['payment']['amount'],
        "trx_cuotas": record['payment']['installments'],
        "trx_ult4_dig": 0, #record['payment']['card_number'][-4:], #GET LAST 4 characters
        "trx_ts_req": (datetime.utcfromtimestamp(record['transaction']['accounted_at']).strftime('%Y-%m-%d %H:%M:%S')) if record['transaction']['accounted_at'] else None, 
        "trx_ts_rsp": (datetime.utcfromtimestamp(record['transaction']['accounted_at']).strftime('%Y-%m-%d %H:%M:%S')) if record['transaction']['accounted_at'] else None, 
        "trx_numtarjeta": record['payment']['card_number'],
        "trx_codautor": record['payment']['authorization_code'],
        "trx_mpago": record['payment']['m'],
        "trx_pin": record['payment']['pin'],
        "trx_cadena": record['merchant']['channel_code'], 
        "trx_vend_caj": record['merchant']['operator_code'], 
        "trx_fechacont": datetime.utcfromtimestamp(record['transaction']['requested_at']).strftime('%Y-%m-%d 00:00:00'), 
        "trx_oc_merchant_code": record['agoraweb']['purchase_order_merchant_code'], 
        "trx_oc_merchant_description": record['agoraweb']['purchase_order_merchant_description'], 
        "trx_oc_order_code": record['agoraweb']['purchase_order_code'], 
        "trx_oc_order_amount": record['agoraweb']['purchase_order_monto'], 
        "trx_usuario1": record['metadata']['1'],
        "trx_usuario2": record['metadata']['2'],
        "trx_usuario3": record['metadata']['3'],
        "trx_usuario4": record['metadata']['4'],
        "trx_usuario5": record['metadata']['5'],
        "trx_usuario6": record['metadata']['6'],
        "trx_usuario7": record['metadata']['7'],
        "trx_usuario8": record['metadata']['8'],
        "trx_llave": record['metadata']['ref_id'],
        "trx_id_con": record['metadata']['ref_id_con'],
        "trx_journal": record['metadata']['journal'],
    }
    return output_json


def add_fixed_values_header(ti, **kwargs):
    record = ti.xcom_pull(task_ids=['fetch_header'])[0]
    record['trx_ip'] = 0
    record['trx_vuelto'] = 'NULL'
    record['trx_donacion'] = 'NULL'
    record['trx_nro_lote'] = 'NULL'
    record['trx_lote_abono_com'] = 0
    record['trx_lote_abono_ban'] = 0
    return record


def schema_validation_header(ti):
    from schema import Schema, And, Use
    record = ti.xcom_pull(task_ids=['add_fixed_values_header'])[0]
    conf_schema = Schema({
        "trx_id": And(Use(int)),
        "trx_tipo_trx": And(Use(str)),
        "trx_emisor": And(Use(str)),
        "trx_comercio": And(Use(str)),
        "trx_local": And(Use(int)),
        "trx_pos": And(Use(int)),
        "trx_boleta": And(Use(str)),
        "trx_fecha": And(Use(str)),
        "trx_hora": And(Use(str)),
        "trx_numero": And(Use(str)),
        "trx_estado": And(Use(int)),
        "trx_cod_rechazo": And(Use(str)),
        "trx_glosa_rech": And(Use(str)),
        "trx_tarjeta": And(Use(str)),
        "trx_monto": And(Use(int)),
        "trx_cuotas": And(Use(int)),
        "trx_ult4_dig": And(Use(str)),
        "trx_ts_req": And(Use(str)),
        "trx_ts_rsp": And(Use(str)),
        "trx_numtarjeta": And(Use(str)),
        "trx_codautor": And(Use(str)),
        "trx_cadena": And(Use(int)),
        "trx_vend_caj": And(Use(str)),
        "trx_fechacont": And(Use(str)),
        "trx_oc_merchant_code": And(Use(str)),
        "trx_oc_merchant_description": And(Use(str)),
        "trx_oc_order_code": And(Use(str)),
        "trx_oc_order_amount": And(Use(str)),
        #FIXED
        "trx_version": And(Use(str)),
        "trx_journal": And(Use(str)),
        "trx_id_con": And(Use(str)),
        "trx_pais": And(Use(str)),
        "trx_estado_portal": And(Use(int)),
        "trx_llave": And(Use(str)),
        "trx_mpago": And(Use(str)),
        "trx_pin": And(Use(str)),
        "trx_ip": And(Use(str)),
        "trx_vuelto": And(Use(str)),
        "trx_donacion": And(Use(str)),
        "trx_nro_lote": And(Use(str)),
        "trx_usuario1": And(Use(str)),
        "trx_usuario2": And(Use(str)),
        "trx_usuario3": And(Use(str)),
        "trx_usuario4": And(Use(str)),
        "trx_usuario5": And(Use(str)),
        "trx_usuario6": And(Use(str)),
        "trx_usuario7": And(Use(str)),
        "trx_usuario8": And(Use(str)),
        "trx_obs": And(Use(str)),
        "trx_lote_abono_com": And(Use(int)),
        "trx_lote_abono_ban": And(Use(int)),
    })
    conf_schema.validate(record)
    return record


def generate_sql_header(ti):
    record = ti.xcom_pull(task_ids=['schema_validation_header'])[0]
    SQL = ""
    table = "{ censored }"
    SQL+='{ censored }'
    SQL+="""
        INSERT INTO {table} (
            trx_id,
            trx_tipo_trx, trx_emisor, trx_comercio, trx_local, 
            trx_pos, trx_boleta, trx_fecha, trx_hora,
            trx_numero, trx_estado, trx_cod_rechazo, trx_glosa_rech,
            trx_tarjeta, trx_monto, trx_cuotas, trx_ult4_dig,
            trx_ts_req, trx_ts_rsp, trx_numtarjeta, trx_codautor,
            trx_cadena, trx_vend_caj, trx_fechacont, trx_oc_merchant_code,
            trx_oc_merchant_description, trx_oc_order_code, trx_oc_order_amount, trx_version,
            trx_journal, trx_id_con, trx_pais, trx_estado_portal, 
            trx_llave, trx_mpago, trx_pin, trx_ip, 
            trx_vuelto, trx_donacion, trx_nro_lote, trx_usuario1,
            trx_usuario2, trx_usuario3, trx_usuario4, trx_usuario5,
            trx_usuario6, trx_usuario7, trx_usuario8, trx_obs,
            trx_lote_abono_com, trx_lote_abono_ban
        ) VALUES (
            { censored },
            '{trx_tipo_trx}', '{trx_emisor}', {trx_comercio}, {trx_local}, 
            {trx_pos}, {trx_boleta}, '{trx_fecha}', '{trx_hora}',
            {trx_numero}, {trx_estado}, {trx_cod_rechazo}, '{trx_glosa_rech}',
            '{trx_tarjeta}', {trx_monto}, {trx_cuotas}, '{trx_ult4_dig}',
            {trx_ts_req}, {trx_ts_rsp}, '{trx_numtarjeta}', {trx_codautor},
            {trx_cadena}, {trx_vend_caj}, '{trx_fechacont}', {trx_oc_merchant_code},
            {trx_oc_merchant_description}, {trx_oc_order_code}, {trx_oc_order_amount}, {trx_version},
            {trx_journal}, '{trx_id_con}', {trx_pais}, {trx_estado_portal}, 
            '{trx_llave}', {trx_mpago}, {trx_pin}, {trx_ip}, 
            {trx_vuelto}, {trx_donacion}, {trx_nro_lote}, {trx_usuario1},
            {trx_usuario2}, {trx_usuario3}, {trx_usuario4}, {trx_usuario5},
            {trx_usuario6}, {trx_usuario7}, {trx_usuario8}, {trx_obs},
            {trx_lote_abono_com}, {trx_lote_abono_ban}
        );
    """.format(
        table=table, 
        trx_tipo_trx=record["trx_tipo_trx"],
        trx_emisor=record["trx_emisor"],
        trx_comercio=record["trx_comercio"],
        trx_local=record["trx_local"],
        trx_pos=record["trx_pos"],
        trx_boleta=record["trx_boleta"],
        trx_fecha=record['trx_fecha'],
        trx_hora=record['trx_hora'],
        trx_numero=record['trx_numero'],
        trx_estado=record["trx_estado"],
        trx_cod_rechazo=record["trx_cod_rechazo"] if record["trx_cod_rechazo"] else '0',
        trx_glosa_rech=record["trx_glosa_rech"],
        trx_tarjeta=record["trx_tarjeta"],
        trx_monto=record["trx_monto"],
        trx_cuotas=record["trx_cuotas"],
        trx_ult4_dig=record["trx_ult4_dig"],
        trx_ts_req=("'" + record["trx_ts_req"] + "'") if record["trx_ts_req"] else 'NULL',
        trx_ts_rsp=("'" + record["trx_ts_rsp"] + "'") if record["trx_ts_rsp"] else 'NULL',
        trx_numtarjeta=record["trx_numtarjeta"],
        trx_codautor=("'" + record["trx_codautor"] + "'") if record["trx_codautor"] else 'NULL',
        trx_cadena=record["trx_cadena"],
        trx_vend_caj=record["trx_vend_caj"],
        trx_fechacont=record["trx_fechacont"],
        trx_oc_merchant_code=("'" + record["trx_oc_merchant_code"] + "'") if record["trx_oc_merchant_code"] else 'NULL',
        trx_oc_merchant_description=("'" + record["trx_oc_merchant_description"] + "'") if record["trx_oc_merchant_description"] else 'NULL',
        trx_oc_order_code=("'" + record["trx_oc_order_code"] + "'") if record["trx_oc_order_code"] else 'NULL',
        trx_oc_order_amount=record["trx_oc_order_amount"] if record["trx_oc_order_amount"] else 'NULL',
        trx_version=record["trx_version"] if record["trx_version"] else 'NULL',
        trx_journal=("'" + record["trx_journal"] + "'") if record["trx_journal"] else 'NULL',
        trx_id_con=record["trx_id_con"],
        trx_pais=record["trx_pais"],
        trx_estado_portal=record["trx_estado_portal"],
        trx_llave=record["trx_llave"],
        trx_mpago=("'" + record["trx_mpago"] + "'") if record["trx_mpago"] else 'NULL',
        trx_pin=record["trx_pin"],
        trx_ip=record["trx_ip"],
        trx_vuelto=record["trx_vuelto"],
        trx_donacion=record["trx_donacion"],
        trx_nro_lote=record["trx_nro_lote"],
        trx_usuario1=record["trx_usuario1"],
        trx_usuario2=record["trx_usuario2"],
        trx_usuario3=("'" + str(record["trx_usuario3"]) + "'") if record["trx_usuario3"] else 'NULL',
        trx_usuario4=("'" + str(record["trx_usuario4"]).replace("'", '') + "'") if record["trx_usuario4"] else 'NULL',
        trx_usuario5=("'" + record["trx_usuario5"] + "'") if record["trx_usuario5"] else 'NULL',
        trx_usuario6=("'" + str(record["trx_usuario6"]) + "'") if record["trx_usuario6"] else 'NULL',
        trx_usuario7=("'" + str(record["trx_usuario7"]).replace("'", '') + "'") if record["trx_oc_merchant_description"] else 'NULL',
        trx_usuario8=("'" + str(record["trx_usuario8"]) + "'") if record["trx_usuario8"] else 'NULL',
        trx_obs=("'" + str(record["trx_obs"]) + "'") if record["trx_obs"] else 'NULL',
        trx_lote_abono_com=record["trx_lote_abono_com"],
        trx_lote_abono_ban=record["trx_lote_abono_ban"],
    )
    return SQL


fetch_header = PythonOperator(
    task_id='fetch_header',
    provide_context=True,
    python_callable=fetch_header,
    on_failure_callback=on_failure_callback,
    dag=dag
)

add_fixed_values_header = PythonOperator(
    task_id='add_fixed_values_header',
    provide_context=True,
    python_callable=add_fixed_values_header,
    on_failure_callback=on_failure_callback,
    dag=dag
)

schema_validation_header = PythonOperator(
    task_id='schema_validation_header',
    provide_context=True,
    python_callable=schema_validation_header,
    on_failure_callback=on_failure_callback,
    dag=dag
)

generate_sql_header = PythonOperator(
    task_id='generate_sql_header',
    provide_context=True,
    python_callable=generate_sql_header,
    on_failure_callback=on_failure_callback,
    dag=dag
)

execute_sql = PostgresOperator(
    task_id="execute_sql",
    postgres_conn_id='ple_postgres_' + os.environ['ENVIRONMENT'],
    sql="{{ ti.xcom_pull(task_ids=['generate_sql_header'])[0] }}",
    dag=dag
)

fetch_header >> add_fixed_values_header >> schema_validation_header >> generate_sql_header >> execute_sql

@jaetma
Copy link

jaetma commented Aug 28, 2023

This is my docker-compose file running in:

Red Hat Enterprise Linux Server, cloud

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME           - Docker image name used to run Airflow.
#                                Default: apache/airflow:2.1.4
# AIRFLOW_UID                  - User ID in Airflow containers
#                                Default: 50000
# AIRFLOW_GID                  - Group ID in Airflow containers
#                                Default: 0
#
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if requested).
#                                Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if requested).
#                                Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
#                                Default: ''
#
# Feel free to modify this file to suit your needs.
---
version: '3'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.3-python3.8}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_OVERFLOW: -1
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__SMTP__STARTTLS: 'true'
    AIRFLOW__SMTP__SSL: 'false'
    AIRFLOW__SMTP__SMTP_HOST: smtp.gmail.com
    AIRFLOW__SMTP__SMTP_PORT: 587
    AIRFLOW__SMTP__SMTP_USER: censored
    AIRFLOW__SMTP__SMTP_PASSWORD: censored
    AIRFLOW__SMTP__SMTP_MAIL_FROM: censored
    AIRFLOW__EMAIL__SUBJECT_TEMPLATE: /opt/airflow/dags/email_subject_template.j2
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    AIRFLOW__API__ACCESS_CONTROL_ALLOW_HEADERS: 'origin,content-type,accept,authorization'
    AIRFLOW__API__ACCESS_CONTROL_ALLOW_METHODS: 'POST,GET,OPTIONS,DELETE'
    AIRFLOW__API__ACCESS_CONTROL_ALLOW_ORIGINS: '*'
    _AIRFLOW_WWW_USER_USERNAME: 'censored'
    _AIRFLOW_WWW_USER_PASSWORD: 'censored'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-pyiso8583 plyvel schema jsonschema redis==4.3.4}
    CENSORED_ENVIRONMENT: ${CENSORED_ENVIRONMENT:-dev} # dev - qa - prod
    SENDGRID_MAIL_FROM: censored
    SENDGRID_API_KEY: /opt/airflow/keys/sendgrid-agoraqr.txt
    GOOGLE_APPLICATION_CREDENTIALS: /opt/airflow/keys/censored.json
  volumes:
    - /etc/localtime:/etc/localtime:ro  # This is to ensure docker container date is the same host date
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - ./keys:/opt/airflow/keys
    - ./files:/opt/airflow/files
    - /opt/censored/files/comercio/censored:/opt/airflow/censored
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-0}"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis/redis-stack-server:latest
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 7093:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:7093/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    environment:
      <<: *airflow-common-env
      # Required to handle warm shutdown of the celery workers properly
      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.1.0
        min_airflow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airflow_version_comparable )); then
          echo
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          echo
          exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID and AIRFLOW_GID environment variables, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:${AIRFLOW_GID}" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-16DF48936F837B402448FBBFF}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:${AIRFLOW_GID:-0}"
    volumes:
      - .:/sources

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5556:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5556/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  db-init:
    environment:
      ENVIRONMENT: ${ENVIRONMENT:-dev} # dev - qa - prod
    build:
      context: db-init
      dockerfile: Dockerfile.init
    depends_on:
      - postgres

volumes:
  postgres-db-volume:

@jaetma
Copy link

jaetma commented Aug 28, 2023

Hi @potiuk!

Our hypothesis is that this issue might be related to changes in Python's pickle library. Airflow 2.6.3 was primarily used with Python 3.7 in our environment. For the downgrade to work correctly, we had to update our image to use Python 3.8 due to the changes in the pickle library between these Python versions. We suspect that these changes might be affecting Airflow's internal handling of task serialization and deserialization, thus causing the performance degradation.

@potiuk
Copy link
Member Author

potiuk commented Aug 28, 2023

@jaetma and @mpolatcan

So can you reiterate and explain again what were the differences between the environments you have? It's clear that you seem to hit the problem when you upgrade Airflow to 2.7.0 but it seems this is not the only thing you are upgrading. @mpolatcan you mentioned that you also upgreaded Python version and you mentioned piclkle library but this is a bit vague. Can you plese help us to narrow it down and maybe even do some experimenting to help us?

  1. Can you please extract out the difference with your enviroments. Ideally in a short summary:
    A. Airflow 2.6.3, Python version any other differences <- does not have problems
    B. Airflow 2.7.0, Python version any other differences <- have problem

  2. @mpolatcan especially, you mentioned some suspicion about Python version 3.8 and pickling library. Can you please elaborate on this and maybe (if that is possible) - attempt to do the same upgrades you did for Airlfow 2.7 but WITHOUT upgrading Airflow (staying with 2.6.3) and see if you observe the same memory growth and stability issues? Is this possible? If not cany you explain what requiremets etc. are preventing it ? That woudl help us enormously in an attempt to track down the root cause of the problem. Also @jaetma - maybe you have a possibility to do similar excercise.

Also - are you using pickling? https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#donot-pickle - airflow's "donot-pickle" is set to "True" by default. Similarly https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#enable-xcom-pickling is set to "False"- XCom pickling is disabled by default. You mentioned pickling library, but (except Python Virtualenv/External Python operator) we are not really using pickling for anything in Airflow. We do use serialization, but without Pickling so I'd be surprised to see pickling library has an effect. But maybe you have those parameter's changed? Can you also (if you get to the point that you have an installation that exhibits the problem) try to change those values (if your current configuration uses pickling).

I am trying to narrow down the problem - we know already that in some cases, Airflow 2.7.0 upgrade might trigger some problems. But we also know that it is not in all environments, only some. So we need to narrow down and track what is really causing it - is it just Airflow, or Python version or some dependencies.

I'd relaly love if we can do this remote experiments with you to see if we can track the root cause of the problem.

And Many thanks for the reports so far, this is already helpful to see that there is something we need to track down.

@Taragolis
Copy link
Contributor

Taragolis commented Aug 28, 2023

@jaetma Just wondering, are you use this ENV VAR in production?

_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-pyiso8583 plyvel schema jsonschema redis==4.3.4}

I'm not sure that is this a problem here however the base recommendation do not use _PIP_ADDITIONAL_REQUIREMENTS, more details was added in docker compose sample stack in Airflow 2.5.2

# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
#                                Use this option ONLY for quick checks. Installing requirements at container
#                                startup is done EVERY TIME the service is started.
#                                A better way is to build a custom image or extend the official image
#                                as described in https://airflow.apache.org/docs/docker-stack/build.html.
#                                Default: ''

@potiuk
Copy link
Member Author

potiuk commented Aug 28, 2023

I'm not sure that is this a problem here however the base recommendation do not use _PIP_ADDITIONAL_REQUIREMENTS, more details was added in docker compose sample stack in Airflow 2.5.2

This is exceptionally good point @Taragolis . NEVER EVER UNDER ANY CIRCUMSTANCES you should use this option in anything close to production. Please build your custom image instead and see if you continue having the problem.

@mobuchowski
Copy link
Contributor

Hypothesis 1

Your on_start_listener is hanging on something -> not very likely that you already have some listener but since you were on 2.6, it's possible.

Enabling DEBUG logging might help verify this.

@Taragolis
Copy link
Contributor

NEVER EVER UNDER ANY CIRCUMSTANCES you should use this option in anything close to production

... test, development. IMHO, this variable should only use if life or carrier depends on it 😥

I hope one day we will remove this env variable completely (Airflow 3?). According to my past experience with companies who already use Airflow chance that also _PIP_ADDITIONAL_REQUIREMENTS uses everywhere close to 100% (or only I so unlucky), with additional side effects: broken dependencies, health checks disabled or use 10 minutes timeout and etc.

And I can't see any better solution rather than disable (neutral) or terminate container (chaotic evil 👿 ) if found this variable, on the Internet a lot of suggestion to use it 😞 😢

pyiso8583 plyvel schema jsonschema redis==4.3.4

I've bet that Airflow 2.7 use redis >=5.2.3,<6

@potiuk
Copy link
Member Author

potiuk commented Aug 28, 2023

Hypotestis 3.

So it might be that _PIP_ADDITIONAL_REQUIREMENTS cause pip to try to install conflicting requirements or takes a very long time to resolve them, and that is causing it to fail and crash scheduler. It's unlikely (then airflow would not even start I guess). If that's the case then.... I will singlehandedly remove that variable immediately and raise Exception in the image if someone uses it.

@potiuk
Copy link
Member Author

potiuk commented Aug 28, 2023

(BTW. It's not A LOT @Taragolis - just 4 pages :D)

@Taragolis
Copy link
Contributor

@potiuk and I've also got 131 results from SO: https://www.google.com/search?q=_PIP_ADDITIONAL_REQUIREMENTS+site:stackoverflow.com 🤣

I've just look couple of them hopefully not all of them suggest to use it, sometimes even opposite: "Please do not use it"

This one funny: https://stackoverflow.com/questions/76424725/airflow-docker-pip-additional-requirements-argument-why-there-is-a-comment-to

In additional google do not return results for some local-specific resources, but I definitely know some resources where suggested this solution

@mpolatcan
Copy link
Contributor

mpolatcan commented Aug 28, 2023

Hi again @potiuk, I didn't mention anything about pickle library @jaetma mentioned it ☺️ And also, our environment runs custom packaged Docker image that basically install main dependencies and apache-airflow[all]==2.7.0 package with constraints file. We packaged Docker image with python:3.9 official Python image. Our DAG basically spawns Kubernetes pod that executes Python code that generates Excel file and sends it via e-mail. Our very simple DAG that is used for on-demand report triggering in our company. Interesting part is that when standalone DAG processor getting error -1 for this DAG but other complex DAGs can rendered very easy. But other parts also has problematic that memory and cpu usage increased intermediate worker pods of Kubernetes executor dramatically. So, I could'nt understood really well, I upgraded from 2.3.2 to 2.7.0 directly. But after I saw @jaetma downgraded to 2.6.3 and everything works fine, I downgraded too and everything works perfectly. Also, we are using EKS and Karpenter combination in our production environment, I think there is no problem related with compute environment resources ☺️

from airflow.models import DAG
from datetime import datetime, timedelta

from utils import AirflowCallbacks
from operators import GetirKubernetesCronJobOperator


default_args = {
    "owner": "Getir",
    "catchup": False,
    "depends_on_past": False,
    "retries": 1,
    "retry_delay": timedelta(seconds=15),
    "start_date": datetime(year=2019, month=7, day=10)
}

dag = DAG(
    dag_id="client_list",
    default_args=default_args,
    schedule_interval=None,
    tags=["client-list", "reports"]
)


GetirKubernetesCronJobOperator(
    name="cl-",
    task_id="clientlist",
    namespace="airflow",
    service_account_name="airflow-sa-role-sa",
    image="164762854291.dkr.ecr.eu-west-1.amazonaws.com/getir-python:3.7-buster",
    image_pull_policy="IfNotPresent",
    repo_name="client-list-cron",
    exec_env="eks",
    eks_nodegroup="ng-airflow-reports",
    eks_cpu_limit="500m",
    eks_memory_limit="16000Mi",
    cmds=[
        "bash", "-c",
        (
            "mkdir -p /usr/src/app && cp -r /repo/* /usr/src/app && "
            "cd /usr/src/app && pip install -r requirements.txt -q && "
            "python -u main.py {{ dag_run.conf.get('parameters') }}"
        )
    ],
    labels={"report": "yes"},
    annotations={
        "report_type": "panel",
        "report_name": "client-list",
        "exec_env": "{{dag_run.conf.get('exec_env', 'fargate')}}",
    },
    startup_timeout_seconds=600,
    is_delete_operator_pod=False,
    reattach_on_restart=False,
    pool="client_list_pool",
    on_failure_callback=AirflowCallbacks.client_list_failed_callback,
    dag=dag
)

@Taragolis
Copy link
Contributor

@mpolatcan Just wondering is any chance that you use some DB performance analyzer in top of your RDS instance? PGBager, Amazon RDS Performance Insights and etc. Maybe it could show some anomaly activity?

@eladkal eladkal added kind:bug This is a clearly a bug area:Scheduler including HA (high availability) scheduler affected_version:2.7 Issues Reported for 2.7 labels Aug 29, 2023
@Taragolis
Copy link
Contributor

Metadata db runs on RDS for prod and preprod with same instance type,

@raphaelsimeon Any chance that you have enabled Performance Insights in your prod/preprod RDS instances?

@Github-dm-CDE
Copy link

Github-dm-CDE commented Aug 30, 2023

Any chance to see the logs with debug turned on for tasks as asked above @Github-dm-CDE ? And a bit more info on the deployment of yours ?

Hello @potiuk , here is the code of the DAG including the logs before and after the update to Airflow 2.7.0. Currently the debug logging is still deactivated, but I will have a look at that in a moment. Is there a good way to enable debug logging only for this one specific DAG?

DAG Config

import itertools
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Iterator

import etl_env as env
from airflow import DAG
from airflow.operators.latest_only import LatestOnlyOperator
from airflow_extensions.macros.teams_plugin import on_failure_message_to_teams
from airflow_extensions.operators.cli_task_builder.cli_task_builder import (
    CLIOperator,
    CliTaskBuilder,
    ConnectionBuilder,
)
from bda_prj_self_service_cas_upload.table_config import config as self_service
from etl_env import PEX_BASE_DIR

default_args = {
    "owner": "CK",
    "depends_on_past": False,
    "start_date": datetime(2021, 6, 1, 7, 0, 0),
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

PEX_PATH = os.path.join(PEX_BASE_DIR, "viyacli.pex")
CASLIB_COUNTRIES = [country.upper() for country in env.COUNTRIES_XX]


def get_self_service_cas_upload_configs() -> Iterator[Path]:
    table_config_path = os.path.dirname(self_service.__file__)
    return Path(table_config_path).glob(
        f"{self_service.SELF_SERVICE_CONFIG_PREFIX}*.yaml"
    )


def get_caslib_teams() -> list:
    caslib_teams = ["CK"]
    for config in get_self_service_cas_upload_configs():
        team_name = self_service.get_team_name_from_config_filename(config.name)
        caslib_teams.append(team_name.upper())
    return list(set(caslib_teams))


def get_all_caslibs() -> Iterator:
    yield "PUBLIC"
    caslib_teams = get_caslib_teams()
    for country, team in itertools.product(CASLIB_COUNTRIES, caslib_teams):
        yield f"{country}_{team}"


def create_cas_table_monitoring_task(dag: DAG, caslib: str) -> CLIOperator:
    return (
        CliTaskBuilder()
        .with_task_args(
            task_id=f"collect_CAS_table_metrics_{caslib.lower()}",
            dag=dag,
        )
        .with_command(f"{PEX_PATH} cas-monitoring")
        .with_connection(
            ConnectionBuilder("sas_viya", env_prefix="VIYA")
            .with_login("USERNAME")
            .with_password("PASSWORD")
            .with_host("HOST_URL")
        )
        .with_connection(
            ConnectionBuilder("mssql_db", env_prefix="MSSQL")
            .with_login("USERNAME")
            .with_password("PASSWORD")
            .with_host("HOST_URL")
            .with_port("HOST_PORT")
            .with_schema("DATABASE")
        )
        .with_cli_arg(f"--ssl_cert_file {SSL_CERT_PATH}")
        .with_cli_arg(f"--viya_caslib {CASLIB}")
        .with_cli_arg(f"--viya_client_secret {CLIENT_SECRET}")
        .with_cli_arg("--monitoring_table sas_cas_table_status")
        .create()
    )


def create_dag():
    return DAG(
        "prj_ck_cas_table_monitoring",
        description=__doc__,
        default_args=default_args,
        schedule="*/20 * * * *",
        max_active_tasks=3,
        max_active_runs=1,
        on_failure_callback=on_failure_message_to_teams,
    )


dag = create_dag()
latest_only = LatestOnlyOperator(task_id="latest_only", dag=dag)
for caslib in get_all_caslibs():
    cas_table_metrics_tasks = create_cas_table_monitoring_task(dag, caslib)
    latest_only >> cas_table_metrics_tasks

and here are the logs for one collect_CAS_table_metrics_* task:
Airflow 2.6.3:

*** Found local files:
***   * /var/log/airflow/dags/dag_id=prj_ck_cas_table_monitoring/run_id=scheduled__2023-08-28T09:20:00+00:00/task_id=collect_CAS_table_metrics_public/attempt=1.log
[2023-08-28, 11:40:02 CEST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public scheduled__2023-08-28T09:20:00+00:00 [queued]>
[2023-08-28, 11:40:02 CEST] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public scheduled__2023-08-28T09:20:00+00:00 [queued]>
[2023-08-28, 11:40:02 CEST] {taskinstance.py:1308} INFO - Starting attempt 1 of 3
[2023-08-28, 11:40:02 CEST] {taskinstance.py:1327} INFO - Executing <Task(CLIOperator): collect_CAS_table_metrics_public> on 2023-08-28 09:20:00+00:00
[2023-08-28, 11:40:02 CEST] {standard_task_runner.py:57} INFO - Started process 3717615 to run task
[2023-08-28, 11:40:02 CEST] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'prj_ck_cas_table_monitoring', 'collect_CAS_table_metrics_public', 'scheduled__2023-08-28T09:20:00+00:00', '--job-id', '8827796', '--raw', '--subdir', 'DAGS_FOLDER/prj_ck_cas_table_monitoring/prj_ck_cas_table_monitoring.py', '--cfg-path', '/tmp/tmp73qdvuk2']
[2023-08-28, 11:40:02 CEST] {standard_task_runner.py:85} INFO - Job 8827796: Subtask collect_CAS_table_metrics_public
[2023-08-28, 11:40:02 CEST] {task_command.py:410} INFO - Running <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public scheduled__2023-08-28T09:20:00+00:00 [running]> on host OBSCURED_HOSTNAME
[2023-08-28, 11:40:02 CEST] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='CK' AIRFLOW_CTX_DAG_ID='prj_ck_cas_table_monitoring' AIRFLOW_CTX_TASK_ID='collect_CAS_table_metrics_public' AIRFLOW_CTX_EXECUTION_DATE='2023-08-28T09:20:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-08-28T09:20:00+00:00'
[2023-08-28, 11:40:02 CEST] {logging_mixin.py:150} INFO - Execute cli task
[2023-08-28, 11:40:02 CEST] {base.py:73} INFO - Using connection ID 'sas_viya_conn' for task execution.
[2023-08-28, 11:40:02 CEST] {base.py:73} INFO - Using connection ID 'cxa_mssql_db_domain_user_format' for task execution.
[2023-08-28, 11:40:02 CEST] {logging_mixin.py:150} INFO - Environment variables: ['LANG', 'PATH', 'VIYA_USERNAME', 'VIYA_PASSWORD', 'VIYA_HOST_URL', 'MSSQL_USERNAME', 'MSSQL_PASSWORD', 'MSSQL_HOST_URL', 'MSSQL_HOST_PORT', 'MSSQL_DATABASE']
[2023-08-28, 11:40:02 CEST] {logging_mixin.py:150} INFO - Bash command: /srv/team-workspaces/ck/pex/viyacli.pex cas-monitoring --ssl_cert_file OBSCURED_SSL_CERT_PATH --viya_caslib PUBLIC --viya_client_secret OBSCURED_VIYA_CLIENT_SECRET --monitoring_table sas_cas_table_status
[2023-08-28, 11:40:02 CEST] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2023-08-28, 11:40:02 CEST] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', '/srv/team-workspaces/ck/pex/viyacli.pex cas-monitoring --ssl_cert_file OBSCURED_SSL_CERT_PATH --viya_caslib PUBLIC --viya_client_secret OBSCURED_VIYA_CLIENT_SECRET --monitoring_table sas_cas_table_status']
[2023-08-28, 11:40:02 CEST] {subprocess.py:86} INFO - Output:
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - 2023-08-28 11:40:06.260 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:39 - Item missing key created - skipping for monitoring:
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - 2023-08-28 11:40:06.260 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:40 - {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "version": 3,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "name": "OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "tableReference": {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "version": 2,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "tableUri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "sessionId": "OBSCURED_SESSION_ID",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "sourceTableName": "OBSCURED_USER_MAPPINGTEST.sashdat",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "sourceCaslibName": "PUBLIC"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "state": "unloaded",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "repeated": false,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "sourceLastModified": "2023-06-09T09:40:45.761Z",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "serverName": "cas-shared-default",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "caslibName": "PUBLIC",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "attributes": {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "owner": "cas",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "size": 8448,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "encryption": "NONE",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "time": "2023-06-09T10:40:46+01:00",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "group": "sas"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "links": [
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "up",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.table"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "self",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.cas.table"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "DELETE",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "delete",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "PUT",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "updateState",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/state",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/state",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "responseType": "application/json;text/plain"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "columns",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/columns",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/columns",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.column"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "dataTable",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.data.table"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         }
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     ]
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - }
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - 2023-08-28 11:40:06.260 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:39 - Item missing key created - skipping for monitoring:
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - 2023-08-28 11:40:06.260 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:40 - {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "version": 3,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "name": "MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "tableReference": {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "version": 2,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "tableUri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "sessionId": "OBSCURED_SESSION_ID",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "sourceTableName": "MAP_DEVICE_TYPE_FIXED.sashdat",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "sourceCaslibName": "PUBLIC"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "state": "unloaded",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "repeated": false,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "sourceLastModified": "2023-06-09T14:18:52.223Z",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "serverName": "cas-shared-default",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "caslibName": "PUBLIC",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "attributes": {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "owner": "cas",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "size": 8544,
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "encryption": "NONE",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "time": "2023-06-09T15:18:52+01:00",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         "group": "sas"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     "links": [
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "up",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.table"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "self",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.cas.table"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "DELETE",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "delete",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "PUT",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "updateState",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/state",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/state",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "responseType": "application/json;text/plain"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "columns",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/columns",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/columns",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.column"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "rel": "dataTable",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "href": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "uri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.data.table"
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -         }
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO -     ]
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - }
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - 2023-08-28 11:40:06.260 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:45 - The following tables have been skipped: ['OBSCURED_USER_MAPPINGTEST', 'MAP_DEVICE_TYPE_FIXED']
[2023-08-28, 11:40:06 CEST] {subprocess.py:93} INFO - 2023-08-28 11:40:06.260 | WARNING  | viya.mssql_utils:write_to_mssql:35 - No data was passed to insert into the Database.
[2023-08-28, 11:40:06 CEST] {subprocess.py:97} INFO - Command exited with return code 0
[2023-08-28, 11:40:06 CEST] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=prj_ck_cas_table_monitoring, task_id=collect_CAS_table_metrics_public, execution_date=20230828T092000, start_date=20230828T094002, end_date=20230828T094006
[2023-08-28, 11:40:06 CEST] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-08-28, 11:40:06 CEST] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check

Airflow 2.7.0:

OBSCURED_VIYA_CLIENT_SECRET Found local files:
OBSCURED_VIYA_CLIENT_SECRET   * /var/log/airflow/dags/dag_id=prj_ck_cas_table_monitoring/run_id=scheduled__2023-08-28T09:40:00+00:00/task_id=collect_CAS_table_metrics_public/attempt=1.log
[2023-08-28, 12:16:16 CEST] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public scheduled__2023-08-28T09:40:00+00:00 [queued]>
[2023-08-28, 12:16:16 CEST] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public scheduled__2023-08-28T09:40:00+00:00 [queued]>
[2023-08-28, 12:16:16 CEST] {taskinstance.py:1361} INFO - Starting attempt 1 of 3
[2023-08-28, 12:16:16 CEST] {taskinstance.py:1382} INFO - Executing <Task(CLIOperator): collect_CAS_table_metrics_public> on 2023-08-28 09:40:00+00:00
[2023-08-28, 12:16:16 CEST] {standard_task_runner.py:57} INFO - Started process 3763639 to run task
[2023-08-28, 12:16:16 CEST] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'prj_ck_cas_table_monitoring', 'collect_CAS_table_metrics_public', 'scheduled__2023-08-28T09:40:00+00:00', '--job-id', '8827969', '--raw', '--subdir', 'DAGS_FOLDER/prj_ck_cas_table_monitoring/prj_ck_cas_table_monitoring.py', '--cfg-path', '/tmp/tmpvgdhcef6']
[2023-08-28, 12:16:16 CEST] {standard_task_runner.py:85} INFO - Job 8827969: Subtask collect_CAS_table_metrics_public
[2023-08-28, 12:16:24 CEST] {task_command.py:415} INFO - Running <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public scheduled__2023-08-28T09:40:00+00:00 [running]> on host OBSCURED_HOSTNAME
[2023-08-28, 12:16:32 CEST] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='CK' AIRFLOW_CTX_DAG_ID='prj_ck_cas_table_monitoring' AIRFLOW_CTX_TASK_ID='collect_CAS_table_metrics_public' AIRFLOW_CTX_EXECUTION_DATE='2023-08-28T09:40:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-08-28T09:40:00+00:00'
[2023-08-28, 12:16:32 CEST] {logging_mixin.py:151} INFO - Execute cli task
[2023-08-28, 12:16:32 CEST] {base.py:73} INFO - Using connection ID 'sas_viya_conn' for task execution.
[2023-08-28, 12:16:32 CEST] {base.py:73} INFO - Using connection ID 'cxa_mssql_db_domain_user_format' for task execution.
[2023-08-28, 12:16:32 CEST] {logging_mixin.py:151} INFO - Environment variables: ['LANG', 'PATH', 'VIYA_USERNAME', 'VIYA_PASSWORD', 'VIYA_HOST_URL', 'MSSQL_USERNAME', 'MSSQL_PASSWORD', 'MSSQL_HOST_URL', 'MSSQL_HOST_PORT', 'MSSQL_DATABASE']
[2023-08-28, 12:16:32 CEST] {logging_mixin.py:151} INFO - Bash command: /srv/team-workspaces/ck/pex/viyacli.pex cas-monitoring --ssl_cert_file OBSCURED_SSL_CERT_PATH --viya_caslib PUBLIC --viya_client_secret OBSCURED_VIYA_CLIENT_SECRET --monitoring_table sas_cas_table_status
[2023-08-28, 12:16:32 CEST] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2023-08-28, 12:16:32 CEST] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', '/srv/team-workspaces/ck/pex/viyacli.pex cas-monitoring --ssl_cert_file OBSCURED_SSL_CERT_PATH --viya_caslib PUBLIC --viya_client_secret OBSCURED_VIYA_CLIENT_SECRET --monitoring_table sas_cas_table_status']
[2023-08-28, 12:16:32 CEST] {subprocess.py:86} INFO - Output:
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - 2023-08-28 12:16:35.935 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:39 - Item missing key created - skipping for monitoring:
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - 2023-08-28 12:16:35.936 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:40 - {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "version": 3,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "name": "OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "tableReference": {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "version": 2,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "tableUri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "sessionId": "OBSCURED_SESSION_ID",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "sourceTableName": "OBSCURED_USER_MAPPINGTEST.sashdat",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "sourceCaslibName": "PUBLIC"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "state": "unloaded",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "repeated": false,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "sourceLastModified": "2023-06-09T09:40:45.761Z",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "serverName": "cas-shared-default",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "caslibName": "PUBLIC",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "attributes": {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "owner": "cas",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "size": 8448,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "encryption": "NONE",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "time": "2023-06-09T10:40:46+01:00",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "group": "sas"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "links": [
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "up",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.table"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "self",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.cas.table"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "DELETE",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "delete",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "PUT",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "updateState",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/state",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/state",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "responseType": "application/json;text/plain"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "columns",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/columns",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/OBSCURED_USER_MAPPINGTEST/columns",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.column"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "dataTable",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/OBSCURED_USER_MAPPINGTEST",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.data.table"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         }
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     ]
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - }
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - 2023-08-28 12:16:35.936 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:39 - Item missing key created - skipping for monitoring:
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - 2023-08-28 12:16:35.936 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:40 - {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "version": 3,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "name": "MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "tableReference": {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "version": 2,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "tableUri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "sessionId": "OBSCURED_SESSION_ID",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "sourceTableName": "MAP_DEVICE_TYPE_FIXED.sashdat",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "sourceCaslibName": "PUBLIC"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "state": "unloaded",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "repeated": false,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "sourceLastModified": "2023-06-09T14:18:52.223Z",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "serverName": "cas-shared-default",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "caslibName": "PUBLIC",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "attributes": {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "owner": "cas",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "size": 8544,
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "encryption": "NONE",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "time": "2023-06-09T15:18:52+01:00",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         "group": "sas"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     "links": [
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "up",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.table"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "self",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.cas.table"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "DELETE",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "delete",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "PUT",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "updateState",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/state",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/state",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "responseType": "application/json;text/plain"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "columns",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/columns",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/casManagement/servers/cas-shared-default/caslibs/PUBLIC/tables/MAP_DEVICE_TYPE_FIXED/columns",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.collection",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "itemType": "application/vnd.sas.cas.column"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         },
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         {
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "method": "GET",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "rel": "dataTable",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "href": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "uri": "/dataTables/dataSources/cas~fs~cas-shared-default~fs~PUBLIC/tables/MAP_DEVICE_TYPE_FIXED",
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -             "type": "application/vnd.sas.data.table"
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -         }
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO -     ]
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - }
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - 2023-08-28 12:16:35.936 | WARNING  | viya.cas_monitoring.cas_monitoring:_transform_data:45 - The following tables have been skipped: ['OBSCURED_USER_MAPPINGTEST', 'MAP_DEVICE_TYPE_FIXED']
[2023-08-28, 12:16:35 CEST] {subprocess.py:93} INFO - 2023-08-28 12:16:35.936 | WARNING  | viya.mssql_utils:write_to_mssql:35 - No data was passed to insert into the Database.
[2023-08-28, 12:16:36 CEST] {subprocess.py:97} INFO - Command exited with return code 0
[2023-08-28, 12:16:36 CEST] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=prj_ck_cas_table_monitoring, task_id=collect_CAS_table_metrics_public, execution_date=20230828T094000, start_date=20230828T101616, end_date=20230828T101636
[2023-08-28, 12:16:36 CEST] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-08-28, 12:16:36 CEST] {taskinstance.py:2784} INFO - 0 downstream tasks scheduled from follow-on schedule check

This is one of the faster running tasks in the DAG. But as you can clearly see, the running times have quadrupled from ~4 seconds to ~20 seconds.
Most of the time, with Airflow 2.7.0, is spent in the period before it starts the actual CLI task.
The time between "Started process" and "Execute cli task" is almost instantaneous with Airflow 2.6.3. With Airflow 2.7.0, 16 seconds pass here alone.

@raphaelsimeon
Copy link

@Taragolis yes, here is the performance activity for the last 5 hours. The peak at 3 AM we see represents also a peak in activity in terms of DAGs running. Anything particular you want to check ?
image

@ephraimbuddy
Copy link
Contributor

Is there a good way to enable debug logging only for this one specific DAG?

@Github-dm-CDE, we would like to also see the scheduler logs if that's possible so enabling logging for only a DAG won't be a solution and no, there's no straightforward way of enabling debug logging only for a specific DAG

@Taragolis
Copy link
Contributor

here is the performance activity for the last 5 hours. The peak at 3 AM we see represents also a peak in activity in terms of DAGs running. Anything particular you want to check ?

According to Average Active Sessions (AAS) more time consuming operations is locks (transactionid and tuple in the legend), I'm not sure that there is something serious on DB backend, even if all this metrics are average for the periods, and actual spike could be missed on it.
You could click on legend on transactionid and this would keep only top 10 queries which impact this event on selected period, I guess it would be SELECT ... FROM dag_run WHERE dag_run.dag_id = '{DAG_ID}' AND dag_run.run_id = '{RUN_ID}' FOR UPDATE;


In additional I locally run simple dag on Airflow 2.6.3 and Airflow 2.7.0

Important

I run my "performance comparison" on local machine so latency to DB with almost zero, executor was Local and DAG/Task pretty simple and nothing other running at that moment. So result might be far-far-far away of actual problem

from airflow.decorators import task
from airflow import DAG
import pendulum


with DAG(
    dag_id=f"performance-check",
    start_date=pendulum.datetime(2021, 1, 1, tz='UTC'),
    end_date=None,
    schedule="@daily",
    catchup=True,
    tags=["performance"],
    max_active_runs=64,
) as dag:
    @task
    def sample_task(test_data=None, ti=None):
        print(f"{ti.dag_id}-{ti.run_id}-{ti.task_id}[{ti.map_index}]: {test_data}")

    sample_task()

Also I've turn on most of the logging on postgres, clean postgres log file before turn on DAG and after all 970 dag runs completed use PGbadger on postgres log: pgbadger-report-airflow-2-results.zip

The main differences was in obtain information about previous dag run

Airflow 2.7.0 First with total cumulative execution time 24s820ms for 9700 queries
image

Airflow 2.6.3, Second (lets ignore COMMIT) with total cumulative execution time 6s525ms for 9700 queries
image

This behaviour fixed and should be part of 2.7.1, in some circumstances it could be a reason of performance degradation, if DB backed far away of Airflow (latency high), quite a few previous DAG runs exists. In additional it also could be a reason why RAM/CPU usage increased. However in deployment which far away of prod usage this was impact just for about additional 100ms-1s

@raphaelsimeon
Copy link

@Taragolis indeed, it does not look like the Db is the problem here. here is top SQL in terms of latency in last 24hrs and performance metrics (there is one peak of latency at 8:45UTC, but not during other "problematic hours")
image

image

@Github-dm-CDE
Copy link

Github-dm-CDE commented Aug 30, 2023

Hi @potiuk & @ephraimbuddy,
here are the task logs wit AF 2.7.0 again (but this time from our RLS stage, which has the same problem), with logging_level=DEBUG.

OBSCURED_CLIENT_SECRET Found local files:
OBSCURED_CLIENT_SECRET   * /var/log/airflow/dags/dag_id=prj_ck_cas_table_monitoring/run_id=manual__2023-08-30T14:22:51.684136+00:00/task_id=collect_CAS_table_metrics_public/attempt=1.log
[2023-08-30, 14:23:49 UTC] {taskinstance.py:1094} DEBUG - previous_execution_date was called
[2023-08-30, 14:23:54 UTC] {__init__.py:51} DEBUG - Loading core task runner: StandardTaskRunner
[2023-08-30, 14:23:58 UTC] {taskinstance.py:1094} DEBUG - previous_execution_date was called
[2023-08-30, 14:24:02 UTC] {base_task_runner.py:68} DEBUG - Planning to run as the  user
[2023-08-30, 14:24:02 UTC] {taskinstance.py:844} DEBUG - Refreshing TaskInstance <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> from DB
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Task Instance State' PASSED: True, Task state queued was valid.
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Task Instance Not Running' PASSED: True, Task is not in running state.
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]>
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Previous Dagrun State' PASSED: True, The task did not have depends_on_past set.
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Task Concurrency' PASSED: True, Task concurrency is not set.
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Pool Slots Available' PASSED: True, There are enough open slots in default_pool to execute the task
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1168} DEBUG - <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]> dependency 'Not In Retry Period' PASSED: True, The task instance was not marked for retrying.
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [queued]>
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1361} INFO - Starting attempt 1 of 3
[2023-08-30, 14:24:02 UTC] {taskinstance.py:1382} INFO - Executing <Task(CLIOperator): collect_CAS_table_metrics_public> on 2023-08-30 14:22:51.684136+00:00
[2023-08-30, 14:24:02 UTC] {standard_task_runner.py:57} INFO - Started process 2355015 to run task
[2023-08-30, 14:24:02 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'prj_ck_cas_table_monitoring', 'collect_CAS_table_metrics_public', 'manual__2023-08-30T14:22:51.684136+00:00', '--job-id', '2428156', '--raw', '--subdir', 'DAGS_FOLDER/prj_ck_cas_table_monitoring/prj_ck_cas_table_monitoring.py', '--cfg-path', '/tmp/tmpo_h27u9p']
[2023-08-30, 14:24:02 UTC] {standard_task_runner.py:85} INFO - Job 2428156: Subtask collect_CAS_table_metrics_public
[2023-08-30, 14:24:02 UTC] {cli_action_loggers.py:67} DEBUG - Calling callbacks: [<function default_action_log at 0x7f880eea13a0>]
[2023-08-30, 14:24:07 UTC] {taskinstance.py:1094} DEBUG - previous_execution_date was called
[2023-08-30, 14:24:08 UTC] {taskinstance.py:844} DEBUG - Refreshing TaskInstance <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [running]> from DB
[2023-08-30, 14:24:08 UTC] {job.py:216} DEBUG - [heartbeat]
[2023-08-30, 14:24:11 UTC] {task_command.py:415} INFO - Running <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [running]> on host OBSCURED_HOSTNAME
[2023-08-30, 14:24:11 UTC] {settings.py:353} DEBUG - Disposing DB connection pool (PID 2355015)
[2023-08-30, 14:24:11 UTC] {settings.py:212} DEBUG - Setting up DB connection pool (PID 2355015)
[2023-08-30, 14:24:11 UTC] {settings.py:285} DEBUG - settings.prepare_engine_args(): Using NullPool
[2023-08-30, 14:24:11 UTC] {taskinstance.py:844} DEBUG - Refreshing TaskInstance <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [running]> from DB
[2023-08-30, 14:24:13 UTC] {taskinstance.py:844} DEBUG - Refreshing TaskInstance <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [running]> from DB
[2023-08-30, 14:24:13 UTC] {job.py:216} DEBUG - [heartbeat]
[2023-08-30, 14:24:16 UTC] {taskinstance.py:1094} DEBUG - previous_execution_date was called
[2023-08-30, 14:24:18 UTC] {taskinstance.py:844} DEBUG - Refreshing TaskInstance <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [running]> from DB
[2023-08-30, 14:24:18 UTC] {job.py:216} DEBUG - [heartbeat]
[2023-08-30, 14:24:20 UTC] {taskinstance.py:925} DEBUG - Clearing XCom data
[2023-08-30, 14:24:20 UTC] {retries.py:92} DEBUG - Running RenderedTaskInstanceFields.write with retries. Try 1 of 3
[2023-08-30, 14:24:20 UTC] {retries.py:92} DEBUG - Running RenderedTaskInstanceFields._do_delete_old_records with retries. Try 1 of 3
[2023-08-30, 14:24:20 UTC] {taskinstance.py:1660} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='CK' AIRFLOW_CTX_DAG_ID='prj_ck_cas_table_monitoring' AIRFLOW_CTX_TASK_ID='collect_CAS_table_metrics_public' AIRFLOW_CTX_EXECUTION_DATE='2023-08-30T14:22:51.684136+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-08-30T14:22:51.684136+00:00'
[2023-08-30, 14:24:20 UTC] {__init__.py:117} DEBUG - Preparing lineage inlets and outlets
[2023-08-30, 14:24:20 UTC] {__init__.py:158} DEBUG - inlets: [], outlets: []
[2023-08-30, 14:24:20 UTC] {logging_mixin.py:151} INFO - Execute cli task
[2023-08-30, 14:24:20 UTC] {base.py:73} INFO - Using connection ID 'sas_viya_conn' for task execution.
[2023-08-30, 14:24:20 UTC] {base.py:73} INFO - Using connection ID 'cxa_mssql_db_domain_user_format' for task execution.
[2023-08-30, 14:24:20 UTC] {logging_mixin.py:151} INFO - Environment variables: ['LANG', 'PATH', 'VIYA_USERNAME', 'VIYA_PASSWORD', 'VIYA_HOST_URL', 'MSSQL_USERNAME', 'MSSQL_PASSWORD', 'MSSQL_HOST_URL', 'MSSQL_HOST_PORT', 'MSSQL_DATABASE']
[2023-08-30, 14:24:20 UTC] {logging_mixin.py:151} INFO - Bash command: /srv/team-workspaces/ck/pex/viyacli.pex cas-monitoring --ssl_cert_file OBSCURED_SSL_CERT_PATH --viya_caslib PUBLIC --viya_client_secret OBSCURED_CLIENT_SECRET --monitoring_table sas_cas_table_status
[2023-08-30, 14:24:20 UTC] {bash.py:186} DEBUG - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='CK' AIRFLOW_CTX_DAG_ID='prj_ck_cas_table_monitoring' AIRFLOW_CTX_TASK_ID='collect_CAS_table_metrics_public' AIRFLOW_CTX_EXECUTION_DATE='2023-08-30T14:22:51.684136+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-08-30T14:22:51.684136+00:00'
[2023-08-30, 14:24:20 UTC] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2023-08-30, 14:24:20 UTC] {subprocess.py:75} INFO - Running command: ['/usr/bin/bash', '-c', '/srv/team-workspaces/ck/pex/viyacli.pex cas-monitoring --ssl_cert_file OBSCURED_SSL_CERT_PATH --viya_caslib PUBLIC --viya_client_secret OBSCURED_CLIENT_SECRET --monitoring_table sas_cas_table_status']
[2023-08-30, 14:24:20 UTC] {subprocess.py:86} INFO - Output:
[2023-08-30, 14:24:23 UTC] {taskinstance.py:844} DEBUG - Refreshing TaskInstance <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [running]> from DB
[2023-08-30, 14:24:23 UTC] {job.py:216} DEBUG - [heartbeat]
[2023-08-30, 14:24:23 UTC] {subprocess.py:93} INFO - 2023-08-30 16:24:23.723 | WARNING  | viya.mssql_utils:write_to_mssql:35 - No data was passed to insert into the Database.
[2023-08-30, 14:24:23 UTC] {subprocess.py:97} INFO - Command exited with return code 0
[2023-08-30, 14:24:23 UTC] {__init__.py:75} DEBUG - Lineage called with inlets: [], outlets: []
[2023-08-30, 14:24:23 UTC] {taskinstance.py:844} DEBUG - Refreshing TaskInstance <TaskInstance: prj_ck_cas_table_monitoring.collect_CAS_table_metrics_public manual__2023-08-30T14:22:51.684136+00:00 [running]> from DB
[2023-08-30, 14:24:23 UTC] {taskinstance.py:1458} DEBUG - Clearing next_method and next_kwargs.
[2023-08-30, 14:24:23 UTC] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=prj_ck_cas_table_monitoring, task_id=collect_CAS_table_metrics_public, execution_date=20230830T142251, start_date=20230830T142402, end_date=20230830T142423
[2023-08-30, 14:24:23 UTC] {taskinstance.py:2436} DEBUG - Task Duration set to 20.913392
[2023-08-30, 14:24:23 UTC] {cli_action_loggers.py:85} DEBUG - Calling callbacks: []
[2023-08-30, 14:24:23 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 0
[2023-08-30, 14:24:23 UTC] {dagrun.py:740} DEBUG - number of tis tasks for <DagRun prj_ck_cas_table_monitoring @ 2023-08-30 14:22:51.684136+00:00: manual__2023-08-30T14:22:51.684136+00:00, state:running, queued_at: 2023-08-30 16:22:51.722581+00:00. externally triggered: True>: 0 task(s)
[2023-08-30, 14:24:23 UTC] {taskinstance.py:2784} INFO - 0 downstream tasks scheduled from follow-on schedule check

@nick-msk-ai
Copy link

Like other users above, when upgrading I also noticed major increases to time taken to execute my dag_run that was previously taking < 1 minute to execute (scheduled every minute). It led to a cascade of unfinished dag_runs across this dag, and subsequently all my other dags were affected.

Like @raphaelsimeon, the performance insights from my RDS instance showed that the query to get the DAG history was taking a long time to return. I also noticed that each running dag_run triggers a query to get this data (all entries before the current dag_run), and also re-requests this query at multiple points while the DAG is running. This seemed to be the root of the issue as even when the first query does not return in a timely manner, a subsequent one is automatically triggered by the scheduler and so on, until there is a log-jam of pending queries (until the max_active_runs is reached) which affects database performance for every other process and DAG.

For context I run airflow db clean as part of a daily DAG to ensure that only the last 21 days of metadata is kept. So for my DAG on a 1 minute interval, I had ~28000 rows in the dag_run table. For another of my DAGs that runs every 10 mins, there are consistently ~ 2900 rows.

Experiment 1:
If I turned off the DAG on the 1 minute interval (also setting all dag runs of that dag to a non-running state) then the rest of my DAGs would execute well within my expected intervals.

Experiment 2:
If I turned on the DAG with 1 minute interval having deleted all relevant entries from the dag_run table, the DAG executes within the interval.

So I have found a work-around for my particular case by deleting rows from the dag_run metadata table. I would be interested to know what kind of state the dag_run tables are for the users who have posted above, and whether they are performing any scheduled maintenance on those tables?

@phanikumv
Copy link
Contributor

@nick-msk-ai you mentioned that the query to get the DAG history was taking a long time to return. Can you please paste the exact query that you mentioned here, so that we can see if there were any recent changes to that query

@nick-msk-ai
Copy link

nick-msk-ai commented Aug 31, 2023

SELECT dag_run.state, dag_run.id, dag_run.dag_id, dag_run.queued_at, dag_run.execution_date, dag_run.start_date, dag_run.end_date, dag_run.run_id, dag_run.creating_job_id, dag_run.external_trigger, dag_run.run_type, dag_run.conf, dag_run.data_interval_start, dag_run.data_interval_end, dag_run.last_scheduling_decision, dag_run.dag_hash, dag_run.log_template_id, dag_run.updated_at
FROM dag_run                                                                                                                                                                                                                                                                                                                                                                                 
WHERE dag_run.dag_id = 'patient_lists' AND dag_run.execution_date < '2023-08-31T12:40:00+00:00'::timestamptz AND dag_run.state = 'success' ORDER BY dag_run.execution_date DESC                                                                                                                                                                                                              

For a 10 minute interval DAG run scheduled at 2023-08-31, 12:50:00 UTC

@phanikumv

I'm not 100% but I think the query itself must be being dynamically generated in airflow/api_connexion/endpoints/dag_run_endpoint.py.

@raphaelsimeon
Copy link

@nick-msk-ai actually, performance insights in RDS don't seem to show very high latency as you suggested. Slowest query looks like it has 100ms latency on average, which does not sound overwhelming, does it ?

@Taragolis
Copy link
Contributor

SELECT dag_run.state, dag_run.id, dag_run.dag_id, dag_run.queued_at, dag_run.execution_date, dag_run.start_date, dag_run.end_date, dag_run.run_id, dag_run.creating_job_id, dag_run.external_trigger, dag_run.run_type, dag_run.conf, dag_run.data_interval_start, dag_run.data_interval_end, dag_run.last_scheduling_decision, dag_run.dag_hash, dag_run.log_template_id, dag_run.updated_at
FROM dag_run                                                                                                                                                                                                                                                                                                                                                                                 
WHERE dag_run.dag_id = 'patient_lists' AND dag_run.execution_date < '2023-08-31T12:40:00+00:00'::timestamptz AND dag_run.state = 'success' ORDER BY dag_run.execution_date DESC                                                                                                                                                                                                              

This query fixed in
#33672

@Taragolis
Copy link
Contributor

@jaetma @raphaelsimeon @nick-msk-ai @Github-dm-CDE could you check if the task's execution become more reliable and fast in Airflow 2.7.1 if compare with 2.7.0

@LuisLarisch
Copy link

@Taragolis,
Sorry for intruding, since no one is answering im giving my feedback.
I was keeping up with this issue since we were facing similar problems after the 2.7.0 upgrade.
We upgraded to 2.7.1 this weekend, and the performance issue was solved.
For context:

  1. 2.6.3: Specific task took 2.5 hrs to finish
  2. 2.7.0: Same task took 6 hrs to finish. (It got to a point where a few task in the DAG stopped working.)
  3. 2.7.1: Back to 2.5 hrs.

@potiuk
Copy link
Member Author

potiuk commented Sep 11, 2023

Can others here confirm it? I am going to close that one as it seems that the fix is working, buth having others confirming it would be great.

@potiuk potiuk closed this as completed Sep 11, 2023
@potiuk potiuk modified the milestones: Airflow 2.7.2, Airflow 2.7.1 Sep 11, 2023
@CM000n
Copy link

CM000n commented Sep 12, 2023

I can confirm that. The update from 2.7.0 to 2.7.1 seems to have solved the problem.

@jaetma
Copy link

jaetma commented Sep 12, 2023

I can confirm my kubernetes instance with 2.7.1 does not have that problem anymore, upgraded from 2.6.3

@potiuk
Copy link
Member Author

potiuk commented Sep 12, 2023

Thanks everyone. Great job @Taragolis on getting to the bottom of it and fixing it :). That was pretty remote from what I would expect could be the reason ;)

@raphaelsimeon
Copy link

raphaelsimeon commented Sep 26, 2023

Late to the party here, but same for us, upgrade to 2.7.1 did the trick. Thanks all for having a look :)

@BenoCharlo
Copy link

Hello everyone, just got to this thread because I'm facing same issue on AF 2.7.2. I'm using this version because this is the latest version available on AWS MWAA. I'm experincing a huge amount of time on simple tasks that used to take seconds to run.
Has anyone facing same delay issue on AWS airflow 2.7.2?

@pankajkoti
Copy link
Member

pankajkoti commented Jan 23, 2024

@BenoCharlo Do you have a local Airflow env setup? If yes, could you try running your DAGs locally with Airflow 2.7.2 vs your previous version and see if you are able to reproduce it locally? Would be nice to check first that it is not an infra issue on your cloud deployment.

@BenoCharlo
Copy link

BenoCharlo commented Jan 23, 2024

@pankajkoti yes, I have a local setup with mwaa-local-runner. My previous version was AF 2.2.2 (a bit outdated). I do experience this issue with the local setup as well

@pankajkoti
Copy link
Member

pankajkoti commented Jan 23, 2024

Thanks @BenoCharlo . Since you've a local setup, would be able to help more with some more testing. Maybe instead of directly upgrading from 2.2.2 to 2.7.2, can you try upgrading to 2.6.3 also and check if you observe such a slowness with 2.6.3 too?

Previously, it was observed that the perf decreased between 2.6.3 and 2.7.
That experiment if you could carry would help into diagnosing this better.

Also, would you be able to share some insights on what your DAGs are doing. Or if you could share a small reproducible DAG that causes this perf degrade would be nice too.

@BenoCharlo
Copy link

@pankajkoti I've tried the 2.6.3 version today. A lot of dags are starting to run as expected. It has gotten a bit better but I am still experiencing the same delay problem for some dags. I have some tasks that delay 1 out 2 runs. The delays are actually the task never ending. I am suspecting writing to s3 bucket using pandas causing this issue.

Thanks for the workaround.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.7 Issues Reported for 2.7 area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests