diff --git a/lib/sidekiq_unique_jobs/orphans/manager.rb b/lib/sidekiq_unique_jobs/orphans/manager.rb index 0d23d54b4..6dbe7758b 100644 --- a/lib/sidekiq_unique_jobs/orphans/manager.rb +++ b/lib/sidekiq_unique_jobs/orphans/manager.rb @@ -10,6 +10,8 @@ module Orphans module Manager module_function + DRIFT_FACTOR = 0.02 + include SidekiqUniqueJobs::Connection include SidekiqUniqueJobs::Logging @@ -56,6 +58,7 @@ def task @task ||= Concurrent::TimerTask.new(timer_task_options) do with_logging_context do redis do |conn| + refresh_reaper_mutex Orphans::Reaper.call(conn) end end @@ -117,7 +120,9 @@ def logging_context # @return [true, false] # def registered? - redis { |conn| conn.get(UNIQUE_REAPER) }.to_i == 1 + redis do |conn| + conn.get(UNIQUE_REAPER).to_i + drift_reaper_interval > current_timestamp + end end def disabled? @@ -131,7 +136,17 @@ def disabled? # @return [void] # def register_reaper_process - redis { |conn| conn.set(UNIQUE_REAPER, 1) } + redis { |conn| conn.set(UNIQUE_REAPER, current_timestamp, nx: true, ex: drift_reaper_interval) } + end + + # + # Updates mutex key + # + # + # @return [void] + # + def refresh_reaper_mutex + redis { |conn| conn.set(UNIQUE_REAPER, current_timestamp, ex: drift_reaper_interval) } end # @@ -143,6 +158,14 @@ def register_reaper_process def unregister_reaper_process redis { |conn| conn.del(UNIQUE_REAPER) } end + + def drift_reaper_interval + reaper_interval + (reaper_interval * DRIFT_FACTOR).to_i + end + + def current_timestamp + Time.now.to_i + end end end end diff --git a/spec/sidekiq_unique_jobs/orphans/manager_spec.rb b/spec/sidekiq_unique_jobs/orphans/manager_spec.rb index d196d8ebe..f73508ba0 100644 --- a/spec/sidekiq_unique_jobs/orphans/manager_spec.rb +++ b/spec/sidekiq_unique_jobs/orphans/manager_spec.rb @@ -7,6 +7,12 @@ describe ".start" do subject(:start) { described_class.start } + let(:frozen_time) { Time.new(1982, 6, 8, 14, 15, 34) } + + around do |example| + Timecop.freeze(frozen_time, &example) + end + before do allow(SidekiqUniqueJobs::Orphans::Observer).to receive(:new).and_return(observer) @@ -18,7 +24,7 @@ end context "when registered?" do - before { redis { |conn| conn.set(SidekiqUniqueJobs::UNIQUE_REAPER, 1) } } + before { described_class.register_reaper_process } it { is_expected.to eq(nil) } end @@ -29,7 +35,7 @@ it "sets a mutex" do start - expect(get(SidekiqUniqueJobs::UNIQUE_REAPER)).to eq("1") + expect(get(SidekiqUniqueJobs::UNIQUE_REAPER)).to eq(frozen_time.to_i.to_s) end it "logs a start message" do @@ -111,6 +117,23 @@ it { is_expected.to eq(SidekiqUniqueJobs.config.reaper_timeout) } end + describe ".register_reaper_process" do + subject(:register_reaper_process) { described_class.register_reaper_process } + + let(:frozen_time) { Time.new(1982, 6, 8, 14, 15, 34) } + + around do |example| + Timecop.freeze(frozen_time, &example) + end + + it "writes a redis key with timestamp" do + expect { register_reaper_process }.to change { get(SidekiqUniqueJobs::UNIQUE_REAPER) } + .from(nil).to(frozen_time.to_i.to_s) + + expect(ttl(SidekiqUniqueJobs::UNIQUE_REAPER)).to be_within(20).of(SidekiqUniqueJobs.config.reaper_interval) + end + end + describe ".logging_context" do subject(:logging_context) { described_class.logging_context } diff --git a/spec/sidekiq_unique_jobs/web/helpers_spec.rb b/spec/sidekiq_unique_jobs/web/helpers_spec.rb index e58bd3ecb..b2ac27189 100644 --- a/spec/sidekiq_unique_jobs/web/helpers_spec.rb +++ b/spec/sidekiq_unique_jobs/web/helpers_spec.rb @@ -10,9 +10,9 @@ let(:time) { Time.now.to_f } let(:stamp) { Time.now.getutc.iso8601 } - before { Timecop.freeze(frozen_time) } - - after { Timecop.return } + around do |example| + Timecop.freeze(frozen_time, &example) + end it "returns relative time html" do expect(safe_relative_time).to eq(<<~HTML.chop) @@ -26,9 +26,9 @@ let(:frozen_time) { Time.new(1982, 6, 8, 14, 15, 34) } - before { Timecop.freeze(frozen_time) } - - after { Timecop.return } + around do |example| + Timecop.freeze(frozen_time, &example) + end context "when time is an Integer" do let(:time) { Time.now.to_i }