Skip to content

Commit

Permalink
Fix numerous small issues with locking (#616)
Browse files Browse the repository at this point in the history
* Update readme

* Prefer explicit use of locksmith

over rarely used methods. Not idea why I added those methods.

* Fix a number of minor issues with locking

* Sort reflections alphabetically

* Add note about upgrading

* Bump version
  • Loading branch information
mhenrixon authored Jun 29, 2021
1 parent 2a588a6 commit 87a6120
Show file tree
Hide file tree
Showing 23 changed files with 250 additions and 209 deletions.
2 changes: 2 additions & 0 deletions .reek.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ detectors:
- SidekiqUniqueJobs::Changelog#page
- SidekiqUniqueJobs::Digests#page
- SidekiqUniqueJobs::Locksmith#create_lock
- SidekiqUniqueJobs::Locksmith#lock!
- SidekiqUniqueJobs::Middleware#self.configure_client
- SidekiqUniqueJobs::Middleware#self.configure_server
- SidekiqUniqueJobs::Orphans::RubyReaper#active?
Expand All @@ -128,6 +129,7 @@ detectors:
- SidekiqUniqueJobs::Digests#page
- SidekiqUniqueJobs::Lock#lock
- SidekiqUniqueJobs::Lock::BaseLock#call_strategy
- SidekiqUniqueJobs::Lock::UntilAndWhileExecuting#execute
- SidekiqUniqueJobs::Lock::WhileExecuting#execute
- SidekiqUniqueJobs::LockArgs#filtered_args
- SidekiqUniqueJobs::LockInfo#set
Expand Down
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ To setup reflections for logging or metrics, use the following API:

```ruby

def extract_log_from_job(message, item)
worker = item['class']
args = item['args']
lock_args = item['lock_args']
queue = item['queue']
def extract_log_from_job(message, job_hash)
worker = job_hash['class']
args = job_hash['args']
lock_args = job_hash['lock_args']
queue = job_hash['queue']
{
message: message,
worker: worker,
Expand All @@ -208,8 +208,8 @@ def extract_log_from_job(message, item)
end

SidekiqUniqueJobs.reflect do |on|
on.lock_failed do |item|
message = extract_log_from_job('Lock Failed', item)
on.lock_failed do |job_hash|
message = extract_log_from_job('Lock Failed', job_hash)
Sidekiq.logger.warn(message)
end
end
Expand Down
79 changes: 79 additions & 0 deletions UPGRADING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Upgrading

## v7.1.0

### Reflection API

SidekiqUniqueJobs do not log by default anymore. Instead I have a reflection API that I shamelessly borrowed from Rpush.

To use the new notification/reflection system please define them as follows in an initializer of your choosing.

```ruby
SidekiqUniqueJobs.reflect do |on|
# Only raised when you have defined such a callback
on.after_unlock_callback_failed do |job_hash|
logger.warn(job_hash.merge(message: "Unlock callback failed"))
end

# This job is skipped because it is a duplicate
on.duplicate do |job_hash|
logger.warn(job_hash.merge(message: "Duplicate Job"))
end

# This means your code broke and we caught the execption to provide this reflection for you. It allows your to gather metrics and details about the error. Those details allow you to act on it as you see fit.
on.execution_failed do |job_hash|
logger.warn(job_hash.merge(message: "Execution failed"))
end

# Failed to acquire lock in a timely fashion
on.lock_failed do |job_hash|
logger.warn(job_hash.merge(message: "Lock failed"))
end

# In case you want to collect metrics
on.locked do |job_hash|
logger.debug(job_hash.merge(message: "Lock success"))
end

# When your conflict strategy is to reschedule and it failed
on.reschedule_failed do |job_hash|
logger.debug(job_hash.merge(message: "Reschedule failed"))
end

# When your conflict strategy is to reschedule and it failed
# Mostly for metrics I guess
on.rescheduled do |job_hash|
logger.debug(job_hash.merge(message: "Reschedule success"))
end

# You asked to wait for a lock to be achieved but we timed out waiting
on.timeout do |job_hash|
logger.warn(job_hash.merge(message: "Oh no! Timeout!! Timeout!!"))
end

# The current worker isn't part of this sidekiq servers workers
on.unknown_sidekiq_worker do |job_hash|
logger.warn(job_hash.merge(message: "WAT!? Why? What is this worker?"))
end

# Unlock failed! Not good
on.unlock_failed do |job_hash|
logger.warn(job_hash.merge(message: "Unlock failed"))
end

# Unlock was successful, perhaps mostly interesting for metrics
on.unlocked do |job_hash|
logger.warn(job_hash.merge(message: "Unlock success"))
end
```

You don't need to configure them all. Some of them are just informational, some of them more for metrics and a couple of them (failures, timeouts) might be of real interest.

I leave it up to you to decided what to do about it.

### Reaper Resurrector

In [#604](https://github.com/mhenrixon/sidekiq-unique-jobs/pull/604) a reaper resurrector was added. This is configured by default so that if the current reaper process dies, another one kicks off again.

With the recent fixes in [#616](https://github.com/mhenrixon/sidekiq-unique-jobs/pull/616) there should be even less need for reaping.

52 changes: 23 additions & 29 deletions lib/sidekiq_unique_jobs/lock/base_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class Lock
# @abstract
# @author Mikael Henriksson <[email protected]>
class BaseLock
extend Forwardable

# includes "SidekiqUniqueJobs::Logging"
# @!parse include SidekiqUniqueJobs::Logging
include SidekiqUniqueJobs::Logging
Expand All @@ -26,6 +28,10 @@ def self.validate_options(options = {})
Validator.validate(options)
end

# NOTE: Mainly used for a clean testing API
#
def_delegators :locksmith, :locked?

# @param [Hash] item the Sidekiq job hash
# @param [Proc] callback the callback to use after unlock
# @param [Sidekiq::RedisConnection, ConnectionPool] redis_pool the redis connection
Expand Down Expand Up @@ -57,31 +63,6 @@ def execute
raise NotImplementedError, "##{__method__} needs to be implemented in #{self.class}"
end

# Unlocks the job from redis
# @return [String] sidekiq job id when successful
# @return [false] when unsuccessful
def unlock
locksmith.unlock # Only signal to release the lock
end

# Deletes the job from redis if it is locked.
def delete
locksmith.delete # Soft delete (don't forcefully remove when expiration is set)
end

# Forcefully deletes the job from redis.
# This is good for jobs when a previous lock was not unlocked
def delete!
locksmith.delete! # Force delete the lock
end

# Checks if the item has achieved a lock
# @return [true] when this jid has locked the job
# @return [false] when this jid has not locked the job
def locked?
locksmith.locked?
end

#
# The lock manager/client
#
Expand Down Expand Up @@ -118,15 +99,22 @@ def prepare_item
SidekiqUniqueJobs::Job.prepare(item)
end

def lock_failed
#
# Handle when lock failed
#
# @param [Symbol] location: :client or :server
#
# @return [void]
#
def lock_failed(origin: :client)
reflect(:lock_failed, item)
call_strategy(of: :client)
call_strategy(origin: origin)
end

def call_strategy(of:) # rubocop:disable Naming/MethodParameterName
def call_strategy(origin:)
@attempt += 1

case of
case origin
when :client
client_strategy.call { lock if replace? }
when :server
Expand All @@ -141,6 +129,12 @@ def replace?
client_strategy.replace? && attempt < 2
end

def unlock_and_callback
return callback_safely if locksmith.unlock

reflect(:unlock_failed, item)
end

def callback_safely
callback&.call
item[JID]
Expand Down
26 changes: 16 additions & 10 deletions lib/sidekiq_unique_jobs/lock/until_and_while_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,28 @@ class UntilAndWhileExecuting < BaseLock
#
# @yield to the caller when given a block
#
def lock
return lock_failed unless (locked_token = locksmith.lock)
return yield locked_token if block_given?
def lock(origin: :client)
return lock_failed(origin: origin) unless (token = locksmith.lock)
return yield token if block_given?

locked_token
token
end

# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute
if unlock
ensure_relocked do
runtime_lock.execute { return yield }
end
if locksmith.unlock
# ensure_relocked do
runtime_lock.execute { return yield }
# end
else
reflect(:unlock_failed, item)
end
rescue Exception # rubocop:disable Lint/RescueException
reflect(:execution_failed, item)
locksmith.lock(wait: 2)

raise
end

private
Expand All @@ -47,12 +52,13 @@ def ensure_relocked
yield
rescue Exception # rubocop:disable Lint/RescueException
reflect(:execution_failed, item)
lock
locksmith.lock

raise
end

def runtime_lock
@runtime_lock ||= SidekiqUniqueJobs::Lock::WhileExecuting.new(item, callback, redis_pool)
@runtime_lock ||= SidekiqUniqueJobs::Lock::WhileExecuting.new(item.dup, callback, redis_pool)
end
end
end
Expand Down
10 changes: 4 additions & 6 deletions lib/sidekiq_unique_jobs/lock/until_executed.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@ class UntilExecuted < BaseLock
# @yield to the caller when given a block
#
def lock
return lock_failed unless (job_id = locksmith.lock)
return yield job_id if block_given?
return lock_failed(origin: :client) unless (token = locksmith.lock)
return yield token if block_given?

job_id
token
end

# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute
locksmith.execute do
yield
return reflect(:unlock_failed, item) unless unlock

callback_safely
unlock_and_callback
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/lock/until_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def lock
# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute
callback_safely if unlock
callback_safely if locksmith.unlock
yield
end
end
Expand Down
21 changes: 21 additions & 0 deletions lib/sidekiq_unique_jobs/lock/until_expired.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,27 @@ class Lock
# @author Mikael Henriksson <[email protected]>
#
class UntilExpired < UntilExecuted
#
# Locks a sidekiq job
#
# @note Will call a conflict strategy if lock can't be achieved.
#
# @return [String, nil] the locked jid when properly locked, else nil.
#
# @yield to the caller when given a block
#
def lock
return lock_failed unless (job_id = locksmith.lock)
return yield job_id if block_given?

job_id
end

# Executes in the Sidekiq server process
# @yield to the worker class perform method
def execute(&block)
locksmith.execute(&block)
end
end
end
end
12 changes: 7 additions & 5 deletions lib/sidekiq_unique_jobs/lock/while_executing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,23 @@ def lock
# @yield to the worker class perform method
def execute
with_logging_context do
call_strategy(of: :server) unless locksmith.execute do
call_strategy(origin: :server) unless locksmith.execute do
yield
callback_safely
callback_safely if locksmith.unlock
ensure
locksmith.unlock
end
end
ensure
locksmith.unlock
end

private

# This is safe as the base_lock always creates a new digest
# The append there for needs to be done every time
def append_unique_key_suffix
item[LOCK_DIGEST] = item[LOCK_DIGEST] + RUN_SUFFIX
return if (lock_digest = item[LOCK_DIGEST]).end_with?(RUN_SUFFIX)

item[LOCK_DIGEST] = lock_digest + RUN_SUFFIX
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/sidekiq_unique_jobs/lock_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def lock_info?
# @return [true,fakse]
#
def wait_for_lock?
timeout.nil? || timeout.positive?
timeout && (timeout.zero? || timeout.positive?)
end

#
Expand Down
Loading

0 comments on commit 87a6120

Please sign in to comment.