Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds coverage for job retries #321

Merged
merged 4 commits into from
Aug 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions lib/sidekiq_unique_jobs/locksmith.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module SidekiqUniqueJobs
# Lock manager class that handles all the various locks
#
# @author Mikael Henriksson <[email protected]>
class Locksmith # rubocop:disable ClassLength
class Locksmith
include SidekiqUniqueJobs::Connection

# @param [Hash] item a Sidekiq job hash
Expand All @@ -20,17 +20,6 @@ def initialize(item, redis_pool = nil)
@redis_pool = redis_pool
end

# Creates the necessary keys in redis to attempt a lock
# @return [String] the Sidekiq job_id
def create
Scripts.call(
:create,
redis_pool,
keys: [exists_key, grabbed_key, available_key, UNIQUE_SET, unique_digest],
argv: [jid, expiration],
)
end

# Checks if the exists key is created in redis
# @return [true, false]
def exists?
Expand Down Expand Up @@ -66,7 +55,9 @@ def delete!
# @yield the block to execute if a lock is successful
# @return the Sidekiq job_id (jid)
def lock(timeout = nil, &block)
create
Scripts.call(:lock, redis_pool,
keys: [exists_key, grabbed_key, available_key, UNIQUE_SET, unique_digest],
argv: [jid, expiration])

grab_token(timeout) do |token|
touch_grabbed_token(token)
Expand Down
7 changes: 6 additions & 1 deletion lib/sidekiq_unique_jobs/util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def keys_with_ttl(pattern = SCAN_PATTERN, count = DEFAULT_COUNT)
# @return [Integer] the number of keys deleted
def del(pattern = SCAN_PATTERN, count = 0)
raise ArgumentError, 'Please provide a number of keys to delete greater than zero' if count.zero?
pattern = "#{pattern}:*" unless pattern.end_with?(':*')
pattern = suffix(pattern)

log_debug { "Deleting keys by: #{pattern}" }
keys, time = timed { keys(pattern, count) }
Expand Down Expand Up @@ -87,6 +87,11 @@ def prefix(key)
"#{unique_prefix}:#{key}"
end

def suffix(key)
return "#{key}*" unless key.end_with?(':*')
key
end

def unique_prefix
SidekiqUniqueJobs.config.unique_prefix
end
Expand Down
12 changes: 12 additions & 0 deletions redis/create.lua → redis/lock.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ local unique_digest = KEYS[5]
local job_id = ARGV[1]
local expiration = tonumber(ARGV[2])

local function current_time()
local time = redis.call('time')
local s = time[1]
local ms = time[2]
local number = tonumber((s .. '.' .. ms))

return number
end

-- redis.log(redis.LOG_DEBUG, "create.lua - investigate possibility of locking jid: " .. job_id)

local stored_token = redis.call('GET', exists_key)
Expand All @@ -34,6 +43,9 @@ end

redis.call('SADD', unique_keys, unique_digest)
redis.call('DEL', grabbed_key)
-- TODO: Move this to LUA when redis 3.2 is the least supported
-- redis.call('HSET', grabbed_key, job_id, current_time())
---------------------------------------------------------------
redis.call('DEL', available_key)
redis.call('RPUSH', available_key, job_id)

Expand Down
52 changes: 52 additions & 0 deletions spec/integration/sidekiq/retry_set_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

require 'spec_helper'

RSpec.describe Sidekiq::RetrySet, redis: :redis do
let(:locksmith) { SidekiqUniqueJobs::Locksmith.new(item) }
let(:args) { [1, 2] }
let(:worker_class) { MyUniqueJob }
let(:jid) { 'ajobid' }
let(:lock) { :until_executed }
let(:lock_expiration) { 7_200 }
let(:queue) { :customqueue }
let(:retry_at) { Time.now.to_f + 360 }
let(:unique_digest) { 'uniquejobs:9e9b5ce5d423d3ea470977004b50ff84' }
let(:item) do
{
'args' => args,
'class' => worker_class,
'failed_at' => Time.now.to_f,
'jid' => jid,
'lock' => lock,
'lock_expiration' => lock_expiration,
'queue' => queue,
'retry_at' => retry_at,
'retry_count' => 1,
'unique_digest' => unique_digest,
}
end

before do
zadd('retry', retry_at.to_s, Sidekiq.dump_json(item))
expect(retry_count).to eq(1)
end

context 'when a job is locked' do
before do
expect(locksmith.lock).to eq(jid)
expect(unique_keys).to match_array(%W[
#{unique_digest}:EXISTS
#{unique_digest}:GRABBED
])
expect(ttl("#{unique_digest}:EXISTS")).to eq(lock_expiration)
expect(ttl("#{unique_digest}:GRABBED")).to eq(-1)
end

it 'can be put back on queue' do
expect { described_class.new.retry_all }
.to change { queue_count(queue) }
.from(0).to(1)
end
end
end
6 changes: 3 additions & 3 deletions spec/integration/sidekiq_unique_jobs/locksmith_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
it 'disappears without a trace when calling `delete!`' do
original_key_size = keys.size

locksmith_one.create
locksmith_one.lock
locksmith_one.delete!

expect(keys.size).to eq(original_key_size)
Expand Down Expand Up @@ -162,7 +162,7 @@

it 'expires keys' do
Sidekiq.redis(&:flushdb)
locksmith_one.create
locksmith_one.lock
keys = unique_keys
expect(unique_keys).not_to include(keys)
end
Expand All @@ -181,7 +181,7 @@
# it_behaves_like 'a lock'

# it 'can dynamically add resources' do
# locksmith_one.create
# locksmith_one.lock

# 3.times do
# locksmith_one.unlock
Expand Down
4 changes: 4 additions & 0 deletions spec/support/sidekiq_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ def unique_keys
keys('uniquejobs:*')
end

def zadd(queue, timestamp, item)
redis { |conn| conn.zadd(queue, timestamp, item) }
end

def zcard(queue)
redis { |conn| conn.zcard(queue) }
end
Expand Down
106 changes: 75 additions & 31 deletions spec/unit/sidekiq_unique_jobs/util_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
RSpec.describe SidekiqUniqueJobs::Util, redis: :redis do
let(:item_hash) do
{
'class' => 'MyUniqueJob',
'args' => [[1, 2]],
'at' => 1_492_341_850.358196,
'retry' => true,
'queue' => 'customqueue',
'unique' => :until_executed,
'expiration' => 7200,
'retry_count' => 10,
'jid' => jid,
'created_at' => 1_492_341_790.358217,
'class' => 'MyUniqueJob',
'args' => [[1, 2]],
'at' => 1_492_341_850.358196,
'retry' => true,
'queue' => 'customqueue',
'lock' => :until_executed,
'lock_expiration' => 7200,
'retry_count' => 10,
'jid' => jid,
'created_at' => 1_492_341_790.358217,
}
end

Expand All @@ -24,49 +24,93 @@
my_item
end

let(:unique_key) { item['unique_digest'] }
let(:jid) { 'e3049b05b0bd9c809182bbe0' }
let(:lock) { SidekiqUniqueJobs::Locksmith.new(item) }
let(:unique_digest) { item['unique_digest'] }
let(:jid) { 'e3049b05b0bd9c809182bbe0' }
let(:lock) { SidekiqUniqueJobs::Locksmith.new(item) }
let(:expected_keys) do
%W[
#{unique_key}:EXISTS
#{unique_key}:GRABBED
#{unique_digest}:EXISTS
#{unique_digest}:GRABBED
]
end

shared_context 'with an old lock' do
before do
result = SidekiqUniqueJobs::Scripts.call(
:acquire_lock,
nil,
keys: [unique_digest],
argv: [jid, 7200],
)
expect(result).to eq(1)
expect(described_class.keys).to include(unique_digest)
end
end

describe '.keys' do
subject(:keys) { described_class.keys }

before do
lock.lock(0)
context 'when old lock exists' do
include_context 'with an old lock'

it { is_expected.to match_array([unique_digest]) }
end

it { is_expected.to match_array(expected_keys) }
context 'when new lock exists' do
before do
lock.lock(0)
end

it { is_expected.to match_array(expected_keys) }
end
end

describe '.del' do
subject(:del) { described_class.del(pattern, 100) }

before do
lock.lock(0)
end
context 'when an old lock exists' do
include_context 'with an old lock'

it { expect(described_class.keys).to match_array(expected_keys) }
it { expect(described_class.keys).to match_array([unique_digest]) }

context 'when pattern is a wildcard' do
let(:pattern) { described_class::SCAN_PATTERN }
context 'when pattern is a wildcard' do
let(:pattern) { described_class::SCAN_PATTERN }

it { is_expected.to eq(2) }
end
it { is_expected.to eq(1) }
it { expect { del }.to change(described_class, :keys).to([]) }
end

context 'when pattern is a specific key' do
let(:pattern) { unique_key }
context 'when pattern is a specific key' do
let(:pattern) { unique_digest }

it { is_expected.to eq(2) }
it { expect { del }.to change(described_class, :keys).to([]) }
it { is_expected.to eq(1) }
it { expect { del }.to change(described_class, :keys).to([]) }
end
end

after { lock.delete }
context 'when a new lock exists' do
before do
lock.lock(0)
end

it { expect(described_class.keys).to match_array(expected_keys) }

context 'when pattern is a wildcard' do
let(:pattern) { described_class::SCAN_PATTERN }

it { is_expected.to eq(2) }
it { expect { del }.to change(described_class, :keys).to([]) }
end

context 'when pattern is a specific key' do
let(:pattern) { unique_digest }

it { is_expected.to eq(2) }
it { expect { del }.to change(described_class, :keys).to([]) }
end

after { lock.delete }
end
end

describe '.prefix' do
Expand Down