Skip to content

Commit

Permalink
Fix handling completed job with expired result when work horse dies
Browse files Browse the repository at this point in the history
  • Loading branch information
fancyweb committed Dec 6, 2024
1 parent fc86e9a commit 5e6baa9
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
11 changes: 6 additions & 5 deletions rq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
DEFAULT_RESULT_TTL,
DEFAULT_WORKER_TTL,
)
from .exceptions import DequeueTimeout, DeserializationError, ShutDownImminentException
from .exceptions import DequeueTimeout, DeserializationError, InvalidJobOperation, ShutDownImminentException
from .executions import Execution
from .group import Group
from .job import Job, JobStatus, Retry
Expand Down Expand Up @@ -1376,11 +1376,12 @@ def monitor_work_horse(self, job: 'Job', queue: 'Queue'):
if ret_val == os.EX_OK: # The process exited normally.
return

job_status = job.get_status()
try:
job_status = job.get_status()
except InvalidJobOperation:
return # Job completed and its ttl has expired

if job_status is None: # Job completed and its ttl has expired
return
elif self._stopped_job_id == job.id:
if self._stopped_job_id == job.id:
# Work-horse killed deliberately
self.log.warning('Job stopped by user, moving job to FailedJobRegistry')
if job.stopped_callback:
Expand Down
11 changes: 11 additions & 0 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,17 @@ def test_dequeue_round_robin(self):

self.assertEqual(expected, sorted_ids)

def test_monitor_work_horse_handles_performed_job_with_non_zero_exit_code_and_result_ttl_0(self):
q = Queue(connection=self.connection)
w = Worker([q])
perform_job = w.perform_job
def p(*args, **kwargs):
perform_job(*args, **kwargs)
raise Exception

w.perform_job = p
q.enqueue(say_hello, args=('ccc',), result_ttl=0)
self.assertTrue(w.work(burst=True))

def wait_and_kill_work_horse(pid, time_to_wait=0.0):
time.sleep(time_to_wait)
Expand Down

0 comments on commit 5e6baa9

Please sign in to comment.