Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow crashes with a psycopg2.errors.DeadlockDetected exception #19957

Closed
1 of 2 tasks
stablum opened this issue Dec 2, 2021 · 47 comments · Fixed by #20894 or #21362
Closed
1 of 2 tasks

Airflow crashes with a psycopg2.errors.DeadlockDetected exception #19957

stablum opened this issue Dec 2, 2021 · 47 comments · Fixed by #20894 or #21362
Labels
area:core kind:bug This is a clearly a bug
Milestone

Comments

@stablum
Copy link

stablum commented Dec 2, 2021

Apache Airflow version

2.2.2 (latest released)

Operating System

Ubuntu 21.04 on a VM

Versions of Apache Airflow Providers

root@AI-Research:~/learning_sets/airflow# pip freeze | grep apache-airflow-providers
apache-airflow-providers-ftp==2.0.1
apache-airflow-providers-http==2.0.1
apache-airflow-providers-imap==2.0.1
apache-airflow-providers-sqlite==2.0.1

Deployment

Other

Deployment details

Airflow is at version 2.2.2
psql (PostgreSQL) 13.5 (Ubuntu 13.5-0ubuntu0.21.04.1)

The dag contains thousands of tasks for data download and preprocessing and preparation which is destined to a mongodb database (so, I'm not using the PostgreSQL inside my tasks).

What happened

[2021-12-01 19:41:57,556] {scheduler_job.py:644} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL: Process 322086 waits for ShareLock on transaction 2391367; blocked by process 340345.
Process 340345 waits for AccessExclusiveLock on tuple (0,26) of relation 19255 of database 19096; blocked by process 340300.
Process 340300 waits for ShareLock on transaction 2391361; blocked by process 322086.
HINT: See server log for query details.
CONTEXT: while updating tuple (1335,10) in relation "task_instance"

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 628, in _execute
self._run_scheduler_loop()
File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 709, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 792, in _do_scheduling
callback_to_run = self.schedule_dag_run(dag_run, session)
File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 1049, in schedule_dag_run
dag_run.schedule_tis(schedulable_tis, session)
File "/usr/local/lib/python3.9/dist-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/dist-packages/airflow/models/dagrun.py", line 898, in schedule_tis
session.query(TI)
File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", line 4063, in update
update_op.exec
()
File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1697, in exec

self._do_exec()
File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1895, in _do_exec
self._execute_stmt(update_stmt)
File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
self.result = self.query._execute_crud(stmt, self.mapper)
File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
return conn.execute(stmt, self._params)
File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
ret = self._execute_context(
File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
self.handle_dbapi_exception(
File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1510, in handle_dbapi_exception
util.raise
(
File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/util/compat.py", line 182, in raise

raise exception
File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL: Process 322086 waits for ShareLock on transaction 2391367; blocked by process 340345.
Process 340345 waits for AccessExclusiveLock on tuple (0,26) of relation 19255 of database 19096; blocked by process 340300.
Process 340300 waits for ShareLock on transaction 2391361; blocked by process 322086.
HINT: See server log for query details.
CONTEXT: while updating tuple (1335,10) in relation "task_instance"

[SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN (%(task_id_1)s, %(task_id_2)s, %(task_id_3)s, %(task_id_4)s, %(task_id_5)s, %(task_id_6)s, %(task_id_7)s, %(task_id_8)s, %(task_id_9)s, %(task_id_10)s, %(task_id_11)s, %(task_id_12)s, %(task_id_13)s, %(task_id_14)s, %(task_id_15)s, %(task_id_16)s, %(task_id_17)s, %(task_id_18)s, %(task_id_19)s, %(task_id_20)s)]
[parameters: {'state': <TaskInstanceState.SCHEDULED: 'scheduled'>, 'dag_id_1': 'download_and_preprocess_sets', 'run_id_1': 'manual__2021-12-01T17:31:23.684597+00:00', 'task_id_1': 'download_1379', 'task_id_2': 'download_1438', 'task_id_3': 'download_1363', 'task_id_4': 'download_1368', 'task_id_5': 'download_138', 'task_id_6': 'download_1432', 'task_id_7': 'download_1435', 'task_id_8': 'download_1437', 'task_id_9': 'download_1439', 'task_id_10': 'download_1457', 'task_id_11': 'download_168', 'task_id_12': 'download_203', 'task_id_13': 'download_782', 'task_id_14': 'download_1430', 'task_id_15': 'download_1431', 'task_id_16': 'download_1436', 'task_id_17': 'download_167', 'task_id_18': 'download_174', 'task_id_19': 'download_205', 'task_id_20': 'download_1434'}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
[2021-12-01 19:41:57,566] {local_executor.py:388} INFO - Shutting down LocalExecutor; waiting for running tasks to finish. Signal again if you don't want to wait.
[2021-12-01 19:42:18,013] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 285470
[2021-12-01 19:42:18,105] {process_utils.py:66} INFO - Process psutil.Process(pid=285470, status='terminated', exitcode=0, started='18:56:21') (285470) terminated with exit code 0
[2021-12-01 19:42:18,106] {scheduler_job.py:655} INFO - Exited execute loop

What you expected to happen

Maybe 24 concurrent processes/tasks are too many?

How to reproduce

reproducibility is challenging, but maybe the exception provides enough info for a fix

Anything else

all the time, after some time the dag is being run

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@stablum stablum added area:core kind:bug This is a clearly a bug labels Dec 2, 2021
@potiuk
Copy link
Member

potiuk commented Dec 6, 2021

Do you happen to have one or multiple schedulers?

@potiuk
Copy link
Member

potiuk commented Dec 6, 2021

Also - is there a chance to find out from the logs what was the "other" query that the deadlock happened on? Is this also possible that you have another person or process (either manuallly or via some direct DB access) acessing the DB (for example soem scripts that perform retention or similar)?

@potiuk
Copy link
Member

potiuk commented Dec 6, 2021

(Reason why I am asking - there are very low number of Postgres deadlock reports we receive, because (unlike MySQL) Postgres usually shows "real" deadlocks and we'd expect this kind of problem to appear more often amongst our users, so I am looking for any special circumstances you might have.

@stablum
Copy link
Author

stablum commented Dec 6, 2021

Hmm, it's difficult to retrieve this bit of information. I don't think there were multiple instances of the scheduler running, but I will keep in mind this eventuality, so thank you for the hint. Anyways, I'm not experiencing the problem anymore right now, maybe related to my upgrading the system packages and rebooting. If this bug happens again I will update this thread with more info about SQL query or eventual multiple scheduler instances being run

@stablum
Copy link
Author

stablum commented Dec 15, 2021

unfortunately, this keeps happening (after couple of weeks where it was running smoothly)

[2021-12-15 01:54:30,915] {dagbag.py:500} INFO - Filling up the DagBag from /root/learning_sets/models/
dag_bag <airflow.models.dagbag.DagBag object at 0x7f56aa88cf70>
Running <TaskInstance: download_and_preprocess_sets.download_1466 manual__2021-12-14T06:28:19.872227+00:00 [queued]> on host AI-Research
Running <TaskInstance: download_and_preprocess_sets.download_952 manual__2021-12-14T06:28:19.872227+00:00 [queued]> on host AI-Research
[2021-12-15 01:54:43,539] {scheduler_job.py:644} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL:  Process 1117623 waits for ShareLock on transaction 4526903; blocked by process 1206850.
Process 1206850 waits for AccessExclusiveLock on tuple (1,17) of relation 19255 of database 19096; blocked by process 1206469.
Process 1206469 waits for ShareLock on transaction 4526895; blocked by process 1117623.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (1899,3) in relation "task_instance"


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 628, in _execute
    self._run_scheduler_loop()
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 709, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 792, in _do_scheduling
    callback_to_run = self._schedule_dag_run(dag_run, session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 1049, in _schedule_dag_run
    dag_run.schedule_tis(schedulable_tis, session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/airflow/models/dagrun.py", line 898, in schedule_tis
    session.query(TI)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", line 4063, in update
    update_op.exec_()
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
    self._do_exec()
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1895, in _do_exec
    self._execute_stmt(update_stmt)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
    self.result = self.query._execute_crud(stmt, self.mapper)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
    return conn.execute(stmt, self._params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL:  Process 1117623 waits for ShareLock on transaction 4526903; blocked by process 1206850.
Process 1206850 waits for AccessExclusiveLock on tuple (1,17) of relation 19255 of database 19096; blocked by process 1206469.
Process 1206469 waits for ShareLock on transaction 4526895; blocked by process 1117623.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (1899,3) in relation "task_instance"

[SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN (%(task_id_1)s, %(task_id_2)s, %(task_id_3)s, %(task_id_4)s, %(task_id_5)s, %(task_id_6)s, %(task_id_7)s)]
[parameters: {'state': <TaskInstanceState.SCHEDULED: 'scheduled'>, 'dag_id_1': 'download_and_preprocess_sets', 'run_id_1': 'manual__2021-12-14T06:28:19.872227+00:00', 'task_id_1': 'download_936', 'task_id_2': 'download_937', 'task_id_3': 'download_938', 'task_id_4': 'download_939', 'task_id_5': 'download_944', 'task_id_6': 'download_946', 'task_id_7': 'download_950'}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
[2021-12-15 01:54:43,544] {local_executor.py:388} INFO - Shutting down LocalExecutor; waiting for running tasks to finish.  Signal again if you don't want to wait.
[2021-12-15 01:54:53,885] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 1117612
[2021-12-15 01:54:54,098] {process_utils.py:66} INFO - Process psutil.Process(pid=1117612, status='terminated', exitcode=0, started='01:38:54') (1117612) terminated with exit code 0
[2021-12-15 01:54:54,098] {scheduler_job.py:655} INFO - Exited execute loop
[2021-12-15 01:54:54 +0100] [1117543] [INFO] Handling signal: term
[2021-12-15 01:54:54 +0100] [1117568] [INFO] Worker exiting (pid: 1117568)
[2021-12-15 01:54:54 +0100] [1117549] [INFO] Worker exiting (pid: 1117549)
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL:  Process 1117623 waits for ShareLock on transaction 4526903; blocked by process 1206850.
Process 1206850 waits for AccessExclusiveLock on tuple (1,17) of relation 19255 of database 19096; blocked by process 1206469.
Process 1206469 waits for ShareLock on transaction 4526895; blocked by process 1117623.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (1899,3) in relation "task_instance"


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.9/dist-packages/airflow/__main__.py", line 48, in main
    args.func(args)
  File "/usr/local/lib/python3.9/dist-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
    _run_scheduler_job(args=args)
  File "/usr/local/lib/python3.9/dist-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
    job.run()
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/base_job.py", line 245, in run
    self._execute()
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 628, in _execute
    self._run_scheduler_loop()
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 709, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 792, in _do_scheduling
    callback_to_run = self._schedule_dag_run(dag_run, session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 1049, in _schedule_dag_run
    dag_run.schedule_tis(schedulable_tis, session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/airflow/models/dagrun.py", line 898, in schedule_tis
    session.query(TI)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", line 4063, in update
    update_op.exec_()
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
    self._do_exec()
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1895, in _do_exec
    self._execute_stmt(update_stmt)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
    self.result = self.query._execute_crud(stmt, self.mapper)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
    return conn.execute(stmt, self._params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL:  Process 1117623 waits for ShareLock on transaction 4526903; blocked by process 1206850.
Process 1206850 waits for AccessExclusiveLock on tuple (1,17) of relation 19255 of database 19096; blocked by process 1206469.
Process 1206469 waits for ShareLock on transaction 4526895; blocked by process 1117623.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (1899,3) in relation "task_instance"

[SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN (%(task_id_1)s, %(task_id_2)s, %(task_id_3)s, %(task_id_4)s, %(task_id_5)s, %(task_id_6)s, %(task_id_7)s)]
[parameters: {'state': <TaskInstanceState.SCHEDULED: 'scheduled'>, 'dag_id_1': 'download_and_preprocess_sets', 'run_id_1': 'manual__2021-12-14T06:28:19.872227+00:00', 'task_id_1': 'download_936', 'task_id_2': 'download_937', 'task_id_3': 'download_938', 'task_id_4': 'download_939', 'task_id_5': 'download_944', 'task_id_6': 'download_946', 'task_id_7': 'download_950'}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
[2021-12-15 01:54:54 +0100] [1117543] [INFO] Shutting down: Master

I just restarted it and this are the scheduler processes running: I launched it with screen:

root@AI-Research:~# ps aux | grep airflow | grep sched | grep -v grep
root     3166838  0.0  0.0   9024  2460 ?        Ss   07:17   0:00 SCREEN -L -Logfile logs/airflow_scheduler_20211215_071716.log -S airflow_scheduler -d -m airflow scheduler
root     3166845 56.1  1.0 212516 179704 pts/66  Rs+  07:17   0:16 /usr/bin/python3 /usr/local/bin/airflow scheduler
root     3167171  0.5  0.4 111728 74756 pts/66   S    07:17   0:00 airflow scheduler -- DagFileProcessorManager

And this is the launching script:

root@AI-Research:~/learning_sets/airflow# cat launch_airflow.sh 
#!/bin/bash

TS=$(date +%Y%m%d_%H%M%S)

screen -X -S airflow_scheduler quit
screen -X -S airflow_webserver quit

sleep 1

ps aux | grep airflow | grep -v launch | awk '{print $2}' | xargs kill

sleep 1

screen -L -Logfile logs/airflow_scheduler_${TS}.log -S airflow_scheduler -d -m airflow scheduler
screen -L -Logfile logs/airflow_webserver_${TS}.log -S airflow_webserver -d -m airflow webserver

Responding to your questions:

  • no other process or script is using the PostgreSQL db at the moment
  • The last two (very long) queries can be read here: https://nopaste.net/nMCqRrrUR1

@stablum
Copy link
Author

stablum commented Dec 15, 2021

Limiting concurrency does not solve the issue. Even by reducing the amount of concurrent tasks to 1 the exception is triggered after some time

@stablum stablum changed the title AIrflow crashes with a psycopg2.errors.DeadlockDetected exception Airflow crashes with a psycopg2.errors.DeadlockDetected exception Dec 15, 2021
@potiuk
Copy link
Member

potiuk commented Dec 15, 2021

Thanks @stablum ! Great to see all the detailed information (and the pastebin)

@ashb - I think that one might need some closer look from both of us :) and some hypotheses on where it could come from.
I plan to take a look at that later this week and maybe we can figure out where it comes from.

@potiuk potiuk added this to the Airflow 2.2.4 milestone Dec 15, 2021
@potiuk
Copy link
Member

potiuk commented Dec 15, 2021

Marked it provisionally to 2.2.4 in case we release it (it might go straight to 2.3.0 depending on how serious things there are / whether we haave a fix and how close we are to 2.3.0)

@stablum
Copy link
Author

stablum commented Dec 16, 2021

it seems that by dropping airflow's database entirely and recreating it from scratch, the bug is not re-occurring. So it might have been something in the airflow's db data.

@stablum
Copy link
Author

stablum commented Dec 16, 2021

it seems that by dropping airflow's database entirely and recreating it from scratch, the bug is not re-occurring. So it might have been something in the airflow's db data.

i take this back: it's actually keeping crashing unfortunately

@AmarEL
Copy link
Contributor

AmarEL commented Dec 22, 2021

We are facing the very same problem with Postgres.

Even being a different database, the stack trace shows the same line/method being called on the airflow "layer" before moving to the database concrete class for the #19832

File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)

I didn't have time to test, but there is a chance the #20030 "fixes" this one together with the #19832

@stablum
Copy link
Author

stablum commented Dec 28, 2021

Just upgraded to Airflow 2.2.3, and unfortunately it keeps crashing as well

@potiuk
Copy link
Member

potiuk commented Dec 29, 2021

I looked at the code and places where it can occur and I have a hypothesis on what could cause it.
And maybe some of the people here can verify my theory.

Could you please disable https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#schedule-after-task-execution and see if the problem still occurs?

@potiuk
Copy link
Member

potiuk commented Jan 1, 2022

To bump it up - Just let us know @stablum @AmarEL if disabling mini-scheduler works.

I think this might be causesd by the mini-scheduler in the task deadlocking with the "actual" scheduler trying to lock the same rows in a different order. But I think before we attempt to analyse it in detail and fix it - since you have easily reproducible and repeatable case, disable the mini-scheduler it and see if it helps. If It does, it will help us to narrow down the reason and possibly fix it.

The only drawback of disabling the mini-scheduler is potentially slightly longer latency in scheduling subsequent tasks in the same dag.

@potiuk
Copy link
Member

potiuk commented Jan 1, 2022

cc: @ashb -> do you think this is plausible hypothesis ^^ ?

@ashb
Copy link
Member

ashb commented Jan 4, 2022

Yes, that seems plausible.

@potiuk
Copy link
Member

potiuk commented Jan 5, 2022

@stablum @AmarEL - is it possible that you disable this configuraiton and see if it helps?

@stablum
Copy link
Author

stablum commented Jan 8, 2022

I managed to run a large number tasks without airflow crashing, so changing that setting as you suggested did indeed help! Thank you! :)

@potiuk
Copy link
Member

potiuk commented Jan 8, 2022

I managed to run a large number tasks without airflow crashing, so changing that setting as you suggested did indeed help! Thank you! :)

Cool! Thanks for confirming! Now we need to find the root cause and fix it !

@dwiajik
Copy link

dwiajik commented Jan 13, 2022

I am experiencing the same problem and have set schedule_after_task_execution to False. The issue still persist. I am running multiple schedulers on kubernetes pods. Do you have any suggestion? Thanks

@potiuk
Copy link
Member

potiuk commented Jan 13, 2022

I actually spent some time few days ago looking at the mini-scheduler code but I could not really find a flaw there. The fact that it did not help you indicates that my hypothesis was unfounded, unfortunately. and maybe the reason was different (and the fact that it worked for @stablum was mainly a coincidence or some side effect of that change).

@dwiajik - it might also be that your case is a bit different - could you please report (maybe create a gist with a few examples of) some of the logs of your deadlocks - Ideally if you could send us the logs of failing scheduler and corresponding logs of the Postgres server from the same time - I believe it will be much easier to investigate if we see few examples - and the server logs shoud tell us exactly which two queries deadlocked and this should help us a lot.

What we really need is somethiing in hte /var/lib/pgsql/data/pg_log/*.log, there should be entries at the time when then deadlock happens that looks like this:

ERROR:  deadlock detected
DETAIL:  Process 21535 waits for AccessExclusiveLock on relation 342640 of database 41454; blocked by process 21506.
Process 21506 waits for AccessExclusiveLock on relation 342637 of database 41454; blocked by process 21535.
HINT:  See server log for query details.
CONTEXT:  SQL statement "UPDATE ..."

We need ideally those and some logs around it if possible.

@potiuk
Copy link
Member

potiuk commented Jan 23, 2022

I am afraid we need to reopen this one. IMHO #20894 has no chance of fixing the problem because it does not change Airflow behaviour really (see discussion in #20894). @dwiajik @stablum if you experience this problem still - I think we really need some server-side logs that will telll us what other query is deadlocking with this one.

@potiuk potiuk reopened this Jan 23, 2022
@potiuk
Copy link
Member

potiuk commented Jan 23, 2022

@dwiajik @stablum - is there any chance you have some customisations (plugins ? ) or users running DB operations (backfill? API calls? UI modifications) that might have caused the deadlock? Looking at the code, my intuition tells me that this must have been something external. Having the server logs could help to pin-point it.

jedcunningham pushed a commit that referenced this issue Feb 10, 2022
The scheduler job performs scheduling after locking the "scheduled"
DagRun row for writing. This should prevent from modifying DagRun
and related task instances by another scheduler or "mini-scheduler"
run after task is completed.

However there is apparently one more case where the DagRun is being
locked by "Task" processes - namely when task throws
AirflowRescheduleException. In this case a new "TaskReschedule"
entity is inserted into the database and it also performs lock
on the DagRun (because TaskReschedule has "DagRun" relationship.

This PR modifies handling the AirflowRescheduleException to obtain the
very same DagRun lock before it attempts to insert TaskReschedule
entity.

Seems that TaskReschedule is the only one that has this relationship
so likely all the misterious SchedulerJob deadlock cases we
experienced might be explained (and fixed) by this one.

It is likely that this one:

* Fixes: #16982
* Fixes: #19957

(cherry picked from commit 6d110b5)
jedcunningham pushed a commit that referenced this issue Feb 17, 2022
The scheduler job performs scheduling after locking the "scheduled"
DagRun row for writing. This should prevent from modifying DagRun
and related task instances by another scheduler or "mini-scheduler"
run after task is completed.

However there is apparently one more case where the DagRun is being
locked by "Task" processes - namely when task throws
AirflowRescheduleException. In this case a new "TaskReschedule"
entity is inserted into the database and it also performs lock
on the DagRun (because TaskReschedule has "DagRun" relationship.

This PR modifies handling the AirflowRescheduleException to obtain the
very same DagRun lock before it attempts to insert TaskReschedule
entity.

Seems that TaskReschedule is the only one that has this relationship
so likely all the misterious SchedulerJob deadlock cases we
experienced might be explained (and fixed) by this one.

It is likely that this one:

* Fixes: #16982
* Fixes: #19957

(cherry picked from commit 6d110b5)
@stablum
Copy link
Author

stablum commented Mar 27, 2022

Unfortunately I'm still experiencing this bug with Airflow 2.2.4 (it's crashing every 5-10 mins):

[2022-03-27 16:14:12,804] {scheduler_job.py:433} INFO - Setting the following tasks to queued state:
	
[2022-03-27 16:14:12,805] {scheduler_job.py:527} INFO - Executor reports execution of download_and_preprocess_sets.download_1544 run_id=manual__2022-03-24T13:43:29.617461+00:00 exited with status success for try_number 2
[2022-03-27 16:14:12,809] {scheduler_job.py:570} INFO - TaskInstance Finished: dag_id=download_and_preprocess_sets, task_id=download_1544, run_id=manual__2022-03-24T13:43:29.617461+00:00, run_start_date=2022-03-27 16:13:00.591762+00:00, run_end_date=2022-03-27 16:13:37.437128+00:00, run_duration=36.845366, state=success, executor_state=success, try_number=2, max_tries=2, job_id=10383, pool=default_pool, queue=default, priority_weight=59, operator=PythonOperator
[2022-03-27 16:14:12,829] {scheduler_job.py:1137} INFO - Resetting orphaned tasks for active dag runs
[2022-03-27 16:14:12,830] {scheduler_job.py:1160} INFO - Marked 1 SchedulerJob instances as failed
[2022-03-27 16:14:12,835] {scheduler_job.py:1201} INFO - Reset the following 1 orphaned TaskInstances:
	<TaskInstance: download_and_preprocess_sets.download_1546 manual__2022-03-24T13:43:29.617461+00:00 [running]>
[2022-03-27 16:14:45,853] {scheduler_job.py:667} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL:  Process 309686 waits for ShareLock on transaction 12660839; blocked by process 314699.
Process 314699 waits for ShareLock on transaction 12660838; blocked by process 309686.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (4928,21) in relation "task_instance"


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 651, in _execute
    self._run_scheduler_loop()
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 732, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 815, in _do_scheduling
    callback_to_run = self._schedule_dag_run(dag_run, session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 1072, in _schedule_dag_run
    dag_run.schedule_tis(schedulable_tis, session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/airflow/models/dagrun.py", line 901, in schedule_tis
    session.query(TI)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", line 4063, in update
    update_op.exec_()
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
    self._do_exec()
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1895, in _do_exec
    self._execute_stmt(update_stmt)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
    self.result = self.query._execute_crud(stmt, self.mapper)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
    return conn.execute(stmt, self._params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL:  Process 309686 waits for ShareLock on transaction 12660839; blocked by process 314699.
Process 314699 waits for ShareLock on transaction 12660838; blocked by process 309686.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (4928,21) in relation "task_instance"

[SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN (%(task_id_1)s)]
[parameters: {'state': <TaskInstanceState.SCHEDULED: 'scheduled'>, 'dag_id_1': 'download_and_preprocess_sets', 'run_id_1': 'manual__2022-03-24T13:43:29.617461+00:00', 'task_id_1': 'download_1546'}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
[2022-03-27 16:14:45,858] {local_executor.py:388} INFO - Shutting down LocalExecutor; waiting for running tasks to finish.  Signal again if you don't want to wait.
[2022-03-27 16:14:46,970] {process_utils.py:120} INFO - Sending Signals.SIGTERM to group 309683. PIDs of all processes in the group: [309683]
[2022-03-27 16:14:46,970] {process_utils.py:75} INFO - Sending the signal Signals.SIGTERM to group 309683
[2022-03-27 16:14:47,022] {process_utils.py:70} INFO - Process psutil.Process(pid=309683, status='terminated', exitcode=0, started='16:08:59') (309683) terminated with exit code 0
[2022-03-27 16:14:47,022] {scheduler_job.py:678} INFO - Exited execute loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL:  Process 309686 waits for ShareLock on transaction 12660839; blocked by process 314699.
Process 314699 waits for ShareLock on transaction 12660838; blocked by process 309686.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (4928,21) in relation "task_instance"


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.9/dist-packages/airflow/__main__.py", line 48, in main
    args.func(args)
  File "/usr/local/lib/python3.9/dist-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
    _run_scheduler_job(args=args)
  File "/usr/local/lib/python3.9/dist-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
    job.run()
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/base_job.py", line 246, in run
    self._execute()
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 651, in _execute
    self._run_scheduler_loop()
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 732, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 815, in _do_scheduling
    callback_to_run = self._schedule_dag_run(dag_run, session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 1072, in _schedule_dag_run
    dag_run.schedule_tis(schedulable_tis, session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/airflow/models/dagrun.py", line 901, in schedule_tis
    session.query(TI)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", line 4063, in update
    update_op.exec_()
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
    self._do_exec()
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1895, in _do_exec
    self._execute_stmt(update_stmt)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
    self.result = self.query._execute_crud(stmt, self.mapper)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
    return conn.execute(stmt, self._params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL:  Process 309686 waits for ShareLock on transaction 12660839; blocked by process 314699.
Process 314699 waits for ShareLock on transaction 12660838; blocked by process 309686.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (4928,21) in relation "task_instance"

[SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN (%(task_id_1)s)]
[parameters: {'state': <TaskInstanceState.SCHEDULED: 'scheduled'>, 'dag_id_1': 'download_and_preprocess_sets', 'run_id_1': 'manual__2022-03-24T13:43:29.617461+00:00', 'task_id_1': 'download_1546'}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)

I also set concurrency=1 and max_active_tasks=1, but the bug keeps reappearing

This is my PostgreSQL log:

2022-03-27 15:37:25.192 UTC [273303] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 15:37:25.201 UTC [300223] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 15:37:25.203 UTC [298660] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 15:37:25.205 UTC [297039] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 15:37:25.209 UTC [301829] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 15:37:26.193 UTC [276786] airflow_user@airflow_db FATAL:  terminating connection due to administrator command
2022-03-27 15:39:22.382 UTC [304375] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 15:40:31.194 UTC [305340] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 15:41:38.554 UTC [306673] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 15:42:51.378 UTC [307629] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 15:43:23.546 UTC [303336] airflow_user@airflow_db ERROR:  deadlock detected
2022-03-27 15:43:23.546 UTC [303336] airflow_user@airflow_db DETAIL:  Process 303336 waits for ShareLock on transaction 12660394; blocked by process 309021.
	Process 309021 waits for ShareLock on transaction 12660383; blocked by process 303336.
	Process 303336: UPDATE task_instance SET state='scheduled' WHERE task_instance.dag_id = 'download_and_preprocess_sets' AND task_instance.run_id = 'manual__2022-03-24T13:43:29.617461+00:00' AND task_instance.task_id IN ('parse_1381', 'download_1385')
	Process 309021: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_insta
2022-03-27 15:43:23.546 UTC [303336] airflow_user@airflow_db HINT:  See server log for query details.
2022-03-27 15:43:23.546 UTC [303336] airflow_user@airflow_db CONTEXT:  while updating tuple (4903,44) in relation "task_instance"
2022-03-27 15:43:23.546 UTC [303336] airflow_user@airflow_db STATEMENT:  UPDATE task_instance SET state='scheduled' WHERE task_instance.dag_id = 'download_and_preprocess_sets' AND task_instance.run_id = 'manual__2022-03-24T13:43:29.617461+00:00' AND task_instance.task_id IN ('parse_1381', 'download_1385')
2022-03-27 15:43:32.520 UTC [308612] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 15:43:33.596 UTC [303337] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 16:10:41.435 UTC [310498] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 16:11:54.228 UTC [311405] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 16:12:26.266 UTC [312643] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 16:13:37.542 UTC [312957] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 16:14:45.851 UTC [309686] airflow_user@airflow_db ERROR:  deadlock detected
2022-03-27 16:14:45.851 UTC [309686] airflow_user@airflow_db DETAIL:  Process 309686 waits for ShareLock on transaction 12660839; blocked by process 314699.
	Process 314699 waits for ShareLock on transaction 12660838; blocked by process 309686.
	Process 309686: UPDATE task_instance SET state='scheduled' WHERE task_instance.dag_id = 'download_and_preprocess_sets' AND task_instance.run_id = 'manual__2022-03-24T13:43:29.617461+00:00' AND task_instance.task_id IN ('download_1546')
	Process 314699: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_insta
2022-03-27 16:14:45.851 UTC [309686] airflow_user@airflow_db HINT:  See server log for query details.
2022-03-27 16:14:45.851 UTC [309686] airflow_user@airflow_db CONTEXT:  while updating tuple (4928,21) in relation "task_instance"
2022-03-27 16:14:45.851 UTC [309686] airflow_user@airflow_db STATEMENT:  UPDATE task_instance SET state='scheduled' WHERE task_instance.dag_id = 'download_and_preprocess_sets' AND task_instance.run_id = 'manual__2022-03-24T13:43:29.617461+00:00' AND task_instance.task_id IN ('download_1546')
2022-03-27 16:14:45.958 UTC [314289] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer
2022-03-27 16:14:47.017 UTC [309687] airflow_user@airflow_db LOG:  could not receive data from client: Connection reset by peer

Edit: still have the bug with 2.2.5

@tanelk
Copy link
Contributor

tanelk commented Apr 11, 2022

In your last log you have some long lines, that seem to be truncated. For example:
Process 309021: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_insta

It would be nice to see the full query - what filters there are, etc.

@stablum
Copy link
Author

stablum commented Apr 11, 2022

Here is the log of an occurence of the crash even after migrating to 2.2.5:

dag_bag <airflow.models.dagbag.DagBag object at 0x7f622100b9d0>
dag_bag <airflow.models.dagbag.DagBag object at 0x7f6221009af0>
dag_bag <airflow.models.dagbag.DagBag object at 0x7f622100e820>
Running <TaskInstance: download_and_preprocess_sets.persist_activity_ids_42 manual__2022-04-06T21:19:58.326877+00:00 [None]> on host ml1
Running <TaskInstance: download_and_preprocess_sets.persist_activity_ids_56 manual__2022-04-06T21:19:58.326877+00:00 [None]> on host ml1
dag_bag <airflow.models.dagbag.DagBag object at 0x7f622100f5e0>
Running <TaskInstance: download_and_preprocess_sets.persist_activity_ids_323 manual__2022-04-06T21:19:58.326877+00:00 [None]> on host ml1
Running <TaskInstance: download_and_preprocess_sets.persist_activity_ids_300 manual__2022-04-06T21:19:58.326877+00:00 [None]> on host ml1
Running <TaskInstance: download_and_preprocess_sets.persist_activity_ids_28 manual__2022-04-06T21:19:58.326877+00:00 [None]> on host ml1
Running <TaskInstance: download_and_preprocess_sets.persist_activity_ids_330 manual__2022-04-06T21:19:58.326877+00:00 [None]> on host ml1
[2022-04-11 13:28:11,900] {scheduler_job.py:742} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL:  Process 1773623 waits for ShareLock on transaction 13027262; blocked by process 1775826.
Process 1775826 waits for AccessExclusiveLock on tuple (3,73) of relation 25305 of database 25146; blocked by process 1775778.
Process 1775778 waits for ShareLock on transaction 13027236; blocked by process 1773623.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (6496,46) in relation "task_instance"


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 726, in _execute
    self._run_scheduler_loop()
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 807, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 890, in _do_scheduling
    callback_to_run = self._schedule_dag_run(dag_run, session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 1147, in _schedule_dag_run
    dag_run.schedule_tis(schedulable_tis, session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/airflow/models/dagrun.py", line 903, in schedule_tis
    session.query(TI)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", line 4063, in update
    update_op.exec_()
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
    self._do_exec()
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1895, in _do_exec
    self._execute_stmt(update_stmt)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
    self.result = self.query._execute_crud(stmt, self.mapper)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
    return conn.execute(stmt, self._params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL:  Process 1773623 waits for ShareLock on transaction 13027262; blocked by process 1775826.
Process 1775826 waits for AccessExclusiveLock on tuple (3,73) of relation 25305 of database 25146; blocked by process 1775778.
Process 1775778 waits for ShareLock on transaction 13027236; blocked by process 1773623.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (6496,46) in relation "task_instance"

[SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN (%(task_id_1)s, %(task_id_2)s, %(task_id_3)s, %(task_id_4)s, %(task_id_5)s, %(task_id_6)s, %(task_id_7)s, %(task_id_8)s, %(task_id_9)s, %(task_id_10)s)]
[parameters: {'state': <TaskInstanceState.SCHEDULED: 'scheduled'>, 'dag_id_1': 'download_and_preprocess_sets', 'run_id_1': 'manual__2022-04-06T21:19:58.326877+00:00', 'task_id_1': 'persist_activity_ids_28', 'task_id_2': 'persist_activity_ids_41', 'task_id_3': 'persist_activity_ids_42', 'task_id_4': 'persist_activity_ids_497', 'task_id_5': 'persist_activity_ids_300', 'task_id_6': 'persist_activity_ids_303', 'task_id_7': 'persist_activity_ids_322', 'task_id_8': 'persist_activity_ids_323', 'task_id_9': 'persist_activity_ids_330', 'task_id_10': 'persist_activity_ids_56'}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
[2022-04-11 13:28:11,904] {local_executor.py:388} INFO - Shutting down LocalExecutor; waiting for running tasks to finish.  Signal again if you don't want to wait.
[2022-04-11 13:28:21,673] {process_utils.py:120} INFO - Sending Signals.SIGTERM to group 1773618. PIDs of all processes in the group: [1773618]
[2022-04-11 13:28:21,673] {process_utils.py:75} INFO - Sending the signal Signals.SIGTERM to group 1773618
[2022-04-11 13:28:21,765] {process_utils.py:70} INFO - Process psutil.Process(pid=1773618, status='terminated', exitcode=0, started='13:16:01') (1773618) terminated with exit code 0
[2022-04-11 13:28:21,766] {scheduler_job.py:753} INFO - Exited execute loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL:  Process 1773623 waits for ShareLock on transaction 13027262; blocked by process 1775826.
Process 1775826 waits for AccessExclusiveLock on tuple (3,73) of relation 25305 of database 25146; blocked by process 1775778.
Process 1775778 waits for ShareLock on transaction 13027236; blocked by process 1773623.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (6496,46) in relation "task_instance"


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.9/dist-packages/airflow/__main__.py", line 48, in main
    args.func(args)
  File "/usr/local/lib/python3.9/dist-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
    _run_scheduler_job(args=args)
  File "/usr/local/lib/python3.9/dist-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
    job.run()
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/base_job.py", line 246, in run
    self._execute()
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 726, in _execute
    self._run_scheduler_loop()
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 807, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 890, in _do_scheduling
    callback_to_run = self._schedule_dag_run(dag_run, session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/jobs/scheduler_job.py", line 1147, in _schedule_dag_run
    dag_run.schedule_tis(schedulable_tis, session)
  File "/usr/local/lib/python3.9/dist-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/airflow/models/dagrun.py", line 903, in schedule_tis
    session.query(TI)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", line 4063, in update
    update_op.exec_()
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1697, in exec_
    self._do_exec()
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1895, in _do_exec
    self._execute_stmt(update_stmt)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/persistence.py", line 1702, in _execute_stmt
    self.result = self.query._execute_crud(stmt, self.mapper)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/orm/query.py", line 3568, in _execute_crud
    return conn.execute(stmt, self._params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
    ret = self._execute_context(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.9/dist-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL:  Process 1773623 waits for ShareLock on transaction 13027262; blocked by process 1775826.
Process 1775826 waits for AccessExclusiveLock on tuple (3,73) of relation 25305 of database 25146; blocked by process 1775778.
Process 1775778 waits for ShareLock on transaction 13027236; blocked by process 1773623.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (6496,46) in relation "task_instance"

[SQL: UPDATE task_instance SET state=%(state)s WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.task_id IN (%(task_id_1)s, %(task_id_2)s, %(task_id_3)s, %(task_id_4)s, %(task_id_5)s, %(task_id_6)s, %(task_id_7)s, %(task_id_8)s, %(task_id_9)s, %(task_id_10)s)]
[parameters: {'state': <TaskInstanceState.SCHEDULED: 'scheduled'>, 'dag_id_1': 'download_and_preprocess_sets', 'run_id_1': 'manual__2022-04-06T21:19:58.326877+00:00', 'task_id_1': 'persist_activity_ids_28', 'task_id_2': 'persist_activity_ids_41', 'task_id_3': 'persist_activity_ids_42', 'task_id_4': 'persist_activity_ids_497', 'task_id_5': 'persist_activity_ids_300', 'task_id_6': 'persist_activity_ids_303', 'task_id_7': 'persist_activity_ids_322', 'task_id_8': 'persist_activity_ids_323', 'task_id_9': 'persist_activity_ids_330', 'task_id_10': 'persist_activity_ids_56'}]

@tanelk
Copy link
Contributor

tanelk commented Apr 11, 2022

The python logs allways seem to show the update query in dagrun.schedule_tis. It would be most helpful to see the second half of this deadlock. In PostgreSQL log there was a hint to SELECT task_instance.try_number AS task_instance_try_number..., but it was truncated.

@stablum
Copy link
Author

stablum commented Apr 11, 2022

one thing that I noticed is that the crashing query is particularly long, as I have several thousands of tasks in this DAG. And since the query is 22MB, the only way that I have to "paste" it is via wetransfer: https://we.tl/t-x8FM4tI0XR

@stablum
Copy link
Author

stablum commented Apr 11, 2022

I wonder if the SQL IN statement is the one creating problems as it seems anti-pattern-ish: would it be possible to avoid it by using a subquery, end/or a JOIN, maybe?

@stablum
Copy link
Author

stablum commented Apr 11, 2022

Hmm, actually the last recorded query starts with:

UPDATE serialized_dag SET data='{"__version": 1, "dag": {"_dag_id": "down    load_and_preprocess_sets", "schedule_interval": null, "_description": "Downloads sets data from IATI.cloud", "_task_group": {"_group_id": null, "prefix_    group_id": true, "tooltip": "", "ui_color": "CornflowerBlue", "ui_fgcolor": "#000", "children": {"codelists": ["operator", "codelists"], "clear_activity    _ids": ["operator", "clear_activity_ids"], "clear_activity_date": ["operator", "clear_activity_date"], "clear_budget": ["operator", "clear_budget"], "cl    ear_result": ["operator", "clear_result"], "clear_participating_org": ["operator", "clear_participating_org"], "clear_contact_info": ["operator", "clear    _contact_info"], "clear_location": ["operator", "clear_location"], "clear_sector": ["operator", "clear_sector"], "clear_policy_marker": ["operator", "cl    ear_policy_marker"], "clear_default_aid_type": ["operator", "clear_default_aid_type"], "clear_transaction": ["operator", "clear_transaction"], "clear_ac    tivity_without_rels": ["operator", "clear_activity_without_rels"], "download_0": ["operator", "download_0"], "persist_activity_ids_0": ["operator", "per    sist_activity_ids_0"], "parse_0": ["operator", "parse_0"], "clear_page_0": ["operator", "clear_page_0"], "persist_0_activity_date": ["operator", "persis    t_0_activity_date"], "persist_0_budget": ["operator", "persist_0_budget"], "persist_0_result": ["operator", "persist_0_result"], "persist_0_participatin    g_org": ["operator", "persist_0_participating_org"], "persist_0_contact_info
...

So I'm wondering if storing the serialization of such a huge dag is creating problems. Maybe deactivating the serialization would prevent the issue?

@stablum
Copy link
Author

stablum commented Apr 11, 2022

Hmm, it seems that the serialization is something that is done at a certain interval. Might it be that a serialization operation can get in conflict with the subsequent one if the first one is non completed?

@stablum
Copy link
Author

stablum commented Apr 11, 2022

I will try to increase min_serialized_dag_update_interval and min_serialized_dag_fetch_interval by a factor of 100

@stablum
Copy link
Author

stablum commented Apr 11, 2022

The python logs allways seem to show the update query in dagrun.schedule_tis. It would be most helpful to see the second half of this deadlock. In PostgreSQL log there was a hint to SELECT task_instance.try_number AS task_instance_try_number..., but it was truncated.

I found it:

2022-04-11 13:19:13.265 UTC [1774251] airflow_user@airflow_db LOG:  statement: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, dag_run_1.id AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash 

@tanelk
Copy link
Contributor

tanelk commented Apr 11, 2022

The python logs allways seem to show the update query in dagrun.schedule_tis. It would be most helpful to see the second half of this deadlock. In PostgreSQL log there was a hint to SELECT task_instance.try_number AS task_instance_try_number..., but it was truncated.

I found it:

2022-04-11 13:19:13.265 UTC [1774251] airflow_user@airflow_db LOG:  statement: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, dag_run_1.id AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash 

Is the query even longer or is this it? It has no WHERE statement.

@stablum
Copy link
Author

stablum commented Apr 11, 2022

The python logs allways seem to show the update query in dagrun.schedule_tis. It would be most helpful to see the second half of this deadlock. In PostgreSQL log there was a hint to SELECT task_instance.try_number AS task_instance_try_number..., but it was truncated.

I found it:

2022-04-11 13:19:13.265 UTC [1774251] airflow_user@airflow_db LOG:  statement: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, dag_run_1.id AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash 

Is the query even longer or is this it? It has no WHERE statement.

Oh my mistake, I used grep, but the query is multi-line. Here is it with some surrounding SQL context:

2022-04-11 13:19:13.197 UTC [1774178] airflow_user@airflow_db LOG:  statement: COMMIT
2022-04-11 13:19:13.265 UTC [1774251] airflow_user@airflow_db LOG:  statement: BEGIN
2022-04-11 13:19:13.265 UTC [1774251] airflow_user@airflow_db LOG:  statement: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs, dag_run_1.state AS dag_run_1_state, dag_run_1.id AS dag_run_1_id, dag_run_1.dag_id AS dag_run_1_dag_id, dag_run_1.queued_at AS dag_run_1_queued_at, dag_run_1.execution_date AS dag_run_1_execution_date, dag_run_1.start_date AS dag_run_1_start_date, dag_run_1.end_date AS dag_run_1_end_date, dag_run_1.run_id AS dag_run_1_run_id, dag_run_1.creating_job_id AS dag_run_1_creating_job_id, dag_run_1.external_trigger AS dag_run_1_external_trigger, dag_run_1.run_type AS dag_run_1_run_type, dag_run_1.conf AS dag_run_1_conf, dag_run_1.data_interval_start AS dag_run_1_data_interval_start, dag_run_1.data_interval_end AS dag_run_1_data_interval_end, dag_run_1.last_scheduling_decision AS dag_run_1_last_scheduling_decision, dag_run_1.dag_hash AS dag_run_1_dag_hash
        FROM task_instance JOIN dag_run AS dag_run_1 ON dag_run_1.dag_id = task_instance.dag_id AND dag_run_1.run_id = task_instance.run_id
        WHERE task_instance.dag_id = 'download_and_preprocess_sets' AND task_instance.task_id = 'persist_activity_ids_499' AND task_instance.run_id = 'manual__2022-04-06T21:19:58.326877+00:00'
         LIMIT 1
2022-04-11 13:19:13.268 UTC [1774251] airflow_user@airflow_db LOG:  statement: COMMIT
2022-04-11 13:19:13.274 UTC [1774252] airflow_user@airflow_db LOG:  statement: BEGIN
2022-04-11 13:19:13.274 UTC [1774252] airflow_user@airflow_db LOG:  statement: SELECT dag.is_paused AS dag_is_paused
        FROM dag
        WHERE dag.dag_id = 'download_and_preprocess_sets'
2022-04-11 13:19:13.275 UTC [1774252] airflow_user@airflow_db LOG:  statement: COMMIT
2022-04-11 13:19:13.280 UTC [1774253] airflow_user@airflow_db LOG:  statement: BEGIN

@stablum
Copy link
Author

stablum commented Apr 11, 2022

would it make sense that the SELECT task_instance.try_number.... be deadlocking with the storing serialization query?

@stablum
Copy link
Author

stablum commented Apr 11, 2022

unfortunately it crashed again and this time one of the deadlocking queries is the following:

2022-04-11 15:20:12.648 UTC [1797892] airflow_user@airflow_db STATEMENT:  UPDATE task_instance SET state='scheduled' WHERE task_instance.dag_id = 'download_and_preprocess_sets' AND task_instance.run_id = 'manual__2022-04-06T21:19:58.326877+00:00' AND task_instance.task_id IN ('persist_activity_ids_119', 'persist_activity_ids_16', 'persist_activity_ids_174', 'persist_activity_ids_191', 'persist_activity_ids_256', 'persist_activity_ids_331', 'persist_activity_ids_333', 'persist_activity_ids_335', 'persist_activity_ids_340', 'persist_activity_ids_631', 'persist_activity_ids_746', 'persist_activity_ids_986', 'persist_activity_ids_1551')

@tanelk
Copy link
Contributor

tanelk commented Apr 11, 2022

The UPDATE statement seems to be constant, but I can't figure out where the SELECT statement is coming from. SQLAlchemy is probably obfuscating it a bit also.

@tanelk
Copy link
Contributor

tanelk commented Apr 11, 2022

I managed to run a large number tasks without airflow crashing, so changing that setting as you suggested did indeed help! Thank you! :)

Does this still apply, or did it fail later on?

@stablum
Copy link
Author

stablum commented Apr 11, 2022

I managed to run a large number tasks without airflow crashing, so changing that setting as you suggested did indeed help! Thank you! :)

Does this still apply, or did it fail later on?

It failed, and I'm not finding the UPDATE serialized_dag query, so it's become more of a mystery.

I mean, after I wrote what you quoted (8th January) it began crashing again, and today I thought it was because of the UPDATE serialized_dag, and after increasing min_serialized_dag_update_interval and min_serialized_dag_fetch_interval 100-fold seemed to not crash anymore, but then it crashed again, and the UPDATE serialized_dag is not there anymore.

@tanelk
Copy link
Contributor

tanelk commented Apr 11, 2022

In prostgre log the block, that starts with ERROR: deadlock detected should tell what queryies are conflicting. There might be other unrelated query around it - most likely serialization query was one of them.

@potiuk
Copy link
Member

potiuk commented Apr 25, 2022

Question - I am a bit lost where we are currently and since this issue is long and relate to other - likely fixed - problem may I have kind request - can someone create a new issue with the deadlock they are currently experiencing - with all details - including the log of deadlock and logs from the server side from around the deadlock (and some details on how frequent/when it happens + all the usual versioning information) ?

I might finally get some time to take a closer look.

@hkc-8010
Copy link

created a new issue to track Deadlock exception #23361

@nehemiascr
Copy link

I am experiencing this issue in 2.5.1 when I try to change the state of 100 task instances of a dag run through the REST API, I am calling the api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} API concurrently and then it happens:

[SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.map_index AS task_instance_map_index, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.updated_at AS task_instance_updated_at, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs
FROM task_instance
WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id IN (%(run_id_1_1)s) AND task_instance.task_id IN (%(task_id_1_1)s, %(task_id_1_2)s, %(task_id_1_3)s, %(task_id_1_4)s, %(task_id_1_5)s, %(task_id_1_6)s, %(task_id_1_7)s, %(task_id_1_8)s, %(task_id_1_9)s, %(task_id_1_10)s, %(task_id_1_11)s, %(task_id_1_12)s, %(task_id_1_13)s, %(task_id_1_14)s, %(task_id_1_15)s, %(task_id_1_16)s, %(task_id_1_17)s, %(task_id_1_18)s, %(task_id_1_19)s, %(task_id_1_20)s, %(task_id_1_21)s, %(task_id_1_22)s, %(task_id_1_23)s, %(task_id_1_24)s, %(task_id_1_25)s, %(task_id_1_26)s, %(task_id_1_27)s, %(task_id_1_28)s, %(task_id_1_29)s, %(task_id_1_30)s, %(task_id_1_31)s, %(task_id_1_32)s, %(task_id_1_33)s, %(task_id_1_34)s, %(task_id_1_35)s, %(task_id_1_36)s, %(task_id_1_37)s, %(task_id_1_38)s, %(task_id_1_39)s, %(task_id_1_40)s, %(task_id_1_41)s, %(task_id_1_42)s, %(task_id_1_43)s, %(task_id_1_44)s, %(task_id_1_45)s, %(task_id_1_46)s, %(task_id_1_47)s, %(task_id_1_48)s, %(task_id_1_49)s, %(task_id_1_50)s, %(task_id_1_51)s, %(task_id_1_52)s, %(task_id_1_53)s) AND (task_instance.state IS NULL OR task_instance.state != %(state_1)s) FOR UPDATE]
[parameters: {'dag_id_1': 'publish_pipeline', 'state_1': 'failed', 'run_id_1_1': 'api_2023-06-15T14:50:02.094531-db-857', 'task_id_1_1': 'agg_historical_weekly_rollup_29', 'task_id_1_2': 'agg_historical_weekly_calculate_metrics_49', 'task_id_1_3': 'agg_historical_weekly_rollup_50', 'task_id_1_4': 'pipeline_end', 'task_id_1_5': 'agg_historical_weekly_rollup_37', 'task_id_1_6': 'agg_historical_weekly_rollup_42', 'task_id_1_7': 'agg_historical_weekly_calculate_metrics_40', 'task_id_1_8': 'agg_historical_weekly_calculate_metrics_34', 'task_id_1_9': 'agg_historical_weekly_rollup_30', 'task_id_1_10': 'agg_historical_weekly_rollup_43', 'task_id_1_11': 'agg_historical_weekly_rollup_41', 'task_id_1_12': 'publish_job_log_finalizer', 'task_id_1_13': 'agg_historical_weekly_calculate_metrics_33', 'task_id_1_14': 'agg_historical_weekly_calculate_metrics_46', 'task_id_1_15': 'agg_historical_weekly_rollup_47', 'task_id_1_16': 'agg_historical_weekly_calculate_metrics_42', 'task_id_1_17': 'agg_historical_weekly_rollup_51', 'task_id_1_18': 'agg_historical_weekly_calculate_metrics_39', 'task_id_1_19': 'agg_historical_weekly_rollup_36', 'task_id_1_20': 'agg_historical_weekly_rollup_45', 'task_id_1_21': 'agg_historical_weekly_rollup_48', 'task_id_1_22': 'wrap_up_publish', 'task_id_1_23': 'agg_historical_weekly_rollup_32', 'task_id_1_24': 'agg_historical_weekly_rollup_34', 'task_id_1_25': 'agg_historical_weekly_rollup_33', 'task_id_1_26': 'agg_historical_weekly_rollup_46', 'task_id_1_27': 'agg_historical_weekly_rollup_49', 'task_id_1_28': 'agg_historical_weekly_calculate_metrics_45', 'task_id_1_29': 'agg_historical_weekly_rollup_39', 'task_id_1_30': 'send_email_on_fail', 'task_id_1_31': 'agg_historical_weekly_rollup_35', 'task_id_1_32': 'agg_historical_weekly_rollup_38', 'task_id_1_33': 'agg_historical_weekly_calculate_metrics_43', 'task_id_1_34': 'agg_historical_weekly_calculate_metrics_36', 'task_id_1_35': 'agg_historical_weekly_calculate_metrics_51', 'task_id_1_36': 'db_quality_check', 'task_id_1_37': 'agg_historical_weekly_calculate_metrics_37', 'task_id_1_38': 'agg_historical_weekly_calculate_metrics_32', 'task_id_1_39': 'agg_historical_weekly_calculate_metrics_30', 'task_id_1_40': 'agg_historical_weekly_calculate_metrics_44', 'task_id_1_41': 'agg_historical_weekly_calculate_metrics_50', 'task_id_1_42': 'agg_historical_weekly_calculate_metrics_48', 'task_id_1_43': 'agg_historical_weekly_calculate_metrics_41', 'task_id_1_44': 'agg_historical_weekly_calculate_metrics_35', 'task_id_1_45': 'agg_historical_weekly_calculate_metrics_38', 'task_id_1_46': 'agg_historical_weekly_rollup_31', 'task_id_1_47': 'agg_historical_weekly_rollup_40', 'task_id_1_48': 'agg_historical_weekly_rollup_44', 'task_id_1_49': 'agg_historical_weekly_calculate_metrics_52', 'task_id_1_50': 'agg_historical_weekly_calculate_metrics_31', 'task_id_1_51': 'agg_historical_weekly_rollup_52', 'task_id_1_52': 'historical_weekly_aggregations_done', 'task_id_1_53': 'agg_historical_weekly_calculate_metrics_47'}]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
10.41.6.131 - airflowdags [15/Jun/2023:15:01:02 +0000] "POST /api/v1/dags/publish_pipeline/updateTaskInstancesState HTTP/1.1" 500 1560 "-" "python-httpx/0.24.1"
[�[34m2023-06-15 15:01:02,879�[0m] {�[34mmanager.py:�[0m226} INFO�[0m - Updated user admin admin�[0m
[�[34m2023-06-15 15:01:03,842�[0m] {�[34mapp.py:�[0m1741} ERROR�[0m - Exception on /api/v1/dags/publish_pipeline/updateTaskInstancesState [POST]�[0m
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL:  Process 19389 waits for ShareLock on transaction 34948119; blocked by process 19388.
Process 19388 waits for ShareLock on transaction 34948109; blocked by process 19389.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (1379,12) in relation "dag_run"


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 2525, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1822, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1820, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1796, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File "/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/decorator.py", line 68, in wrapper
    response = function(request)
  File "/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/uri_parsing.py", line 149, in wrapper
    response = function(request)
  File "/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/validation.py", line 196, in wrapper
    response = function(request)
  File "/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/validation.py", line 399, in wrapper
    return function(request)
  File "/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/response.py", line 112, in wrapper
    response = function(request)
  File "/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/parameter.py", line 120, in wrapper
    return function(**kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/api_connexion/security.py", line 51, in decorated
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/api_connexion/endpoints/task_instance_endpoint.py", line 539, in post_set_task_instances_state
    tis = dag.set_task_instance_state(
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line 1888, in set_task_instance_state
    subdag.clear(
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line 2051, in clear
    clear_task_instances(
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 281, in clear_task_instances
    session.flush()
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3345, in flush
    self._flush(objects)
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3485, in _flush
    transaction.rollback(_capture_exception=True)
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3445, in _flush
    flush_context.execute()
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
    _emit_update_statements(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 1000, in _emit_update_statements
    c = connection._execute_20(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1481, in _execute_clauseelement
    ret = self._execute_context(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
    self._handle_dbapi_exception(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
    util.raise_(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL:  Process 19389 waits for ShareLock on transaction 34948119; blocked by process 19388.
Process 19388 waits for ShareLock on transaction 34948109; blocked by process 19389.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (1379,12) in relation "dag_run"

[SQL: UPDATE dag_run SET queued_at=%(queued_at)s, start_date=%(start_date)s, state=%(state)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s]
[parameters: {'queued_at': datetime.datetime(2023, 6, 15, 15, 1, 2, 833204, tzinfo=Timezone('UTC')), 'start_date': None, 'state': <DagRunState.QUEUED: 'queued'>, 'updated_at': datetime.datetime(2023, 6, 15, 15, 1, 2, 837800, tzinfo=Timezone('UTC')), 'dag_run_id': 25068}]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
10.41.6.185 - airflowdags [15/Jun/2023:15:01:03 +0000] "POST /api/v1/dags/publish_pipeline/updateTaskInstancesState HTTP/1.1" 500 1560 "-" "python-httpx/0.24.1"

1 similar comment
@nehemiascr
Copy link

I am experiencing this issue in 2.5.1 when I try to change the state of 100 task instances of a dag run through the REST API, I am calling the api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} API concurrently and then it happens:

[SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.map_index AS task_instance_map_index, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.updated_at AS task_instance_updated_at, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs
FROM task_instance
WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.run_id IN (%(run_id_1_1)s) AND task_instance.task_id IN (%(task_id_1_1)s, %(task_id_1_2)s, %(task_id_1_3)s, %(task_id_1_4)s, %(task_id_1_5)s, %(task_id_1_6)s, %(task_id_1_7)s, %(task_id_1_8)s, %(task_id_1_9)s, %(task_id_1_10)s, %(task_id_1_11)s, %(task_id_1_12)s, %(task_id_1_13)s, %(task_id_1_14)s, %(task_id_1_15)s, %(task_id_1_16)s, %(task_id_1_17)s, %(task_id_1_18)s, %(task_id_1_19)s, %(task_id_1_20)s, %(task_id_1_21)s, %(task_id_1_22)s, %(task_id_1_23)s, %(task_id_1_24)s, %(task_id_1_25)s, %(task_id_1_26)s, %(task_id_1_27)s, %(task_id_1_28)s, %(task_id_1_29)s, %(task_id_1_30)s, %(task_id_1_31)s, %(task_id_1_32)s, %(task_id_1_33)s, %(task_id_1_34)s, %(task_id_1_35)s, %(task_id_1_36)s, %(task_id_1_37)s, %(task_id_1_38)s, %(task_id_1_39)s, %(task_id_1_40)s, %(task_id_1_41)s, %(task_id_1_42)s, %(task_id_1_43)s, %(task_id_1_44)s, %(task_id_1_45)s, %(task_id_1_46)s, %(task_id_1_47)s, %(task_id_1_48)s, %(task_id_1_49)s, %(task_id_1_50)s, %(task_id_1_51)s, %(task_id_1_52)s, %(task_id_1_53)s) AND (task_instance.state IS NULL OR task_instance.state != %(state_1)s) FOR UPDATE]
[parameters: {'dag_id_1': 'publish_pipeline', 'state_1': 'failed', 'run_id_1_1': 'api_2023-06-15T14:50:02.094531-db-857', 'task_id_1_1': 'agg_historical_weekly_rollup_29', 'task_id_1_2': 'agg_historical_weekly_calculate_metrics_49', 'task_id_1_3': 'agg_historical_weekly_rollup_50', 'task_id_1_4': 'pipeline_end', 'task_id_1_5': 'agg_historical_weekly_rollup_37', 'task_id_1_6': 'agg_historical_weekly_rollup_42', 'task_id_1_7': 'agg_historical_weekly_calculate_metrics_40', 'task_id_1_8': 'agg_historical_weekly_calculate_metrics_34', 'task_id_1_9': 'agg_historical_weekly_rollup_30', 'task_id_1_10': 'agg_historical_weekly_rollup_43', 'task_id_1_11': 'agg_historical_weekly_rollup_41', 'task_id_1_12': 'publish_job_log_finalizer', 'task_id_1_13': 'agg_historical_weekly_calculate_metrics_33', 'task_id_1_14': 'agg_historical_weekly_calculate_metrics_46', 'task_id_1_15': 'agg_historical_weekly_rollup_47', 'task_id_1_16': 'agg_historical_weekly_calculate_metrics_42', 'task_id_1_17': 'agg_historical_weekly_rollup_51', 'task_id_1_18': 'agg_historical_weekly_calculate_metrics_39', 'task_id_1_19': 'agg_historical_weekly_rollup_36', 'task_id_1_20': 'agg_historical_weekly_rollup_45', 'task_id_1_21': 'agg_historical_weekly_rollup_48', 'task_id_1_22': 'wrap_up_publish', 'task_id_1_23': 'agg_historical_weekly_rollup_32', 'task_id_1_24': 'agg_historical_weekly_rollup_34', 'task_id_1_25': 'agg_historical_weekly_rollup_33', 'task_id_1_26': 'agg_historical_weekly_rollup_46', 'task_id_1_27': 'agg_historical_weekly_rollup_49', 'task_id_1_28': 'agg_historical_weekly_calculate_metrics_45', 'task_id_1_29': 'agg_historical_weekly_rollup_39', 'task_id_1_30': 'send_email_on_fail', 'task_id_1_31': 'agg_historical_weekly_rollup_35', 'task_id_1_32': 'agg_historical_weekly_rollup_38', 'task_id_1_33': 'agg_historical_weekly_calculate_metrics_43', 'task_id_1_34': 'agg_historical_weekly_calculate_metrics_36', 'task_id_1_35': 'agg_historical_weekly_calculate_metrics_51', 'task_id_1_36': 'db_quality_check', 'task_id_1_37': 'agg_historical_weekly_calculate_metrics_37', 'task_id_1_38': 'agg_historical_weekly_calculate_metrics_32', 'task_id_1_39': 'agg_historical_weekly_calculate_metrics_30', 'task_id_1_40': 'agg_historical_weekly_calculate_metrics_44', 'task_id_1_41': 'agg_historical_weekly_calculate_metrics_50', 'task_id_1_42': 'agg_historical_weekly_calculate_metrics_48', 'task_id_1_43': 'agg_historical_weekly_calculate_metrics_41', 'task_id_1_44': 'agg_historical_weekly_calculate_metrics_35', 'task_id_1_45': 'agg_historical_weekly_calculate_metrics_38', 'task_id_1_46': 'agg_historical_weekly_rollup_31', 'task_id_1_47': 'agg_historical_weekly_rollup_40', 'task_id_1_48': 'agg_historical_weekly_rollup_44', 'task_id_1_49': 'agg_historical_weekly_calculate_metrics_52', 'task_id_1_50': 'agg_historical_weekly_calculate_metrics_31', 'task_id_1_51': 'agg_historical_weekly_rollup_52', 'task_id_1_52': 'historical_weekly_aggregations_done', 'task_id_1_53': 'agg_historical_weekly_calculate_metrics_47'}]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
10.41.6.131 - airflowdags [15/Jun/2023:15:01:02 +0000] "POST /api/v1/dags/publish_pipeline/updateTaskInstancesState HTTP/1.1" 500 1560 "-" "python-httpx/0.24.1"
[�[34m2023-06-15 15:01:02,879�[0m] {�[34mmanager.py:�[0m226} INFO�[0m - Updated user admin admin�[0m
[�[34m2023-06-15 15:01:03,842�[0m] {�[34mapp.py:�[0m1741} ERROR�[0m - Exception on /api/v1/dags/publish_pipeline/updateTaskInstancesState [POST]�[0m
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.DeadlockDetected: deadlock detected
DETAIL:  Process 19389 waits for ShareLock on transaction 34948119; blocked by process 19388.
Process 19388 waits for ShareLock on transaction 34948109; blocked by process 19389.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (1379,12) in relation "dag_run"


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 2525, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1822, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1820, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/airflow/.local/lib/python3.9/site-packages/flask/app.py", line 1796, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File "/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/decorator.py", line 68, in wrapper
    response = function(request)
  File "/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/uri_parsing.py", line 149, in wrapper
    response = function(request)
  File "/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/validation.py", line 196, in wrapper
    response = function(request)
  File "/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/validation.py", line 399, in wrapper
    return function(request)
  File "/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/response.py", line 112, in wrapper
    response = function(request)
  File "/home/airflow/.local/lib/python3.9/site-packages/connexion/decorators/parameter.py", line 120, in wrapper
    return function(**kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/api_connexion/security.py", line 51, in decorated
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/api_connexion/endpoints/task_instance_endpoint.py", line 539, in post_set_task_instances_state
    tis = dag.set_task_instance_state(
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line 1888, in set_task_instance_state
    subdag.clear(
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/dag.py", line 2051, in clear
    clear_task_instances(
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 281, in clear_task_instances
    session.flush()
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3345, in flush
    self._flush(objects)
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3485, in _flush
    transaction.rollback(_capture_exception=True)
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 3445, in _flush
    flush_context.execute()
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
    rec.execute(self)
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 236, in save_obj
    _emit_update_statements(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/orm/persistence.py", line 1000, in _emit_update_statements
    c = connection._execute_20(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 325, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1481, in _execute_clauseelement
    ret = self._execute_context(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1845, in _execute_context
    self._handle_dbapi_exception(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2026, in _handle_dbapi_exception
    util.raise_(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1802, in _execute_context
    self.dialect.do_execute(
  File "/home/airflow/.local/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.errors.DeadlockDetected) deadlock detected
DETAIL:  Process 19389 waits for ShareLock on transaction 34948119; blocked by process 19388.
Process 19388 waits for ShareLock on transaction 34948109; blocked by process 19389.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (1379,12) in relation "dag_run"

[SQL: UPDATE dag_run SET queued_at=%(queued_at)s, start_date=%(start_date)s, state=%(state)s, updated_at=%(updated_at)s WHERE dag_run.id = %(dag_run_id)s]
[parameters: {'queued_at': datetime.datetime(2023, 6, 15, 15, 1, 2, 833204, tzinfo=Timezone('UTC')), 'start_date': None, 'state': <DagRunState.QUEUED: 'queued'>, 'updated_at': datetime.datetime(2023, 6, 15, 15, 1, 2, 837800, tzinfo=Timezone('UTC')), 'dag_run_id': 25068}]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
10.41.6.185 - airflowdags [15/Jun/2023:15:01:03 +0000] "POST /api/v1/dags/publish_pipeline/updateTaskInstancesState HTTP/1.1" 500 1560 "-" "python-httpx/0.24.1"

@ephraimbuddy
Copy link
Contributor

For those still having this issue, I think you should open a separate issue after testing with the latest release which is 2.6.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
9 participants