From 34182dd875660e37c13f4e85ef4c21f6d70beb14 Mon Sep 17 00:00:00 2001 From: Dominik Sander Date: Mon, 2 Aug 2021 17:53:27 +0200 Subject: [PATCH] Fix `on_conflict` strategy handling This fixes the exception posted below when using `on_conflict: replace` and potentially other bugs related to `on_conflict`. Before [this change][1] `BasicLock.lock` always returned either the acquired lock or the return value of `call_strategy`. Since `Middleware::Client#lock` now only yields when `lock_instance.lock` yields we have to also `yield` inside the lock instances when the lock was acquired via `lock_failed` (i.e. a `on_conflict` strategy). ``` NoMethodError: undefined method `key?' for "f5d69f8fd2e1f3dde8cee02e":String from sidekiq/client.rb:197:in `atomic_push' from sidekiq/client.rb:190:in `block (2 levels) in raw_push' from redis.rb:2489:in `block in multi' from redis.rb:69:in `block in synchronize' from monitor.rb:202:in `synchronize' from monitor.rb:202:in `mon_synchronize' from redis.rb:69:in `synchronize' from redis.rb:2483:in `multi' from sidekiq/client.rb:189:in `block in raw_push' from connection_pool.rb:63:in `block (2 levels) in with' from connection_pool.rb:62:in `handle_interrupt' from connection_pool.rb:62:in `block in with' from connection_pool.rb:59:in `handle_interrupt' from connection_pool.rb:59:in `with' from sidekiq/client.rb:188:in `raw_push' from sidekiq/client.rb:74:in `push' from sidekiq/worker.rb:240:in `client_push' from sidekiq/worker.rb:215:in `perform_in' ``` #590 [1]: s://github.com/mhenrixon/sidekiq-unique-jobs/commit/8c8d54c8b9dea363a7d8b8aeaceb2e82966b8503 --- .../lock/until_and_while_executing.rb | 4 +-- .../lock/until_executed.rb | 4 +-- .../lock/until_executing.rb | 4 +-- lib/sidekiq_unique_jobs/lock/until_expired.rb | 4 +-- .../until_and_while_executing_replace_job.rb | 17 +++++++++++ ...il_and_while_executing_replace_job_spec.rb | 30 +++++++++++++++++++ 6 files changed, 55 insertions(+), 8 deletions(-) create mode 100644 spec/support/workers/until_and_while_executing_replace_job.rb create mode 100644 spec/workers/until_and_while_executing_replace_job_spec.rb diff --git a/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb b/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb index ced00745..33ec3e96 100644 --- a/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb +++ b/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb @@ -23,8 +23,8 @@ class UntilAndWhileExecuting < BaseLock # @yield to the caller when given a block # def lock(origin: :client) - return lock_failed(origin: origin) unless (token = locksmith.lock) - return yield token if block_given? + token = locksmith.lock || lock_failed(origin: origin) + return yield token if token && block_given? token end diff --git a/lib/sidekiq_unique_jobs/lock/until_executed.rb b/lib/sidekiq_unique_jobs/lock/until_executed.rb index 01a8ec61..82148344 100644 --- a/lib/sidekiq_unique_jobs/lock/until_executed.rb +++ b/lib/sidekiq_unique_jobs/lock/until_executed.rb @@ -18,8 +18,8 @@ class UntilExecuted < BaseLock # @yield to the caller when given a block # def lock - return lock_failed(origin: :client) unless (token = locksmith.lock) - return yield token if block_given? + token = locksmith.lock || lock_failed(origin: :client) + return yield token if token && block_given? token end diff --git a/lib/sidekiq_unique_jobs/lock/until_executing.rb b/lib/sidekiq_unique_jobs/lock/until_executing.rb index 745136ea..4b1fa69c 100644 --- a/lib/sidekiq_unique_jobs/lock/until_executing.rb +++ b/lib/sidekiq_unique_jobs/lock/until_executing.rb @@ -16,8 +16,8 @@ class UntilExecuting < BaseLock # @return [String, nil] the locked jid when properly locked, else nil. # def lock - return lock_failed unless (job_id = locksmith.lock) - return yield job_id if block_given? + job_id = locksmith.lock || lock_failed + return yield job_id if job_id && block_given? job_id end diff --git a/lib/sidekiq_unique_jobs/lock/until_expired.rb b/lib/sidekiq_unique_jobs/lock/until_expired.rb index f498dfab..f247a966 100644 --- a/lib/sidekiq_unique_jobs/lock/until_expired.rb +++ b/lib/sidekiq_unique_jobs/lock/until_expired.rb @@ -18,8 +18,8 @@ class UntilExpired < UntilExecuted # @yield to the caller when given a block # def lock - return lock_failed unless (job_id = locksmith.lock) - return yield job_id if block_given? + job_id = locksmith.lock || lock_failed + return yield job_id if job_id && block_given? job_id end diff --git a/spec/support/workers/until_and_while_executing_replace_job.rb b/spec/support/workers/until_and_while_executing_replace_job.rb new file mode 100644 index 00000000..25f3b7a5 --- /dev/null +++ b/spec/support/workers/until_and_while_executing_replace_job.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +# :nocov: + +class UntilAndWhileExecutingReplaceJob + include Sidekiq::Worker + + sidekiq_options lock: :until_and_while_executing, + queue: :working, + on_conflict: :replace + + def self.lock_args(args) + [args[0]] + end + + def perform(key); end +end diff --git a/spec/workers/until_and_while_executing_replace_job_spec.rb b/spec/workers/until_and_while_executing_replace_job_spec.rb new file mode 100644 index 00000000..c8c4372f --- /dev/null +++ b/spec/workers/until_and_while_executing_replace_job_spec.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +RSpec.describe UntilAndWhileExecutingReplaceJob do + it_behaves_like "sidekiq with options" do + let(:options) do + { + "queue" => :working, + "retry" => true, + "lock" => :until_and_while_executing, + "on_conflict" => :replace, + } + end + end + + it "replaces the previous job successfully" do + Sidekiq::Testing.disable! do + set = Sidekiq::ScheduledSet.new + + described_class.perform_at(Time.now + 30, "unique", "first argument") + expect(set.size).to eq(1) + expect(set.first.item["args"]).to eq(["unique", "first argument"]) + + described_class.perform_at(Time.now + 30, "unique", "new argument") + expect(set.size).to eq(1) + expect(set.first.item["args"]).to eq(["unique", "new argument"]) + + set.each(&:delete) + end + end +end