Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce reaper threads #576

Merged
merged 9 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .reek.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ detectors:
exclude:
- Sidekiq::JobSet::UniqueExtension#delete_by_value
- Sidekiq::ScheduledSet::UniqueExtension#delete
- SidekiqUniqueJobs::Orphans::Manager#start
- SidekiqUniqueJobs::Orphans::RubyReaper#active?
- SidekiqUniqueJobs::Redis::Hash#entries
DataClump:
Expand Down Expand Up @@ -58,6 +59,9 @@ detectors:
- SidekiqUniqueJobs::SidekiqWorkerMethods#worker_class_constantize
- SidekiqUniqueJobs::Web::Helpers#cparams
- SidekiqUniqueJobs::Web::Helpers#display_lock_args
InstanceVariableAssumption:
exclude:
- SidekiqUniqueJobs::TimerTask
IrresponsibleModule:
enabled: true
LongParameterList:
Expand Down Expand Up @@ -139,6 +143,8 @@ detectors:
- SidekiqUniqueJobs::Script::Caller#call_script
- SidekiqUniqueJobs::Script::Caller#extract_args
- SidekiqUniqueJobs::SidekiqWorkerMethods#worker_class_constantize
- SidekiqUniqueJobs::TimerTask#execute_task
- SidekiqUniqueJobs::TimerTask#timeout_task
- SidekiqUniqueJobs::UpgradeLocks#call
- SidekiqUniqueJobs::UpgradeLocks#upgrade_v6_lock
- SidekiqUniqueJobs::Web#self.registered
Expand All @@ -148,6 +154,7 @@ detectors:
- SidekiqUniqueJobs::LockConfig
- SidekiqUniqueJobs::Locksmith
- SidekiqUniqueJobs::Lock::BaseLock
- SidekiqUniqueJobs::TimerTask
TooManyMethods:
exclude:
- SidekiqUniqueJobs::Lock::BaseLock
Expand Down
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ gem "timecop"
gem "yard"

platforms :mri do
gem "concurrent-ruby-ext"
gem "fasterer"
gem "github_changelog_generator"
gem "guard"
Expand All @@ -30,6 +31,7 @@ platforms :mri do
gem "reek", ">= 5.3"
gem "rspec-benchmark"
gem "rubocop-mhenrixon"
gem "ruby-prof", require: false
gem "simplecov-sublime", "0.21.0", require: false
gem "travis"
end
Expand Down
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,16 @@ Configure SidekiqUniqueJobs in an initializer or the sidekiq initializer on appl

```ruby
SidekiqUniqueJobs.configure do |config|
config.debug_lua = true
config.lock_info = true
config.lock_ttl = 10.minutes
config.lock_timeout = 10.minutes
config.logger = Sidekiq.logger
config.max_history = 10_000
config.reaper = :lua
config.reaper_count = 100
config.reaper_interval = 10
config.reaper_timeout = 5
config.logger = Sidekiq.logger # default, change at your own discretion
config.debug_lua = false # Turn on when debugging
config.lock_info = false # Turn on when debugging
config.lock_ttl = 600 # Expire locks after 10 minutes
config.lock_timeout = nil # turn off lock timeout
config.max_history = 0 # Turn on when debugging
config.reaper = :ruby # :ruby, :lua or :none/nil
config.reaper_count = 1000 # Stop reaping after this many keys
config.reaper_interval = 600 # Reap orphans every 10 minutes
config.reaper_timeout = 150 # Timeout reaper after 1,5 minutes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

150/60 = 2.5 minutes

I keep in my own config 595 seconds as timeout. It's close to interval 600s.

end
```

Expand Down
42 changes: 42 additions & 0 deletions bin/profiling
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

# Trap interrupts to quit cleanly. See
# https://twitter.com/mitchellh/status/283014103189053442
Signal.trap("INT") { abort }

require "bundler/setup"
require "ruby-prof"
require "sidekiq-unique-jobs"

SidekiqUniqueJobs.configure do |config|
config.reaper_interval = 2
config.reaper_timeout = 1
config.reaper_count = 10_000
end

TASK = SidekiqUniqueJobs::TimerTask.new(SidekiqUniqueJobs::Orphans::Manager.timer_task_options) do
SidekiqUniqueJobs::Orphans::Manager.with_logging_context do
SidekiqUniqueJobs::Orphans::Manager.redis do |_conn|
SidekiqUniqueJobs::Orphans::Manager.refresh_reaper_mutex
sleep(1)
end
end
end

counter = 0
result = RubyProf.profile do
100.times do
SidekiqUniqueJobs::Orphans::Manager.start(TASK)
end

while counter < 60
sleep(1)

counter += 1
end
end

result.exclude_common_methods!
printer = RubyProf::GraphPrinter.new(result)
printer.print($stdout, min_percent: 2)
2 changes: 2 additions & 0 deletions gemfiles/sidekiq_5.0.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gem "yard"
gem "sidekiq", "~> 5.0.0"

platforms :mri do
gem "concurrent-ruby-ext"
gem "fasterer"
gem "github_changelog_generator"
gem "guard"
Expand All @@ -28,6 +29,7 @@ platforms :mri do
gem "reek", ">= 5.3"
gem "rspec-benchmark"
gem "rubocop-mhenrixon"
gem "ruby-prof", require: false
gem "simplecov-sublime", "0.21.0", require: false
gem "travis"
end
Expand Down
2 changes: 2 additions & 0 deletions gemfiles/sidekiq_5.1.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gem "yard"
gem "sidekiq", "~> 5.1.0"

platforms :mri do
gem "concurrent-ruby-ext"
gem "fasterer"
gem "github_changelog_generator"
gem "guard"
Expand All @@ -28,6 +29,7 @@ platforms :mri do
gem "reek", ">= 5.3"
gem "rspec-benchmark"
gem "rubocop-mhenrixon"
gem "ruby-prof", require: false
gem "simplecov-sublime", "0.21.0", require: false
gem "travis"
end
Expand Down
2 changes: 2 additions & 0 deletions gemfiles/sidekiq_5.2.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gem "yard"
gem "sidekiq", "~> 5.2.0"

platforms :mri do
gem "concurrent-ruby-ext"
gem "fasterer"
gem "github_changelog_generator"
gem "guard"
Expand All @@ -28,6 +29,7 @@ platforms :mri do
gem "reek", ">= 5.3"
gem "rspec-benchmark"
gem "rubocop-mhenrixon"
gem "ruby-prof", require: false
gem "simplecov-sublime", "0.21.0", require: false
gem "travis"
end
Expand Down
2 changes: 2 additions & 0 deletions gemfiles/sidekiq_6.0.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gem "yard"
gem "sidekiq", "~> 6.0.0"

platforms :mri do
gem "concurrent-ruby-ext"
gem "fasterer"
gem "github_changelog_generator"
gem "guard"
Expand All @@ -28,6 +29,7 @@ platforms :mri do
gem "reek", ">= 5.3"
gem "rspec-benchmark"
gem "rubocop-mhenrixon"
gem "ruby-prof", require: false
gem "simplecov-sublime", "0.21.0", require: false
gem "travis"
end
Expand Down
2 changes: 2 additions & 0 deletions gemfiles/sidekiq_6.1.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gem "yard"
gem "sidekiq", "~> 6.1.0"

platforms :mri do
gem "concurrent-ruby-ext"
gem "fasterer"
gem "github_changelog_generator"
gem "guard"
Expand All @@ -28,6 +29,7 @@ platforms :mri do
gem "reek", ">= 5.3"
gem "rspec-benchmark"
gem "rubocop-mhenrixon"
gem "ruby-prof", require: false
gem "simplecov-sublime", "0.21.0", require: false
gem "travis"
end
Expand Down
2 changes: 2 additions & 0 deletions gemfiles/sidekiq_develop.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ gem "yard"
gem "sidekiq", git: "https://github.com/mperham/sidekiq.git"

platforms :mri do
gem "concurrent-ruby-ext"
gem "fasterer"
gem "github_changelog_generator"
gem "guard"
Expand All @@ -28,6 +29,7 @@ platforms :mri do
gem "reek", ">= 5.3"
gem "rspec-benchmark"
gem "rubocop-mhenrixon"
gem "ruby-prof", require: false
gem "simplecov-sublime", "0.21.0", require: false
gem "travis"
end
Expand Down
4 changes: 3 additions & 1 deletion lib/sidekiq_unique_jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
require "brpoplpush/redis_script"
require "concurrent/future"
require "concurrent/promises"
require "concurrent/timer_task"
require "concurrent/map"
require "concurrent/mutable_struct"
require "concurrent/timer_task"
require "concurrent/executor/ruby_single_thread_executor"
require "digest"
require "digest/sha1"
require "erb"
Expand All @@ -14,6 +15,7 @@
require "pathname"
require "sidekiq"

require "sidekiq_unique_jobs/timer_task"
require "sidekiq_unique_jobs/version"
require "sidekiq_unique_jobs/version_check"
require "sidekiq_unique_jobs/constants"
Expand Down
16 changes: 13 additions & 3 deletions lib/sidekiq_unique_jobs/orphans/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ module Manager
# Starts a separate thread that periodically reaps orphans
#
#
# @return [Concurrent::TimerTask] the task that was started
# @return [SidekiqUniqueJobs::TimerTask] the task that was started
#
def start # rubocop:disable
def start(test_task = nil) # rubocop:disable
return if disabled?
return if registered?

self.task = test_task || default_task

with_logging_context do
register_reaper_process
log_info("Starting Reaper")
Expand Down Expand Up @@ -59,7 +61,11 @@ def stop
# @return [<type>] <description>
#
def task
@task ||= Concurrent::TimerTask.new(timer_task_options) do
@task ||= default_task
end

def default_task
SidekiqUniqueJobs::TimerTask.new(timer_task_options) do
with_logging_context do
redis do |conn|
refresh_reaper_mutex
Expand All @@ -69,6 +75,10 @@ def task
end
end

def task=(task)
@task = task
end

#
# Arguments passed on to the timer task
#
Expand Down
78 changes: 78 additions & 0 deletions lib/sidekiq_unique_jobs/timer_task.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# frozen_string_literal: true

module SidekiqUniqueJobs
# @see [Concurrent::TimerTask] https://www.rubydoc.info/gems/concurrent-ruby/Concurrent/TimerTask
#
class TimerTask < ::Concurrent::TimerTask
private

def ns_initialize(opts, &task)
set_deref_options(opts)

self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
self.timeout_interval = opts[:timeout] || opts[:timeout_interval] || TIMEOUT_INTERVAL
@run_now = opts[:now] || opts[:run_now]
@executor = Concurrent::RubySingleThreadExecutor.new
@running = Concurrent::AtomicBoolean.new(false)
@task = task
@value = nil

self.observers = Concurrent::Collection::CopyOnNotifyObserverSet.new
end

def schedule_next_task(interval = execution_interval)
exec_task = ->(completion) { execute_task(completion) }
ScheduledTask.execute(interval, args: [Concurrent::Event.new], &exec_task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be prefix Concurrent::ScheduledTask?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added fix in this PR #577

nil
end

# @!visibility private
def execute_task(completion) # rubocop:disable Metrics/MethodLength
return nil unless @running.true?

timeout_task = -> { timeout_task(completion) }

Concurrent::ScheduledTask.execute(
timeout_interval,
args: [completion],
&timeout_task
)
@thread_completed = Concurrent::Event.new

@value = @reason = nil
@executor.post do
@value = @task.call(self)
rescue Exception => ex # rubocop:disable Lint/RescueException
@reason = ex
ensure
@thread_completed.set
end

@thread_completed.wait

if completion.try?
schedule_next_task
time = Time.now
observers.notify_observers do
[time, value, @reason]
end
end
nil
end

# @!visibility private
def timeout_task(completion)
return unless @running.true?
return unless completion.try?

@executor.kill
@executor.wait_for_termination
@executor = Concurrent::RubySingleThreadExecutor.new

@thread_completed.set

schedule_next_task
observers.notify_observers(Time.now, nil, Concurrent::TimeoutError.new)
end
end
end
12 changes: 6 additions & 6 deletions spec/sidekiq_unique_jobs/orphans/manager_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

RSpec.describe SidekiqUniqueJobs::Orphans::Manager do
let(:task) { instance_spy(Concurrent::TimerTask) }
let(:task) { instance_spy(SidekiqUniqueJobs::TimerTask) }
let(:observer) { instance_spy(SidekiqUniqueJobs::Orphans::Observer) }

describe ".start" do
Expand Down Expand Up @@ -297,20 +297,20 @@
end
end

describe "#task" do
subject(:task) { described_class.task }
describe "#default_task" do
subject(:default_task) { described_class.default_task }

before do
allow(Concurrent::TimerTask).to receive(:new).and_call_original
allow(SidekiqUniqueJobs::TimerTask).to receive(:new).and_call_original
allow(described_class).to receive(:with_logging_context).and_yield
allow(described_class).to receive(:refresh_reaper_mutex).and_return(true)
allow(SidekiqUniqueJobs::Orphans::Reaper).to receive(:call).and_return(true)
end

it "initializes a new timer task with the correct arguments" do
expect(task).to be_a(Concurrent::TimerTask)
expect(default_task).to be_a(SidekiqUniqueJobs::TimerTask)

expect(Concurrent::TimerTask).to have_received(:new)
expect(SidekiqUniqueJobs::TimerTask).to have_received(:new)
.with(described_class.timer_task_options)
end
end
Expand Down
Loading