Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New multiprocess manager #2183

Merged
merged 65 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
bb4efdd
New multiprocess manager
abersheeran Dec 15, 2023
fde0284
lint it
abersheeran Dec 15, 2023
efd22a3
Fixed test
abersheeran Dec 15, 2023
c602782
Fixed `Process`.`__init__`
abersheeran Dec 15, 2023
c41e48d
Fix signal handling in Multiprocess class
abersheeran Dec 15, 2023
cf71b61
Update coverage fail_under value
abersheeran Dec 15, 2023
3928b3d
Remove redundant log message
abersheeran Dec 15, 2023
31c300b
Update coverage fail_under value
abersheeran Dec 15, 2023
fb13e27
Update coverage fail_under value
abersheeran Dec 15, 2023
993e695
Update fail_under value in coverage report
abersheeran Dec 15, 2023
29b3ad6
Remove unused threading event
abersheeran Dec 15, 2023
637a372
Merge branch 'master' of https://github.com/encode/uvicorn into multi…
abersheeran Mar 4, 2024
f767d9c
lint
abersheeran Mar 4, 2024
2a7e193
more tests
abersheeran Mar 5, 2024
cc7a2e1
More tests and fix bug
abersheeran Mar 5, 2024
636080a
lint
abersheeran Mar 5, 2024
939ed2b
Add pytest.mark.skipif for SIGHUP test on Windows
abersheeran Mar 5, 2024
e9760f4
delete unused code
abersheeran Mar 5, 2024
a207664
More tests
abersheeran Mar 5, 2024
b8947b0
Try fix tests in Windows
abersheeran Mar 5, 2024
8cbd3c2
make linter feels great
abersheeran Mar 5, 2024
edae1a9
delete pytest-xdist
abersheeran Mar 5, 2024
c724cfa
Try fix test in windows
abersheeran Mar 5, 2024
3511f48
Try make mypy happy
abersheeran Mar 5, 2024
298906a
Skip tests in windows
abersheeran Mar 5, 2024
f1f4d64
lint
abersheeran Mar 5, 2024
fe2fb08
Try test basic run in Windows
abersheeran Mar 5, 2024
fb46e20
Try fix error in Windows
abersheeran Mar 5, 2024
197fa3d
lint
abersheeran Mar 5, 2024
33f5692
Skip tests in window
abersheeran Mar 5, 2024
d3c4484
Try test in window
abersheeran Mar 5, 2024
d3f090a
lint
abersheeran Mar 5, 2024
4bae355
Add import statement and set current working directory in test_multip…
abersheeran Mar 5, 2024
b910b3f
lint
abersheeran Mar 5, 2024
6d2c6b6
giveup
abersheeran Mar 5, 2024
7ee21af
Refactor signal handling in Multiprocess class
abersheeran Mar 6, 2024
9397e8e
Merge branch 'master' into multiprocess-manager
abersheeran Apr 2, 2024
8efcad3
Tests in windows
abersheeran Apr 3, 2024
7c683ca
lint
abersheeran Apr 3, 2024
4e3ac6e
lint
abersheeran Apr 3, 2024
404b864
ignore mypy check in linux
abersheeran Apr 3, 2024
d3a3aab
Add __init__.py
abersheeran Apr 3, 2024
736510a
fix warning
abersheeran Apr 3, 2024
42c7187
coverage ignore
abersheeran Apr 3, 2024
6fce985
Update coverage
abersheeran Apr 4, 2024
6e916f3
coverage
abersheeran Apr 4, 2024
b8ee655
Add documents
abersheeran Apr 5, 2024
66395d0
Update docs/deployment.md
abersheeran Apr 14, 2024
a19f737
Update uvicorn/supervisors/multiprocess.py
abersheeran Apr 14, 2024
658a4fa
Update uvicorn/supervisors/multiprocess.py
abersheeran Apr 14, 2024
43e0190
Update uvicorn/supervisors/multiprocess.py
abersheeran Apr 14, 2024
cc25743
Update uvicorn/supervisors/multiprocess.py
abersheeran Apr 14, 2024
5f23d2f
Update uvicorn/supervisors/multiprocess.py
abersheeran Apr 14, 2024
a5d19cb
Update uvicorn/supervisors/multiprocess.py
abersheeran Apr 14, 2024
1b3ddc7
Update uvicorn/supervisors/multiprocess.py
abersheeran Apr 14, 2024
1e8db13
Update uvicorn/supervisors/multiprocess.py
abersheeran Apr 14, 2024
a4209cf
Do not output the PID information repeatedly.
abersheeran Apr 14, 2024
dbc34c8
Fix occasional abnormal exits.
abersheeran Apr 14, 2024
a9d4b63
Update docs
abersheeran May 25, 2024
b740e2d
Change subprocess termination logic in Multiprocess class
abersheeran May 25, 2024
e78ca8c
Merge branch 'master' into multiprocess-manager
abersheeran May 25, 2024
59aa0be
Update new_console_in_windows function to include pragma statement
abersheeran May 25, 2024
5281eee
Revert coverage to 98.35
abersheeran May 25, 2024
15ffc6f
chore: Remove pragma statements from test_multiprocess functions
abersheeran May 25, 2024
a1054e7
chore: Exclude test_multiprocess.py from coverage on Windows
abersheeran May 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ omit = [

[tool.coverage.report]
precision = 2
fail_under = 98.35
fail_under = 96.82
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not willing to introduce this feature without testing it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I just want to see your attitude towards this PR before adding the test. Because I remember you have implemented something similar before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, then let me review and get back to you.

show_missing = true
skip_covered = true
exclude_lines = [
Expand Down
11 changes: 9 additions & 2 deletions tests/supervisors/test_multiprocess.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

import signal
import socket
import threading
import time

from uvicorn import Config
from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope
Expand All @@ -16,6 +17,12 @@ def run(sockets: list[socket.socket] | None) -> None:
pass # pragma: no cover


def stop_run(stop) -> None:
while True:
time.sleep(1)
stop()


def test_multiprocess_run() -> None:
"""
A basic sanity check.
Expand All @@ -25,5 +32,5 @@ def test_multiprocess_run() -> None:
"""
config = Config(app=app, workers=2)
supervisor = Multiprocess(config, target=run, sockets=[])
supervisor.signal_handler(sig=signal.SIGINT, frame=None)
threading.Thread(target=stop_run, args=(supervisor.handle_int,), daemon=True).start()
supervisor.run()
223 changes: 193 additions & 30 deletions uvicorn/supervisors/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,101 @@
import os
import signal
import threading
from multiprocessing.context import SpawnProcess
from multiprocessing import Pipe
from socket import socket
from types import FrameType
from typing import Callable
from typing import Any, Callable

import click

from uvicorn._subprocess import get_subprocess
from uvicorn.config import Config

HANDLED_SIGNALS = (
signal.SIGINT, # Unix signal 2. Sent by Ctrl+C.
signal.SIGTERM, # Unix signal 15. Sent by `kill <pid>`.
)
UNIX_SIGNALS = {
getattr(signal, f"SIG{x}"): x for x in "HUP QUIT TTIN TTOU USR1 USR2 WINCH".split() if hasattr(signal, f"SIG{x}")
}

logger = logging.getLogger("uvicorn.error")


class Process:
def __init__(
self,
config: Config,
target: Callable[[list[socket] | None], None],
sockets: list[socket],
) -> None:
self.real_target = target

self.parent_conn, self.child_conn = Pipe()
self.process = get_subprocess(config, self.target, sockets)

def ping(self, timeout: float = 5) -> bool:
self.parent_conn.send(b"ping")
if self.parent_conn.poll(timeout):
self.parent_conn.recv()
return True
return False

def pong(self) -> None:
self.child_conn.recv()
self.child_conn.send(b"pong")

def always_pong(self) -> None:
while True:
self.pong()
abersheeran marked this conversation as resolved.
Show resolved Hide resolved

def target(self, sockets: list[socket] | None = None) -> Any:
if os.name == "nt":
# Windows doesn't support SIGTERM, so we use SIGBREAK instead.
# And then we raise SIGTERM when SIGBREAK is received.
# https://learn.microsoft.com/zh-cn/cpp/c-runtime-library/reference/signal?view=msvc-170
signal.signal(
signal.SIGBREAK, # type: ignore[attr-defined]
lambda sig, frame: signal.raise_signal(signal.SIGTERM),
)

threading.Thread(target=self.always_pong, daemon=True).start()
return self.real_target(sockets)

def is_alive(self, timeout: float = 5) -> bool:
if not self.process.is_alive():
return False

return self.ping(timeout)

def start(self) -> None:
self.process.start()
logger.info(f"Started child process [{self.process.pid}]")

def terminate(self) -> None:
if self.process.exitcode is not None:
return
assert self.process.pid is not None
if os.name == "nt":
# Windows doesn't support SIGTERM.
# So send SIGBREAK, and then in process raise SIGTERM.
os.kill(self.process.pid, signal.CTRL_BREAK_EVENT) # type: ignore[attr-defined]
else:
os.kill(self.process.pid, signal.SIGTERM)
logger.info(f"Terminated child process [{self.process.pid}]")

self.parent_conn.close()
self.child_conn.close()

def kill(self) -> None:
# In Windows, the method will call `TerminateProcess` to kill the process.
# In Unix, the method will send SIGKILL to the process.
self.process.kill()

def join(self) -> None:
logger.info(f"Waiting for child process [{self.process.pid}]")
self.process.join()

@property
def pid(self) -> int | None:
return self.process.pid


class Multiprocess:
def __init__(
self,
Expand All @@ -32,39 +109,125 @@ def __init__(
self.config = config
self.target = target
self.sockets = sockets
self.processes: list[SpawnProcess] = []
self.should_exit = threading.Event()
self.pid = os.getpid()

def signal_handler(self, sig: int, frame: FrameType | None) -> None:
"""
A signal handler that is registered with the parent process.
"""
self.should_exit.set()
self.processes_num = config.workers
self.processes: list[Process] = []

def run(self) -> None:
self.startup()
self.should_exit.wait()
self.shutdown()
self.should_exit = threading.Event()

def startup(self) -> None:
message = f"Started parent process [{str(self.pid)}]"
color_message = "Started parent process [{}]".format(click.style(str(self.pid), fg="cyan", bold=True))
logger.info(message, extra={"color_message": color_message})
self.signal_queue: list[int] = []
for sig in UNIX_SIGNALS:
signal.signal(sig, lambda sig, frame: self.signal_queue.append(sig))

for sig in HANDLED_SIGNALS:
signal.signal(sig, self.signal_handler)
# Sent by Ctrl+C.
signal.signal(signal.SIGINT, lambda sig, frame: self.handle_int())
# Sent by `kill <pid>`. Not sent on Windows.
signal.signal(signal.SIGTERM, lambda sig, frame: self.handle_term())
if os.name == "nt":
# Sent by `Ctrl+Break` on Windows.
signal.signal(signal.SIGBREAK, lambda sig, frame: self.handle_break()) # type: ignore[attr-defined]

for _idx in range(self.config.workers):
process = get_subprocess(config=self.config, target=self.target, sockets=self.sockets)
def init_processes(self) -> None:
for _ in range(self.processes_num):
process = Process(self.config, self.target, self.sockets)
process.start()
self.processes.append(process)

def shutdown(self) -> None:
def terminate_all(self) -> None:
for process in self.processes:
process.terminate()

def join_all(self) -> None:
for process in self.processes:
process.join()

message = f"Stopping parent process [{str(self.pid)}]"
color_message = "Stopping parent process [{}]".format(click.style(str(self.pid), fg="cyan", bold=True))
def restart_all(self) -> None:
for idx, process in enumerate(tuple(self.processes)):
process.terminate()
process.join()
del self.processes[idx]
process = Process(self.config, self.target, self.sockets)
process.start()
abersheeran marked this conversation as resolved.
Show resolved Hide resolved
self.processes.append(process)

def run(self) -> None:
message = f"Started parent process [{os.getpid()}]"
color_message = "Started parent process [{}]".format(click.style(str(os.getpid()), fg="cyan", bold=True))
logger.info(message, extra={"color_message": color_message})

self.init_processes()

while not self.should_exit.wait(0.5):
abersheeran marked this conversation as resolved.
Show resolved Hide resolved
self.handle_signals()
self.keep_subprocess_alive()

self.terminate_all()
self.join_all()

message = f"Stopping parent process [{os.getpid()}]"
color_message = "Stopping parent process [{}]".format(click.style(str(os.getpid()), fg="cyan", bold=True))
logger.info(message, extra={"color_message": color_message})

def keep_subprocess_alive(self) -> None:
for idx, process in enumerate(tuple(self.processes)):
if process.is_alive():
continue

process.kill() # process is hung, kill it
process.join()
logger.info(f"Child process [{process.pid}] died")
del self.processes[idx]
process = Process(self.config, self.target, self.sockets)
process.start()
self.processes.append(process)

def handle_signals(self) -> None:
for sig in tuple(self.signal_queue):
self.signal_queue.remove(sig)
sig_name = UNIX_SIGNALS[sig]
sig_handler = getattr(self, f"handle_{sig_name.lower()}", None)
if sig_handler is not None:
sig_handler()
else:
logger.info(f"Received signal [{sig_name}], but nothing to do")
abersheeran marked this conversation as resolved.
Show resolved Hide resolved

def handle_int(self) -> None:
if not self.should_exit.is_set():
self.should_exit.set()
else:
self.terminate_all()

def handle_term(self) -> None:
logger.info("Received SIGTERM, exiting")
abersheeran marked this conversation as resolved.
Show resolved Hide resolved
if not self.should_exit.is_set():
self.should_exit.set()
else:
self.terminate_all()

def handle_break(self) -> None:
logger.info("Received SIGBREAK, exiting")
abersheeran marked this conversation as resolved.
Show resolved Hide resolved
if not self.should_exit.is_set():
self.should_exit.set()
else:
self.terminate_all()
abersheeran marked this conversation as resolved.
Show resolved Hide resolved

def handle_hup(self) -> None:
logger.info("Received SIGHUP, restarting processes")
abersheeran marked this conversation as resolved.
Show resolved Hide resolved
self.restart_all()
abersheeran marked this conversation as resolved.
Show resolved Hide resolved

def handle_ttin(self) -> None:
logger.info("Received SIGTTIN, increasing processes")
abersheeran marked this conversation as resolved.
Show resolved Hide resolved
self.processes_num += 1
process = Process(self.config, self.target, self.sockets)
process.start()
self.processes.append(process)
abersheeran marked this conversation as resolved.
Show resolved Hide resolved

def handle_ttou(self) -> None:
logger.info("Received SIGTTOU, decreasing processes")
abersheeran marked this conversation as resolved.
Show resolved Hide resolved
if self.processes_num <= 1:
logger.info("Cannot decrease processes any more")
abersheeran marked this conversation as resolved.
Show resolved Hide resolved
return
self.processes_num -= 1
process = self.processes.pop()
process.terminate()
process.join()
abersheeran marked this conversation as resolved.
Show resolved Hide resolved