Skip to content

Commit

Permalink
Ensure that deleted jobs are unlocked
Browse files Browse the repository at this point in the history
- Checks that every test cleans up any remaining locks
- Adds a PgLock model for test support
  • Loading branch information
bensheldon committed Dec 16, 2020
1 parent cbbb377 commit 0c4a4b3
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 10 deletions.
7 changes: 4 additions & 3 deletions lib/good_job/lockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,11 @@ def advisory_lock
# {#advisory_unlock} and {#advisory_lock} the same number of times.
# @return [Boolean] whether the lock was released.
def advisory_unlock
where_sql = <<~SQL.squish
pg_advisory_unlock(('x' || substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint)
query = <<~SQL.squish
SELECT 1 AS one
WHERE pg_advisory_unlock(('x'||substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint)
SQL
self.class.unscoped.exists?([where_sql, { table_name: self.class.table_name, id: send(self.class.primary_key) }])
self.class.connection.execute(sanitize_sql_for_conditions([query, { table_name: self.class.table_name, id: send(self.class.primary_key) }])).ntuples.positive?
end

# Acquires an advisory lock on this record or raises
Expand Down
4 changes: 2 additions & 2 deletions spec/lib/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ def perform(result_value = nil, raise_error: false)
end)
end

it_behaves_like 'lockable'

describe '.enqueue' do
let(:active_job) { ExampleJob.new }

Expand Down Expand Up @@ -52,6 +50,8 @@ def perform(result_value = nil, raise_error: false)

locked_good_job = described_class.enqueue(active_job, create_with_advisory_lock: true)
expect(locked_good_job.advisory_locked?).to eq true

locked_good_job.advisory_unlock
end
end

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
RSpec.shared_examples 'lockable' do
require 'rails_helper'

RSpec.describe GoodJob::Lockable do
let(:model_class) { GoodJob::Job }
let(:job) { model_class.create! }

describe '.advisory_lock' do
around do |example|
RSpec.configure do |config|
Expand All @@ -14,7 +19,7 @@
end

it 'generates appropriate SQL' do
query = described_class.where(priority: 99).order(priority: :desc).limit(2).advisory_lock
query = model_class.where(priority: 99).order(priority: :desc).limit(2).advisory_lock

expect(normalize_sql(query.to_sql)).to eq normalize_sql(<<~SQL.squish)
SELECT "good_jobs".*
Expand All @@ -37,21 +42,31 @@

it 'returns first row of the query with a lock' do
expect(job).not_to be_advisory_locked
result_job = described_class.advisory_lock.first
result_job = model_class.advisory_lock.first
expect(result_job).to eq job
expect(job).to be_advisory_locked

job.advisory_unlock
end
end

describe '.with_advisory_lock' do
it 'opens a block with a lock' do
records = nil
described_class.limit(2).with_advisory_lock do |results|
model_class.limit(2).with_advisory_lock do |results|
records = results
expect(records).to all be_advisory_locked
end

expect(records).to all be_advisory_unlocked
expect(PgLock.advisory_lock.count).to eq 0
end

it 'does not leak advisory locks' do
model_class.limit(2).with_advisory_lock do |results|
records = results
expect(records).to all be_advisory_locked
end
end
end

Expand All @@ -63,6 +78,8 @@

other_thread_owns_advisory_lock = Concurrent::Promises.future(job, &:owns_advisory_lock?).value!
expect(other_thread_owns_advisory_lock).to be false

job.advisory_unlock
end
end

Expand All @@ -82,6 +99,17 @@
expect do
job.advisory_unlock
end.not_to change(job, :advisory_locked?).from(true)

job.advisory_unlock
end

it 'unlocks the record even after the record is destroyed' do
job.advisory_lock!
job.destroy!

expect do
job.advisory_unlock
end.to change(PgLock, :count).by(-1)
end
end

Expand All @@ -98,11 +126,13 @@

describe 'create_with_lock' do
it 'causes the job to be saved and locked' do
job = described_class.new
job = model_class.new
job.create_with_advisory_lock = true
job.save!

expect(job).to be_advisory_locked

job.advisory_unlock
end
end

Expand All @@ -113,5 +143,7 @@
expect do
Concurrent::Promises.future(job, &:advisory_lock!).value!
end.to raise_error GoodJob::Lockable::RecordAlreadyAdvisoryLockedError

job.advisory_unlock
end
end
38 changes: 38 additions & 0 deletions spec/support/pg_locks.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
module PostgresXidExtension
def initialize_type_map(map = type_map)
register_class_with_limit map, 'xid', ActiveRecord::Type::String # OID 28
super(map)
end
end

ActiveSupport.on_load :active_record do
ActiveRecord::ConnectionAdapters::PostgreSQLAdapter.prepend PostgresXidExtension
end

class PgLock < ActiveRecord::Base
self.table_name = 'pg_locks'
self.primary_key = 'objid'

scope :advisory_lock, -> { where(locktype: 'advisory') }
scope :owns, -> { where('pid = pg_backend_pid()') }
scope :others, -> { where('pid != pg_backend_pid()') }

def unlock
where_sql = <<~SQL.squish
pg_advisory_unlock((?::bigint << 32) + ?::bigint)
SQL
self.class.unscoped.exists?([where_sql, classid, objid])
end
end

RSpec.configure do |config|
config.before do
PgLock.advisory_lock.each(&:unlock) if PgLock.advisory_lock.count > 0
expect(PgLock.advisory_lock.count).to eq(0), "Existing advisory locks BEFORE test run"
end

config.after do
expect(PgLock.owns.advisory_lock.count).to eq(0), "Existing owned advisory locks AFTER test run"
expect(PgLock.others.advisory_lock.count).to eq(0), "Existing others advisory locks AFTER test run"
end
end

0 comments on commit 0c4a4b3

Please sign in to comment.