From 9e78112c49f27ffa3b238d330d30b7528ce867a4 Mon Sep 17 00:00:00 2001 From: Zina Rasoamanana Date: Tue, 3 Dec 2024 15:58:42 +0100 Subject: [PATCH] [IMP] queue_job: detect jobs runned by workers that have been killed --- queue_job/models/queue_job.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index 8af7468b7c..51fb0214fa 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -5,6 +5,8 @@ import random from datetime import datetime, timedelta +import psutil + from odoo import _, api, exceptions, fields, models from odoo.osv import expression from odoo.tools import config, html_escape @@ -417,6 +419,35 @@ def autovacuum(self): break return True + def _check_job_worker_pid(self): + """ + Checking that job's worker pids still exist + If not, it means that the worker has been killed + """ + jobs = self.env["queue.job"].search( + [ + ("state", "=", "started"), + ("worker_pid", "!=", False), + ] + ) + + for job in jobs: + if not psutil.pid_exists(job.worker_pid): + _logger.info( + "Worker %d executing job %s does not exist" + % (job.worker_pid, job.uuid) + ) + _job = Job.load(job.env, job.uuid) + _job.set_failed( + exc_name=_("WorkerError"), + exc_info=_( + "The worker executing the job was killed." + "This is likely to be due to a timeout" + ), + exc_message=_("Associated worker was killed"), + ) + _job.store() + def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0): """Fix jobs that are in a bad states @@ -431,6 +462,9 @@ def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0): """ if started_delta == -1: started_delta = (config["limit_time_real"] // 60) + 1 + + self._check_job_worker_pid() + return self._get_stuck_jobs_to_requeue( enqueued_delta=enqueued_delta, started_delta=started_delta ).requeue()