Skip to content

Commit

Permalink
[IMP] queue_job: HA job runner using session level advisory lock
Browse files Browse the repository at this point in the history
  • Loading branch information
sbidoul committed Dec 6, 2024
1 parent 3f6b8bd commit 3fa9137
Showing 1 changed file with 24 additions and 2 deletions.
26 changes: 24 additions & 2 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,17 @@

SELECT_TIMEOUT = 60
ERROR_RECOVERY_DELAY = 5
PG_ADVISORY_LOCK_ID = 2293787760715711918

_logger = logging.getLogger(__name__)

select = selectors.DefaultSelector


class MasterElectionLost(Exception):
pass


# Unfortunately, it is not possible to extend the Odoo
# server command line arguments, so we resort to environment variables
# to configure the runner (channels mostly).
Expand Down Expand Up @@ -268,6 +273,7 @@ def __init__(self, db_name):
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.has_queue_job = self._has_queue_job()
if self.has_queue_job:
self._acquire_master_lock()
self._initialize()
except BaseException:
self.close()
Expand All @@ -284,6 +290,14 @@ def close(self):
pass
self.conn = None

def _acquire_master_lock(self):
"""Acquire the master runner lock or raise MasterElectionLost"""
with closing(self.conn.cursor()) as cr:
cr.execute("SELECT pg_try_advisory_lock(%s)", (PG_ADVISORY_LOCK_ID,))
if not cr.fetchone()[0]:
msg = f"could not acquire master runner lock on {self.db_name}"
raise MasterElectionLost(msg)

def _has_queue_job(self):
with closing(self.conn.cursor()) as cr:
cr.execute(
Expand Down Expand Up @@ -406,7 +420,7 @@ def get_db_names(self):
db_names = config["db_name"].split(",")
else:
db_names = odoo.service.db.list_dbs(True)
return db_names
return sorted(db_names)

def close_databases(self, remove_jobs=True):
for db_name, db in self.db_by_name.items():
Expand Down Expand Up @@ -515,7 +529,7 @@ def run(self):
while not self._stop:
# outer loop does exception recovery
try:
_logger.info("initializing database connections")
_logger.debug("initializing database connections")
# TODO: how to detect new databases or databases
# on which queue_job is installed after server start?
self.initialize_databases()
Expand All @@ -530,6 +544,14 @@ def run(self):
except InterruptedError:
# Interrupted system call, i.e. KeyboardInterrupt during select
self.stop()
except MasterElectionLost as e:
_logger.debug(
"master election lost: %s, sleeping %ds and retrying",
e,
ERROR_RECOVERY_DELAY,
)
self.close_databases()
time.sleep(ERROR_RECOVERY_DELAY)
except Exception:
_logger.exception(
"exception: sleeping %ds and retrying", ERROR_RECOVERY_DELAY
Expand Down

0 comments on commit 3fa9137

Please sign in to comment.