From b999c8b646b5471e12b182ff6c3565133a2d3787 Mon Sep 17 00:00:00 2001 From: Mikael Henriksson Date: Tue, 12 Jul 2022 11:15:59 +0200 Subject: [PATCH] Fix(until_expired): Fix test and implementation (#725) --- .../orphans/ruby_reaper.rb | 31 +++++++++++++------ .../orphans/ruby_reaper_spec.rb | 14 ++++++--- spec/support/sidekiq_unique_jobs/testing.rb | 8 +++++ 3 files changed, 40 insertions(+), 13 deletions(-) diff --git a/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb b/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb index faa47054..95c13953 100644 --- a/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb +++ b/lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb @@ -34,9 +34,15 @@ class RubyReaper < Reaper # # @!attribute [r] start_time - # @return [Integer] The timestamp this execution started represented as integer + # @return [Integer] The timestamp this execution started represented as Time (used for locks) attr_reader :start_time + # + # @!attribute [r] start_time + # @return [Integer] The clock stamp this execution started represented as integer + # (used for redis compatibility as it is more accurate than time) + attr_reader :start_source + # # @!attribute [r] timeout_ms # @return [Integer] The allowed ms before timeout @@ -49,11 +55,12 @@ class RubyReaper < Reaper # def initialize(conn) super(conn) - @digests = SidekiqUniqueJobs::Digests.new - @scheduled = Redis::SortedSet.new(SCHEDULE) - @retried = Redis::SortedSet.new(RETRY) - @start_time = time_source.call - @timeout_ms = SidekiqUniqueJobs.config.reaper_timeout * 1000 + @digests = SidekiqUniqueJobs::Digests.new + @scheduled = Redis::SortedSet.new(SCHEDULE) + @retried = Redis::SortedSet.new(RETRY) + @start_time = Time.now + @start_source = time_source.call + @timeout_ms = SidekiqUniqueJobs.config.reaper_timeout * 1000 end # @@ -65,12 +72,18 @@ def initialize(conn) def call return if queues_very_full? - BatchDelete.call(orphans, conn) BatchDelete.call(expired_digests, conn) + BatchDelete.call(orphans, conn) end def expired_digests - conn.zrangebyscore(EXPIRING_DIGESTS, 0, @start_time) + max_score = (start_time - reaper_timeout).to_f + + if VersionCheck.satisfied?(redis_version, ">= 6.2.0") && VersionCheck.satisfied?(::Redis::VERSION, ">= 4.6.0") + conn.zrange(EXPIRING_DIGESTS, 0, max_score, byscore: true) + else + conn.zrangebyscore(EXPIRING_DIGESTS, 0, max_score) + end end # @@ -109,7 +122,7 @@ def timeout? end def elapsed_ms - time_source.call - start_time + time_source.call - start_source end # diff --git a/spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb b/spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb index 9d45914d..be2b7494 100644 --- a/spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb +++ b/spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb @@ -45,7 +45,7 @@ SidekiqUniqueJobs::Lock.create(digest_two, job_id_two) SidekiqUniqueJobs::Lock.create(digest_three, job_id_three) - elapsed_ms = service.start_time + service.timeout_ms + 10 + elapsed_ms = service.start_source + service.timeout_ms + 10 allow(service).to receive(:elapsed_ms).and_return(elapsed_ms) allow(service).to receive(:belongs_to_job?).and_call_original @@ -150,12 +150,18 @@ } end + before do + # NOTE: The below makes sure that the timing is way of in the future + # which allows the spec to pass and consider existing locks as `old` + allow(service).to receive(:start_time).and_return(Time.now + 100_000) + end + it "clears the lock" do - expect(redis { |conn| conn.zcard(SidekiqUniqueJobs::EXPIRING_DIGESTS) }).to eq 1 - sleep 2 + expect(zcard(SidekiqUniqueJobs::EXPIRING_DIGESTS)).to eq 1 + service.call - expect(redis { |conn| conn.zcard(SidekiqUniqueJobs::EXPIRING_DIGESTS) }).to eq 0 + expect(zcard(SidekiqUniqueJobs::EXPIRING_DIGESTS)).to eq 0 end end end diff --git a/spec/support/sidekiq_unique_jobs/testing.rb b/spec/support/sidekiq_unique_jobs/testing.rb index ccda62ad..a3b03411 100644 --- a/spec/support/sidekiq_unique_jobs/testing.rb +++ b/spec/support/sidekiq_unique_jobs/testing.rb @@ -831,6 +831,10 @@ def unique_keys SidekiqUniqueJobs::EXPIRING_DIGESTS] end + def expired_keys + zrange(SidekiqUniqueJobs::EXPIRING_DIGESTS, 0, -1) + end + def changelogs @changelogs || SidekiqUniqueJobs::Changelog.new end @@ -871,6 +875,10 @@ def locked_jids(key = nil) end end + def now + SidekiqUniqueJobs.now + end + def now_f SidekiqUniqueJobs.now_f end