From 89fe74f8e8ff80f2af997a6fcdbc3f32520c9996 Mon Sep 17 00:00:00 2001 From: Ankitha Damodara Date: Tue, 27 Aug 2024 14:09:22 +0100 Subject: [PATCH] Lock the rows when taking the advisory lock --- lib/que/adapters/active_record_with_lock.rb | 19 +++++++++++++++---- lib/que/sql.rb | 2 +- spec/active_record_with_lock_spec_helper.rb | 2 +- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/lib/que/adapters/active_record_with_lock.rb b/lib/que/adapters/active_record_with_lock.rb index 2e43644..acc31c8 100644 --- a/lib/que/adapters/active_record_with_lock.rb +++ b/lib/que/adapters/active_record_with_lock.rb @@ -35,13 +35,24 @@ def execute(command, params = []) def lock_job_with_lock_database(queue, cursor) result = [] loop do - result = Que.execute(:find_job_to_lock, [queue, cursor]) + break_loop = false + Que.transaction do + result = Que.execute(:find_job_to_lock, [queue, cursor]) - break if result.empty? + if result.empty? + break_loop = true + break + end - cursor = result.first["job_id"] - break if pg_try_advisory_lock?(cursor) + cursor = result.first["job_id"] + if pg_try_advisory_lock?(cursor) + break_loop = true + break + end + end + break if break_loop end + result end diff --git a/lib/que/sql.rb b/lib/que/sql.rb index 20496d5..a6267ab 100644 --- a/lib/que/sql.rb +++ b/lib/que/sql.rb @@ -184,7 +184,7 @@ module Que AND retryable = true AND job_id >= $2 ORDER BY priority, run_at, job_id - LIMIT 1 + for update skip locked LIMIT 1 }, } # rubocop:enable Style/MutableConstant diff --git a/spec/active_record_with_lock_spec_helper.rb b/spec/active_record_with_lock_spec_helper.rb index 3c10fd0..c639c78 100644 --- a/spec/active_record_with_lock_spec_helper.rb +++ b/spec/active_record_with_lock_spec_helper.rb @@ -4,7 +4,7 @@ class LockDatabaseRecord < ActiveRecord::Base establish_connection( adapter: "postgresql", host: ENV.fetch("LOCK_PGHOST", "localhost"), - user: ENV.fetch("LOCK_PGUSER", "postgres"), + user: ENV.fetch("LOCK_PGUSER", "ubuntu"), password: ENV.fetch("LOCK_PGPASSWORD", "password"), database: ENV.fetch("LOCK_PGDATABASE", "lock-test"), port: ENV.fetch("LOCK_PGPORT", 5434),