Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename remaining unique_* keys to lock_* #475

Merged
merged 6 commits into from
Mar 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/bug_report.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class MyWorker
sidekiq_options lock: :until_executed, queue: :undefault
def perform(args); end

def self.unique_args(args)
def self.lock_args(args)
# the way you consider unique arguments
end
end
Expand Down
24 changes: 12 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
- [Worker Configuration](#worker-configuration)
- [lock_ttl](#lock_ttl-1)
- [lock_timeout](#lock_timeout-1)
- [unique_across_queues](#uniqueacrossqueues)
- [unique_across_workers](#uniqueacrossworkers)
- [unique_across_queues](#unique_across_queues)
- [unique_across_workers](#unique_across_workers)
- [Locks](#locks)
- [Until Executing](#until-executing)
- [Until Executed](#until-executed)
Expand All @@ -33,14 +33,14 @@
- [While Executing](#while-executing)
- [Custom Locks](#custom-locks)
- [Conflict Strategy](#conflict-strategy)
- [lib/strategies/my_custom_strategy.rb](#libstrategiesmycustomstrategyrb)
- [lib/strategies/my_custom_strategy.rb](#libstrategiesmy_custom_strategyrb)
- [For rails application](#for-rails-application)
- [config/initializers/sidekiq_unique_jobs.rb](#configinitializerssidekiquniquejobsrb)
- [config/initializers/sidekiq_unique_jobs.rb](#configinitializerssidekiq_unique_jobsrb)
- [For other projects, whenever you prefer](#for-other-projects-whenever-you-prefer)
- [this goes in your initializer](#this-goes-in-your-initializer)
- [app/config/routes.rb](#appconfigroutesrb)
- [app/workers/bad_worker.rb](#appworkersbad_workerrb)
- [spec/workers/bad_worker_spec.rb](#specworkersbadworkerspecrb)
- [spec/workers/bad_worker_spec.rb](#specworkersbad_worker_specrb)
- [OR](#or)
- [Contributing](#contributing)
- [Contributors](#contributors)
Expand Down Expand Up @@ -504,9 +504,9 @@ Requiring the gem in your gemfile should be sufficient to enable unique jobs.

### Finer Control over Uniqueness

Sometimes it is desired to have a finer control over which arguments are used in determining uniqueness of the job, and others may be _transient_. For this use-case, you need to define either a `unique_args` method, or a ruby proc.
Sometimes it is desired to have a finer control over which arguments are used in determining uniqueness of the job, and others may be _transient_. For this use-case, you need to define either a `lock_args` method, or a ruby proc.

*NOTE:* The unique_args method need to return an array of values to use for uniqueness check.
*NOTE:* The lock_args method need to return an array of values to use for uniqueness check.

*NOTE:* The arguments passed to the proc or the method is always an array. If your method takes a single array as argument the value of args will be `[[...]]`.

Expand All @@ -516,9 +516,9 @@ The method or the proc can return a modified version of args without the transie
class UniqueJobWithFilterMethod
include Sidekiq::Worker
sidekiq_options lock: :until_and_while_executing,
unique_args: :unique_args # this is default and will be used if such a method is defined
lock_args: :lock_args # this is default and will be used if such a method is defined

def self.unique_args(args)
def self.lock_args(args)
[ args[0], args[2][:type] ]
end

Expand All @@ -529,7 +529,7 @@ end
class UniqueJobWithFilterProc
include Sidekiq::Worker
sidekiq_options lock: :until_executed,
unique_args: ->(args) { [ args.first ] }
lock_args: ->(args) { [ args.first ] }

...

Expand All @@ -541,9 +541,9 @@ It is possible to ensure different types of unique args based on context. I can'
```ruby
class UniqueJobWithFilterMethod
include Sidekiq::Worker
sidekiq_options lock: :until_and_while_executing, unique_args: :unique_args
sidekiq_options lock: :until_and_while_executing, lock_args: :lock_args

def self.unique_args(args)
def self.lock_args(args)
if Sidekiq::ProcessSet.new.size > 1
# sidekiq runtime; uniqueness for the object (first arg)
args.first
Expand Down
2 changes: 1 addition & 1 deletion bin/bench
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def get_item(num)
"args" => [num, num],
"queue" => QUEUES.sample,
"jid" => JOB_IDS[num],
"unique_digest" => DIGESTS[num] }
"lock_digest" => DIGESTS[num] }
end

Benchmark.ips do |ips|
Expand Down
2 changes: 2 additions & 0 deletions lib/sidekiq_unique_jobs/constants.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ module SidekiqUniqueJobs
LIMIT ||= "limit"
LIVE_VERSION ||= "uniquejobs:live"
LOCK ||= "lock"
LOCK_ARGS ||= "lock_args"
LOCK_DIGEST ||= "lock_digest"
LOCK_EXPIRATION ||= "lock_expiration"
LOCK_INFO ||= "lock_info"
LOCK_LIMIT ||= "lock_limit"
Expand Down
18 changes: 10 additions & 8 deletions lib/sidekiq_unique_jobs/exceptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class UniqueJobsError < ::RuntimeError
# @author Mikael Henriksson <[email protected]>
class Conflict < UniqueJobsError
def initialize(item)
super("Item with the key: #{item[UNIQUE_DIGEST]} is already scheduled or processing")
super("Item with the key: #{item[LOCK_DIGEST]} is already scheduled or processing")
end
end

Expand Down Expand Up @@ -61,15 +61,17 @@ def initialize(lock_config)
# @author Mikael Henriksson <[email protected]>
class InvalidUniqueArguments < UniqueJobsError
def initialize(options)
given = options[:given]
worker_class = options[:worker_class]
unique_args_method = options[:unique_args_method]
uniq_args_meth = worker_class.method(unique_args_method)
num_args = uniq_args_meth.arity
# source_location = uniq_args_meth.source_location
given = options[:given]
worker_class = options[:worker_class]
lock_args_method = options[:lock_args_method]
lock_args_meth = worker_class.method(lock_args_method)
num_args = lock_args_meth.arity
source_location = lock_args_meth.source_location

super(
"#{worker_class}#unique_args takes #{num_args} arguments, received #{given.inspect}"
"#{worker_class}##{lock_args_method} takes #{num_args} arguments, received #{given.inspect}" \
"\n\n" \
" #{source_location.join(':')}"
)
end
end
Expand Down
24 changes: 11 additions & 13 deletions lib/sidekiq_unique_jobs/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,20 @@ module SidekiqUniqueJobs
module Job
extend self

# Adds timeout, expiration, unique_args, unique_prefix and unique_digest to the sidekiq job hash
# Adds timeout, expiration, lock_args, lock_prefix and lock_digest to the sidekiq job hash
# @return [Hash] the job hash
def prepare(item)
add_lock_timeout(item)
add_lock_ttl(item)
add_digest(item)

item
end

# Adds unique_args, unique_prefix and unique_digest to the sidekiq job hash
# Adds lock_args, lock_prefix and lock_digest to the sidekiq job hash
# @return [Hash] the job hash
def add_digest(item)
add_unique_prefix(item)
add_unique_args(item)
add_unique_digest(item)
add_lock_prefix(item)
add_lock_args(item)
add_lock_digest(item)

item
end
Expand All @@ -37,16 +35,16 @@ def add_lock_timeout(item)
item[LOCK_TIMEOUT] = SidekiqUniqueJobs::LockTimeout.calculate(item)
end

def add_unique_args(item)
item[UNIQUE_ARGS] = SidekiqUniqueJobs::LockArgs.call(item)
def add_lock_args(item)
item[LOCK_ARGS] = SidekiqUniqueJobs::LockArgs.call(item)
end

def add_unique_digest(item)
item[UNIQUE_DIGEST] = SidekiqUniqueJobs::LockDigest.call(item)
def add_lock_digest(item)
item[LOCK_DIGEST] = SidekiqUniqueJobs::LockDigest.call(item)
end

def add_unique_prefix(item)
item[UNIQUE_PREFIX] = SidekiqUniqueJobs.config.unique_prefix
def add_lock_prefix(item)
item[LOCK_PREFIX] = SidekiqUniqueJobs.config.unique_prefix
end
end
end
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/lock/base_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def locksmith
private

def prepare_item
return if item.key?(UNIQUE_DIGEST)
return if item.key?(LOCK_DIGEST)

# The below should only be done to ease testing
# in production this will be done by the middleware
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def execute
runtime_lock.execute { return yield }
end
else
log_warn "couldn't unlock digest: #{item[UNIQUE_DIGEST]} #{item[JID]}"
log_warn "couldn't unlock digest: #{item[LOCK_DIGEST]} #{item[JID]}"
end
end

Expand Down
16 changes: 16 additions & 0 deletions lib/sidekiq_unique_jobs/lock/validator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ class Lock
# @author Mikael Henriksson <[email protected]>
#
class Validator
DEPRECATED_KEYS = {
UNIQUE.to_sym => LOCK.to_sym,
UNIQUE_ARGS.to_sym => LOCK_ARGS.to_sym,
UNIQUE_PREFIX.to_sym => LOCK_PREFIX.to_sym,
}.freeze

#
# Shorthand for `new(options).validate`
#
Expand All @@ -30,7 +36,9 @@ def self.validate(options)
# @param [Hash] options the sidekiq_options for the worker being validated
#
def initialize(options)
@options = options.transform_keys(&:to_sym)
@lock_config = LockConfig.new(options)
handle_deprecations
end

#
Expand All @@ -53,6 +61,14 @@ def validate
lock_config
end

def handle_deprecations
DEPRECATED_KEYS.each do |old, new|
next unless @options.key?(old)

lock_config.errors[old] = "is deprecated, use `#{new}: #{@options[old]}` instead."
end
end

#
# Validates the client configuration
#
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/lock/while_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def execute
# This is safe as the base_lock always creates a new digest
# The append there for needs to be done every time
def append_unique_key_suffix
item[UNIQUE_DIGEST] = item[UNIQUE_DIGEST] + RUN_SUFFIX
item[LOCK_DIGEST] = item[LOCK_DIGEST] + RUN_SUFFIX
end
end
end
Expand Down
62 changes: 34 additions & 28 deletions lib/sidekiq_unique_jobs/lock_args.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class LockArgs
# @param [Hash] item a Sidekiq job hash
# @return [String] a unique digest
def self.call(item)
new(item).unique_args
new(item).lock_args
end

# The sidekiq job hash
Expand All @@ -32,42 +32,42 @@ def initialize(item)
end

# The unique arguments to use for creating a lock
# @return [Array] the arguments filters by the {#filtered_args} method if {#unique_args_enabled?}
def unique_args
@unique_args ||= filtered_args
# @return [Array] the arguments filters by the {#filtered_args} method if {#lock_args_enabled?}
def lock_args
@lock_args ||= filtered_args
end

# Checks if the worker class has enabled unique_args
# Checks if the worker class has enabled lock_args
# @return [true, false]
def unique_args_enabled?
# return false unless unique_args_method_valid?
def lock_args_enabled?
# return false unless lock_args_method_valid?

unique_args_method
lock_args_method
end

# Validate that the unique_args_method is acceptable
# Validate that the lock_args_method is acceptable
# @return [true, false]
def unique_args_method_valid?
[NilClass, TrueClass, FalseClass].none? { |klass| unique_args_method.is_a?(klass) }
def lock_args_method_valid?
[NilClass, TrueClass, FalseClass].none? { |klass| lock_args_method.is_a?(klass) }
end

# Checks if the worker class has disabled unique_args
# Checks if the worker class has disabled lock_args
# @return [true, false]
def unique_args_disabled?
!unique_args_method
def lock_args_disabled?
!lock_args_method
end

# Filters unique arguments by proc or symbol
# @return [Array] {#filter_by_proc} when {#unique_args_method} is a Proc
# @return [Array] {#filter_by_symbol} when {#unique_args_method} is a Symbol
# @return [Array] {#filter_by_proc} when {#lock_args_method} is a Proc
# @return [Array] {#filter_by_symbol} when {#lock_args_method} is a Symbol
# @return [Array] args unfiltered when neither of the above
def filtered_args
return args if unique_args_disabled?
return args if lock_args_disabled?
return args if args.empty?

json_args = Normalizer.jsonify(args)

case unique_args_method
case lock_args_method
when Proc
filter_by_proc(json_args)
when Symbol
Expand All @@ -79,34 +79,40 @@ def filtered_args
# @param [Array] args the arguments passed to the sidekiq worker
# @return [Array] with the filtered arguments
def filter_by_proc(args)
unique_args_method.call(args)
lock_args_method.call(args)
end

# Filters unique arguments by method configured in the sidekiq worker
# @param [Array] args the arguments passed to the sidekiq worker
# @return [Array] unfiltered unless {#worker_method_defined?}
# @return [Array] with the filtered arguments
def filter_by_symbol(args)
return args unless worker_method_defined?(unique_args_method)
return args unless worker_method_defined?(lock_args_method)

worker_class.send(unique_args_method, args)
worker_class.send(lock_args_method, args)
rescue ArgumentError
raise SidekiqUniqueJobs::InvalidUniqueArguments,
given: args,
worker_class: worker_class,
unique_args_method: unique_args_method
lock_args_method: lock_args_method
end

# The method to use for filtering unique arguments
def unique_args_method
@unique_args_method ||= worker_options[UNIQUE_ARGS]
@unique_args_method ||= :unique_args if worker_method_defined?(:unique_args)
@unique_args_method ||= default_unique_args_method
def lock_args_method
@lock_args_method ||= worker_options[LOCK_ARGS] || worker_options[UNIQUE_ARGS]
@lock_args_method ||= :lock_args if worker_method_defined?(:lock_args)
@lock_args_method ||= :unique_args if worker_method_defined?(:unique_args)
@lock_args_method ||= default_lock_args_method
end

# The global worker options defined in Sidekiq directly
def default_unique_args_method
Sidekiq.default_worker_options.stringify_keys[UNIQUE_ARGS]
def default_lock_args_method
default_worker_options[LOCK_ARGS] ||
default_worker_options[UNIQUE_ARGS]
end

def default_worker_options
@default_worker_options ||= Sidekiq.default_worker_options.stringify_keys
end
end
end
Loading