Skip to content

Commit

Permalink
Ensure the replace strategy yields
Browse files Browse the repository at this point in the history
This should be considered a success
  • Loading branch information
mhenrixon committed Sep 27, 2021
1 parent 756b456 commit 8de220a
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 39 deletions.
23 changes: 5 additions & 18 deletions lib/sidekiq_unique_jobs/lock/base_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,13 @@ def prepare_item
SidekiqUniqueJobs::Job.prepare(item)
end

#
# Handle when lock failed
#
# @param [Symbol] origin either `:client` or `:server`
#
# @return [void]
#
def lock_failed(origin: :client)
reflect(:lock_failed, item)
call_strategy(origin: origin)
nil
end

def call_strategy(origin:)
strategy = strategy_for(origin)
strategy.call { lock if strategy.replace? && @attempt < 2 }

@attempt += 1
new_job_id = nil
strategy = strategy_for(origin)
@attempt += 1

nil
strategy.call { new_job_id = lock if strategy.replace? && @attempt < 2 }
yield if new_job_id && block_given?
end

def unlock_and_callback
Expand Down
12 changes: 9 additions & 3 deletions lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@ class UntilAndWhileExecuting < BaseLock
#
# @yield to the caller when given a block
#
def lock(origin: :client)
return lock_failed(origin: origin) unless (token = locksmith.lock)
return yield token if block_given?
def lock(origin: :client, &block)
unless (token = locksmith.lock)
reflect(:lock_failed, item)
call_strategy(origin: origin, &block)

return
end

yield if block

token
end
Expand Down
12 changes: 9 additions & 3 deletions lib/sidekiq_unique_jobs/lock/until_executed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ class UntilExecuted < BaseLock
#
# @yield to the caller when given a block
#
def lock
return lock_failed(origin: :client) unless (token = locksmith.lock)
return yield token if block_given?
def lock(&block)
unless (token = locksmith.lock)
reflect(:lock_failed, item)
call_strategy(origin: :client, &block)

return
end

yield if block

token
end
Expand Down
14 changes: 10 additions & 4 deletions lib/sidekiq_unique_jobs/lock/until_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,17 @@ class UntilExecuting < BaseLock
#
# @return [String, nil] the locked jid when properly locked, else nil.
#
def lock
return lock_failed unless (job_id = locksmith.lock)
return yield job_id if block_given?
def lock(&block)
unless (token = locksmith.lock)
reflect(:lock_failed, item)
call_strategy(origin: :client, &block)

job_id
return
end

yield if block

token
end

# Executes in the Sidekiq server process
Expand Down
14 changes: 10 additions & 4 deletions lib/sidekiq_unique_jobs/lock/until_expired.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ class UntilExpired < UntilExecuted
#
# @yield to the caller when given a block
#
def lock
return lock_failed unless (job_id = locksmith.lock)
return yield job_id if block_given?
def lock(&block)
unless (token = locksmith.lock)
reflect(:lock_failed, item)
call_strategy(origin: :client, &block)

job_id
return
end

yield if block

token
end

# Executes in the Sidekiq server process
Expand Down
10 changes: 6 additions & 4 deletions lib/sidekiq_unique_jobs/lock/while_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Lock
#
# @author Mikael Henriksson <[email protected]>
class WhileExecuting < BaseLock
RUN_SUFFIX ||= ":RUN"
RUN_SUFFIX = ":RUN"

include SidekiqUniqueJobs::OptionsWithFallback
include SidekiqUniqueJobs::Logging::Middleware
Expand All @@ -30,22 +30,24 @@ def initialize(item, callback, redis_pool = nil)
# @return [true] always returns true
def lock
job_id = item[JID]
yield job_id if block_given?
yield if block_given?

job_id
end

# Executes in the Sidekiq server process.
# These jobs are locked in the server process not from the client
# @yield to the worker class perform method
def execute
def execute(&block)
with_logging_context do
return call_strategy(origin: :server) unless locksmith.execute do
executed = locksmith.execute do
yield
callback_safely if locksmith.unlock
ensure
locksmith.unlock
end

call_strategy(origin: :server, &block) unless executed
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/middleware/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def call(*, &block)
private

def lock
lock_instance.lock do |_locked_jid|
lock_instance.lock do
reflect(:locked, item)
return yield
end
Expand Down
2 changes: 0 additions & 2 deletions lib/sidekiq_unique_jobs/on_conflict/replace.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ def call(&block)
end

block&.call

nil # Ensure we always return nil
end

#
Expand Down
20 changes: 20 additions & 0 deletions spec/support/workers/until_and_while_executing_reject_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# frozen_string_literal: true

# :nocov:

class UntilAndWhileExecutingRejectJob
include Sidekiq::Worker

sidekiq_options lock: :until_and_while_executing,
queue: :working,
on_conflict: {
client: :reject,
server: :reject,
}

def self.lock_args(args)
[args[0]]
end

def perform(key); end
end
20 changes: 20 additions & 0 deletions spec/support/workers/until_and_while_executing_replace_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# frozen_string_literal: true

# :nocov:

class UntilAndWhileExecutingReplaceJob
include Sidekiq::Worker

sidekiq_options lock: :until_and_while_executing,
queue: :working,
on_conflict: {
client: :replace,
server: :reschedule,
}

def self.lock_args(args)
[args[0]]
end

def perform(key); end
end
40 changes: 40 additions & 0 deletions spec/workers/until_and_while_executing_reject_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# frozen_string_literal: true

RSpec.describe UntilAndWhileExecutingRejectJob do
it_behaves_like "sidekiq with options" do
let(:options) do
{
"queue" => :working,
"retry" => true,
"lock" => :until_and_while_executing,
"on_conflict" => { client: :reject, server: :reject },
}
end
end

it "rejects the job successfully" do
Sidekiq::Testing.disable! do
set = Sidekiq::ScheduledSet.new

described_class.perform_at(Time.now + 30, 1)
expect(set.size).to eq(1)

expect(described_class.perform_at(Time.now + 30, 1)).to be_nil

set.each(&:delete)
end
end

it "rejects job successfully when using perform_in" do
Sidekiq::Testing.disable! do
set = Sidekiq::ScheduledSet.new

described_class.perform_in(30, 1)
expect(set.size).to eq(1)

expect(described_class.perform_in(30, 1)).to be_nil

set.each(&:delete)
end
end
end
48 changes: 48 additions & 0 deletions spec/workers/until_and_while_executing_replace_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# frozen_string_literal: true

RSpec.describe UntilAndWhileExecutingReplaceJob do
it_behaves_like "sidekiq with options" do
let(:options) do
{
"queue" => :working,
"retry" => true,
"lock" => :until_and_while_executing,
"on_conflict" => { client: :replace, server: :reschedule },
}
end
end

it "replaces the previous job successfully" do
Sidekiq::Testing.disable! do
set = Sidekiq::ScheduledSet.new

described_class.perform_at(Time.now + 30, "unique", "first argument")
expect(set.size).to eq(1)
expect(set.first.item["args"]).to eq(["unique", "first argument"])

job_id = described_class.perform_at(Time.now + 30, "unique", "new argument")
expect(job_id).not_to be_nil
expect(set.size).to eq(1)
expect(set.first.item["args"]).to eq(["unique", "new argument"])

set.each(&:delete)
end
end

it "replaces the previous job successfully when using perform_in" do
Sidekiq::Testing.disable! do
set = Sidekiq::ScheduledSet.new

described_class.perform_in(30, "unique", "first argument")
expect(set.size).to eq(1)
expect(set.first.item["args"]).to eq(["unique", "first argument"])

job_id = described_class.perform_in(30, "unique", "new argument")
expect(job_id).not_to be_nil
expect(set.size).to eq(1)
expect(set.first.item["args"]).to eq(["unique", "new argument"])

set.each(&:delete)
end
end
end

0 comments on commit 8de220a

Please sign in to comment.