Skip to content

Commit

Permalink
Fix(until_expired): Fix test and implementation (#725)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhenrixon authored Jul 12, 2022
1 parent 6c7d48d commit b999c8b
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 13 deletions.
31 changes: 22 additions & 9 deletions lib/sidekiq_unique_jobs/orphans/ruby_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,15 @@ class RubyReaper < Reaper

#
# @!attribute [r] start_time
# @return [Integer] The timestamp this execution started represented as integer
# @return [Integer] The timestamp this execution started represented as Time (used for locks)
attr_reader :start_time

#
# @!attribute [r] start_time
# @return [Integer] The clock stamp this execution started represented as integer
# (used for redis compatibility as it is more accurate than time)
attr_reader :start_source

#
# @!attribute [r] timeout_ms
# @return [Integer] The allowed ms before timeout
Expand All @@ -49,11 +55,12 @@ class RubyReaper < Reaper
#
def initialize(conn)
super(conn)
@digests = SidekiqUniqueJobs::Digests.new
@scheduled = Redis::SortedSet.new(SCHEDULE)
@retried = Redis::SortedSet.new(RETRY)
@start_time = time_source.call
@timeout_ms = SidekiqUniqueJobs.config.reaper_timeout * 1000
@digests = SidekiqUniqueJobs::Digests.new
@scheduled = Redis::SortedSet.new(SCHEDULE)
@retried = Redis::SortedSet.new(RETRY)
@start_time = Time.now
@start_source = time_source.call
@timeout_ms = SidekiqUniqueJobs.config.reaper_timeout * 1000
end

#
Expand All @@ -65,12 +72,18 @@ def initialize(conn)
def call
return if queues_very_full?

BatchDelete.call(orphans, conn)
BatchDelete.call(expired_digests, conn)
BatchDelete.call(orphans, conn)
end

def expired_digests
conn.zrangebyscore(EXPIRING_DIGESTS, 0, @start_time)
max_score = (start_time - reaper_timeout).to_f

if VersionCheck.satisfied?(redis_version, ">= 6.2.0") && VersionCheck.satisfied?(::Redis::VERSION, ">= 4.6.0")
conn.zrange(EXPIRING_DIGESTS, 0, max_score, byscore: true)
else
conn.zrangebyscore(EXPIRING_DIGESTS, 0, max_score)
end
end

#
Expand Down Expand Up @@ -109,7 +122,7 @@ def timeout?
end

def elapsed_ms
time_source.call - start_time
time_source.call - start_source
end

#
Expand Down
14 changes: 10 additions & 4 deletions spec/sidekiq_unique_jobs/orphans/ruby_reaper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
SidekiqUniqueJobs::Lock.create(digest_two, job_id_two)
SidekiqUniqueJobs::Lock.create(digest_three, job_id_three)

elapsed_ms = service.start_time + service.timeout_ms + 10
elapsed_ms = service.start_source + service.timeout_ms + 10

allow(service).to receive(:elapsed_ms).and_return(elapsed_ms)
allow(service).to receive(:belongs_to_job?).and_call_original
Expand Down Expand Up @@ -150,12 +150,18 @@
}
end

before do
# NOTE: The below makes sure that the timing is way of in the future
# which allows the spec to pass and consider existing locks as `old`
allow(service).to receive(:start_time).and_return(Time.now + 100_000)
end

it "clears the lock" do
expect(redis { |conn| conn.zcard(SidekiqUniqueJobs::EXPIRING_DIGESTS) }).to eq 1
sleep 2
expect(zcard(SidekiqUniqueJobs::EXPIRING_DIGESTS)).to eq 1

service.call

expect(redis { |conn| conn.zcard(SidekiqUniqueJobs::EXPIRING_DIGESTS) }).to eq 0
expect(zcard(SidekiqUniqueJobs::EXPIRING_DIGESTS)).to eq 0
end
end
end
Expand Down
8 changes: 8 additions & 0 deletions spec/support/sidekiq_unique_jobs/testing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,10 @@ def unique_keys
SidekiqUniqueJobs::EXPIRING_DIGESTS]
end

def expired_keys
zrange(SidekiqUniqueJobs::EXPIRING_DIGESTS, 0, -1)
end

def changelogs
@changelogs || SidekiqUniqueJobs::Changelog.new
end
Expand Down Expand Up @@ -871,6 +875,10 @@ def locked_jids(key = nil)
end
end

def now
SidekiqUniqueJobs.now
end

def now_f
SidekiqUniqueJobs.now_f
end
Expand Down

0 comments on commit b999c8b

Please sign in to comment.