From 8de220ae2edcad86704c8e1257921323d6cff17c Mon Sep 17 00:00:00 2001 From: mhenrixon Date: Mon, 27 Sep 2021 14:19:15 +0200 Subject: [PATCH] Ensure the replace strategy yields This should be considered a success --- lib/sidekiq_unique_jobs/lock/base_lock.rb | 23 ++------- .../lock/until_and_while_executing.rb | 12 +++-- .../lock/until_executed.rb | 12 +++-- .../lock/until_executing.rb | 14 ++++-- lib/sidekiq_unique_jobs/lock/until_expired.rb | 14 ++++-- .../lock/while_executing.rb | 10 ++-- lib/sidekiq_unique_jobs/middleware/client.rb | 2 +- .../on_conflict/replace.rb | 2 - .../until_and_while_executing_reject_job.rb | 20 ++++++++ .../until_and_while_executing_replace_job.rb | 20 ++++++++ ...til_and_while_executing_reject_job_spec.rb | 40 ++++++++++++++++ ...il_and_while_executing_replace_job_spec.rb | 48 +++++++++++++++++++ 12 files changed, 178 insertions(+), 39 deletions(-) create mode 100644 spec/support/workers/until_and_while_executing_reject_job.rb create mode 100644 spec/support/workers/until_and_while_executing_replace_job.rb create mode 100644 spec/workers/until_and_while_executing_reject_job_spec.rb create mode 100644 spec/workers/until_and_while_executing_replace_job_spec.rb diff --git a/lib/sidekiq_unique_jobs/lock/base_lock.rb b/lib/sidekiq_unique_jobs/lock/base_lock.rb index a27cd12af..a9f3d77c3 100644 --- a/lib/sidekiq_unique_jobs/lock/base_lock.rb +++ b/lib/sidekiq_unique_jobs/lock/base_lock.rb @@ -99,26 +99,13 @@ def prepare_item SidekiqUniqueJobs::Job.prepare(item) end - # - # Handle when lock failed - # - # @param [Symbol] origin either `:client` or `:server` - # - # @return [void] - # - def lock_failed(origin: :client) - reflect(:lock_failed, item) - call_strategy(origin: origin) - nil - end - def call_strategy(origin:) - strategy = strategy_for(origin) - strategy.call { lock if strategy.replace? && @attempt < 2 } - - @attempt += 1 + new_job_id = nil + strategy = strategy_for(origin) + @attempt += 1 - nil + strategy.call { new_job_id = lock if strategy.replace? && @attempt < 2 } + yield if new_job_id && block_given? end def unlock_and_callback diff --git a/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb b/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb index ced00745a..cb1672324 100644 --- a/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb +++ b/lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb @@ -22,9 +22,15 @@ class UntilAndWhileExecuting < BaseLock # # @yield to the caller when given a block # - def lock(origin: :client) - return lock_failed(origin: origin) unless (token = locksmith.lock) - return yield token if block_given? + def lock(origin: :client, &block) + unless (token = locksmith.lock) + reflect(:lock_failed, item) + call_strategy(origin: origin, &block) + + return + end + + yield if block token end diff --git a/lib/sidekiq_unique_jobs/lock/until_executed.rb b/lib/sidekiq_unique_jobs/lock/until_executed.rb index 01a8ec616..b90467a37 100644 --- a/lib/sidekiq_unique_jobs/lock/until_executed.rb +++ b/lib/sidekiq_unique_jobs/lock/until_executed.rb @@ -17,9 +17,15 @@ class UntilExecuted < BaseLock # # @yield to the caller when given a block # - def lock - return lock_failed(origin: :client) unless (token = locksmith.lock) - return yield token if block_given? + def lock(&block) + unless (token = locksmith.lock) + reflect(:lock_failed, item) + call_strategy(origin: :client, &block) + + return + end + + yield if block token end diff --git a/lib/sidekiq_unique_jobs/lock/until_executing.rb b/lib/sidekiq_unique_jobs/lock/until_executing.rb index 745136ea9..7903b356d 100644 --- a/lib/sidekiq_unique_jobs/lock/until_executing.rb +++ b/lib/sidekiq_unique_jobs/lock/until_executing.rb @@ -15,11 +15,17 @@ class UntilExecuting < BaseLock # # @return [String, nil] the locked jid when properly locked, else nil. # - def lock - return lock_failed unless (job_id = locksmith.lock) - return yield job_id if block_given? + def lock(&block) + unless (token = locksmith.lock) + reflect(:lock_failed, item) + call_strategy(origin: :client, &block) - job_id + return + end + + yield if block + + token end # Executes in the Sidekiq server process diff --git a/lib/sidekiq_unique_jobs/lock/until_expired.rb b/lib/sidekiq_unique_jobs/lock/until_expired.rb index f498dfab2..1b39f26d8 100644 --- a/lib/sidekiq_unique_jobs/lock/until_expired.rb +++ b/lib/sidekiq_unique_jobs/lock/until_expired.rb @@ -17,11 +17,17 @@ class UntilExpired < UntilExecuted # # @yield to the caller when given a block # - def lock - return lock_failed unless (job_id = locksmith.lock) - return yield job_id if block_given? + def lock(&block) + unless (token = locksmith.lock) + reflect(:lock_failed, item) + call_strategy(origin: :client, &block) - job_id + return + end + + yield if block + + token end # Executes in the Sidekiq server process diff --git a/lib/sidekiq_unique_jobs/lock/while_executing.rb b/lib/sidekiq_unique_jobs/lock/while_executing.rb index c6ce8e300..713c73d39 100644 --- a/lib/sidekiq_unique_jobs/lock/while_executing.rb +++ b/lib/sidekiq_unique_jobs/lock/while_executing.rb @@ -11,7 +11,7 @@ class Lock # # @author Mikael Henriksson class WhileExecuting < BaseLock - RUN_SUFFIX ||= ":RUN" + RUN_SUFFIX = ":RUN" include SidekiqUniqueJobs::OptionsWithFallback include SidekiqUniqueJobs::Logging::Middleware @@ -30,7 +30,7 @@ def initialize(item, callback, redis_pool = nil) # @return [true] always returns true def lock job_id = item[JID] - yield job_id if block_given? + yield if block_given? job_id end @@ -38,14 +38,16 @@ def lock # Executes in the Sidekiq server process. # These jobs are locked in the server process not from the client # @yield to the worker class perform method - def execute + def execute(&block) with_logging_context do - return call_strategy(origin: :server) unless locksmith.execute do + executed = locksmith.execute do yield callback_safely if locksmith.unlock ensure locksmith.unlock end + + call_strategy(origin: :server, &block) unless executed end end diff --git a/lib/sidekiq_unique_jobs/middleware/client.rb b/lib/sidekiq_unique_jobs/middleware/client.rb index 5b73daf9d..ea033903d 100644 --- a/lib/sidekiq_unique_jobs/middleware/client.rb +++ b/lib/sidekiq_unique_jobs/middleware/client.rb @@ -30,7 +30,7 @@ def call(*, &block) private def lock - lock_instance.lock do |_locked_jid| + lock_instance.lock do reflect(:locked, item) return yield end diff --git a/lib/sidekiq_unique_jobs/on_conflict/replace.rb b/lib/sidekiq_unique_jobs/on_conflict/replace.rb index 1d851de67..d434da9e7 100644 --- a/lib/sidekiq_unique_jobs/on_conflict/replace.rb +++ b/lib/sidekiq_unique_jobs/on_conflict/replace.rb @@ -43,8 +43,6 @@ def call(&block) end block&.call - - nil # Ensure we always return nil end # diff --git a/spec/support/workers/until_and_while_executing_reject_job.rb b/spec/support/workers/until_and_while_executing_reject_job.rb new file mode 100644 index 000000000..4e9c21332 --- /dev/null +++ b/spec/support/workers/until_and_while_executing_reject_job.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +# :nocov: + +class UntilAndWhileExecutingRejectJob + include Sidekiq::Worker + + sidekiq_options lock: :until_and_while_executing, + queue: :working, + on_conflict: { + client: :reject, + server: :reject, + } + + def self.lock_args(args) + [args[0]] + end + + def perform(key); end +end diff --git a/spec/support/workers/until_and_while_executing_replace_job.rb b/spec/support/workers/until_and_while_executing_replace_job.rb new file mode 100644 index 000000000..c4d84c891 --- /dev/null +++ b/spec/support/workers/until_and_while_executing_replace_job.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +# :nocov: + +class UntilAndWhileExecutingReplaceJob + include Sidekiq::Worker + + sidekiq_options lock: :until_and_while_executing, + queue: :working, + on_conflict: { + client: :replace, + server: :reschedule, + } + + def self.lock_args(args) + [args[0]] + end + + def perform(key); end +end diff --git a/spec/workers/until_and_while_executing_reject_job_spec.rb b/spec/workers/until_and_while_executing_reject_job_spec.rb new file mode 100644 index 000000000..3f9f2b1cc --- /dev/null +++ b/spec/workers/until_and_while_executing_reject_job_spec.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +RSpec.describe UntilAndWhileExecutingRejectJob do + it_behaves_like "sidekiq with options" do + let(:options) do + { + "queue" => :working, + "retry" => true, + "lock" => :until_and_while_executing, + "on_conflict" => { client: :reject, server: :reject }, + } + end + end + + it "rejects the job successfully" do + Sidekiq::Testing.disable! do + set = Sidekiq::ScheduledSet.new + + described_class.perform_at(Time.now + 30, 1) + expect(set.size).to eq(1) + + expect(described_class.perform_at(Time.now + 30, 1)).to be_nil + + set.each(&:delete) + end + end + + it "rejects job successfully when using perform_in" do + Sidekiq::Testing.disable! do + set = Sidekiq::ScheduledSet.new + + described_class.perform_in(30, 1) + expect(set.size).to eq(1) + + expect(described_class.perform_in(30, 1)).to be_nil + + set.each(&:delete) + end + end +end diff --git a/spec/workers/until_and_while_executing_replace_job_spec.rb b/spec/workers/until_and_while_executing_replace_job_spec.rb new file mode 100644 index 000000000..13d889bc6 --- /dev/null +++ b/spec/workers/until_and_while_executing_replace_job_spec.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +RSpec.describe UntilAndWhileExecutingReplaceJob do + it_behaves_like "sidekiq with options" do + let(:options) do + { + "queue" => :working, + "retry" => true, + "lock" => :until_and_while_executing, + "on_conflict" => { client: :replace, server: :reschedule }, + } + end + end + + it "replaces the previous job successfully" do + Sidekiq::Testing.disable! do + set = Sidekiq::ScheduledSet.new + + described_class.perform_at(Time.now + 30, "unique", "first argument") + expect(set.size).to eq(1) + expect(set.first.item["args"]).to eq(["unique", "first argument"]) + + job_id = described_class.perform_at(Time.now + 30, "unique", "new argument") + expect(job_id).not_to be_nil + expect(set.size).to eq(1) + expect(set.first.item["args"]).to eq(["unique", "new argument"]) + + set.each(&:delete) + end + end + + it "replaces the previous job successfully when using perform_in" do + Sidekiq::Testing.disable! do + set = Sidekiq::ScheduledSet.new + + described_class.perform_in(30, "unique", "first argument") + expect(set.size).to eq(1) + expect(set.first.item["args"]).to eq(["unique", "first argument"]) + + job_id = described_class.perform_in(30, "unique", "new argument") + expect(job_id).not_to be_nil + expect(set.size).to eq(1) + expect(set.first.item["args"]).to eq(["unique", "new argument"]) + + set.each(&:delete) + end + end +end