Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix "This Session's transaction has been rolled back" #25532

Merged
merged 1 commit into from
Aug 5, 2022

Conversation

ephraimbuddy
Copy link
Contributor

@ephraimbuddy ephraimbuddy commented Aug 4, 2022

Accessing the run_id on exception leads to error because sessions are invalidated on exception. Here we extract the information we need to log the exception before handling the exception

Here's how to reproduce the scheduler crash:
Run this dag in main:

from datetime import datetime

from airflow import DAG
from airflow.decorators import task

with DAG(dag_id='mvp_map_task_bug', start_date=datetime(2022, 1, 1), schedule_interval='@daily', catchup=False) as dag:
    @task
    def get_files():
        return [1,2,3,4,5,6]

    @task
    def download_files(file: str):
        print(f"{file}")

    files = download_files.expand(file=get_files())

Stop the scheduler
Reduce the list in the get_files to something like [1,2,3]
Change this difference to symmetric_difference:

missing_indexes.update({k: list(set(new_indexes[k]).difference(v))})

Start the scheduler and clear the above task so it can run again.
Now, it'll try to create new TIs and the Scheduler will crash. Switch to this PR and try the same again. You will notice that the scheduler survived the crash and rollback was successful.

@potiuk
Copy link
Member

potiuk commented Aug 4, 2022

What was the crash you experienced?

@potiuk
Copy link
Member

potiuk commented Aug 4, 2022

BTW. By first look - the crash could be caused by some DB object held by the Integrity Error accessed by secret masker. The difference vs. the original implementation was that our logging (which could have secret masking included) walks through all objects in the exception as well, so it could have accessed something that would be released by rollback(). That could explain why you cannot reproduce it in tests.

An easy way to verify that hypothesis is to disable secret masking (just to be sure - you can remove it from logging config rather than by airflow configuration and repeat your tests @ephraimbuddy .

@potiuk
Copy link
Member

potiuk commented Aug 4, 2022

However - this could also be just "logging" (no secret masking involved) - to verify this hypothesis you would have to repeat the tests without exc_info in the log.

@ephraimbuddy
Copy link
Contributor Author

What was the crash you experienced?

Here's the log:

2022-08-04 11:12:57,341] {process_utils.py:76} INFO - Process psutil.Process(pid=22647, status='terminated', exitcode=0, started='11:12:49') (22647) terminated with exit code 0
[2022-08-04 11:12:57,342] {scheduler_job.py:779} INFO - Exited execute loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1803, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
[2022-08-04 11:12:57 +0000] [22509] [INFO] Handling signal: term
    cursor.execute(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "task_instance_pkey"
DETAIL:  Key (dag_id, task_id, run_id, map_index)=(mvp_map_task_bug, download_files, scheduled__2022-08-03T00:00:00+00:00, 1) already exists.


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/airflow/airflow/models/dagrun.py", line 1143, in _create_task_instances
[2022-08-04 11:12:57 +0000] [22510] [INFO] Worker exiting (pid: 22510)
[2022-08-04 11:12:57 +0000] [22563] [INFO] Worker exiting (pid: 22563)
    session.bulk_insert_mappings(TI, tasks)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 3711, in bulk_insert_mappings
    render_nulls,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 3811, in _bulk_save_mappings
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 72, in __exit__
    with_traceback=exc_tb,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 3805, in _bulk_save_mappings
    render_nulls,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 112, in _bulk_insert
    bookkeeping=return_defaults,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1097, in _emit_insert_statements
    statement, multiparams, execution_options=execution_options
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1614, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 326, in _execute_on_connection
    self, multiparams, params, execution_options
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1491, in _execute_clauseelement
    cache_hit=cache_hit,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1846, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 2027, in _handle_dbapi_exception
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
    raise exception
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1803, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 719, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "task_instance_pkey"
DETAIL:  Key (dag_id, task_id, run_id, map_index)=(mvp_map_task_bug, download_files, scheduled__2022-08-03T00:00:00+00:00, 1) already exists.

[SQL: INSERT INTO task_instance (task_id, dag_id, run_id, map_index, try_number, max_tries, hostname, unixname, pool, pool_slots, queue, priority_weight, operator, executor_config) VALUES (%(task_id)s, %(dag_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(max_tries)s, %(hostname)s, %(unixname)s, %(pool)s, %(pool_slots)s, %(queue)s, %(priority_weight)s, %(operator)s, %(executor_config)s)]
[parameters: {'task_id': 'download_files', 'dag_id': 'mvp_map_task_bug', 'run_id': 'scheduled__2022-08-03T00:00:00+00:00', 'map_index': 1, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'root', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0xffff7a066630>}]
(Background on this error at: https://sqlalche.me/e/14/gkpj)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 33, in <module>
    sys.exit(load_entry_point('apache-airflow', 'console_scripts', 'airflow')())
  File "/opt/airflow/airflow/__main__.py", line 38, in main
    args.func(args)
  File "/opt/airflow/airflow/cli/cli_parser.py", line 51, in command
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/utils/cli.py", line 94, in wrapper
    return f(*args, **kwargs)
  File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 84, in scheduler
    _run_scheduler_job(args=args)
  File "/opt/airflow/airflow/cli/commands/scheduler_command.py", line 50, in _run_scheduler_job
    job.run()
  File "/opt/airflow/airflow/jobs/base_job.py", line 244, in run
    self._execute()
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 750, in _execute
    self._run_scheduler_loop()
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 859, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 941, in _do_scheduling
    callback_to_run = self._schedule_dag_run(dag_run, session)
  File "/opt/airflow/airflow/jobs/scheduler_job.py", line 1177, in _schedule_dag_run
    schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
  File "/opt/airflow/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/models/dagrun.py", line 527, in update_state
    info = self.task_instance_scheduling_decisions(session)
  File "/opt/airflow/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/models/dagrun.py", line 741, in task_instance_scheduling_decisions
    self.verify_integrity(missing_indexes=missing_indexes, session=session)
  File "/opt/airflow/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/opt/airflow/airflow/models/dagrun.py", line 950, in verify_integrity
    self._create_task_instances(dag.dag_id, tasks, created_counts, hook_is_noop, session=session)
  File "/opt/airflow/airflow/models/dagrun.py", line 1154, in _create_task_instances
    self.run_id,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
    return self.impl.get(state, dict_)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line 926, in get
    value = self._fire_loader_callables(state, key, passive)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line 957, in _fire_loader_callables
    return state._load_expired(state, passive)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 710, in _load_expired
    self.manager.expired_attribute_loader(self, toload, passive)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 1459, in load_scalar_attributes
    no_autoflush=no_autoflush,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 418, in load_on_ident
    execution_options=execution_options,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/loading.py", line 534, in load_on_pk_identity
    bind_arguments=bind_arguments,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1688, in execute
    conn = self._connection_for_bind(bind)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1530, in _connection_for_bind
    engine, execution_options
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 721, in _connection_for_bind
    self._assert_active()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 608, in _assert_active
    code="7s2a",
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "task_instance_pkey"
DETAIL:  Key (dag_id, task_id, run_id, map_index)=(mvp_map_task_bug, download_files, scheduled__2022-08-03T00:00:00+00:00, 1) already exists.

[SQL: INSERT INTO task_instance (task_id, dag_id, run_id, map_index, try_number, max_tries, hostname, unixname, pool, pool_slots, queue, priority_weight, operator, executor_config) VALUES (%(task_id)s, %(dag_id)s, %(run_id)s, %(map_index)s, %(try_number)s, %(max_tries)s, %(hostname)s, %(unixname)s, %(pool)s, %(pool_slots)s, %(queue)s, %(priority_weight)s, %(operator)s, %(executor_config)s)]
[parameters: {'task_id': 'download_files', 'dag_id': 'mvp_map_task_bug', 'run_id': 'scheduled__2022-08-03T00:00:00+00:00', 'map_index': 1, 'try_number': 0, 'max_tries': 0, 'hostname': '', 'unixname': 'root', 'pool': 'default_pool', 'pool_slots': 1, 'queue': 'default', 'priority_weight': 1, 'operator': '_PythonDecoratedOperator', 'executor_config': <psycopg2.extensions.Binary object at 0xffff7a066630>}]
(Background on this error at: https://sqlalche.me/e/14/gkpj) (Background on this error at: https://sqlalche.me/e/14/7s2a)
[2022-08-04 11:12:57 +0000] [22509] [INFO] Shutting down: Master

@potiuk
Copy link
Member

potiuk commented Aug 4, 2022

Nothing obvious - but it would be great to verify these two hypotheses because maybe we can discover a pattern that we should avoid (or even forbid via pre-commit).

@ephraimbuddy
Copy link
Contributor Author

Nothing obvious - but it would be great to verify these two hypotheses because maybe we can discover a pattern that we should avoid (or even forbid via pre-commit).

I removed exc_info and it still failed. It's the logging because once I move it past it, it doesn't crash.

@potiuk
Copy link
Member

potiuk commented Aug 4, 2022

I have another hypothesis:

    run_id = Column(StringID(), nullable=False)

And we are trying to access it in the log:

            self.log.info(
                'Hit IntegrityError while creating the TIs for %s- %s',
                dag_id,
                self.run_id,
                exc_info=True,
            )

Run id is database, sqlalchemy object. And when you access it and It failed to be added due to Integrity Error, MAYBE when you access it before rollback, SQL Alchemy knows it just failed to add it, and it is in a limbo state when it is not yet added but not yet detached either, so it will return the error.

When you do rollback() the DagRun object itself becomes detached and you can access it's run_id.

It's a long shot - but maybe ? You can test THAT hypothesis by logging a bogus string instead of self.run_id.

@ephraimbuddy
Copy link
Contributor Author

Still failed. I moved the log after the other log that used the run_id. That's this log: self.log.info('Doing session rollback.') before the session.rollback but it still failed. I think the issue is that we access self to get the log?

@potiuk
Copy link
Member

potiuk commented Aug 4, 2022

Still failed. I moved the log after the other log that used the run_id. That's this log: self.log.info('Doing session rollback.') before the session.rollback but it still failed. I think the issue is that we access self to get the log?

Just to clarify (cause I am not 100% sure). - did it also fail, when you did that: ?

        except IntegrityError:
            self.log.info('Session rolled back.')
            session.rollback()
            self.log.info(
                'Hit IntegrityError while creating the TIs for %s- %s',
                dag_id,
                self.run_id,
                exc_info=True,
            )

If that's the case, then I am afraid the root cause is somewhere deeper. Probably we see some race condition manifesting tself. Accessing self.log should not trigger it. Anothe option is that one of the logging handlers of yours (do you have any?) Accesses the DB in some way,

@ephraimbuddy
Copy link
Contributor Author

Exactly the code you have above is what I tried and it failed. I don't have any logging handlers and no environment variables was set too.

@potiuk
Copy link
Member

potiuk commented Aug 4, 2022

Exactly the code you have above is what I tried and it failed. I don't have any logging handlers and no environment variables was set too.

Hmm. Can you really reproduce it ? Maybe there was some mistake when you run it. and you run another version (or maybe .pyc file was compiled or something like that).

When I look closer at the stacktrace it DOES seem like the last hypothesis of mine..

  File "/opt/airflow/airflow/models/dagrun.py", line 950, in verify_integrity
    self._create_task_instances(dag.dag_id, tasks, created_counts, hook_is_noop, session=session)
  File "/opt/airflow/airflow/models/dagrun.py", line 1154, in _create_task_instances
    self.run_id,

^^ this is part of the log.info where run_id is accessed.

  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
    return self.impl.get(state, dict_)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line 926, in get
    value = self._fire_loader_callables(state, key, passive)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line 957, in _fire_loader_callables
    return state._load_expired(state, passive)

^^ And it seems sql alchemy tries to retrieve the object and load it and it tries to execute SQL command but irt finds out that rollback is Pending and should be executed before..

So this would be rather strange to see it happening with this code:

        except IntegrityError:
            self.log.info('Session rolled back.')
            session.rollback()
            self.log.info(
                'Hit IntegrityError while creating the TIs for %s- %s',
                dag_id,
                self.run_id,
                exc_info=True,
            )

Can you produce such an exception and see that the line number match and get the stack trace?

@potiuk
Copy link
Member

potiuk commented Aug 4, 2022

I'd only think such error would only be possible (with the last code) if the DagRun object was created in a DIFFERENT session than the one that has just got rollback() applied.

@ashb
Copy link
Member

ashb commented Aug 4, 2022

^^ And it seems sql alchemy tries to retrieve the object and load it and it tries to execute SQL command but irt finds out that rollback is Pending and should be executed before..

  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/state.py", line 710, in _load_expired

This attribute has been marked as expired, so SQLA is trying to reload the value. SQLA doesn't do this by itself, so somewhere we are expiring the object (but leaving it attached to the session. If it was detached via expunge it would behave differently.)

Do we do make any calls to expire/expire_all after this object was loaded but before it was accessed in this block?

@potiuk
Copy link
Member

potiuk commented Aug 4, 2022

I tihnk it the same issue as the one here: apache/superset#530

@potiuk
Copy link
Member

potiuk commented Aug 4, 2022

So I think the problem is due to bulk nature of the change above. DagRun is modfied by one of the "succesful" changes, but then the exception makes the connection dirty. The DagRun object is not "expunged" - it's just marked as modified because part of the bulk update actually succeeded.

@potiuk
Copy link
Member

potiuk commented Aug 4, 2022

If that's the case then moving rollback() before is "good" - because it effectively detaches the object from session and it is not going to be refreshed even if it is dirty / modified.

@ephraimbuddy
Copy link
Contributor Author

@potiuk You are right, I had to test now by restarting breeze. With just self.log(...) above the rollback, it didn't crash. The integrity error was caught.

@ephraimbuddy ephraimbuddy force-pushed the catch-integrity-error branch from f36636e to eefb527 Compare August 4, 2022 14:52
@potiuk
Copy link
Member

potiuk commented Aug 4, 2022

@potiuk You are right, I had to test now by restarting breeze. With just self.log(...) above the rollback, it didn't crash. The integrity error was caught.

So we have a VERY plausible explanation then :).

@potiuk
Copy link
Member

potiuk commented Aug 4, 2022

I looked at the code and this seems to be the only place (except tests) where we use session.bulk_* operations so we should be pretty save.

However I think it would be good if we add a description of the problem here and link to the superset issue in a comment here just to capture it somewhere in the code (I am thinking about my fufure self in a year, hitting similar issue and trying to find out what was it about when I recall that something similar happened in the past) :)

@ephraimbuddy
Copy link
Contributor Author

ephraimbuddy commented Aug 5, 2022

I looked at the code and this seems to be the only place (except tests) where we use session.bulk_* operations so we should be pretty save.

However I think it would be good if we add a description of the problem here and link to the superset issue in a comment here just to capture it somewhere in the code (I am thinking about my fufure self in a year, hitting similar issue and trying to find out what was it about when I recall that something similar happened in the past) :)

Look how I did it now. I feel it's much better and understandable.
I tested it too

@potiuk
Copy link
Member

potiuk commented Aug 5, 2022

Perfect!

@potiuk potiuk added this to the Airflow 2.3.4 milestone Aug 5, 2022
@potiuk
Copy link
Member

potiuk commented Aug 5, 2022

You probably want to rename it/update commit description before merge :)

@ephraimbuddy ephraimbuddy force-pushed the catch-integrity-error branch from b96f4e0 to 53a46d3 Compare August 5, 2022 12:22
@ephraimbuddy ephraimbuddy changed the title Move session.rollback closer to the exception Fix Scheduler crash due to PendingRollbackError Aug 5, 2022
@ephraimbuddy ephraimbuddy force-pushed the catch-integrity-error branch from 53a46d3 to 6e6b3af Compare August 5, 2022 12:30
@ephraimbuddy ephraimbuddy changed the title Fix Scheduler crash due to PendingRollbackError Fix PendingRollbackError due to accessing object in invalidated session Aug 5, 2022
Accessing the run_id(self.run_id) on exception leads to error because sessions are invalidated on exception. Here we extract the run_id before handling the exception
@ephraimbuddy ephraimbuddy force-pushed the catch-integrity-error branch from 6e6b3af to 0409c12 Compare August 5, 2022 14:10
@ephraimbuddy ephraimbuddy changed the title Fix PendingRollbackError due to accessing object in invalidated session Fix "This Session's transaction has been rolled back" Aug 5, 2022
@ephraimbuddy ephraimbuddy merged commit 5668888 into apache:main Aug 5, 2022
@ephraimbuddy ephraimbuddy deleted the catch-integrity-error branch August 5, 2022 16:15
@ephraimbuddy ephraimbuddy added the type:bug-fix Changelog: Bug Fixes label Aug 15, 2022
ephraimbuddy added a commit that referenced this pull request Aug 15, 2022
Accessing the run_id(self.run_id) on exception leads to error because sessions are invalidated on exception. Here we extract the run_id before handling the exception

(cherry picked from commit 5668888)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Airflow scheduler crashes due to 'duplicate key value violates unique constraint "task_instance_pkey"'
4 participants