Skip to content

Commit

Permalink
Handle fork_sibling failing repeatedly
Browse files Browse the repository at this point in the history
Ref: #79

It can happen that the new mold was forked while at an unsafe
point, causing the middle process to crash.

When we detect this happens, we should abandon this mold.

Currently abandoning the mold cause a graceful shutdown,
in the future we could try creating a new mold to replace it.

Co-Authored-By: Étienne Barrié <[email protected]>
  • Loading branch information
byroot and etiennebarrie committed Dec 8, 2023
1 parent 1be123e commit 9f1fd81
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 32 deletions.
2 changes: 1 addition & 1 deletion bin/dev-console
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# this script gives a quick dev environment for macOS
# and Windows contributors.
runner="docker"
if command -v podman >/dev/null 2>&1; then
if ! command -v docker >/dev/null 2>&1; then
runner="podman"
fi

Expand Down
29 changes: 23 additions & 6 deletions lib/pitchfork.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ module Pitchfork
ClientShutdown = Class.new(EOFError)

BootFailure = Class.new(StandardError)
ForkFailure = Class.new(StandardError)

# :stopdoc:

FORK_TIMEOUT = 5
FORK_LOCK = Monitor.new
@socket_type = :SOCK_SEQPACKET

Expand Down Expand Up @@ -198,39 +200,54 @@ def clean_fork(setpgid: true, &block)

def fork_sibling(role, &block)
if REFORKING_AVAILABLE
r, w = IO.pipe
# We double fork so that the new worker is re-attached back
# to the master.
# This requires either PR_SET_CHILD_SUBREAPER which is exclusive to Linux 3.4
# or the master to be PID 1.
if middle_pid = FORK_LOCK.synchronize { Process.fork } # parent
w.close
# We need to wait(2) so that the middle process doesn't end up a zombie.
# The process only call fork again an exit so it should be pretty fast.
# However it might need to execute some `Process._fork` or `at_exit` callbacks,
# so it case it takes more than 5 seconds to exit, we kill it with SIGBUS
# to produce a crash report, as this is indicative of a nasty bug.
process_wait_with_timeout(middle_pid, 5, :BUS)
status = process_wait_with_timeout(middle_pid, FORK_TIMEOUT, :BUS)
pid_str = r.gets
r.close
if pid_str
Integer(pid_str)
else
raise ForkFailure, "fork_sibling didn't succeed in #{FORK_TIMEOUT} seconds"
end
else # first child
r.close
Process.setproctitle("<pitchfork fork_sibling(#{role})>")
clean_fork(&block) # detach into a grand child
pid = clean_fork do
# detach into a grand child
w.close
yield
end
w.puts(pid)
w.close
exit
end
else
clean_fork(&block)
end

nil # it's tricky to return the PID
end

def process_wait_with_timeout(pid, timeout, timeout_signal = :KILL)
(timeout * 200).times do
status = Process.wait(pid, Process::WNOHANG)
_, status = Process.waitpid2(pid, Process::WNOHANG)
return status if status
sleep 0.005 # 200 * 5ms => 1s
end

# The process didn't exit in the allotted time, so we kill it.
Process.kill(timeout_signal, pid)
Process.wait(pid)
_, status = Process.waitpid2(pid)
status
end

def time_now(int = false)
Expand Down
8 changes: 8 additions & 0 deletions lib/pitchfork/children.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,21 @@ def nr_alive?(nr)
@workers.key?(nr)
end

def abandon(worker)
@workers.delete(worker.nr)
@pending_workers.delete(worker.nr)
end

def reap(pid)
if child = @children.delete(pid)
@pending_workers.delete(child.nr)
@pending_molds.delete(child.pid)
@molds.delete(child.pid)
@workers.delete(child.nr)
if @mold == child
@pending_workers.reject! do |nr, worker|
worker.generation == @mold.generation
end
@mold = nil
end
end
Expand Down
29 changes: 24 additions & 5 deletions lib/pitchfork/http_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ def join
end
end
stop # gracefully shutdown all workers on our way out
logger.info "master complete"
logger.info "master complete status=#{@exit_status}"
@exit_status
end

Expand Down Expand Up @@ -368,7 +368,7 @@ def monitor_loop(sleep = true)
when Message::WorkerSpawned
worker = @children.update(message)
# TODO: should we send a message to the worker to acknowledge?
logger.info "worker=#{worker.nr} pid=#{worker.pid} registered"
logger.info "worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} registered"
when Message::MoldSpawned
new_mold = @children.update(message)
logger.info("mold pid=#{new_mold.pid} gen=#{new_mold.generation} spawned")
Expand Down Expand Up @@ -409,6 +409,7 @@ def stop(graceful = true)
end

def worker_exit(worker)
logger.info "worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} exiting"
proc_name status: "exiting"

if @before_worker_exit
Expand Down Expand Up @@ -514,6 +515,12 @@ def murder_lazy_workers
end

def hard_timeout(worker)
if worker.pid.nil? # Not yet registered, likely never spawned
logger.error "worker=#{worker.nr} timed out during spawn, abandoning"
@children.abandon(worker)
return
end

if @after_worker_hard_timeout
begin
@after_worker_hard_timeout.call(self, worker)
Expand All @@ -523,9 +530,9 @@ def hard_timeout(worker)
end

if worker.mold?
logger.error "mold pid=#{worker.pid} timed out, killing"
logger.error "mold pid=#{worker.pid} gen=#{worker.generation} timed out, killing"
else
logger.error "worker=#{worker.nr} pid=#{worker.pid} timed out, killing"
logger.error "worker=#{worker.nr} pid=#{worker.pid} gen=#{worker.generation} timed out, killing"
end
@children.hard_timeout(worker) # take no prisoners for hard timeout violations
end
Expand Down Expand Up @@ -599,6 +606,8 @@ def spawn_missing_workers
worker = Pitchfork::Worker.new(worker_nr)

if REFORKING_AVAILABLE
worker.generation = @children.mold&.generation || 0

unless @children.mold&.spawn_worker(worker)
@logger.error("Failed to send a spawn_worker command")
end
Expand Down Expand Up @@ -854,7 +863,7 @@ def worker_loop(worker)
if @refork_condition.met?(worker, logger)
proc_name status: "requests: #{worker.requests_count}, spawning mold"
if spawn_mold(worker.generation)
logger.info("Refork condition met, promoting ourselves")
logger.info("worker=#{worker.nr} gen=#{worker.generation} Refork condition met, promoting ourselves")
end
@refork_condition.backoff!
end
Expand Down Expand Up @@ -910,8 +919,18 @@ def mold_loop(mold)
when false
# no message, keep looping
when Message::SpawnWorker
retries = 1
begin
spawn_worker(Worker.new(message.nr, generation: mold.generation), detach: true)
rescue ForkFailure
if retries > 0
@logger.fatal("mold pid=#{mold.pid} gen=#{mold.generation} Failed to spawn a worker. Retrying.")
retries -= 1
retry
else
@logger.fatal("mold pid=#{mold.pid} gen=#{mold.generation} Failed to spawn a worker twice in a row. Corrupted mold process?")
Process.exit(1)
end
rescue => error
raise BootFailure, error.message
end
Expand Down
4 changes: 2 additions & 2 deletions test/integration/test_boot.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ def test_boot_worker_stuck_in_spawn
assert_healthy("http://#{addr}:#{port}")

assert_stderr("worker=0 gen=0 ready")
assert_stderr(/worker=1 pid=\d+ registered/)
assert_stderr(/worker=1 pid=\d+ timed out, killing/, timeout: 4)
assert_stderr(/worker=1 pid=\d+ gen=0 registered/)
assert_stderr(/worker=1 pid=\d+ gen=0 timed out, killing/, timeout: 4)

assert_clean_shutdown(pid)
end
Expand Down
38 changes: 38 additions & 0 deletions test/integration/test_reforking.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,44 @@ def test_reforking_broken_after_mold_fork_hook
assert_clean_shutdown(pid)
end

def test_broken_mold
addr, port = unused_port

pid = spawn_server(app: File.join(ROOT, "test/integration/env.ru"), config: <<~CONFIG)
listen "#{addr}:#{port}"
worker_processes 2
spawn_timeout 2
refork_after [5, 5]
after_mold_fork do |_server, mold|
if mold.generation > 0
def Process.fork
# simulate some issue causing children to fail.
# Typically some native background thread holding a lock
# right when we fork.
Process.spawn("false")
end
end
end
CONFIG

assert_healthy("http://#{addr}:#{port}")
assert_stderr "worker=0 gen=0 ready"
assert_stderr "worker=1 gen=0 ready", timeout: 5

9.times do
assert_equal true, healthy?("http://#{addr}:#{port}")
end

assert_stderr "Refork condition met, promoting ourselves", timeout: 3

assert_stderr "Failed to spawn a worker. Retrying."
assert_stderr "Failed to spawn a worker twice in a row. Corrupted mold process?"
assert_stderr "No mold alive, shutting down"
assert_stderr "timed out during spawn, abandoning", timeout: 5

assert_exited(pid, 1, timeout: 5)
end

def test_fork_unsafe
addr, port = unused_port

Expand Down
51 changes: 33 additions & 18 deletions test/integration_test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ def unused_port
private

def assert_stderr(pattern, timeout: 1)
wait_stderr?(pattern, timeout)
assert_match(pattern, read_stderr)
print_stderr_on_error do
wait_stderr?(pattern, timeout)
assert_match(pattern, read_stderr)
end
end

def read_stderr
Expand All @@ -115,26 +117,30 @@ def wait_stderr?(pattern, timeout)
false
end

def assert_clean_shutdown(pid, timeout = 4)
Process.kill("QUIT", pid)
status = nil
(timeout * 2).times do
Process.kill(0, pid)
break if status = Process.wait2(pid, Process::WNOHANG)
sleep 0.5
def assert_clean_shutdown(pid, timeout: 4)
print_stderr_on_error do
Process.kill("QUIT", pid)
status = nil
(timeout * 2).times do
Process.kill(0, pid)
break if status = Process.wait2(pid, Process::WNOHANG)
sleep 0.5
end
assert status, "process pid=#{pid} didn't exit in #{timeout} seconds"
assert_predicate status[1], :success?
end
assert status, "process pid=#{pid} didn't exit in #{timeout} seconds"
assert_predicate status[1], :success?
end

def assert_exited(pid, exitstatus, timeout = 4)
status = nil
(timeout * 2).times do
break if status = Process.wait2(pid, Process::WNOHANG)
sleep 0.5
def assert_exited(pid, exitstatus, timeout: 4)
print_stderr_on_error do
status = nil
(timeout * 2).times do
break if status = Process.wait2(pid, Process::WNOHANG)
sleep 0.5
end
assert status, "process pid=#{pid} didn't exit in #{timeout} seconds"
assert_equal exitstatus, status[1].exitstatus
end
assert status, "process pid=#{pid} didn't exit in #{timeout} seconds"
assert_equal exitstatus, status[1].exitstatus
end

def assert_healthy(host, timeout = 2)
Expand Down Expand Up @@ -179,5 +185,14 @@ def spawn(*args)
@_pids << pid
pid
end

def print_stderr_on_error
yield
rescue Minitest::Assertion
puts '=' * 40
puts read_stderr
puts '=' * 40
raise
end
end
end
2 changes: 2 additions & 0 deletions test/unit/test_children.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ def test_message_worker_spawned
worker = Worker.new(0)
@children.register(worker)
assert_predicate @children, :pending_workers?
assert @children.nr_alive?(0)

@children.update(Message::WorkerSpawned.new(0, 42, 0, pipe))
refute_predicate @children, :pending_workers?
assert @children.nr_alive?(0), @children.inspect
assert_equal 42, worker.pid
assert_equal [worker], @children.workers
end
Expand Down

0 comments on commit 9f1fd81

Please sign in to comment.