Skip to content

Commit

Permalink
Reduce reaper threads (#576)
Browse files Browse the repository at this point in the history
* Reduce the number of threads used for reaping

* Fix specs

* Fix specs

* Fix Gemfiles

* Mandatory rubocop commit

* Reduce performance impact of method as block

* Smelly

* Update README to a more sensible default

* Reduce overhead of schedule_next_task as well
  • Loading branch information
mhenrixon authored Feb 17, 2021
1 parent a8f4d2b commit 1e13e2c
Show file tree
Hide file tree
Showing 15 changed files with 184 additions and 21 deletions.
7 changes: 7 additions & 0 deletions .reek.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ detectors:
exclude:
- Sidekiq::JobSet::UniqueExtension#delete_by_value
- Sidekiq::ScheduledSet::UniqueExtension#delete
- SidekiqUniqueJobs::Orphans::Manager#start
- SidekiqUniqueJobs::Orphans::RubyReaper#active?
- SidekiqUniqueJobs::Redis::Hash#entries
DataClump:
Expand Down Expand Up @@ -58,6 +59,9 @@ detectors:
- SidekiqUniqueJobs::SidekiqWorkerMethods#worker_class_constantize
- SidekiqUniqueJobs::Web::Helpers#cparams
- SidekiqUniqueJobs::Web::Helpers#display_lock_args
InstanceVariableAssumption:
exclude:
- SidekiqUniqueJobs::TimerTask
IrresponsibleModule:
enabled: true
LongParameterList:
Expand Down Expand Up @@ -139,6 +143,8 @@ detectors:
- SidekiqUniqueJobs::Script::Caller#call_script
- SidekiqUniqueJobs::Script::Caller#extract_args
- SidekiqUniqueJobs::SidekiqWorkerMethods#worker_class_constantize
- SidekiqUniqueJobs::TimerTask#execute_task
- SidekiqUniqueJobs::TimerTask#timeout_task
- SidekiqUniqueJobs::UpgradeLocks#call
- SidekiqUniqueJobs::UpgradeLocks#upgrade_v6_lock
- SidekiqUniqueJobs::Web#self.registered
Expand All @@ -148,6 +154,7 @@ detectors:
- SidekiqUniqueJobs::LockConfig
- SidekiqUniqueJobs::Locksmith
- SidekiqUniqueJobs::Lock::BaseLock
- SidekiqUniqueJobs::TimerTask
TooManyMethods:
exclude:
- SidekiqUniqueJobs::Lock::BaseLock
Expand Down
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ gem "timecop"
gem "yard"

platforms :mri do
gem "concurrent-ruby-ext"
gem "fasterer"
gem "github_changelog_generator"
gem "guard"
Expand All @@ -30,6 +31,7 @@ platforms :mri do
gem "reek", ">= 5.3"
gem "rspec-benchmark"
gem "rubocop-mhenrixon"
gem "ruby-prof", require: false
gem "simplecov-sublime", "0.21.0", require: false
gem "travis"
end
Expand Down
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,16 @@ Configure SidekiqUniqueJobs in an initializer or the sidekiq initializer on appl

```ruby
SidekiqUniqueJobs.configure do |config|
config.debug_lua = true
config.lock_info = true
config.lock_ttl = 10.minutes
config.lock_timeout = 10.minutes
config.logger = Sidekiq.logger
config.max_history = 10_000
config.reaper = :lua
config.reaper_count = 100
config.reaper_interval = 10
config.reaper_timeout = 5
config.logger = Sidekiq.logger # default, change at your own discretion
config.debug_lua = false # Turn on when debugging
config.lock_info = false # Turn on when debugging
config.lock_ttl = 600 # Expire locks after 10 minutes
config.lock_timeout = nil # turn off lock timeout
config.max_history = 0 # Turn on when debugging
config.reaper = :ruby # :ruby, :lua or :none/nil
config.reaper_count = 1000 # Stop reaping after this many keys
config.reaper_interval = 600 # Reap orphans every 10 minutes
config.reaper_timeout = 150 # Timeout reaper after 1,5 minutes
end
```

Expand Down
42 changes: 42 additions & 0 deletions bin/profiling
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

# Trap interrupts to quit cleanly. See
# https://twitter.com/mitchellh/status/283014103189053442
Signal.trap("INT") { abort }

require "bundler/setup"
require "ruby-prof"
require "sidekiq-unique-jobs"

SidekiqUniqueJobs.configure do |config|
config.reaper_interval = 2
config.reaper_timeout = 1
config.reaper_count = 10_000
end

TASK = SidekiqUniqueJobs::TimerTask.new(SidekiqUniqueJobs::Orphans::Manager.timer_task_options) do
SidekiqUniqueJobs::Orphans::Manager.with_logging_context do
SidekiqUniqueJobs::Orphans::Manager.redis do |_conn|
SidekiqUniqueJobs::Orphans::Manager.refresh_reaper_mutex
sleep(1)
end
end
end

counter = 0
result = RubyProf.profile do
100.times do
SidekiqUniqueJobs::Orphans::Manager.start(TASK)
end

while counter < 60
sleep(1)

counter += 1
end
end

result.exclude_common_methods!
printer = RubyProf::GraphPrinter.new(result)
printer.print($stdout, min_percent: 2)
2 changes: 2 additions & 0 deletions gemfiles/sidekiq_5.0.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gem "yard"
gem "sidekiq", "~> 5.0.0"

platforms :mri do
gem "concurrent-ruby-ext"
gem "fasterer"
gem "github_changelog_generator"
gem "guard"
Expand All @@ -28,6 +29,7 @@ platforms :mri do
gem "reek", ">= 5.3"
gem "rspec-benchmark"
gem "rubocop-mhenrixon"
gem "ruby-prof", require: false
gem "simplecov-sublime", "0.21.0", require: false
gem "travis"
end
Expand Down
2 changes: 2 additions & 0 deletions gemfiles/sidekiq_5.1.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gem "yard"
gem "sidekiq", "~> 5.1.0"

platforms :mri do
gem "concurrent-ruby-ext"
gem "fasterer"
gem "github_changelog_generator"
gem "guard"
Expand All @@ -28,6 +29,7 @@ platforms :mri do
gem "reek", ">= 5.3"
gem "rspec-benchmark"
gem "rubocop-mhenrixon"
gem "ruby-prof", require: false
gem "simplecov-sublime", "0.21.0", require: false
gem "travis"
end
Expand Down
2 changes: 2 additions & 0 deletions gemfiles/sidekiq_5.2.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gem "yard"
gem "sidekiq", "~> 5.2.0"

platforms :mri do
gem "concurrent-ruby-ext"
gem "fasterer"
gem "github_changelog_generator"
gem "guard"
Expand All @@ -28,6 +29,7 @@ platforms :mri do
gem "reek", ">= 5.3"
gem "rspec-benchmark"
gem "rubocop-mhenrixon"
gem "ruby-prof", require: false
gem "simplecov-sublime", "0.21.0", require: false
gem "travis"
end
Expand Down
2 changes: 2 additions & 0 deletions gemfiles/sidekiq_6.0.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gem "yard"
gem "sidekiq", "~> 6.0.0"

platforms :mri do
gem "concurrent-ruby-ext"
gem "fasterer"
gem "github_changelog_generator"
gem "guard"
Expand All @@ -28,6 +29,7 @@ platforms :mri do
gem "reek", ">= 5.3"
gem "rspec-benchmark"
gem "rubocop-mhenrixon"
gem "ruby-prof", require: false
gem "simplecov-sublime", "0.21.0", require: false
gem "travis"
end
Expand Down
2 changes: 2 additions & 0 deletions gemfiles/sidekiq_6.1.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gem "yard"
gem "sidekiq", "~> 6.1.0"

platforms :mri do
gem "concurrent-ruby-ext"
gem "fasterer"
gem "github_changelog_generator"
gem "guard"
Expand All @@ -28,6 +29,7 @@ platforms :mri do
gem "reek", ">= 5.3"
gem "rspec-benchmark"
gem "rubocop-mhenrixon"
gem "ruby-prof", require: false
gem "simplecov-sublime", "0.21.0", require: false
gem "travis"
end
Expand Down
2 changes: 2 additions & 0 deletions gemfiles/sidekiq_develop.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gem "yard"
gem "sidekiq", git: "https://github.com/mperham/sidekiq.git"

platforms :mri do
gem "concurrent-ruby-ext"
gem "fasterer"
gem "github_changelog_generator"
gem "guard"
Expand All @@ -28,6 +29,7 @@ platforms :mri do
gem "reek", ">= 5.3"
gem "rspec-benchmark"
gem "rubocop-mhenrixon"
gem "ruby-prof", require: false
gem "simplecov-sublime", "0.21.0", require: false
gem "travis"
end
Expand Down
4 changes: 3 additions & 1 deletion lib/sidekiq_unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
require "brpoplpush/redis_script"
require "concurrent/future"
require "concurrent/promises"
require "concurrent/timer_task"
require "concurrent/map"
require "concurrent/mutable_struct"
require "concurrent/timer_task"
require "concurrent/executor/ruby_single_thread_executor"
require "digest"
require "digest/sha1"
require "erb"
Expand All @@ -14,6 +15,7 @@
require "pathname"
require "sidekiq"

require "sidekiq_unique_jobs/timer_task"
require "sidekiq_unique_jobs/version"
require "sidekiq_unique_jobs/version_check"
require "sidekiq_unique_jobs/constants"
Expand Down
16 changes: 13 additions & 3 deletions lib/sidekiq_unique_jobs/orphans/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ module Manager
# Starts a separate thread that periodically reaps orphans
#
#
# @return [Concurrent::TimerTask] the task that was started
# @return [SidekiqUniqueJobs::TimerTask] the task that was started
#
def start # rubocop:disable
def start(test_task = nil) # rubocop:disable
return if disabled?
return if registered?

self.task = test_task || default_task

with_logging_context do
register_reaper_process
log_info("Starting Reaper")
Expand Down Expand Up @@ -59,7 +61,11 @@ def stop
# @return [<type>] <description>
#
def task
@task ||= Concurrent::TimerTask.new(timer_task_options) do
@task ||= default_task
end

def default_task
SidekiqUniqueJobs::TimerTask.new(timer_task_options) do
with_logging_context do
redis do |conn|
refresh_reaper_mutex
Expand All @@ -69,6 +75,10 @@ def task
end
end

def task=(task)
@task = task
end

#
# Arguments passed on to the timer task
#
Expand Down
78 changes: 78 additions & 0 deletions lib/sidekiq_unique_jobs/timer_task.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# frozen_string_literal: true

module SidekiqUniqueJobs
# @see [Concurrent::TimerTask] https://www.rubydoc.info/gems/concurrent-ruby/Concurrent/TimerTask
#
class TimerTask < ::Concurrent::TimerTask
private

def ns_initialize(opts, &task)
set_deref_options(opts)

self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
self.timeout_interval = opts[:timeout] || opts[:timeout_interval] || TIMEOUT_INTERVAL
@run_now = opts[:now] || opts[:run_now]
@executor = Concurrent::RubySingleThreadExecutor.new
@running = Concurrent::AtomicBoolean.new(false)
@task = task
@value = nil

self.observers = Concurrent::Collection::CopyOnNotifyObserverSet.new
end

def schedule_next_task(interval = execution_interval)
exec_task = ->(completion) { execute_task(completion) }
ScheduledTask.execute(interval, args: [Concurrent::Event.new], &exec_task)
nil
end

# @!visibility private
def execute_task(completion) # rubocop:disable Metrics/MethodLength
return nil unless @running.true?

timeout_task = -> { timeout_task(completion) }

Concurrent::ScheduledTask.execute(
timeout_interval,
args: [completion],
&timeout_task
)
@thread_completed = Concurrent::Event.new

@value = @reason = nil
@executor.post do
@value = @task.call(self)
rescue Exception => ex # rubocop:disable Lint/RescueException
@reason = ex
ensure
@thread_completed.set
end

@thread_completed.wait

if completion.try?
schedule_next_task
time = Time.now
observers.notify_observers do
[time, value, @reason]
end
end
nil
end

# @!visibility private
def timeout_task(completion)
return unless @running.true?
return unless completion.try?

@executor.kill
@executor.wait_for_termination
@executor = Concurrent::RubySingleThreadExecutor.new

@thread_completed.set

schedule_next_task
observers.notify_observers(Time.now, nil, Concurrent::TimeoutError.new)
end
end
end
12 changes: 6 additions & 6 deletions spec/sidekiq_unique_jobs/orphans/manager_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

RSpec.describe SidekiqUniqueJobs::Orphans::Manager do
let(:task) { instance_spy(Concurrent::TimerTask) }
let(:task) { instance_spy(SidekiqUniqueJobs::TimerTask) }
let(:observer) { instance_spy(SidekiqUniqueJobs::Orphans::Observer) }

describe ".start" do
Expand Down Expand Up @@ -297,20 +297,20 @@
end
end

describe "#task" do
subject(:task) { described_class.task }
describe "#default_task" do
subject(:default_task) { described_class.default_task }

before do
allow(Concurrent::TimerTask).to receive(:new).and_call_original
allow(SidekiqUniqueJobs::TimerTask).to receive(:new).and_call_original
allow(described_class).to receive(:with_logging_context).and_yield
allow(described_class).to receive(:refresh_reaper_mutex).and_return(true)
allow(SidekiqUniqueJobs::Orphans::Reaper).to receive(:call).and_return(true)
end

it "initializes a new timer task with the correct arguments" do
expect(task).to be_a(Concurrent::TimerTask)
expect(default_task).to be_a(SidekiqUniqueJobs::TimerTask)

expect(Concurrent::TimerTask).to have_received(:new)
expect(SidekiqUniqueJobs::TimerTask).to have_received(:new)
.with(described_class.timer_task_options)
end
end
Expand Down
Loading

0 comments on commit 1e13e2c

Please sign in to comment.