Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid littering postgres server logs with "could not obtain lock" with HA schedulers #19842

Merged
merged 7 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from datetime import timedelta
from typing import Collection, DefaultDict, Dict, Iterator, List, Optional, Tuple

from sqlalchemy import and_, func, not_, or_, tuple_
from sqlalchemy import and_, func, not_, or_, text, tuple_
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import load_only, selectinload
from sqlalchemy.orm.session import Session, make_transient
Expand Down Expand Up @@ -234,8 +234,25 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
:type max_tis: int
:return: list[airflow.models.TaskInstance]
"""
from airflow.utils.db import DBLocks

executable_tis: List[TI] = []

if session.get_bind().dialect.name == "postgresql":
# Optimization: to avoid littering the DB errors of "ERROR: canceling statement due to lock
# timeout", try to take out a transactional advisory lock (unlocks automatically on
# COMMIT/ROLLBACK)
lock_acquired = session.execute(
text("SELECT pg_try_advisory_xact_lock(:id)").bindparams(
id=DBLocks.SCHEDULER_CRITICAL_SECTION
ashb marked this conversation as resolved.
Show resolved Hide resolved
)
).scalar()
if not lock_acquired:
# Throw an error like the one that would happen with NOWAIT
raise OperationalError(
"Failed to acquire advisory lock", params=None, orig=RuntimeError('55P03')
)

# Get the pool settings. We get a lock on the pool rows, treating this as a "critical section"
# Throws an exception if lock cannot be obtained, rather than blocking
pools = models.Pool.slots_stats(lock_rows=True, session=session)
Expand Down
54 changes: 50 additions & 4 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import contextlib
import enum
import logging
import os
import time
Expand Down Expand Up @@ -52,7 +54,7 @@
from airflow.models.serialized_dag import SerializedDagModel # noqa: F401

# TODO: remove create_session once we decide to break backward compatibility
from airflow.utils.session import create_global_lock, create_session, provide_session # noqa: F401
from airflow.utils.session import create_session, provide_session # noqa: F401

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -594,7 +596,7 @@ def initdb(session=None):
if conf.getboolean('core', 'LOAD_DEFAULT_CONNECTIONS'):
create_default_connections(session=session)

with create_global_lock(session=session):
with create_global_lock(session=session, lock=DBLocks.INIT):

dagbag = DagBag()
# Save DAGs in the ORM
Expand Down Expand Up @@ -910,7 +912,7 @@ def upgradedb(session=None):
if errors_seen:
exit(1)

with create_global_lock(session=session, pg_lock_id=2, lock_name="upgrade"):
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
log.info("Creating tables")
command.upgrade(config, 'heads')
add_default_pool_if_not_exists()
Expand All @@ -923,7 +925,7 @@ def resetdb(session=None):

connection = settings.engine.connect()

with create_global_lock(session=session, pg_lock_id=4, lock_name="reset"):
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
drop_airflow_models(connection)
drop_flask_models(connection)

Expand Down Expand Up @@ -986,3 +988,47 @@ def check(session=None):
"""
session.execute('select 1 as is_alive;')
log.info("Connection successful.")


@enum.unique
class DBLocks(enum.IntEnum):
"""
Cross-db Identifiers for advisory global database locks.

Postgres uses int64 lock ids so we use the integer value, MySQL uses names, so we use the ``_name_``
field.
ashb marked this conversation as resolved.
Show resolved Hide resolved
"""

INIT = enum.auto()
MIGRATIONS = enum.auto()
ashb marked this conversation as resolved.
Show resolved Hide resolved
SCHEDULER_CRITICAL_SECTION = enum.auto()

def __str__(self):
return f"airflow_{self._name_}"


@contextlib.contextmanager
def create_global_lock(session, lock: DBLocks, lock_timeout=1800):
"""Contextmanager that will create and teardown a global db lock."""
conn = session.connection()
dialect = conn.dialect
try:
if dialect.name == 'postgresql':
conn.execute(text('SET LOCK_TIMEOUT to :timeout'), timeout=lock_timeout)
conn.execute(text('SELECT pg_advisory_lock(:id)'), id=lock.value)
kaxil marked this conversation as resolved.
Show resolved Hide resolved
elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
conn.execute(text("SELECT GET_LOCK(:id, :timeout)"), id=str(lock), timeout=lock_timeout)
elif dialect.name == 'mssql':
# TODO: make locking works for MSSQL
ashb marked this conversation as resolved.
Show resolved Hide resolved
pass

yield None
finally:
if dialect.name == 'postgresql':
conn.execute('SET LOCK_TIMEOUT TO DEFAULT')
conn.execute(text('SELECT pg_advisory_unlock(:id)'), id=lock.value)
kaxil marked this conversation as resolved.
Show resolved Hide resolved
elif dialect.name == 'mysql' and dialect.server_version_info >= (5, 6):
conn.execute(text("select RELEASE_LOCK(:id)"), id=str(lock))
elif dialect.name == 'mssql':
# TODO: make locking works for MSSQL
ashb marked this conversation as resolved.
Show resolved Hide resolved
pass
36 changes: 0 additions & 36 deletions airflow/utils/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import contextlib
from functools import wraps
from inspect import signature
Expand Down Expand Up @@ -70,38 +69,3 @@ def wrapper(*args, **kwargs) -> RT:
return func(*args, session=session, **kwargs)

return wrapper


@provide_session
@contextlib.contextmanager
def create_global_lock(session=None, pg_lock_id=1, lock_name='init', mysql_lock_timeout=1800):
"""Contextmanager that will create and teardown a global db lock."""
dialect = session.connection().dialect
try:
if dialect.name == 'postgresql':
session.connection().execute(f'select PG_ADVISORY_LOCK({pg_lock_id});')

if dialect.name == 'mysql' and dialect.server_version_info >= (
5,
6,
):
session.connection().execute(f"select GET_LOCK('{lock_name}',{mysql_lock_timeout});")

if dialect.name == 'mssql':
# TODO: make locking works for MSSQL
pass

yield None
finally:
if dialect.name == 'postgresql':
session.connection().execute(f'select PG_ADVISORY_UNLOCK({pg_lock_id});')

if dialect.name == 'mysql' and dialect.server_version_info >= (
5,
6,
):
session.connection().execute(f"select RELEASE_LOCK('{lock_name}');")

if dialect.name == 'mssql':
# TODO: make locking works for MSSQL
pass