Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Complement image: propagate SIGTERM to all workers (#13914)
Browse files Browse the repository at this point in the history
This should mean that logs from worker processes are flushed before shutdown.

When a test completes, Complement stops the docker container, which means that
synapse will receive a SIGTERM. Currently, the `complement_fork_starter` exits
immediately (without notifying the worker processes), which means that the
workers never get a chance to flush their logs before the whole container is
vaped. We can fix this by propagating the SIGTERM to the children.
  • Loading branch information
richvdh authored Sep 26, 2022
1 parent 2fae1a3 commit d6b85a2
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
1 change: 1 addition & 0 deletions changelog.d/13914.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Complement image: propagate SIGTERM to all workers.
32 changes: 30 additions & 2 deletions synapse/app/complement_fork_starter.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,18 @@
import importlib
import itertools
import multiprocessing
import os
import signal
import sys
from typing import Any, Callable, List
from types import FrameType
from typing import Any, Callable, List, Optional

from twisted.internet.main import installReactor

# a list of the original signal handlers, before we installed our custom ones.
# We restore these in our child processes.
_original_signal_handlers: dict[int, Any] = {}


class ProxiedReactor:
"""
Expand Down Expand Up @@ -105,6 +112,11 @@ def _worker_entrypoint(

sys.argv = args

# reset the custom signal handlers that we installed, so that the children start
# from a clean slate.
for sig, handler in _original_signal_handlers.items():
signal.signal(sig, handler)

from twisted.internet.epollreactor import EPollReactor

proxy_reactor._install_real_reactor(EPollReactor())
Expand Down Expand Up @@ -167,13 +179,29 @@ def main() -> None:
update_proc.join()
print("===== PREPARED DATABASE =====", file=sys.stderr)

processes: List[multiprocessing.Process] = []

# Install signal handlers to propagate signals to all our children, so that they
# shut down cleanly. This also inhibits our own exit, but that's good: we want to
# wait until the children have exited.
def handle_signal(signum: int, frame: Optional[FrameType]) -> None:
print(
f"complement_fork_starter: Caught signal {signum}. Stopping children.",
file=sys.stderr,
)
for p in processes:
if p.pid:
os.kill(p.pid, signum)

for sig in (signal.SIGINT, signal.SIGTERM):
_original_signal_handlers[sig] = signal.signal(sig, handle_signal)

# At this point, we've imported all the main entrypoints for all the workers.
# Now we basically just fork() out to create the workers we need.
# Because we're using fork(), all the workers get a clone of this launcher's
# memory space and don't need to repeat the work of loading the code!
# Instead of using fork() directly, we use the multiprocessing library,
# which uses fork() on Unix platforms.
processes = []
for (func, worker_args) in zip(worker_functions, args_by_worker):
process = multiprocessing.Process(
target=_worker_entrypoint, args=(func, proxy_reactor, worker_args)
Expand Down

0 comments on commit d6b85a2

Please sign in to comment.