Skip to content

Commit

Permalink
Fix lingering threads in tests
Browse files Browse the repository at this point in the history
Also, allow embedding process to provide their own heartbeat thread.
  • Loading branch information
mperham committed Nov 1, 2022
1 parent b1fc7fa commit 636fa9a
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 130 deletions.
28 changes: 18 additions & 10 deletions lib/sidekiq/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ def initialize(config, embedded: false)
@done = false
end

def run
# Start this Sidekiq instance. If an embedding process already
# has a heartbeat thread, caller can use `async_beat: false`
# and instead have thread call Launcher#heartbeat every N seconds.
def run(async_beat: true)
Sidekiq.freeze!
@thread = safe_thread("heartbeat", &method(:start_heartbeat))
@thread = safe_thread("heartbeat", &method(:start_heartbeat)) if async_beat
@poller.start
@managers.each(&:start)
end

# Stops this instance from processing any more jobs,
#
def quiet
return if @done

Expand Down Expand Up @@ -71,18 +73,30 @@ def stopping?
@done
end

# If embedding Sidekiq, you can have the process heartbeat
# call this method to regularly heartbeat rather than creating
# a separate thread.
def heartbeat
end

private unless $TESTING

BEAT_PAUSE = 10

def start_heartbeat
loop do
heartbeat
beat
sleep BEAT_PAUSE
end
logger.info("Heartbeat stopping...")
end

def beat
$0 = PROCTITLES.map { |proc| proc.call(self, to_data) }.compact.join(" ") unless @embedded
end

def clear_heartbeat
flush_stats

Expand All @@ -99,12 +113,6 @@ def clear_heartbeat
# best effort, ignore network errors
end

def heartbeat
$0 = PROCTITLES.map { |proc| proc.call(self, to_data) }.compact.join(" ") unless @embedded

end

def flush_stats
fails = Processor::FAILURE.reset
procd = Processor::PROCESSED.reset
Expand Down
2 changes: 1 addition & 1 deletion test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

require "minitest/pride"
require "maxitest/autorun"
# require "maxitest/threads"
require "maxitest/threads"

$TESTING = true
# disable minitest/parallel threads
Expand Down
151 changes: 39 additions & 112 deletions test/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
require "sidekiq/launcher"

describe Sidekiq::Launcher do
subject do
Sidekiq::Launcher.new(@config)
end

before do
@config = reset!
@config.default_capsule.concurrency = 3
Expand All @@ -23,13 +19,15 @@
end

it "starts and stops" do
subject.run
subject.stop
l = Sidekiq::Launcher.new(@config)
l.run(async_beat: false)
l.stop
end

describe "heartbeat" do
before do
@id = subject.identity
@launcher = Sidekiq::Launcher.new(@config)
@id = @launcher.identity

Sidekiq::Processor::WORK_STATE.set("a", {"b" => 1})

Expand All @@ -41,123 +39,52 @@
$0 = @proctitle
end

describe "#heartbeat" do
describe "run" do
it "sets sidekiq version, tag and the number of busy workers to proctitle" do
subject.heartbeat

assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy]", $0
end

it "stores process info in redis" do
subject.heartbeat

workers, rtt = @config.redis { |c| c.hmget(subject.identity, "busy", "rtt_us") }

assert_equal "1", workers
refute_nil rtt
assert_in_delta 1000, rtt.to_i, 1000
it "stores process info in redis" do
@launcher.beat

expires = @config.redis { |c| c.pttl(subject.identity) }
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy]", $0
workers, rtt = @config.redis { |c| c.hmget(@id, "busy", "rtt_us") }

assert_in_delta 60000, expires, 500
end
assert_equal "1", workers
refute_nil rtt
assert_in_delta 1000, rtt.to_i, 1000

describe "events" do
before do
@cnt = 0
expires = @config.redis { |c| c.pttl(@id) }
assert_in_delta 60000, expires, 500
end

@config.on(:heartbeat) do
@cnt += 1
end
end
it "fires start heartbeat event only once" do
cnt = 0

it "fires start heartbeat event only once" do
assert_equal 0, @cnt
subject.heartbeat
assert_equal 1, @cnt
subject.heartbeat
assert_equal 1, @cnt
end
end
@config.on(:heartbeat) do
cnt += 1
end
assert_equal 0, cnt
@launcher.heartbeat
assert_equal 1, cnt
@launcher.heartbeat
assert_equal 1, cnt
end

describe "quiet" do
before do
subject.quiet
end

it "sets stopping proctitle" do
subject.heartbeat

assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] stopping", $0
end

it "stores process info in redis" do
subject.heartbeat

info = @config.redis { |c| c.hmget(subject.identity, "busy") }

assert_equal ["1"], info

expires = @config.redis { |c| c.pttl(subject.identity) }
it "quiets" do
@launcher.quiet
@launcher.beat

assert_in_delta 60000, expires, 50
end
end
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] stopping", $0

it "fires new heartbeat events" do
i = 0
@config.on(:heartbeat) do
i += 1
end
assert_equal 0, i
subject.heartbeat
assert_equal 1, i
subject.heartbeat
assert_equal 1, i
end
@launcher.beat
info = @config.redis { |c| c.hmget(@id, "busy") }
assert_equal ["1"], info

describe "when manager is active" do
before do
Sidekiq::Launcher::PROCTITLES << proc { "xyz" }
subject.heartbeat
Sidekiq::Launcher::PROCTITLES.pop
end

it "sets useful info to proctitle" do
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] xyz", $0
end

it "stores process info in redis" do
info = @config.redis { |c| c.hmget(@id, "busy") }
assert_equal ["1"], info
expires = @config.redis { |c| c.pttl(@id) }
assert_in_delta 60000, expires, 500
end
end
expires = @config.redis { |c| c.pttl(@id) }
assert_in_delta 60000, expires, 50
end

describe "when manager is stopped" do
before do
subject.quiet
subject.heartbeat
end

# after do
# puts system('redis-cli -n 15 keys "*" | while read LINE ; do TTL=`redis-cli -n 15 ttl "$LINE"`; if [ "$TTL" -eq -1 ]; then echo "$LINE"; fi; done;')
# end

it "indicates stopping status in proctitle" do
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] stopping", $0
end

it "stores process info in redis" do
info = @config.redis { |c| c.hmget(@id, "busy") }
assert_equal ["1"], info
expires = @config.redis { |c| c.pttl(@id) }
assert_in_delta 60000, expires, 50
end
it "allows arbitrary proctitle extensions" do
Sidekiq::Launcher::PROCTITLES << proc { "xyz" }
@launcher.beat
Sidekiq::Launcher::PROCTITLES.pop
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] xyz", $0
end
end
end
11 changes: 4 additions & 7 deletions test/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@ def new_manager
mgr = new_manager
init_size = mgr.workers.size
processor = mgr.workers.first
begin
mgr.processor_result(processor, "ignored")
mgr.quiet
mgr.processor_result(processor, "ignored")

assert_equal init_size, mgr.workers.size
refute mgr.workers.include?(processor)
ensure
mgr.quiet
end
assert_equal init_size - 1, mgr.workers.size
refute mgr.workers.include?(processor)
end
end

0 comments on commit 636fa9a

Please sign in to comment.