From 0c4a4b39ae521a130f93521d8de3f64f0348ca54 Mon Sep 17 00:00:00 2001 From: Ben Sheldon Date: Tue, 15 Dec 2020 17:19:26 -0800 Subject: [PATCH] Ensure that deleted jobs are unlocked - Checks that every test cleans up any remaining locks - Adds a PgLock model for test support --- lib/good_job/lockable.rb | 7 ++-- spec/lib/good_job/job_spec.rb | 4 +- .../good_job/lockable_spec.rb} | 42 ++++++++++++++++--- spec/support/pg_locks.rb | 38 +++++++++++++++++ 4 files changed, 81 insertions(+), 10 deletions(-) rename spec/{support/shared_examples/lockable.rb => lib/good_job/lockable_spec.rb} (76%) create mode 100644 spec/support/pg_locks.rb diff --git a/lib/good_job/lockable.rb b/lib/good_job/lockable.rb index 901a85f66..b054bdb29 100644 --- a/lib/good_job/lockable.rb +++ b/lib/good_job/lockable.rb @@ -151,10 +151,11 @@ def advisory_lock # {#advisory_unlock} and {#advisory_lock} the same number of times. # @return [Boolean] whether the lock was released. def advisory_unlock - where_sql = <<~SQL.squish - pg_advisory_unlock(('x' || substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint) + query = <<~SQL.squish + SELECT 1 AS one + WHERE pg_advisory_unlock(('x'||substr(md5(:table_name || :id::text), 1, 16))::bit(64)::bigint) SQL - self.class.unscoped.exists?([where_sql, { table_name: self.class.table_name, id: send(self.class.primary_key) }]) + self.class.connection.execute(sanitize_sql_for_conditions([query, { table_name: self.class.table_name, id: send(self.class.primary_key) }])).ntuples.positive? end # Acquires an advisory lock on this record or raises diff --git a/spec/lib/good_job/job_spec.rb b/spec/lib/good_job/job_spec.rb index 4b3cd1ddb..ec179450b 100644 --- a/spec/lib/good_job/job_spec.rb +++ b/spec/lib/good_job/job_spec.rb @@ -19,8 +19,6 @@ def perform(result_value = nil, raise_error: false) end) end - it_behaves_like 'lockable' - describe '.enqueue' do let(:active_job) { ExampleJob.new } @@ -52,6 +50,8 @@ def perform(result_value = nil, raise_error: false) locked_good_job = described_class.enqueue(active_job, create_with_advisory_lock: true) expect(locked_good_job.advisory_locked?).to eq true + + locked_good_job.advisory_unlock end end diff --git a/spec/support/shared_examples/lockable.rb b/spec/lib/good_job/lockable_spec.rb similarity index 76% rename from spec/support/shared_examples/lockable.rb rename to spec/lib/good_job/lockable_spec.rb index 957568757..bdf4b4967 100644 --- a/spec/support/shared_examples/lockable.rb +++ b/spec/lib/good_job/lockable_spec.rb @@ -1,4 +1,9 @@ -RSpec.shared_examples 'lockable' do +require 'rails_helper' + +RSpec.describe GoodJob::Lockable do + let(:model_class) { GoodJob::Job } + let(:job) { model_class.create! } + describe '.advisory_lock' do around do |example| RSpec.configure do |config| @@ -14,7 +19,7 @@ end it 'generates appropriate SQL' do - query = described_class.where(priority: 99).order(priority: :desc).limit(2).advisory_lock + query = model_class.where(priority: 99).order(priority: :desc).limit(2).advisory_lock expect(normalize_sql(query.to_sql)).to eq normalize_sql(<<~SQL.squish) SELECT "good_jobs".* @@ -37,21 +42,31 @@ it 'returns first row of the query with a lock' do expect(job).not_to be_advisory_locked - result_job = described_class.advisory_lock.first + result_job = model_class.advisory_lock.first expect(result_job).to eq job expect(job).to be_advisory_locked + + job.advisory_unlock end end describe '.with_advisory_lock' do it 'opens a block with a lock' do records = nil - described_class.limit(2).with_advisory_lock do |results| + model_class.limit(2).with_advisory_lock do |results| records = results expect(records).to all be_advisory_locked end expect(records).to all be_advisory_unlocked + expect(PgLock.advisory_lock.count).to eq 0 + end + + it 'does not leak advisory locks' do + model_class.limit(2).with_advisory_lock do |results| + records = results + expect(records).to all be_advisory_locked + end end end @@ -63,6 +78,8 @@ other_thread_owns_advisory_lock = Concurrent::Promises.future(job, &:owns_advisory_lock?).value! expect(other_thread_owns_advisory_lock).to be false + + job.advisory_unlock end end @@ -82,6 +99,17 @@ expect do job.advisory_unlock end.not_to change(job, :advisory_locked?).from(true) + + job.advisory_unlock + end + + it 'unlocks the record even after the record is destroyed' do + job.advisory_lock! + job.destroy! + + expect do + job.advisory_unlock + end.to change(PgLock, :count).by(-1) end end @@ -98,11 +126,13 @@ describe 'create_with_lock' do it 'causes the job to be saved and locked' do - job = described_class.new + job = model_class.new job.create_with_advisory_lock = true job.save! expect(job).to be_advisory_locked + + job.advisory_unlock end end @@ -113,5 +143,7 @@ expect do Concurrent::Promises.future(job, &:advisory_lock!).value! end.to raise_error GoodJob::Lockable::RecordAlreadyAdvisoryLockedError + + job.advisory_unlock end end diff --git a/spec/support/pg_locks.rb b/spec/support/pg_locks.rb new file mode 100644 index 000000000..fdafe883c --- /dev/null +++ b/spec/support/pg_locks.rb @@ -0,0 +1,38 @@ +module PostgresXidExtension + def initialize_type_map(map = type_map) + register_class_with_limit map, 'xid', ActiveRecord::Type::String # OID 28 + super(map) + end +end + +ActiveSupport.on_load :active_record do + ActiveRecord::ConnectionAdapters::PostgreSQLAdapter.prepend PostgresXidExtension +end + +class PgLock < ActiveRecord::Base + self.table_name = 'pg_locks' + self.primary_key = 'objid' + + scope :advisory_lock, -> { where(locktype: 'advisory') } + scope :owns, -> { where('pid = pg_backend_pid()') } + scope :others, -> { where('pid != pg_backend_pid()') } + + def unlock + where_sql = <<~SQL.squish + pg_advisory_unlock((?::bigint << 32) + ?::bigint) + SQL + self.class.unscoped.exists?([where_sql, classid, objid]) + end +end + +RSpec.configure do |config| + config.before do + PgLock.advisory_lock.each(&:unlock) if PgLock.advisory_lock.count > 0 + expect(PgLock.advisory_lock.count).to eq(0), "Existing advisory locks BEFORE test run" + end + + config.after do + expect(PgLock.owns.advisory_lock.count).to eq(0), "Existing owned advisory locks AFTER test run" + expect(PgLock.others.advisory_lock.count).to eq(0), "Existing others advisory locks AFTER test run" + end +end