Skip to content

Commit

Permalink
Various improvements (#240)
Browse files Browse the repository at this point in the history
* Fix running tests

* Avoid warnings when testing console.start

* Only hide errors for valid constant names

* Improve test readability

* Lock on the instance again

* Clean up more tests

* Allow raising name errors caught by configuration

* Add change log entry

* Parallelize rubocop

* Styles

* Close ##236
  • Loading branch information
mhenrixon authored Aug 19, 2017
1 parent 3e7361a commit aac5133
Show file tree
Hide file tree
Showing 18 changed files with 333 additions and 259 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ cache: bundler
services:
- redis-server
script:
- if [[ "${STYLE}" = "true" ]]; then bundle exec rubocop; fi;
- if [[ "${STYLE}" = "true" ]]; then bundle exec rubocop - P; fi;
- bundle exec rspec spec
- if [[ "${STYLE}" = "true" ]]; then CODECLIMATE_REPO_TOKEN=88e524e8f638efe690def7a6e2c72b1a9db5cdfa74548921b734d609a5858ee5 bundle exec codeclimate-test-reporter; fi;
rvm:
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## v5.0.10

- Cleans up test setup and make tests more readable
- Allow raising all errors
- Log previously hidden errors
- Revert the changes of v5.0.5 (8a4b7648b8b0ee5d7dc1f5f5a036f41d52ad9a42)
- Allow name errors in unique args method to be raised

## v5.0.9
- [#229](https://github.com/mhenrixon/sidekiq-unique-jobs/issues/#229) Use HSCAN for expiring keys
- [#232](https://github.com/mhenrixon/sidekiq-unique-jobs/issues/#232) Fix testing helper
Expand Down
14 changes: 11 additions & 3 deletions lib/sidekiq-unique-jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def config
default_run_lock_expiration: 60,
default_lock: :while_executing,
redis_test_mode: :redis, # :mock
raise_unique_args_errors: false,
)
end

Expand All @@ -55,12 +56,19 @@ def namespace
end

# Attempt to constantize a string worker_class argument, always
# failing back to the original argument.
# failing back to the original argument when the constant can't be found
#
# raises an error for other errors
def worker_class_constantize(worker_class)
return worker_class unless worker_class.is_a?(String)
Object.const_get(worker_class)
rescue NameError
worker_class
rescue NameError => ex
case ex.message
when /uninitialized constant/
worker_class
else
raise
end
end

def mocked?
Expand Down
24 changes: 12 additions & 12 deletions lib/sidekiq_unique_jobs/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ def console
console_class.start
end

private

def logger
SidekiqUniqueJobs.logger
end

def console_class
require 'pry'
Pry
rescue LoadError
require 'irb'
IRB
no_commands do
def logger
SidekiqUniqueJobs.logger
end

def console_class
require 'pry'
Pry
rescue LoadError
require 'irb'
IRB
end
end
end
end
12 changes: 5 additions & 7 deletions lib/sidekiq_unique_jobs/lock/while_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
module SidekiqUniqueJobs
module Lock
class WhileExecuting
MUTEX = Mutex.new

def self.synchronize(item, redis_pool = nil)
new(item, redis_pool).synchronize { yield }
end
Expand All @@ -13,19 +11,19 @@ def initialize(item, redis_pool = nil)
@item = item
@redis_pool = redis_pool
@unique_digest = "#{create_digest}:run"
@mutex = Mutex.new
end

def synchronize
MUTEX.lock
sleep 0.1 until locked?

yield
@mutex.synchronize do
sleep 0.1 until locked?
yield
end
rescue Sidekiq::Shutdown
logger.fatal { "the unique_key: #{@unique_digest} needs to be unlocked manually" }
raise
ensure
SidekiqUniqueJobs.connection(@redis_pool) { |conn| conn.del @unique_digest }
MUTEX.unlock
end

def locked?
Expand Down
12 changes: 8 additions & 4 deletions lib/sidekiq_unique_jobs/unique_args.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ module SidekiqUniqueJobs
class UniqueArgs
CLASS_NAME = 'SidekiqUniqueJobs::UniqueArgs'
extend Forwardable
include Normalizer

def_delegators :SidekiqUniqueJobs, :config, :worker_class_constantize
def_delegators :Sidekiq, :logger
Expand Down Expand Up @@ -62,9 +61,13 @@ def unique_args(args)
logger.debug { "#{__method__} : unique arguments disabled" }
args
end
rescue NameError
# fallback to not filtering args when class can't be instantiated
return args
rescue NameError => ex
logger.error "#{__method__}(#{args}) : failed with (#{ex.message})"
logger.error ex

raise if config.raise_unique_args_errors

args
end

def unique_on_all_queues?
Expand Down Expand Up @@ -114,6 +117,7 @@ def filter_by_proc(args)
logger.warn { "#{__method__} : unique_args_method is nil. Returning (#{args})" }
return args
end

filter_args = unique_args_method.call(args)
logger.debug { "#{__method__} : #{args} -> #{filter_args}" }
filter_args
Expand Down
46 changes: 29 additions & 17 deletions lib/sidekiq_unique_jobs/util.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# frozen_string_literal: true

module SidekiqUniqueJobs
module Util
SCAN_PATTERN ||= '*'
DEFAULT_COUNT ||= 1_000
KEYS_METHOD ||= 'keys'
SCAN_METHOD ||= 'scan'
EXPIRE_BATCH_SIZE ||= 100
module Util # rubocop:disable Metrics/ModuleLength
COUNT = 'COUNT'
DEFAULT_COUNT = 1_000
EXPIRE_BATCH_SIZE = 100
MATCH = 'MATCH'
KEYS_METHOD = 'keys'
SCAN_METHOD = 'scan'
SCAN_PATTERN = '*'

module_function
extend self # rubocop:disable Style/ModuleFunction

def keys(pattern = SCAN_PATTERN, count = DEFAULT_COUNT)
send("keys_by_#{redis_keys_method}", pattern, count)
Expand All @@ -26,7 +28,7 @@ def del(pattern = SCAN_PATTERN, count = 0, dry_run = true)
keys, time = timed { keys(pattern, count) }
logger.debug { "#{keys.size} matching keys found in #{time} sec." }
keys = dry_run(keys)
logger.debug { "#{keys.size} matching keys after postprocessing" }
logger.debug { "#{keys.size} matching keys after post-processing" }
unless dry_run
logger.debug { "deleting #{keys}..." }
_, time = timed { batch_delete(keys) }
Expand All @@ -41,23 +43,33 @@ def unique_hash
end
end

def expire
def expire # rubocop:disable Metrics/MethodLength
removed_keys = {}
connection do |conn|
cursor = '0'
cursor, jobs = conn.hscan(SidekiqUniqueJobs::HASH_KEY, [cursor, 'MATCH', '*', 'COUNT', EXPIRE_BATCH_SIZE])
jobs.each do |job_array|
jid, unique_key = job_array
next if conn.get(unique_key)
conn.hdel(SidekiqUniqueJobs::HASH_KEY, jid)
removed_keys[jid] = unique_key
end
loop do
cursor, jobs = get_jobs(conn, cursor)
jobs.each do |job_array|
jid, unique_key = job_array

next if conn.get(unique_key)
conn.hdel(SidekiqUniqueJobs::HASH_KEY, jid)
removed_keys[jid] = unique_key
end

break if cursor == '0'
break if cursor == '0'
end
end

removed_keys
end

private

def get_jobs(conn, cursor)
conn.hscan(SidekiqUniqueJobs::HASH_KEY, [cursor, MATCH, SCAN_PATTERN, COUNT, EXPIRE_BATCH_SIZE])
end

def keys_by_scan(pattern, count)
connection { |conn| conn.scan_each(match: prefix(pattern), count: count).to_a }
end
Expand Down
2 changes: 2 additions & 0 deletions spec/jobs/custom_queue_job_with_filter_method.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require_relative 'custom_queue_job'

class CustomQueueJobWithFilterMethod < CustomQueueJob
sidekiq_options unique: :until_executed, unique_args: :args_filter

Expand Down
4 changes: 1 addition & 3 deletions spec/lib/sidekiq_unique_jobs/cli_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,7 @@

specify do
expect(Object).to receive(:include).with(SidekiqUniqueJobs::Util).and_return(true)
console = double(:console)
allow_any_instance_of(SidekiqUniqueJobs::Cli).to receive(:console_class).and_return(console)
allow(console).to receive(:start)
allow(Pry).to receive(:start).and_return(true)
expect(output).to eq(expected)
end
end
Expand Down
3 changes: 3 additions & 0 deletions spec/lib/sidekiq_unique_jobs/client/middleware_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def digest_for(item)
end

Sidekiq::Simulator.process_queue(:notify_worker) do
sleep 1
Sidekiq.redis do |conn|
wait(10).for { conn.llen('queue:notify_worker') }.to eq(0)
end
Expand All @@ -48,6 +49,7 @@ def digest_for(item)
end

Sidekiq::Simulator.process_queue(:not_default) do
sleep(1)
Sidekiq.redis do |conn|
expect(conn.llen('queue:default')).to eq(1)
wait(10).for { conn.llen('queue:not_default') }.to eq(0)
Expand All @@ -60,6 +62,7 @@ def digest_for(item)
end

Sidekiq::Simulator.process_queue(:default) do
sleep(1)
Sidekiq.redis do |conn|
expect(conn.llen('queue:not_default')).to eq(0)
wait(10).for { conn.llen('queue:default') }.to eq(0)
Expand Down
40 changes: 23 additions & 17 deletions spec/lib/sidekiq_unique_jobs/run_lock_timeout_calculator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,42 @@
require 'spec_helper'

RSpec.describe SidekiqUniqueJobs::RunLockTimeoutCalculator do
shared_context 'generic unscheduled job' do
subject { described_class.new('class' => 'JustAWorker') }
end
subject { calculator }

describe 'public api' do
it_behaves_like 'generic unscheduled job' do
it { is_expected.to respond_to(:seconds) }
end
end
let(:calculator) { described_class.new(args) }
let(:args) { { 'class' => 'JustAWorker' } }

it { is_expected.to respond_to(:seconds) }

describe '.for_item' do
subject { described_class.for_item('WAT') }

it 'initializes a new calculator' do
expect(described_class).to receive(:new).with('WAT')
described_class.for_item('WAT')
subject
end
end

describe '#seconds' do
context 'using default run_lock_expiration' do
subject { described_class.new(nil) }
before { allow(subject).to receive(:worker_class_run_lock_expiration).and_return(9) }
subject { calculator.seconds }

before do
allow(calculator).to receive(:worker_class_run_lock_expiration)
.and_return(expiration)
end

context 'when worker_class_run_lock_expiration is configured' do
let(:args) { nil }
let(:expiration) { 9 }

its(:seconds) { is_expected.to eq(9) }
it { is_expected.to eq(9) }
end

context 'using specified sidekiq option run_lock_expiration' do
subject { described_class.new(nil) }
before { allow(subject).to receive(:worker_class_run_lock_expiration).and_return(nil) }
context 'when worker_class_run_lock_expiration is not configured' do
let(:args) { nil }
let(:expiration) { nil }

its(:seconds) { is_expected.to eq(60) }
it { is_expected.to eq(60) }
end
end
end
Loading

0 comments on commit aac5133

Please sign in to comment.