-
-
Notifications
You must be signed in to change notification settings - Fork 279
/
ruby_reaper.rb
313 lines (266 loc) · 8.77 KB
/
ruby_reaper.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# frozen_string_literal: true
module SidekiqUniqueJobs
module Orphans
#
# Class DeleteOrphans provides deletion of orphaned digests
#
# @note this is a much slower version of the lua script but does not crash redis
#
# @author Mikael Henriksson <[email protected]>
#
# rubocop:disable Metrics/ClassLength
class RubyReaper < Reaper
include SidekiqUniqueJobs::Timing
#
# @return [Integer] a best guess of Sidekiq::Launcher::BEAT_PAUSE
SIDEKIQ_BEAT_PAUSE = 10
#
# @return [String] the suffix for :RUN locks
RUN_SUFFIX = ":RUN"
#
# @return [Integer] the maximum combined length of sidekiq queues for running the reaper
MAX_QUEUE_LENGTH = 1000
#
# @!attribute [r] digests
# @return [SidekiqUniqueJobs::Digests] digest collection
attr_reader :digests
#
# @!attribute [r] scheduled
# @return [Redis::SortedSet] the Sidekiq ScheduleSet
attr_reader :scheduled
#
# @!attribute [r] retried
# @return [Redis::SortedSet] the Sidekiq RetrySet
attr_reader :retried
#
# @!attribute [r] start_time
# @return [Integer] The timestamp this execution started represented as Time (used for locks)
attr_reader :start_time
#
# @!attribute [r] start_time
# @return [Integer] The clock stamp this execution started represented as integer
# (used for redis compatibility as it is more accurate than time)
attr_reader :start_source
#
# @!attribute [r] timeout_ms
# @return [Integer] The allowed ms before timeout
attr_reader :timeout_ms
#
# Initialize a new instance of DeleteOrphans
#
# @param [Redis] conn a connection to redis
#
def initialize(conn)
super
@digests = SidekiqUniqueJobs::Digests.new
@scheduled = Redis::SortedSet.new(SCHEDULE)
@retried = Redis::SortedSet.new(RETRY)
@start_time = Time.now
@start_source = time_source.call
@timeout_ms = SidekiqUniqueJobs.config.reaper_timeout * 1000
end
#
# Delete orphaned digests
#
#
# @return [Integer] the number of reaped locks
#
def call
return if queues_very_full?
BatchDelete.call(expired_digests, conn)
BatchDelete.call(orphans, conn)
# orphans.each_slice(500) do |chunk|
# conn.pipelined do |pipeline|
# chunk.each do |digest|
# next if belongs_to_job?(digest)
# pipeline.zadd(ORPHANED_DIGESTS, now_f, digest)
# end
# end
# end
end
def expired_digests
conn.zrange(EXPIRING_DIGESTS, 0, max_score, "byscore")
end
def orphaned_digests
conn.zrange(ORPHANED_DIGESTS, 0, max_score, "byscore")
end
def max_score
(start_time - reaper_timeout - SIDEKIQ_BEAT_PAUSE).to_f
end
#
# Find orphaned digests
#
#
# @return [Array<String>] an array of orphaned digests
#
def orphans # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
orphans = []
page = 0
per = reaper_count * 2
results = digests.byscore(0, max_score, offset: page * per, count: (page + 1) * per)
while results.size.positive?
results.each do |digest|
break if timeout?
next if belongs_to_job?(digest)
orphans << digest
break if orphans.size >= reaper_count
end
break if timeout?
break if orphans.size >= reaper_count
page += 1
results = digests.byscore(0, max_score, offset: page * per, count: (page + 1) * per)
end
orphans
end
def timeout?
elapsed_ms >= timeout_ms
end
def elapsed_ms
time_source.call - start_source
end
#
# Checks if the digest has a matching job.
# 1. It checks the scheduled set
# 2. It checks the retry set
# 3. It goes through all queues
#
#
# @param [String] digest the digest to search for
#
# @return [true] when either of the checks return true
# @return [false] when no job was found for this digest
#
def belongs_to_job?(digest)
scheduled?(digest) || retried?(digest) || enqueued?(digest) || active?(digest)
end
#
# Checks if the digest exists in the Sidekiq::ScheduledSet
#
# @param [String] digest the current digest
#
# @return [true] when digest exists in scheduled set
#
def scheduled?(digest)
in_sorted_set?(SCHEDULE, digest)
end
#
# Checks if the digest exists in the Sidekiq::RetrySet
#
# @param [String] digest the current digest
#
# @return [true] when digest exists in retry set
#
def retried?(digest)
in_sorted_set?(RETRY, digest)
end
#
# Checks if the digest exists in a Sidekiq::Queue
#
# @param [String] digest the current digest
#
# @return [true] when digest exists in any queue
#
def enqueued?(digest)
Sidekiq.redis do |conn|
queues(conn) do |queue|
entries(conn, queue) do |entry|
return true if entry.include?(digest)
end
end
false
end
end
def active?(digest) # rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
Sidekiq.redis do |conn|
procs = conn.sscan("processes").to_a
return false if procs.empty?
procs.sort.each do |key|
valid, workers = conn.pipelined do |pipeline|
# TODO: Remove the if statement in the future
if pipeline.respond_to?(:exists?)
pipeline.exists?(key)
else
pipeline.exists(key)
end
pipeline.hgetall("#{key}:work")
end
next unless valid
next unless workers.any?
workers.each_pair do |_tid, job|
next unless (item = safe_load_json(job))
payload = safe_load_json(item[PAYLOAD])
return true if match?(digest, payload[LOCK_DIGEST])
return true if considered_active?(payload[CREATED_AT])
end
end
false
end
end
def match?(key_one, key_two)
return false if key_one.nil? || key_two.nil?
key_one.delete_suffix(RUN_SUFFIX) == key_two.delete_suffix(RUN_SUFFIX)
end
def considered_active?(time_f)
max_score < time_f
end
#
# Loops through all the redis queues and yields them one by one
#
# @param [Redis] conn the connection to use for fetching queues
#
# @return [void]
#
# @yield queues one at a time
#
def queues(conn, &block)
conn.sscan("queues").each(&block)
end
def entries(conn, queue, &block) # rubocop:disable Metrics/MethodLength
queue_key = "queue:#{queue}"
initial_size = conn.llen(queue_key)
deleted_size = 0
page = 0
page_size = 50
loop do
range_start = (page * page_size) - deleted_size
range_end = range_start + page_size - 1
entries = conn.lrange(queue_key, range_start, range_end)
page += 1
break if entries.empty?
entries.each(&block)
deleted_size = initial_size - conn.llen(queue_key)
# The queue is growing, not shrinking, just keep looping
deleted_size = 0 if deleted_size.negative?
end
end
# If sidekiq queues are very full, it becomes highly inefficient for the reaper
# because it must check every queued job to verify a digest is safe to delete
# The reaper checks queued jobs in batches of 50, adding 2 reads per digest
# With a queue length of 1,000 jobs, that's over 20 extra reads per digest.
def queues_very_full?
total_queue_size = 0
Sidekiq.redis do |conn|
queues(conn) do |queue|
total_queue_size += conn.llen("queue:#{queue}")
return true if total_queue_size > MAX_QUEUE_LENGTH
end
end
false
end
#
# Checks a sorted set for the existance of this digest
#
#
# @param [String] key the key for the sorted set
# @param [String] digest the digest to scan for
#
# @return [true] when found
# @return [false] when missing
#
def in_sorted_set?(key, digest)
conn.zscan(key, match: "*#{digest}*", count: 1).to_a.any?
end
end
# rubocop:enable Metrics/ClassLength
end
end