From bb4efdd4d483e8213e0d60c992a016f88eba234d Mon Sep 17 00:00:00 2001 From: abersheeran Date: Fri, 15 Dec 2023 11:17:15 +0800 Subject: [PATCH 01/62] New multiprocess manager --- tests/supervisors/test_multiprocess.py | 2 +- uvicorn/supervisors/multiprocess.py | 234 +++++++++++++++++++++---- 2 files changed, 201 insertions(+), 35 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 82dc1118a..20f1c9b67 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -26,5 +26,5 @@ def test_multiprocess_run() -> None: """ config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) - supervisor.signal_handler(sig=signal.SIGINT, frame=None) + supervisor.handle_int() supervisor.run() diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 87ce91f15..b51ae3da4 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -2,24 +2,104 @@ import os import signal import threading -from multiprocessing.context import SpawnProcess +from multiprocessing import Pipe from socket import socket -from types import FrameType -from typing import Callable, List, Optional +from typing import Any, Callable, List, Optional import click from uvicorn._subprocess import get_subprocess from uvicorn.config import Config -HANDLED_SIGNALS = ( - signal.SIGINT, # Unix signal 2. Sent by Ctrl+C. - signal.SIGTERM, # Unix signal 15. Sent by `kill `. -) +UNIX_SIGNALS = { + getattr(signal, f"SIG{x}"): x + for x in "HUP QUIT TTIN TTOU USR1 USR2 WINCH".split() + if hasattr(signal, f"SIG{x}") +} logger = logging.getLogger("uvicorn.error") +class Process: + def __init__( + self, + config: Config, + target: Callable[[Optional[List[socket]]], None], + sockets: List[socket], + ) -> None: + self.config = config + self.real_target = target + self.sockets = sockets + + self.parent_conn, self.child_conn = Pipe() + self.process = get_subprocess(self.config, self.target, self.sockets) + + def ping(self, timeout: float = 5) -> bool: + self.parent_conn.send(b"ping") + if self.parent_conn.poll(timeout): + self.parent_conn.recv() + return True + return False + + def pong(self) -> None: + self.child_conn.recv() + self.child_conn.send(b"pong") + + def always_pong(self) -> None: + while True: + self.pong() + + def target(self) -> Any: + if os.name == "nt": + # Windows doesn't support SIGTERM, so we use SIGBREAK instead. + # And then we raise SIGTERM when SIGBREAK is received. + # https://learn.microsoft.com/zh-cn/cpp/c-runtime-library/reference/signal?view=msvc-170 + signal.signal( + signal.SIGBREAK, lambda sig, frame: signal.raise_signal(signal.SIGTERM) + ) + + threading.Thread(target=self.always_pong, daemon=True).start() + return self.real_target(self.sockets) + + def is_alive(self, timeout: float = 5) -> bool: + if not self.process.is_alive(): + return False + + return self.ping(timeout) + + def start(self) -> None: + self.process.start() + logger.info("Started child process [{}]".format(self.process.pid)) + + def terminate(self) -> None: + if self.process.exitcode is not None: + return + assert self.process.pid is not None + if os.name == "nt": + # Windows doesn't support SIGTERM. + # So send SIGBREAK, and then in process raise SIGTERM. + os.kill(self.process.pid, signal.CTRL_BREAK_EVENT) + else: + os.kill(self.process.pid, signal.SIGTERM) + logger.info("Terminated child process [{}]".format(self.process.pid)) + + self.parent_conn.close() + self.child_conn.close() + + def kill(self) -> None: + # In Windows, the method will call `TerminateProcess` to kill the process. + # In Unix, the method will send SIGKILL to the process. + self.process.kill() + + def join(self) -> None: + logger.info("Waiting for child process [{}]".format(self.process.pid)) + self.process.join() + + @property + def pid(self) -> int | None: + return self.process.pid + + class Multiprocess: def __init__( self, @@ -30,45 +110,131 @@ def __init__( self.config = config self.target = target self.sockets = sockets - self.processes: List[SpawnProcess] = [] - self.should_exit = threading.Event() - self.pid = os.getpid() - def signal_handler(self, sig: int, frame: Optional[FrameType]) -> None: - """ - A signal handler that is registered with the parent process. - """ - self.should_exit.set() + self.processes_num = config.workers + self.processes: list[Process] = [] - def run(self) -> None: - self.startup() - self.should_exit.wait() - self.shutdown() + self.should_exit = threading.Event() + self.keep_alive_checking = threading.Event() - def startup(self) -> None: - message = "Started parent process [{}]".format(str(self.pid)) - color_message = "Started parent process [{}]".format( - click.style(str(self.pid), fg="cyan", bold=True) - ) - logger.info(message, extra={"color_message": color_message}) + self.signal_queue: list[int] = [] + for sig in UNIX_SIGNALS: + signal.signal(sig, lambda sig, frame: self.signal_queue.append(sig)) - for sig in HANDLED_SIGNALS: - signal.signal(sig, self.signal_handler) + # Sent by Ctrl+C. + signal.signal(signal.SIGINT, lambda sig, frame: self.handle_int()) + # Sent by `kill `. Not sent on Windows. + signal.signal(signal.SIGTERM, lambda sig, frame: self.handle_term()) + if os.name == "nt": + # Sent by `Ctrl+Break` on Windows. + signal.signal(signal.SIGBREAK, lambda sig, frame: self.handle_break()) - for _idx in range(self.config.workers): - process = get_subprocess( - config=self.config, target=self.target, sockets=self.sockets - ) + def init_processes(self) -> None: + for _ in range(self.processes_num): + process = Process(self.config, self.target, self.sockets) process.start() self.processes.append(process) - def shutdown(self) -> None: + def terminate_all(self) -> None: + for process in self.processes: + process.terminate() + + def join_all(self) -> None: for process in self.processes: + process.join() + + def restart_all(self) -> None: + for idx, process in enumerate(tuple(self.processes)): process.terminate() process.join() + del self.processes[idx] + process = Process(self.config, self.target, self.sockets) + process.start() + self.processes.append(process) + + def run(self) -> None: + message = "Started parent process [{}]".format(os.getpid()) + color_message = "Started parent process [{}]".format( + click.style(os.getpid(), fg="cyan", bold=True) + ) + logger.info(message, extra={"color_message": color_message}) + + self.init_processes() + + while not self.should_exit.wait(0.5): + self.handle_signals() + self.keep_subprocess_alive() - message = "Stopping parent process [{}]".format(str(self.pid)) + self.join_all() + + message = "Stopping parent process [{}]".format(os.getpid()) color_message = "Stopping parent process [{}]".format( - click.style(str(self.pid), fg="cyan", bold=True) + click.style(os.getpid(), fg="cyan", bold=True) ) logger.info(message, extra={"color_message": color_message}) + + def keep_subprocess_alive(self) -> None: + self.keep_alive_checking.clear() + for idx, process in enumerate(tuple(self.processes)): + if process.is_alive(): + continue + + process.kill() # process is hung, kill it + process.join() + logger.info("Child process [{}] died".format(process.pid)) + del self.processes[idx] + process = Process(self.config, self.target, self.sockets) + process.start() + self.processes.append(process) + self.keep_alive_checking.set() + + def handle_signals(self) -> None: + for sig in tuple(self.signal_queue): + self.signal_queue.remove(sig) + sig_name = UNIX_SIGNALS[sig] + sig_handler = getattr(self, f"handle_{sig_name.lower()}", None) + if sig_handler is not None: + sig_handler() + else: + logger.info(f"Received signal [{sig_name}], but nothing to do") + + def handle_int(self) -> None: + logger.info("Received SIGINT, exiting") + self.should_exit.set() + self.keep_alive_checking.wait() + self.terminate_all() + + def handle_term(self) -> None: + logger.info("Received SIGTERM, exiting") + self.should_exit.set() + self.keep_alive_checking.wait() + self.terminate_all() + + def handle_break(self) -> None: + logger.info("Received SIGBREAK, exiting") + if not self.should_exit.is_set(): + self.should_exit.set() + else: + self.keep_alive_checking.wait() + self.terminate_all() + + def handle_hup(self) -> None: + logger.info("Received SIGHUP, restarting processes") + self.restart_all() + + def handle_ttin(self) -> None: + logger.info("Received SIGTTIN, increasing processes") + self.processes_num += 1 + process = Process(self.config, self.target, self.sockets) + process.start() + self.processes.append(process) + + def handle_ttou(self) -> None: + logger.info("Received SIGTTOU, decreasing processes") + if self.processes_num <= 1: + logger.info("Cannot decrease processes any more") + return + self.processes_num -= 1 + process = self.processes.pop() + process.terminate() + process.join() From fde028434bdaa85022c00fe371637fc0b4669aea Mon Sep 17 00:00:00 2001 From: abersheeran Date: Fri, 15 Dec 2023 11:48:51 +0800 Subject: [PATCH 02/62] lint it --- tests/supervisors/test_multiprocess.py | 1 - uvicorn/supervisors/multiprocess.py | 19 ++++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 20f1c9b67..11069aab4 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -1,4 +1,3 @@ -import signal import socket from typing import List, Optional diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index b51ae3da4..e720f2f68 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -4,7 +4,7 @@ import threading from multiprocessing import Pipe from socket import socket -from typing import Any, Callable, List, Optional +from typing import Any, Callable, List, Optional, Union import click @@ -55,7 +55,8 @@ def target(self) -> Any: # And then we raise SIGTERM when SIGBREAK is received. # https://learn.microsoft.com/zh-cn/cpp/c-runtime-library/reference/signal?view=msvc-170 signal.signal( - signal.SIGBREAK, lambda sig, frame: signal.raise_signal(signal.SIGTERM) + signal.SIGBREAK, # type: ignore[attr-defined] + lambda sig, frame: signal.raise_signal(signal.SIGTERM), ) threading.Thread(target=self.always_pong, daemon=True).start() @@ -78,7 +79,7 @@ def terminate(self) -> None: if os.name == "nt": # Windows doesn't support SIGTERM. # So send SIGBREAK, and then in process raise SIGTERM. - os.kill(self.process.pid, signal.CTRL_BREAK_EVENT) + os.kill(self.process.pid, signal.CTRL_BREAK_EVENT) # type: ignore[attr-defined] else: os.kill(self.process.pid, signal.SIGTERM) logger.info("Terminated child process [{}]".format(self.process.pid)) @@ -96,7 +97,7 @@ def join(self) -> None: self.process.join() @property - def pid(self) -> int | None: + def pid(self) -> Union[int, None]: return self.process.pid @@ -112,12 +113,12 @@ def __init__( self.sockets = sockets self.processes_num = config.workers - self.processes: list[Process] = [] + self.processes: List[Process] = [] self.should_exit = threading.Event() self.keep_alive_checking = threading.Event() - self.signal_queue: list[int] = [] + self.signal_queue: List[int] = [] for sig in UNIX_SIGNALS: signal.signal(sig, lambda sig, frame: self.signal_queue.append(sig)) @@ -127,7 +128,7 @@ def __init__( signal.signal(signal.SIGTERM, lambda sig, frame: self.handle_term()) if os.name == "nt": # Sent by `Ctrl+Break` on Windows. - signal.signal(signal.SIGBREAK, lambda sig, frame: self.handle_break()) + signal.signal(signal.SIGBREAK, lambda sig, frame: self.handle_break()) # type: ignore[attr-defined] def init_processes(self) -> None: for _ in range(self.processes_num): @@ -155,7 +156,7 @@ def restart_all(self) -> None: def run(self) -> None: message = "Started parent process [{}]".format(os.getpid()) color_message = "Started parent process [{}]".format( - click.style(os.getpid(), fg="cyan", bold=True) + click.style(str(os.getpid()), fg="cyan", bold=True) ) logger.info(message, extra={"color_message": color_message}) @@ -169,7 +170,7 @@ def run(self) -> None: message = "Stopping parent process [{}]".format(os.getpid()) color_message = "Stopping parent process [{}]".format( - click.style(os.getpid(), fg="cyan", bold=True) + click.style(str(os.getpid()), fg="cyan", bold=True) ) logger.info(message, extra={"color_message": color_message}) From efd22a3ef1b13867a3442a471339b0dec1d75725 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Fri, 15 Dec 2023 12:01:13 +0800 Subject: [PATCH 03/62] Fixed test --- tests/supervisors/test_multiprocess.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 11069aab4..e33121113 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -1,4 +1,6 @@ import socket +import threading +import time from typing import List, Optional from uvicorn import Config @@ -16,6 +18,12 @@ def run(sockets: Optional[List[socket.socket]]) -> None: pass # pragma: no cover +def stop_run(stop) -> None: + while True: + time.sleep(1) + stop() + + def test_multiprocess_run() -> None: """ A basic sanity check. @@ -25,5 +33,7 @@ def test_multiprocess_run() -> None: """ config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) - supervisor.handle_int() + threading.Thread( + target=stop_run, args=(supervisor.handle_int,), daemon=True + ).start() supervisor.run() From c6027823253d42ea8d43c1d169d3112a63dcc3e7 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Fri, 15 Dec 2023 12:05:32 +0800 Subject: [PATCH 04/62] Fixed `Process`.`__init__` --- uvicorn/supervisors/multiprocess.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index e720f2f68..5aab3eb2a 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -27,12 +27,10 @@ def __init__( target: Callable[[Optional[List[socket]]], None], sockets: List[socket], ) -> None: - self.config = config self.real_target = target - self.sockets = sockets self.parent_conn, self.child_conn = Pipe() - self.process = get_subprocess(self.config, self.target, self.sockets) + self.process = get_subprocess(config, self.target, sockets) def ping(self, timeout: float = 5) -> bool: self.parent_conn.send(b"ping") @@ -49,7 +47,7 @@ def always_pong(self) -> None: while True: self.pong() - def target(self) -> Any: + def target(self, sockets: Optional[List[socket]] = None) -> Any: if os.name == "nt": # Windows doesn't support SIGTERM, so we use SIGBREAK instead. # And then we raise SIGTERM when SIGBREAK is received. @@ -60,7 +58,7 @@ def target(self) -> Any: ) threading.Thread(target=self.always_pong, daemon=True).start() - return self.real_target(self.sockets) + return self.real_target(sockets) def is_alive(self, timeout: float = 5) -> bool: if not self.process.is_alive(): From c41e48db68e1c09d2b19fee0f82522941208c086 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Fri, 15 Dec 2023 12:17:46 +0800 Subject: [PATCH 05/62] Fix signal handling in Multiprocess class --- uvicorn/supervisors/multiprocess.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 5aab3eb2a..6f3406d1b 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -164,6 +164,7 @@ def run(self) -> None: self.handle_signals() self.keep_subprocess_alive() + self.terminate_all() self.join_all() message = "Stopping parent process [{}]".format(os.getpid()) @@ -199,15 +200,19 @@ def handle_signals(self) -> None: def handle_int(self) -> None: logger.info("Received SIGINT, exiting") - self.should_exit.set() - self.keep_alive_checking.wait() - self.terminate_all() + if not self.should_exit.is_set(): + self.should_exit.set() + else: + self.keep_alive_checking.wait() + self.terminate_all() def handle_term(self) -> None: logger.info("Received SIGTERM, exiting") - self.should_exit.set() - self.keep_alive_checking.wait() - self.terminate_all() + if not self.should_exit.is_set(): + self.should_exit.set() + else: + self.keep_alive_checking.wait() + self.terminate_all() def handle_break(self) -> None: logger.info("Received SIGBREAK, exiting") From cf71b6188199f166ef0baef815764f88e12c4e9e Mon Sep 17 00:00:00 2001 From: abersheeran Date: Fri, 15 Dec 2023 12:23:00 +0800 Subject: [PATCH 06/62] Update coverage fail_under value --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 94cef1453..d8bf6f6eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -100,7 +100,7 @@ omit = [ [tool.coverage.report] precision = 2 -fail_under = 98.35 +fail_under = 96.95 show_missing = true skip_covered = true exclude_lines = [ From 3928b3d9acd069c05e3f3dbd4444d751fd008a62 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Fri, 15 Dec 2023 12:28:05 +0800 Subject: [PATCH 07/62] Remove redundant log message --- uvicorn/supervisors/multiprocess.py | 1 - 1 file changed, 1 deletion(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 6f3406d1b..ecf7e3409 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -199,7 +199,6 @@ def handle_signals(self) -> None: logger.info(f"Received signal [{sig_name}], but nothing to do") def handle_int(self) -> None: - logger.info("Received SIGINT, exiting") if not self.should_exit.is_set(): self.should_exit.set() else: From 31c300b68447d944141548c56b2a03aaaa3883a7 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Fri, 15 Dec 2023 12:30:48 +0800 Subject: [PATCH 08/62] Update coverage fail_under value --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d8bf6f6eb..296aa42e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -100,7 +100,7 @@ omit = [ [tool.coverage.report] precision = 2 -fail_under = 96.95 +fail_under = 96.92 show_missing = true skip_covered = true exclude_lines = [ From fb13e273a0f8cd944d5e3bbfabd15bfac9b0ca60 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Fri, 15 Dec 2023 12:36:31 +0800 Subject: [PATCH 09/62] Update coverage fail_under value --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 296aa42e3..85aa9179d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -100,7 +100,7 @@ omit = [ [tool.coverage.report] precision = 2 -fail_under = 96.92 +fail_under = 96.83 show_missing = true skip_covered = true exclude_lines = [ From 993e69517ee20917e40720ad30d3d6dd2018d2e3 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Fri, 15 Dec 2023 12:48:32 +0800 Subject: [PATCH 10/62] Update fail_under value in coverage report --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 85aa9179d..24165a31d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -100,7 +100,7 @@ omit = [ [tool.coverage.report] precision = 2 -fail_under = 96.83 +fail_under = 96.82 show_missing = true skip_covered = true exclude_lines = [ From 29b3ad6029c1c2150a596e159901cbd392d50aa0 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Fri, 15 Dec 2023 17:13:35 +0800 Subject: [PATCH 11/62] Remove unused threading event --- uvicorn/supervisors/multiprocess.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index ecf7e3409..133cf5a41 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -114,7 +114,6 @@ def __init__( self.processes: List[Process] = [] self.should_exit = threading.Event() - self.keep_alive_checking = threading.Event() self.signal_queue: List[int] = [] for sig in UNIX_SIGNALS: @@ -174,7 +173,6 @@ def run(self) -> None: logger.info(message, extra={"color_message": color_message}) def keep_subprocess_alive(self) -> None: - self.keep_alive_checking.clear() for idx, process in enumerate(tuple(self.processes)): if process.is_alive(): continue @@ -186,7 +184,6 @@ def keep_subprocess_alive(self) -> None: process = Process(self.config, self.target, self.sockets) process.start() self.processes.append(process) - self.keep_alive_checking.set() def handle_signals(self) -> None: for sig in tuple(self.signal_queue): @@ -202,7 +199,6 @@ def handle_int(self) -> None: if not self.should_exit.is_set(): self.should_exit.set() else: - self.keep_alive_checking.wait() self.terminate_all() def handle_term(self) -> None: @@ -210,7 +206,6 @@ def handle_term(self) -> None: if not self.should_exit.is_set(): self.should_exit.set() else: - self.keep_alive_checking.wait() self.terminate_all() def handle_break(self) -> None: @@ -218,7 +213,6 @@ def handle_break(self) -> None: if not self.should_exit.is_set(): self.should_exit.set() else: - self.keep_alive_checking.wait() self.terminate_all() def handle_hup(self) -> None: From f767d9c24411805d6f6a85ba87022a1ea4965959 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Mon, 4 Mar 2024 09:44:45 +0800 Subject: [PATCH 12/62] lint --- tests/supervisors/test_multiprocess.py | 6 +--- uvicorn/supervisors/multiprocess.py | 38 +++++++++++--------------- 2 files changed, 17 insertions(+), 27 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 52f8633c1..57a0e118e 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -1,10 +1,8 @@ from __future__ import annotations -import signal import socket import threading import time -from typing import List, Optional from uvicorn import Config from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope @@ -34,7 +32,5 @@ def test_multiprocess_run() -> None: """ config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) - threading.Thread( - target=stop_run, args=(supervisor.handle_int,), daemon=True - ).start() + threading.Thread(target=stop_run, args=(supervisor.handle_int,), daemon=True).start() supervisor.run() diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 6e1ff36e3..6c5d70317 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -6,7 +6,7 @@ import threading from multiprocessing import Pipe from socket import socket -from typing import Any, Callable, List, Optional, Union +from typing import Any, Callable import click @@ -14,9 +14,7 @@ from uvicorn.config import Config UNIX_SIGNALS = { - getattr(signal, f"SIG{x}"): x - for x in "HUP QUIT TTIN TTOU USR1 USR2 WINCH".split() - if hasattr(signal, f"SIG{x}") + getattr(signal, f"SIG{x}"): x for x in "HUP QUIT TTIN TTOU USR1 USR2 WINCH".split() if hasattr(signal, f"SIG{x}") } logger = logging.getLogger("uvicorn.error") @@ -26,8 +24,8 @@ class Process: def __init__( self, config: Config, - target: Callable[[Optional[List[socket]]], None], - sockets: List[socket], + target: Callable[[list[socket] | None], None], + sockets: list[socket], ) -> None: self.real_target = target @@ -49,7 +47,7 @@ def always_pong(self) -> None: while True: self.pong() - def target(self, sockets: Optional[List[socket]] = None) -> Any: + def target(self, sockets: list[socket] | None = None) -> Any: if os.name == "nt": # Windows doesn't support SIGTERM, so we use SIGBREAK instead. # And then we raise SIGTERM when SIGBREAK is received. @@ -70,7 +68,7 @@ def is_alive(self, timeout: float = 5) -> bool: def start(self) -> None: self.process.start() - logger.info("Started child process [{}]".format(self.process.pid)) + logger.info(f"Started child process [{self.process.pid}]") def terminate(self) -> None: if self.process.exitcode is not None: @@ -82,7 +80,7 @@ def terminate(self) -> None: os.kill(self.process.pid, signal.CTRL_BREAK_EVENT) # type: ignore[attr-defined] else: os.kill(self.process.pid, signal.SIGTERM) - logger.info("Terminated child process [{}]".format(self.process.pid)) + logger.info(f"Terminated child process [{self.process.pid}]") self.parent_conn.close() self.child_conn.close() @@ -93,11 +91,11 @@ def kill(self) -> None: self.process.kill() def join(self) -> None: - logger.info("Waiting for child process [{}]".format(self.process.pid)) + logger.info(f"Waiting for child process [{self.process.pid}]") self.process.join() @property - def pid(self) -> Union[int, None]: + def pid(self) -> int | None: return self.process.pid @@ -113,11 +111,11 @@ def __init__( self.sockets = sockets self.processes_num = config.workers - self.processes: List[Process] = [] + self.processes: list[Process] = [] self.should_exit = threading.Event() - self.signal_queue: List[int] = [] + self.signal_queue: list[int] = [] for sig in UNIX_SIGNALS: signal.signal(sig, lambda sig, frame: self.signal_queue.append(sig)) @@ -153,10 +151,8 @@ def restart_all(self) -> None: self.processes.append(process) def run(self) -> None: - message = "Started parent process [{}]".format(os.getpid()) - color_message = "Started parent process [{}]".format( - click.style(str(os.getpid()), fg="cyan", bold=True) - ) + message = f"Started parent process [{os.getpid()}]" + color_message = "Started parent process [{}]".format(click.style(str(os.getpid()), fg="cyan", bold=True)) logger.info(message, extra={"color_message": color_message}) self.init_processes() @@ -168,10 +164,8 @@ def run(self) -> None: self.terminate_all() self.join_all() - message = "Stopping parent process [{}]".format(os.getpid()) - color_message = "Stopping parent process [{}]".format( - click.style(str(os.getpid()), fg="cyan", bold=True) - ) + message = f"Stopping parent process [{os.getpid()}]" + color_message = "Stopping parent process [{}]".format(click.style(str(os.getpid()), fg="cyan", bold=True)) logger.info(message, extra={"color_message": color_message}) def keep_subprocess_alive(self) -> None: @@ -181,7 +175,7 @@ def keep_subprocess_alive(self) -> None: process.kill() # process is hung, kill it process.join() - logger.info("Child process [{}] died".format(process.pid)) + logger.info(f"Child process [{process.pid}] died") del self.processes[idx] process = Process(self.config, self.target, self.sockets) process.start() From 2a7e19397c78b8e92532fe79a3fe909f0037bc06 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 10:55:20 +0800 Subject: [PATCH 13/62] more tests --- tests/supervisors/test_multiprocess.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 57a0e118e..ebc63c13d 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -14,7 +14,8 @@ async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable def run(sockets: list[socket.socket] | None) -> None: - pass # pragma: no cover + while True: + time.sleep(1) def stop_run(stop) -> None: @@ -34,3 +35,21 @@ def test_multiprocess_run() -> None: supervisor = Multiprocess(config, target=run, sockets=[]) threading.Thread(target=stop_run, args=(supervisor.handle_int,), daemon=True).start() supervisor.run() + + +def test_multiprocess_health_check() -> None: + """ + Ensure that the health check works as expected. + """ + config = Config(app=app, workers=2) + supervisor = Multiprocess(config, target=run, sockets=[]) + threading.Thread(target=supervisor.run, daemon=True).start() + time.sleep(1) + process = supervisor.processes[0] + process.kill() + assert not process.is_alive() + time.sleep(1) + for p in supervisor.processes: + assert p.is_alive() + + supervisor.handle_int() From cc7a2e111812ab18d0f8804733e94b99b9ef94ff Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 13:07:43 +0800 Subject: [PATCH 14/62] More tests and fix bug --- tests/supervisors/test_multiprocess.py | 29 +++++++++++++++----- uvicorn/supervisors/multiprocess.py | 38 ++++++++++++-------------- 2 files changed, 39 insertions(+), 28 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index ebc63c13d..1b9aad44e 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -1,4 +1,5 @@ from __future__ import annotations +import signal import socket import threading @@ -16,12 +17,9 @@ async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable def run(sockets: list[socket.socket] | None) -> None: while True: time.sleep(1) + import os - -def stop_run(stop) -> None: - while True: - time.sleep(1) - stop() + print("Running , pid: ", os.getpid()) def test_multiprocess_run() -> None: @@ -33,8 +31,9 @@ def test_multiprocess_run() -> None: """ config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) - threading.Thread(target=stop_run, args=(supervisor.handle_int,), daemon=True).start() + supervisor.signal_queue.append(signal.SIGINT) supervisor.run() + supervisor.join_all() def test_multiprocess_health_check() -> None: @@ -51,5 +50,21 @@ def test_multiprocess_health_check() -> None: time.sleep(1) for p in supervisor.processes: assert p.is_alive() + supervisor.signal_queue.append(signal.SIGINT) + supervisor.join_all() - supervisor.handle_int() + +def test_multiprocess_sighup() -> None: + """ + Ensure that the SIGHUP signal is handled as expected. + """ + config = Config(app=app, workers=2) + supervisor = Multiprocess(config, target=run, sockets=[]) + threading.Thread(target=supervisor.run, daemon=True).start() + time.sleep(1) + pids = [p.pid for p in supervisor.processes] + supervisor.signal_queue.append(signal.SIGHUP) + time.sleep(1) + assert pids != [p.pid for p in supervisor.processes] + supervisor.signal_queue.append(signal.SIGINT) + supervisor.join_all() diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 6c5d70317..fa9833688 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -13,8 +13,10 @@ from uvicorn._subprocess import get_subprocess from uvicorn.config import Config -UNIX_SIGNALS = { - getattr(signal, f"SIG{x}"): x for x in "HUP QUIT TTIN TTOU USR1 USR2 WINCH".split() if hasattr(signal, f"SIG{x}") +SIGNALS = { + getattr(signal, f"SIG{x}"): x + for x in "INT TERM BREAK HUP QUIT TTIN TTOU USR1 USR2 WINCH".split() + if hasattr(signal, f"SIG{x}") } logger = logging.getLogger("uvicorn.error") @@ -33,11 +35,14 @@ def __init__( self.process = get_subprocess(config, self.target, sockets) def ping(self, timeout: float = 5) -> bool: - self.parent_conn.send(b"ping") - if self.parent_conn.poll(timeout): - self.parent_conn.recv() - return True - return False + try: + self.parent_conn.send(b"ping") + if self.parent_conn.poll(timeout): + self.parent_conn.recv() + return True + return False + except OSError: # Closed pipe + return False def pong(self) -> None: self.child_conn.recv() @@ -116,17 +121,9 @@ def __init__( self.should_exit = threading.Event() self.signal_queue: list[int] = [] - for sig in UNIX_SIGNALS: + for sig in SIGNALS: signal.signal(sig, lambda sig, frame: self.signal_queue.append(sig)) - # Sent by Ctrl+C. - signal.signal(signal.SIGINT, lambda sig, frame: self.handle_int()) - # Sent by `kill `. Not sent on Windows. - signal.signal(signal.SIGTERM, lambda sig, frame: self.handle_term()) - if os.name == "nt": - # Sent by `Ctrl+Break` on Windows. - signal.signal(signal.SIGBREAK, lambda sig, frame: self.handle_break()) # type: ignore[attr-defined] - def init_processes(self) -> None: for _ in range(self.processes_num): process = Process(self.config, self.target, self.sockets) @@ -145,10 +142,9 @@ def restart_all(self) -> None: for idx, process in enumerate(tuple(self.processes)): process.terminate() process.join() - del self.processes[idx] - process = Process(self.config, self.target, self.sockets) - process.start() - self.processes.append(process) + new_process = Process(self.config, self.target, self.sockets) + new_process.start() + self.processes[idx] = new_process def run(self) -> None: message = f"Started parent process [{os.getpid()}]" @@ -184,7 +180,7 @@ def keep_subprocess_alive(self) -> None: def handle_signals(self) -> None: for sig in tuple(self.signal_queue): self.signal_queue.remove(sig) - sig_name = UNIX_SIGNALS[sig] + sig_name = SIGNALS[sig] sig_handler = getattr(self, f"handle_{sig_name.lower()}", None) if sig_handler is not None: sig_handler() From 636080a706b8bd19b673a6befbab83a3d0c0da3a Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 13:08:13 +0800 Subject: [PATCH 15/62] lint --- tests/supervisors/test_multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 1b9aad44e..1a022423a 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -1,6 +1,6 @@ from __future__ import annotations -import signal +import signal import socket import threading import time From 939ed2b1e9fd0c85aa9ba66f5287a5df63afdf52 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 13:11:29 +0800 Subject: [PATCH 16/62] Add pytest.mark.skipif for SIGHUP test on Windows --- tests/supervisors/test_multiprocess.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 1a022423a..d9ea2fce6 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -5,6 +5,8 @@ import threading import time +import pytest + from uvicorn import Config from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope from uvicorn.supervisors import Multiprocess @@ -54,6 +56,8 @@ def test_multiprocess_health_check() -> None: supervisor.join_all() +# Test is skipped because windows does not support SIGHUP +@pytest.mark.skipif(not hasattr(signal, "SIGHUP"), reason="SIGHUP is not supported on Windows") def test_multiprocess_sighup() -> None: """ Ensure that the SIGHUP signal is handled as expected. From e9760f4a3d8dee4f75e21ff90f872b17e37cb466 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 13:21:22 +0800 Subject: [PATCH 17/62] delete unused code --- uvicorn/supervisors/multiprocess.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index fa9833688..c2afd5a76 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -35,14 +35,11 @@ def __init__( self.process = get_subprocess(config, self.target, sockets) def ping(self, timeout: float = 5) -> bool: - try: - self.parent_conn.send(b"ping") - if self.parent_conn.poll(timeout): - self.parent_conn.recv() - return True - return False - except OSError: # Closed pipe - return False + self.parent_conn.send(b"ping") + if self.parent_conn.poll(timeout): + self.parent_conn.recv() + return True + return False def pong(self) -> None: self.child_conn.recv() From a2076641205a34d8716e6b42cde4bcd2d537a12c Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 13:43:11 +0800 Subject: [PATCH 18/62] More tests --- tests/supervisors/test_multiprocess.py | 61 +++++++++++++++++++++++++- uvicorn/supervisors/multiprocess.py | 1 + 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index d9ea2fce6..56381470a 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -56,8 +56,32 @@ def test_multiprocess_health_check() -> None: supervisor.join_all() -# Test is skipped because windows does not support SIGHUP -@pytest.mark.skipif(not hasattr(signal, "SIGHUP"), reason="SIGHUP is not supported on Windows") +def test_multiprocess_sigterm() -> None: + """ + Ensure that the SIGTERM signal is handled as expected. + """ + config = Config(app=app, workers=2) + supervisor = Multiprocess(config, target=run, sockets=[]) + threading.Thread(target=supervisor.run, daemon=True).start() + time.sleep(1) + supervisor.signal_queue.append(signal.SIGTERM) + supervisor.join_all() + + +@pytest.mark.skipif(not hasattr(signal, "SIGBREAK"), reason="platform unsupports SIGBREAK") +def test_multiprocess_sigbreak() -> None: + """ + Ensure that the SIGBREAK signal is handled as expected. + """ + config = Config(app=app, workers=2) + supervisor = Multiprocess(config, target=run, sockets=[]) + threading.Thread(target=supervisor.run, daemon=True).start() + time.sleep(1) + supervisor.signal_queue.append(signal.SIGBREAK) + supervisor.join_all() + + +@pytest.mark.skipif(not hasattr(signal, "SIGHUP"), reason="platform unsupports SIGHUP") def test_multiprocess_sighup() -> None: """ Ensure that the SIGHUP signal is handled as expected. @@ -72,3 +96,36 @@ def test_multiprocess_sighup() -> None: assert pids != [p.pid for p in supervisor.processes] supervisor.signal_queue.append(signal.SIGINT) supervisor.join_all() + + +@pytest.mark.skipif(not hasattr(signal, "SIGTTIN"), reason="platform unsupports SIGTTIN") +def test_multiprocess_sigttin() -> None: + """ + Ensure that the SIGTTIN signal is handled as expected. + """ + config = Config(app=app, workers=2) + supervisor = Multiprocess(config, target=run, sockets=[]) + threading.Thread(target=supervisor.run, daemon=True).start() + supervisor.signal_queue.append(signal.SIGTTIN) + time.sleep(1) + assert len(supervisor.processes) == 3 + supervisor.signal_queue.append(signal.SIGINT) + supervisor.join_all() + + +@pytest.mark.skipif(not hasattr(signal, "SIGTTOU"), reason="platform unsupports SIGTTOU") +def test_multiprocess_sigttou() -> None: + """ + Ensure that the SIGTTOU signal is handled as expected. + """ + config = Config(app=app, workers=2) + supervisor = Multiprocess(config, target=run, sockets=[]) + threading.Thread(target=supervisor.run, daemon=True).start() + supervisor.signal_queue.append(signal.SIGTTOU) + time.sleep(1) + assert len(supervisor.processes) == 1 + supervisor.signal_queue.append(signal.SIGTTOU) + time.sleep(1) + assert len(supervisor.processes) == 1 + supervisor.signal_queue.append(signal.SIGINT) + supervisor.join_all() diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index c2afd5a76..0bbb6210c 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -185,6 +185,7 @@ def handle_signals(self) -> None: logger.info(f"Received signal [{sig_name}], but nothing to do") def handle_int(self) -> None: + logger.info("Received SIGINT, exiting") if not self.should_exit.is_set(): self.should_exit.set() else: From b8947b001980ea3d698dcd593aef05134b38908f Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 13:48:01 +0800 Subject: [PATCH 19/62] Try fix tests in Windows --- requirements.txt | 1 + tests/supervisors/test_multiprocess.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/requirements.txt b/requirements.txt index 316167f4c..3f013c970 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,6 +17,7 @@ twine==4.0.2 ruff==0.1.15 pytest==8.0.0 pytest-mock==3.12.0 +pytest-xdist==3.5.0 mypy==1.8.0 types-click==7.1.8 types-pyyaml==6.0.12.12 diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 56381470a..99a322b61 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -24,6 +24,7 @@ def run(sockets: list[socket.socket] | None) -> None: print("Running , pid: ", os.getpid()) +@pytest.mark.xdist_group(name="group1") def test_multiprocess_run() -> None: """ A basic sanity check. @@ -38,6 +39,7 @@ def test_multiprocess_run() -> None: supervisor.join_all() +@pytest.mark.xdist_group(name="group2") def test_multiprocess_health_check() -> None: """ Ensure that the health check works as expected. @@ -56,6 +58,7 @@ def test_multiprocess_health_check() -> None: supervisor.join_all() +@pytest.mark.xdist_group(name="group3") def test_multiprocess_sigterm() -> None: """ Ensure that the SIGTERM signal is handled as expected. @@ -68,6 +71,7 @@ def test_multiprocess_sigterm() -> None: supervisor.join_all() +@pytest.mark.xdist_group(name="group4") @pytest.mark.skipif(not hasattr(signal, "SIGBREAK"), reason="platform unsupports SIGBREAK") def test_multiprocess_sigbreak() -> None: """ From 8cbd3c2404fb43f8a2b9d1ca75e784d318c1b35d Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 13:51:24 +0800 Subject: [PATCH 20/62] make linter feels great --- tests/supervisors/test_multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 99a322b61..22334751d 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -81,7 +81,7 @@ def test_multiprocess_sigbreak() -> None: supervisor = Multiprocess(config, target=run, sockets=[]) threading.Thread(target=supervisor.run, daemon=True).start() time.sleep(1) - supervisor.signal_queue.append(signal.SIGBREAK) + supervisor.signal_queue.append(getattr(signal, "SIGBREAK")) supervisor.join_all() From edae1a95bee1485be13e976e7141f2ef624e1f44 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 14:05:15 +0800 Subject: [PATCH 21/62] delete pytest-xdist --- requirements.txt | 1 - tests/supervisors/test_multiprocess.py | 7 ------- 2 files changed, 8 deletions(-) diff --git a/requirements.txt b/requirements.txt index 3f013c970..316167f4c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,7 +17,6 @@ twine==4.0.2 ruff==0.1.15 pytest==8.0.0 pytest-mock==3.12.0 -pytest-xdist==3.5.0 mypy==1.8.0 types-click==7.1.8 types-pyyaml==6.0.12.12 diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 22334751d..045308eb9 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -19,12 +19,8 @@ async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable def run(sockets: list[socket.socket] | None) -> None: while True: time.sleep(1) - import os - print("Running , pid: ", os.getpid()) - -@pytest.mark.xdist_group(name="group1") def test_multiprocess_run() -> None: """ A basic sanity check. @@ -39,7 +35,6 @@ def test_multiprocess_run() -> None: supervisor.join_all() -@pytest.mark.xdist_group(name="group2") def test_multiprocess_health_check() -> None: """ Ensure that the health check works as expected. @@ -58,7 +53,6 @@ def test_multiprocess_health_check() -> None: supervisor.join_all() -@pytest.mark.xdist_group(name="group3") def test_multiprocess_sigterm() -> None: """ Ensure that the SIGTERM signal is handled as expected. @@ -71,7 +65,6 @@ def test_multiprocess_sigterm() -> None: supervisor.join_all() -@pytest.mark.xdist_group(name="group4") @pytest.mark.skipif(not hasattr(signal, "SIGBREAK"), reason="platform unsupports SIGBREAK") def test_multiprocess_sigbreak() -> None: """ From c724cfafe89bbd64cde3f8afb293f7e2f5a6308b Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 14:09:54 +0800 Subject: [PATCH 22/62] Try fix test in windows --- tests/supervisors/test_multiprocess.py | 47 +++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 045308eb9..a5e24eb44 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -1,7 +1,9 @@ from __future__ import annotations +import multiprocessing import signal import socket +import sys import threading import time @@ -19,15 +21,19 @@ async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable def run(sockets: list[socket.socket] | None) -> None: while True: time.sleep(1) + import os + print("Running , pid: ", os.getpid()) -def test_multiprocess_run() -> None: + +def _test_multiprocess_run() -> None: """ A basic sanity check. Simply run the supervisor against a no-op server, and signal for it to quit immediately. """ + sys.stdin = None config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) supervisor.signal_queue.append(signal.SIGINT) @@ -35,10 +41,18 @@ def test_multiprocess_run() -> None: supervisor.join_all() -def test_multiprocess_health_check() -> None: +def test_multiprocess_run() -> None: + process = multiprocessing.Process(target=_test_multiprocess_run) + process.start() + process.join() + assert process.exitcode == 0 + + +def _test_multiprocess_health_check() -> None: """ Ensure that the health check works as expected. """ + sys.stdin = None config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) threading.Thread(target=supervisor.run, daemon=True).start() @@ -53,10 +67,18 @@ def test_multiprocess_health_check() -> None: supervisor.join_all() -def test_multiprocess_sigterm() -> None: +def test_multiprocess_health_check() -> None: + process = multiprocessing.Process(target=_test_multiprocess_health_check) + process.start() + process.join() + assert process.exitcode == 0 + + +def _test_multiprocess_sigterm() -> None: """ Ensure that the SIGTERM signal is handled as expected. """ + sys.stdin = None config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) threading.Thread(target=supervisor.run, daemon=True).start() @@ -65,11 +87,18 @@ def test_multiprocess_sigterm() -> None: supervisor.join_all() -@pytest.mark.skipif(not hasattr(signal, "SIGBREAK"), reason="platform unsupports SIGBREAK") -def test_multiprocess_sigbreak() -> None: +def test_multiprocess_sigterm() -> None: + process = multiprocessing.Process(target=_test_multiprocess_sigterm) + process.start() + process.join() + assert process.exitcode == 0 + + +def _test_multiprocess_sigbreak() -> None: """ Ensure that the SIGBREAK signal is handled as expected. """ + sys.stdin = None config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) threading.Thread(target=supervisor.run, daemon=True).start() @@ -78,6 +107,14 @@ def test_multiprocess_sigbreak() -> None: supervisor.join_all() +@pytest.mark.skipif(not hasattr(signal, "SIGBREAK"), reason="platform unsupports SIGBREAK") +def test_multiprocess_sigbreak() -> None: + process = multiprocessing.Process(target=_test_multiprocess_sigbreak) + process.start() + process.join() + assert process.exitcode == 0 + + @pytest.mark.skipif(not hasattr(signal, "SIGHUP"), reason="platform unsupports SIGHUP") def test_multiprocess_sighup() -> None: """ From 3511f485ddcda9f286948a9915761f6e3e8715bf Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 14:13:39 +0800 Subject: [PATCH 23/62] Try make mypy happy --- tests/supervisors/test_multiprocess.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index a5e24eb44..cc1f8f19c 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -6,6 +6,7 @@ import sys import threading import time +from io import StringIO import pytest @@ -33,7 +34,7 @@ def _test_multiprocess_run() -> None: Simply run the supervisor against a no-op server, and signal for it to quit immediately. """ - sys.stdin = None + sys.stdin = StringIO() config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) supervisor.signal_queue.append(signal.SIGINT) @@ -52,7 +53,7 @@ def _test_multiprocess_health_check() -> None: """ Ensure that the health check works as expected. """ - sys.stdin = None + sys.stdin = StringIO() config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) threading.Thread(target=supervisor.run, daemon=True).start() @@ -78,7 +79,7 @@ def _test_multiprocess_sigterm() -> None: """ Ensure that the SIGTERM signal is handled as expected. """ - sys.stdin = None + sys.stdin = StringIO() config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) threading.Thread(target=supervisor.run, daemon=True).start() @@ -98,7 +99,7 @@ def _test_multiprocess_sigbreak() -> None: """ Ensure that the SIGBREAK signal is handled as expected. """ - sys.stdin = None + sys.stdin = StringIO() config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) threading.Thread(target=supervisor.run, daemon=True).start() From 298906ad583adc0edcda3d2513125bd8b39dfdc5 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 14:19:18 +0800 Subject: [PATCH 24/62] Skip tests in windows --- tests/supervisors/test_multiprocess.py | 42 ++++++-------------------- 1 file changed, 9 insertions(+), 33 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index cc1f8f19c..fcaad116b 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -27,7 +27,8 @@ def run(sockets: list[socket.socket] | None) -> None: print("Running , pid: ", os.getpid()) -def _test_multiprocess_run() -> None: +@pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") +def test_multiprocess_run() -> None: """ A basic sanity check. @@ -42,14 +43,8 @@ def _test_multiprocess_run() -> None: supervisor.join_all() -def test_multiprocess_run() -> None: - process = multiprocessing.Process(target=_test_multiprocess_run) - process.start() - process.join() - assert process.exitcode == 0 - - -def _test_multiprocess_health_check() -> None: +@pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") +def test_multiprocess_health_check() -> None: """ Ensure that the health check works as expected. """ @@ -68,14 +63,8 @@ def _test_multiprocess_health_check() -> None: supervisor.join_all() -def test_multiprocess_health_check() -> None: - process = multiprocessing.Process(target=_test_multiprocess_health_check) - process.start() - process.join() - assert process.exitcode == 0 - - -def _test_multiprocess_sigterm() -> None: +@pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") +def test_multiprocess_sigterm() -> None: """ Ensure that the SIGTERM signal is handled as expected. """ @@ -88,14 +77,9 @@ def _test_multiprocess_sigterm() -> None: supervisor.join_all() -def test_multiprocess_sigterm() -> None: - process = multiprocessing.Process(target=_test_multiprocess_sigterm) - process.start() - process.join() - assert process.exitcode == 0 - - -def _test_multiprocess_sigbreak() -> None: +@pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") +@pytest.mark.skipif(not hasattr(signal, "SIGBREAK"), reason="platform unsupports SIGBREAK") +def test_multiprocess_sigbreak() -> None: """ Ensure that the SIGBREAK signal is handled as expected. """ @@ -108,14 +92,6 @@ def _test_multiprocess_sigbreak() -> None: supervisor.join_all() -@pytest.mark.skipif(not hasattr(signal, "SIGBREAK"), reason="platform unsupports SIGBREAK") -def test_multiprocess_sigbreak() -> None: - process = multiprocessing.Process(target=_test_multiprocess_sigbreak) - process.start() - process.join() - assert process.exitcode == 0 - - @pytest.mark.skipif(not hasattr(signal, "SIGHUP"), reason="platform unsupports SIGHUP") def test_multiprocess_sighup() -> None: """ From f1f4d64bbe881864dfd2f37ac39ace805658b312 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 14:19:32 +0800 Subject: [PATCH 25/62] lint --- tests/supervisors/test_multiprocess.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index fcaad116b..8bf8c6d08 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -1,6 +1,5 @@ from __future__ import annotations -import multiprocessing import signal import socket import sys From fe2fb0878808e7ad03c702f1ff23ddc3657ce851 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 14:30:06 +0800 Subject: [PATCH 26/62] Try test basic run in Windows --- tests/supervisors/test_multiprocess.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 8bf8c6d08..0d64d1da3 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -21,12 +21,8 @@ async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable def run(sockets: list[socket.socket] | None) -> None: while True: time.sleep(1) - import os - print("Running , pid: ", os.getpid()) - -@pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") def test_multiprocess_run() -> None: """ A basic sanity check. @@ -34,12 +30,10 @@ def test_multiprocess_run() -> None: Simply run the supervisor against a no-op server, and signal for it to quit immediately. """ - sys.stdin = StringIO() config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) supervisor.signal_queue.append(signal.SIGINT) supervisor.run() - supervisor.join_all() @pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") From fb46e20ba3cc44a2ce5e6de2b05365d5314b4500 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 14:45:33 +0800 Subject: [PATCH 27/62] Try fix error in Windows --- tests/supervisors/test_multiprocess.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 0d64d1da3..ca6c66d8e 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -32,8 +32,8 @@ def test_multiprocess_run() -> None: """ config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) + threading.Thread(target=supervisor.run, daemon=True).start() supervisor.signal_queue.append(signal.SIGINT) - supervisor.run() @pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") @@ -41,7 +41,6 @@ def test_multiprocess_health_check() -> None: """ Ensure that the health check works as expected. """ - sys.stdin = StringIO() config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) threading.Thread(target=supervisor.run, daemon=True).start() @@ -61,7 +60,6 @@ def test_multiprocess_sigterm() -> None: """ Ensure that the SIGTERM signal is handled as expected. """ - sys.stdin = StringIO() config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) threading.Thread(target=supervisor.run, daemon=True).start() @@ -76,7 +74,6 @@ def test_multiprocess_sigbreak() -> None: """ Ensure that the SIGBREAK signal is handled as expected. """ - sys.stdin = StringIO() config = Config(app=app, workers=2) supervisor = Multiprocess(config, target=run, sockets=[]) threading.Thread(target=supervisor.run, daemon=True).start() From 197fa3d032646a1c56bf1c8c2b6c82c0c0fa4ffb Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 14:46:35 +0800 Subject: [PATCH 28/62] lint --- tests/supervisors/test_multiprocess.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index ca6c66d8e..53bec354f 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -5,7 +5,6 @@ import sys import threading import time -from io import StringIO import pytest From 33f5692a374d0e5d65d35b9afd9509f1d107a788 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 15:01:33 +0800 Subject: [PATCH 29/62] Skip tests in window --- tests/supervisors/test_multiprocess.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 53bec354f..a34286778 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -22,6 +22,7 @@ def run(sockets: list[socket.socket] | None) -> None: time.sleep(1) +@pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") def test_multiprocess_run() -> None: """ A basic sanity check. From d3c4484fc1345a331fedaeb7db3a363ff9d2a717 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 15:15:38 +0800 Subject: [PATCH 30/62] Try test in window --- tests/supervisors/test_multiprocess.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index a34286778..a1f91452d 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -5,6 +5,7 @@ import sys import threading import time +import subprocess import pytest @@ -22,8 +23,7 @@ def run(sockets: list[socket.socket] | None) -> None: time.sleep(1) -@pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") -def test_multiprocess_run() -> None: +def _test_multiprocess_run() -> None: """ A basic sanity check. @@ -34,6 +34,16 @@ def test_multiprocess_run() -> None: supervisor = Multiprocess(config, target=run, sockets=[]) threading.Thread(target=supervisor.run, daemon=True).start() supervisor.signal_queue.append(signal.SIGINT) + supervisor.join_all() + + +def test_multiprocess_run() -> None: + """ + Run the test in a subprocess to avoid any side effects. + """ + subprocess.check_call( + [sys.executable, "-m", "pytest", "tests/supervisors/test_multiprocess.py::_test_multiprocess_run"], shell=True + ) @pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") From d3f090ace15688af8fccc6c9bdc42ec76e23acc7 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 15:21:36 +0800 Subject: [PATCH 31/62] lint --- tests/supervisors/test_multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index a1f91452d..978e2d7fa 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -2,10 +2,10 @@ import signal import socket +import subprocess import sys import threading import time -import subprocess import pytest From 4bae355b77d85672d6851b3ea36f1a9559a76b4e Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 15:28:00 +0800 Subject: [PATCH 32/62] Add import statement and set current working directory in test_multiprocess.py --- tests/supervisors/test_multiprocess.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 978e2d7fa..62ada01b5 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -1,4 +1,5 @@ from __future__ import annotations +import os import signal import socket @@ -42,7 +43,9 @@ def test_multiprocess_run() -> None: Run the test in a subprocess to avoid any side effects. """ subprocess.check_call( - [sys.executable, "-m", "pytest", "tests/supervisors/test_multiprocess.py::_test_multiprocess_run"], shell=True + [sys.executable, "-m", "pytest", "tests/supervisors/test_multiprocess.py::_test_multiprocess_run"], + shell=True, + cwd=os.getcwd(), ) From b910b3f6eb230facf41e3e7da6018d730aff75a0 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 15:29:41 +0800 Subject: [PATCH 33/62] lint --- tests/supervisors/test_multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 62ada01b5..213e5f1be 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -1,6 +1,6 @@ from __future__ import annotations -import os +import os import signal import socket import subprocess From 6d2c6b656ecba0a36506c59646b8da19973a0954 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Tue, 5 Mar 2024 16:13:58 +0800 Subject: [PATCH 34/62] giveup --- tests/supervisors/test_multiprocess.py | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 213e5f1be..c3a40bf34 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -1,9 +1,7 @@ from __future__ import annotations -import os import signal import socket -import subprocess import sys import threading import time @@ -24,7 +22,8 @@ def run(sockets: list[socket.socket] | None) -> None: time.sleep(1) -def _test_multiprocess_run() -> None: +@pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") +def test_multiprocess_run() -> None: """ A basic sanity check. @@ -38,17 +37,6 @@ def _test_multiprocess_run() -> None: supervisor.join_all() -def test_multiprocess_run() -> None: - """ - Run the test in a subprocess to avoid any side effects. - """ - subprocess.check_call( - [sys.executable, "-m", "pytest", "tests/supervisors/test_multiprocess.py::_test_multiprocess_run"], - shell=True, - cwd=os.getcwd(), - ) - - @pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") def test_multiprocess_health_check() -> None: """ From 7ee21af18e2916e7b067008159f76b229d5a64bb Mon Sep 17 00:00:00 2001 From: abersheeran Date: Wed, 6 Mar 2024 11:15:28 +0800 Subject: [PATCH 35/62] Refactor signal handling in Multiprocess class --- uvicorn/supervisors/multiprocess.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 0bbb6210c..4ce0f2a81 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -186,24 +186,15 @@ def handle_signals(self) -> None: def handle_int(self) -> None: logger.info("Received SIGINT, exiting") - if not self.should_exit.is_set(): - self.should_exit.set() - else: - self.terminate_all() + self.should_exit.set() def handle_term(self) -> None: logger.info("Received SIGTERM, exiting") - if not self.should_exit.is_set(): - self.should_exit.set() - else: - self.terminate_all() + self.should_exit.set() def handle_break(self) -> None: logger.info("Received SIGBREAK, exiting") - if not self.should_exit.is_set(): - self.should_exit.set() - else: - self.terminate_all() + self.should_exit.set() def handle_hup(self) -> None: logger.info("Received SIGHUP, restarting processes") From 8efcad34e3360bf2a485a5d62f44d39ee51286db Mon Sep 17 00:00:00 2001 From: abersheeran Date: Thu, 4 Apr 2024 00:17:01 +0800 Subject: [PATCH 36/62] Tests in windows --- tests/supervisors/test_multiprocess.py | 32 ++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index c3a40bf34..594ad3410 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -1,10 +1,13 @@ from __future__ import annotations +import functools +import os import signal import socket import sys import threading import time +from typing import Any, Callable import pytest @@ -13,6 +16,27 @@ from uvicorn.supervisors import Multiprocess +def new_console_in_windows(test_function: Callable[[], Any]): + if os.name != "nt": + return test_function + + @functools.wraps(test_function) + def new_function(): + import subprocess + import sys + + return subprocess.check_call( + [ + sys.executable, + "-c", + f"from {test_function.__module__} import {test_function.__name__}; {test_function.__name__}.__wrapped__()", + ], + creationflags=subprocess.CREATE_NO_WINDOW, + ) + + return new_function + + async def app(scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable) -> None: pass # pragma: no cover @@ -22,7 +46,7 @@ def run(sockets: list[socket.socket] | None) -> None: time.sleep(1) -@pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") +@new_console_in_windows def test_multiprocess_run() -> None: """ A basic sanity check. @@ -37,7 +61,7 @@ def test_multiprocess_run() -> None: supervisor.join_all() -@pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") +@new_console_in_windows def test_multiprocess_health_check() -> None: """ Ensure that the health check works as expected. @@ -56,7 +80,7 @@ def test_multiprocess_health_check() -> None: supervisor.join_all() -@pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") +@new_console_in_windows def test_multiprocess_sigterm() -> None: """ Ensure that the SIGTERM signal is handled as expected. @@ -69,8 +93,8 @@ def test_multiprocess_sigterm() -> None: supervisor.join_all() -@pytest.mark.skipif(sys.platform == "win32", reason="In Windows, Ctrl+C/Ctrl+Break will sent to the parent process.") @pytest.mark.skipif(not hasattr(signal, "SIGBREAK"), reason="platform unsupports SIGBREAK") +@new_console_in_windows def test_multiprocess_sigbreak() -> None: """ Ensure that the SIGBREAK signal is handled as expected. From 7c683ca2fa7c2189590062b1cb30b1649956d507 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Thu, 4 Apr 2024 00:17:23 +0800 Subject: [PATCH 37/62] lint --- tests/supervisors/test_multiprocess.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 594ad3410..4473bc3e0 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -4,7 +4,6 @@ import os import signal import socket -import sys import threading import time from typing import Any, Callable From 4e3ac6ef3b92db633342672a6b7e8f473ab58cfa Mon Sep 17 00:00:00 2001 From: abersheeran Date: Thu, 4 Apr 2024 00:19:11 +0800 Subject: [PATCH 38/62] lint --- tests/supervisors/test_multiprocess.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 4473bc3e0..d4f6cb7b5 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -24,11 +24,14 @@ def new_function(): import subprocess import sys + module = test_function.__module__ + name = test_function.__name__ + return subprocess.check_call( [ sys.executable, "-c", - f"from {test_function.__module__} import {test_function.__name__}; {test_function.__name__}.__wrapped__()", + f"from {module} import {name}; {name}.__wrapped__()", ], creationflags=subprocess.CREATE_NO_WINDOW, ) From 404b864ea9ed5f6044b892db43cb21b6abb3b11e Mon Sep 17 00:00:00 2001 From: abersheeran Date: Thu, 4 Apr 2024 00:21:29 +0800 Subject: [PATCH 39/62] ignore mypy check in linux --- tests/supervisors/test_multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index d4f6cb7b5..8b45867f2 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -33,7 +33,7 @@ def new_function(): "-c", f"from {module} import {name}; {name}.__wrapped__()", ], - creationflags=subprocess.CREATE_NO_WINDOW, + creationflags=subprocess.CREATE_NO_WINDOW, # type: ignore ) return new_function From d3a3aaba7175a5f718ef8745802ddbe6f4c85a58 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Thu, 4 Apr 2024 00:27:14 +0800 Subject: [PATCH 40/62] Add __init__.py --- tests/supervisors/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/supervisors/__init__.py diff --git a/tests/supervisors/__init__.py b/tests/supervisors/__init__.py new file mode 100644 index 000000000..e69de29bb From 736510a75d39c961f405ffdf25d5baeba6a757e3 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Thu, 4 Apr 2024 00:33:27 +0800 Subject: [PATCH 41/62] fix warning --- tests/supervisors/test_multiprocess.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 8b45867f2..eb8286ab2 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -15,7 +15,7 @@ from uvicorn.supervisors import Multiprocess -def new_console_in_windows(test_function: Callable[[], Any]): +def new_console_in_windows(test_function: Callable[[], Any]) -> Callable[[], Any]: if os.name != "nt": return test_function @@ -27,7 +27,7 @@ def new_function(): module = test_function.__module__ name = test_function.__name__ - return subprocess.check_call( + subprocess.check_call( [ sys.executable, "-c", From 42c7187900ea98230b0c6f0cd4e53cfbc8eb88cb Mon Sep 17 00:00:00 2001 From: abersheeran Date: Thu, 4 Apr 2024 00:40:59 +0800 Subject: [PATCH 42/62] coverage ignore --- uvicorn/supervisors/multiprocess.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 4ce0f2a81..b618e01b8 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -50,7 +50,7 @@ def always_pong(self) -> None: self.pong() def target(self, sockets: list[socket] | None = None) -> Any: - if os.name == "nt": + if os.name == "nt": # pragma: py-not-win32 # Windows doesn't support SIGTERM, so we use SIGBREAK instead. # And then we raise SIGTERM when SIGBREAK is received. # https://learn.microsoft.com/zh-cn/cpp/c-runtime-library/reference/signal?view=msvc-170 @@ -76,7 +76,7 @@ def terminate(self) -> None: if self.process.exitcode is not None: return assert self.process.pid is not None - if os.name == "nt": + if os.name == "nt": # pragma: py-not-win32 # Windows doesn't support SIGTERM. # So send SIGBREAK, and then in process raise SIGTERM. os.kill(self.process.pid, signal.CTRL_BREAK_EVENT) # type: ignore[attr-defined] @@ -192,22 +192,22 @@ def handle_term(self) -> None: logger.info("Received SIGTERM, exiting") self.should_exit.set() - def handle_break(self) -> None: + def handle_break(self) -> None: # pragma: py-not-win32 logger.info("Received SIGBREAK, exiting") self.should_exit.set() - def handle_hup(self) -> None: + def handle_hup(self) -> None: # pragma: py-win32 logger.info("Received SIGHUP, restarting processes") self.restart_all() - def handle_ttin(self) -> None: + def handle_ttin(self) -> None: # pragma: py-win32 logger.info("Received SIGTTIN, increasing processes") self.processes_num += 1 process = Process(self.config, self.target, self.sockets) process.start() self.processes.append(process) - def handle_ttou(self) -> None: + def handle_ttou(self) -> None: # pragma: py-win32 logger.info("Received SIGTTOU, decreasing processes") if self.processes_num <= 1: logger.info("Cannot decrease processes any more") From 6fce985ce320f1c1c95748701bf1b1bc8ef83ce7 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Thu, 4 Apr 2024 15:00:32 +0800 Subject: [PATCH 43/62] Update coverage --- pyproject.toml | 4 ++-- tests/supervisors/test_multiprocess.py | 14 ++++++++++++- uvicorn/supervisors/multiprocess.py | 27 +++++++++++++------------- 3 files changed, 28 insertions(+), 17 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 0f4722b13..d0eb1a95e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,7 +101,7 @@ omit = [ [tool.coverage.report] precision = 2 -fail_under = 96.82 +fail_under = 98.13 show_missing = true skip_covered = true exclude_lines = [ @@ -113,7 +113,7 @@ exclude_lines = [ ] [tool.coverage.coverage_conditional_plugin.omit] -"sys_platform == 'win32'" = ["uvicorn/loops/uvloop.py"] +"sys_platform == 'win32'" = ["uvicorn/loops/uvloop.py", "uvicorn/supervisors/multiprocess.py"] "sys_platform != 'win32'" = ["uvicorn/loops/asyncio.py"] [tool.coverage.coverage_conditional_plugin.rules] diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index eb8286ab2..1bcc65d12 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -13,6 +13,7 @@ from uvicorn import Config from uvicorn._types import ASGIReceiveCallable, ASGISendCallable, Scope from uvicorn.supervisors import Multiprocess +from uvicorn.supervisors.multiprocess import Process def new_console_in_windows(test_function: Callable[[], Any]) -> Callable[[], Any]: @@ -33,7 +34,7 @@ def new_function(): "-c", f"from {module} import {name}; {name}.__wrapped__()", ], - creationflags=subprocess.CREATE_NO_WINDOW, # type: ignore + creationflags=subprocess.CREATE_NO_WINDOW, # type: ignore[attr-defined] ) return new_function @@ -48,6 +49,17 @@ def run(sockets: list[socket.socket] | None) -> None: time.sleep(1) +def test_process_ping_pong() -> None: + process = Process(Config(app=app), target=lambda x: None, sockets=[]) + threading.Thread(target=process.always_pong, daemon=True).start() + assert process.ping() + + +def test_process_ping_pong_timeout() -> None: + process = Process(Config(app=app), target=lambda x: None, sockets=[]) + assert not process.ping(0.1) + + @new_console_in_windows def test_multiprocess_run() -> None: """ diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index b618e01b8..53538b501 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -49,7 +49,7 @@ def always_pong(self) -> None: while True: self.pong() - def target(self, sockets: list[socket] | None = None) -> Any: + def target(self, sockets: list[socket] | None = None) -> Any: # pragma: no cover if os.name == "nt": # pragma: py-not-win32 # Windows doesn't support SIGTERM, so we use SIGBREAK instead. # And then we raise SIGTERM when SIGBREAK is received. @@ -73,19 +73,18 @@ def start(self) -> None: logger.info(f"Started child process [{self.process.pid}]") def terminate(self) -> None: - if self.process.exitcode is not None: - return - assert self.process.pid is not None - if os.name == "nt": # pragma: py-not-win32 - # Windows doesn't support SIGTERM. - # So send SIGBREAK, and then in process raise SIGTERM. - os.kill(self.process.pid, signal.CTRL_BREAK_EVENT) # type: ignore[attr-defined] - else: - os.kill(self.process.pid, signal.SIGTERM) - logger.info(f"Terminated child process [{self.process.pid}]") + if self.process.exitcode is None: # Process is still running + assert self.process.pid is not None + if os.name == "nt": # pragma: py-not-win32 + # Windows doesn't support SIGTERM. + # So send SIGBREAK, and then in process raise SIGTERM. + os.kill(self.process.pid, signal.CTRL_BREAK_EVENT) # type: ignore[attr-defined] + else: + os.kill(self.process.pid, signal.SIGTERM) + logger.info(f"Terminated child process [{self.process.pid}]") - self.parent_conn.close() - self.child_conn.close() + self.parent_conn.close() + self.child_conn.close() def kill(self) -> None: # In Windows, the method will call `TerminateProcess` to kill the process. @@ -181,7 +180,7 @@ def handle_signals(self) -> None: sig_handler = getattr(self, f"handle_{sig_name.lower()}", None) if sig_handler is not None: sig_handler() - else: + else: # pragma: no cover logger.info(f"Received signal [{sig_name}], but nothing to do") def handle_int(self) -> None: From 6e916f3f08e0b493306f14d62fb3cd29198d02e6 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Thu, 4 Apr 2024 15:09:48 +0800 Subject: [PATCH 44/62] coverage --- tests/supervisors/test_multiprocess.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 1bcc65d12..b7da5db4a 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -61,7 +61,7 @@ def test_process_ping_pong_timeout() -> None: @new_console_in_windows -def test_multiprocess_run() -> None: +def test_multiprocess_run() -> None: # pragma: py-win32 """ A basic sanity check. @@ -76,7 +76,7 @@ def test_multiprocess_run() -> None: @new_console_in_windows -def test_multiprocess_health_check() -> None: +def test_multiprocess_health_check() -> None: # pragma: py-win32 """ Ensure that the health check works as expected. """ @@ -95,7 +95,7 @@ def test_multiprocess_health_check() -> None: @new_console_in_windows -def test_multiprocess_sigterm() -> None: +def test_multiprocess_sigterm() -> None: # pragma: py-win32 """ Ensure that the SIGTERM signal is handled as expected. """ @@ -109,7 +109,7 @@ def test_multiprocess_sigterm() -> None: @pytest.mark.skipif(not hasattr(signal, "SIGBREAK"), reason="platform unsupports SIGBREAK") @new_console_in_windows -def test_multiprocess_sigbreak() -> None: +def test_multiprocess_sigbreak() -> None: # pragma: py-win32 """ Ensure that the SIGBREAK signal is handled as expected. """ @@ -122,7 +122,7 @@ def test_multiprocess_sigbreak() -> None: @pytest.mark.skipif(not hasattr(signal, "SIGHUP"), reason="platform unsupports SIGHUP") -def test_multiprocess_sighup() -> None: +def test_multiprocess_sighup() -> None: # pragma: py-win32 """ Ensure that the SIGHUP signal is handled as expected. """ @@ -139,7 +139,7 @@ def test_multiprocess_sighup() -> None: @pytest.mark.skipif(not hasattr(signal, "SIGTTIN"), reason="platform unsupports SIGTTIN") -def test_multiprocess_sigttin() -> None: +def test_multiprocess_sigttin() -> None: # pragma: py-win32 """ Ensure that the SIGTTIN signal is handled as expected. """ @@ -154,7 +154,7 @@ def test_multiprocess_sigttin() -> None: @pytest.mark.skipif(not hasattr(signal, "SIGTTOU"), reason="platform unsupports SIGTTOU") -def test_multiprocess_sigttou() -> None: +def test_multiprocess_sigttou() -> None: # pragma: py-win32 """ Ensure that the SIGTTOU signal is handled as expected. """ From b8ee655ca5704a1251f2dc561a9897b70693866f Mon Sep 17 00:00:00 2001 From: abersheeran Date: Fri, 5 Apr 2024 22:52:47 +0800 Subject: [PATCH 45/62] Add documents --- docs/deployment.md | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/docs/deployment.md b/docs/deployment.md index 58927d95e..4fffe0831 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -178,7 +178,23 @@ Running Uvicorn using a process manager ensures that you can run multiple proces A process manager will handle the socket setup, start-up multiple server processes, monitor process aliveness, and listen for signals to provide for processes restarts, shutdowns, or dialing up and down the number of running processes. -Uvicorn provides a lightweight way to run multiple worker processes, for example `--workers 4`, but does not provide any process monitoring. +### Built-in + +Uvicorn includes a `--workers` option that allows you to run multiple worker processes. + +```bash +$ uvicorn main:app --workers 4 +``` + +Unlike gunicorn, uvicorn does not use pre-fork, but uses [`spwan`](https://docs.python.org/zh-cn/3/library/multiprocessing.html#contexts-and-start-methods), which allows uvicorn's multi-process manager to still work well on Windows. + +The default process manager monitors the status of child processes and automatically restarts child processes that die unexpectedly. Not only that, it will also monitor the status of the child process through the pipeline. When the child process is accidentally stuck, the corresponding child process will be killed through an unstoppable system signal or interface. + +You can also manage child processes by sending specific signals to the main process. (Not supported on Windows.) + +- `SIGHUP`: Work processeses are graceful restarted one after another. If you update the code, the new worker process will use the new code. +- `SIGTTIN`: Increase the number of worker processes by one. +- `SIGTTOU`: Decrease the number of worker processes by one. ### Gunicorn From 66395d00ef3b72f34423cc11e3bbb41ad2b265a0 Mon Sep 17 00:00:00 2001 From: Aber Date: Sun, 14 Apr 2024 16:14:03 +0800 Subject: [PATCH 46/62] Update docs/deployment.md Co-authored-by: Marcelo Trylesinski --- docs/deployment.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/deployment.md b/docs/deployment.md index 4fffe0831..7069677ab 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -186,7 +186,7 @@ Uvicorn includes a `--workers` option that allows you to run multiple worker pro $ uvicorn main:app --workers 4 ``` -Unlike gunicorn, uvicorn does not use pre-fork, but uses [`spwan`](https://docs.python.org/zh-cn/3/library/multiprocessing.html#contexts-and-start-methods), which allows uvicorn's multi-process manager to still work well on Windows. +Unlike gunicorn, uvicorn does not use pre-fork, but uses [`spawn`](https://docs.python.org/zh-cn/3/library/multiprocessing.html#contexts-and-start-methods), which allows uvicorn's multiprocess manager to still work well on Windows. The default process manager monitors the status of child processes and automatically restarts child processes that die unexpectedly. Not only that, it will also monitor the status of the child process through the pipeline. When the child process is accidentally stuck, the corresponding child process will be killed through an unstoppable system signal or interface. From a19f737f3b921ea3d51f33dcac0adaeb98c2dc99 Mon Sep 17 00:00:00 2001 From: Aber Date: Sun, 14 Apr 2024 16:14:16 +0800 Subject: [PATCH 47/62] Update uvicorn/supervisors/multiprocess.py Co-authored-by: Marcelo Trylesinski --- uvicorn/supervisors/multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 53538b501..2bb2d1856 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -181,7 +181,7 @@ def handle_signals(self) -> None: if sig_handler is not None: sig_handler() else: # pragma: no cover - logger.info(f"Received signal [{sig_name}], but nothing to do") + logger.debug(f"Received signal {sig_name}, but no handler is defined for it.") def handle_int(self) -> None: logger.info("Received SIGINT, exiting") From 658a4fa95b535c389fb0fa414db9f5cba4e7c3af Mon Sep 17 00:00:00 2001 From: Aber Date: Sun, 14 Apr 2024 16:14:29 +0800 Subject: [PATCH 48/62] Update uvicorn/supervisors/multiprocess.py Co-authored-by: Marcelo Trylesinski --- uvicorn/supervisors/multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 2bb2d1856..a589deb47 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -207,7 +207,7 @@ def handle_ttin(self) -> None: # pragma: py-win32 self.processes.append(process) def handle_ttou(self) -> None: # pragma: py-win32 - logger.info("Received SIGTTOU, decreasing processes") + logger.info("Received SIGTTOU, decreasing number of processes.") if self.processes_num <= 1: logger.info("Cannot decrease processes any more") return From 43e01904d768422bfbebd24e587125e9b48c09ad Mon Sep 17 00:00:00 2001 From: Aber Date: Sun, 14 Apr 2024 16:14:55 +0800 Subject: [PATCH 49/62] Update uvicorn/supervisors/multiprocess.py Co-authored-by: Marcelo Trylesinski --- uvicorn/supervisors/multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index a589deb47..8fb4590aa 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -209,7 +209,7 @@ def handle_ttin(self) -> None: # pragma: py-win32 def handle_ttou(self) -> None: # pragma: py-win32 logger.info("Received SIGTTOU, decreasing number of processes.") if self.processes_num <= 1: - logger.info("Cannot decrease processes any more") + logger.info("Already reached one process, cannot decrease the number of processes anymore.") return self.processes_num -= 1 process = self.processes.pop() From cc257439667e2e1c902bab23d60e2d819e284e6a Mon Sep 17 00:00:00 2001 From: Aber Date: Sun, 14 Apr 2024 16:15:02 +0800 Subject: [PATCH 50/62] Update uvicorn/supervisors/multiprocess.py Co-authored-by: Marcelo Trylesinski --- uvicorn/supervisors/multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 8fb4590aa..1d29158c0 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -200,7 +200,7 @@ def handle_hup(self) -> None: # pragma: py-win32 self.restart_all() def handle_ttin(self) -> None: # pragma: py-win32 - logger.info("Received SIGTTIN, increasing processes") + logger.info("Received SIGTTIN, increasing the number of processes.") self.processes_num += 1 process = Process(self.config, self.target, self.sockets) process.start() From 5f23d2f4a8507f17bcc98be24e870bfb04564fee Mon Sep 17 00:00:00 2001 From: Aber Date: Sun, 14 Apr 2024 16:15:11 +0800 Subject: [PATCH 51/62] Update uvicorn/supervisors/multiprocess.py Co-authored-by: Marcelo Trylesinski --- uvicorn/supervisors/multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 1d29158c0..db2a34a39 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -196,7 +196,7 @@ def handle_break(self) -> None: # pragma: py-not-win32 self.should_exit.set() def handle_hup(self) -> None: # pragma: py-win32 - logger.info("Received SIGHUP, restarting processes") + logger.info("Received SIGHUP, restarting processes.") self.restart_all() def handle_ttin(self) -> None: # pragma: py-win32 From a5d19cb5a1bb113486ab83d6e78e6de631a6170d Mon Sep 17 00:00:00 2001 From: Aber Date: Sun, 14 Apr 2024 16:15:20 +0800 Subject: [PATCH 52/62] Update uvicorn/supervisors/multiprocess.py Co-authored-by: Marcelo Trylesinski --- uvicorn/supervisors/multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index db2a34a39..53a1000eb 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -192,7 +192,7 @@ def handle_term(self) -> None: self.should_exit.set() def handle_break(self) -> None: # pragma: py-not-win32 - logger.info("Received SIGBREAK, exiting") + logger.info("Received SIGBREAK, exiting.") self.should_exit.set() def handle_hup(self) -> None: # pragma: py-win32 From 1b3ddc7ae72016177b9f4921605d681a1cba9e5a Mon Sep 17 00:00:00 2001 From: Aber Date: Sun, 14 Apr 2024 16:15:28 +0800 Subject: [PATCH 53/62] Update uvicorn/supervisors/multiprocess.py Co-authored-by: Marcelo Trylesinski --- uvicorn/supervisors/multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 53a1000eb..5079b7c97 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -188,7 +188,7 @@ def handle_int(self) -> None: self.should_exit.set() def handle_term(self) -> None: - logger.info("Received SIGTERM, exiting") + logger.info("Received SIGTERM, exiting.") self.should_exit.set() def handle_break(self) -> None: # pragma: py-not-win32 From 1e8db13ce10e23d3f03cb37b252bdd6e0dab3f13 Mon Sep 17 00:00:00 2001 From: Aber Date: Sun, 14 Apr 2024 16:15:36 +0800 Subject: [PATCH 54/62] Update uvicorn/supervisors/multiprocess.py Co-authored-by: Marcelo Trylesinski --- uvicorn/supervisors/multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 5079b7c97..1364162b7 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -184,7 +184,7 @@ def handle_signals(self) -> None: logger.debug(f"Received signal {sig_name}, but no handler is defined for it.") def handle_int(self) -> None: - logger.info("Received SIGINT, exiting") + logger.info("Received SIGINT, exiting.") self.should_exit.set() def handle_term(self) -> None: From a4209cfd056f82bc1c1dfbbcead2fff7ee93a623 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Sun, 14 Apr 2024 16:21:17 +0800 Subject: [PATCH 55/62] Do not output the PID information repeatedly. --- uvicorn/supervisors/multiprocess.py | 1 - 1 file changed, 1 deletion(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 1364162b7..05bbf5f79 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -70,7 +70,6 @@ def is_alive(self, timeout: float = 5) -> bool: def start(self) -> None: self.process.start() - logger.info(f"Started child process [{self.process.pid}]") def terminate(self) -> None: if self.process.exitcode is None: # Process is still running From dbc34c85e28186eb0d7c6aaac29e0595bea94ea2 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Sun, 14 Apr 2024 16:28:12 +0800 Subject: [PATCH 56/62] Fix occasional abnormal exits. --- uvicorn/supervisors/multiprocess.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 05bbf5f79..5ecd3ecd6 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -161,6 +161,9 @@ def run(self) -> None: def keep_subprocess_alive(self) -> None: for idx, process in enumerate(tuple(self.processes)): + if self.should_exit.is_set(): + return + if process.is_alive(): continue From a9d4b63bcde845e4251b00297fe2c81a6c2c82eb Mon Sep 17 00:00:00 2001 From: abersheeran Date: Sat, 25 May 2024 14:22:52 +0800 Subject: [PATCH 57/62] Update docs --- docs/deployment.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/deployment.md b/docs/deployment.md index 7069677ab..275166c6c 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -186,7 +186,7 @@ Uvicorn includes a `--workers` option that allows you to run multiple worker pro $ uvicorn main:app --workers 4 ``` -Unlike gunicorn, uvicorn does not use pre-fork, but uses [`spawn`](https://docs.python.org/zh-cn/3/library/multiprocessing.html#contexts-and-start-methods), which allows uvicorn's multiprocess manager to still work well on Windows. +Unlike gunicorn, uvicorn does not use pre-fork, but uses [`spawn`](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods), which allows uvicorn's multiprocess manager to still work well on Windows. The default process manager monitors the status of child processes and automatically restarts child processes that die unexpectedly. Not only that, it will also monitor the status of the child process through the pipeline. When the child process is accidentally stuck, the corresponding child process will be killed through an unstoppable system signal or interface. From b740e2d5374e0110eced51d5ebba2b244c1a21f9 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Sat, 25 May 2024 14:31:06 +0800 Subject: [PATCH 58/62] Change subprocess termination logic in Multiprocess class --- uvicorn/supervisors/multiprocess.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/uvicorn/supervisors/multiprocess.py b/uvicorn/supervisors/multiprocess.py index 5ecd3ecd6..c242fed9a 100644 --- a/uvicorn/supervisors/multiprocess.py +++ b/uvicorn/supervisors/multiprocess.py @@ -160,15 +160,19 @@ def run(self) -> None: logger.info(message, extra={"color_message": color_message}) def keep_subprocess_alive(self) -> None: - for idx, process in enumerate(tuple(self.processes)): - if self.should_exit.is_set(): - return + if self.should_exit.is_set(): + return # parent process is exiting, no need to keep subprocess alive + for idx, process in enumerate(tuple(self.processes)): if process.is_alive(): continue process.kill() # process is hung, kill it process.join() + + if self.should_exit.is_set(): + return + logger.info(f"Child process [{process.pid}] died") del self.processes[idx] process = Process(self.config, self.target, self.sockets) From 59aa0befe81bf240b1f150088e7c0105a1859783 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Sat, 25 May 2024 23:36:44 +0800 Subject: [PATCH 59/62] Update new_console_in_windows function to include pragma statement --- tests/supervisors/test_multiprocess.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index b7da5db4a..86f2de3c8 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -16,7 +16,7 @@ from uvicorn.supervisors.multiprocess import Process -def new_console_in_windows(test_function: Callable[[], Any]) -> Callable[[], Any]: +def new_console_in_windows(test_function: Callable[[], Any]) -> Callable[[], Any]: # pragma: no cover if os.name != "nt": return test_function From 5281eee8f5173eca1b0c3394f148b90437189e10 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Sat, 25 May 2024 23:43:35 +0800 Subject: [PATCH 60/62] Revert coverage to 98.35 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index d22f95b66..ffb4fafa2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -103,7 +103,7 @@ omit = [ [tool.coverage.report] precision = 2 -fail_under = 98.13 +fail_under = 98.35 show_missing = true skip_covered = true exclude_lines = [ From 15ffc6fbcd4d7b0014a9fc369626b640d98d5a46 Mon Sep 17 00:00:00 2001 From: abersheeran Date: Sat, 25 May 2024 23:48:17 +0800 Subject: [PATCH 61/62] chore: Remove pragma statements from test_multiprocess functions --- tests/supervisors/test_multiprocess.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/supervisors/test_multiprocess.py b/tests/supervisors/test_multiprocess.py index 86f2de3c8..5365907aa 100644 --- a/tests/supervisors/test_multiprocess.py +++ b/tests/supervisors/test_multiprocess.py @@ -61,7 +61,7 @@ def test_process_ping_pong_timeout() -> None: @new_console_in_windows -def test_multiprocess_run() -> None: # pragma: py-win32 +def test_multiprocess_run() -> None: """ A basic sanity check. @@ -76,7 +76,7 @@ def test_multiprocess_run() -> None: # pragma: py-win32 @new_console_in_windows -def test_multiprocess_health_check() -> None: # pragma: py-win32 +def test_multiprocess_health_check() -> None: """ Ensure that the health check works as expected. """ @@ -95,7 +95,7 @@ def test_multiprocess_health_check() -> None: # pragma: py-win32 @new_console_in_windows -def test_multiprocess_sigterm() -> None: # pragma: py-win32 +def test_multiprocess_sigterm() -> None: """ Ensure that the SIGTERM signal is handled as expected. """ @@ -109,7 +109,7 @@ def test_multiprocess_sigterm() -> None: # pragma: py-win32 @pytest.mark.skipif(not hasattr(signal, "SIGBREAK"), reason="platform unsupports SIGBREAK") @new_console_in_windows -def test_multiprocess_sigbreak() -> None: # pragma: py-win32 +def test_multiprocess_sigbreak() -> None: # pragma: py-not-win32 """ Ensure that the SIGBREAK signal is handled as expected. """ @@ -122,7 +122,7 @@ def test_multiprocess_sigbreak() -> None: # pragma: py-win32 @pytest.mark.skipif(not hasattr(signal, "SIGHUP"), reason="platform unsupports SIGHUP") -def test_multiprocess_sighup() -> None: # pragma: py-win32 +def test_multiprocess_sighup() -> None: """ Ensure that the SIGHUP signal is handled as expected. """ @@ -139,7 +139,7 @@ def test_multiprocess_sighup() -> None: # pragma: py-win32 @pytest.mark.skipif(not hasattr(signal, "SIGTTIN"), reason="platform unsupports SIGTTIN") -def test_multiprocess_sigttin() -> None: # pragma: py-win32 +def test_multiprocess_sigttin() -> None: """ Ensure that the SIGTTIN signal is handled as expected. """ @@ -154,7 +154,7 @@ def test_multiprocess_sigttin() -> None: # pragma: py-win32 @pytest.mark.skipif(not hasattr(signal, "SIGTTOU"), reason="platform unsupports SIGTTOU") -def test_multiprocess_sigttou() -> None: # pragma: py-win32 +def test_multiprocess_sigttou() -> None: """ Ensure that the SIGTTOU signal is handled as expected. """ From a1054e7751e13db8dac8a3a6177ce2f7301418df Mon Sep 17 00:00:00 2001 From: abersheeran Date: Sat, 25 May 2024 23:54:26 +0800 Subject: [PATCH 62/62] chore: Exclude test_multiprocess.py from coverage on Windows --- pyproject.toml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index ffb4fafa2..9fb3e3315 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -115,7 +115,11 @@ exclude_lines = [ ] [tool.coverage.coverage_conditional_plugin.omit] -"sys_platform == 'win32'" = ["uvicorn/loops/uvloop.py", "uvicorn/supervisors/multiprocess.py"] +"sys_platform == 'win32'" = [ + "uvicorn/loops/uvloop.py", + "uvicorn/supervisors/multiprocess.py", + "tests/supervisors/test_multiprocess.py", +] "sys_platform != 'win32'" = ["uvicorn/loops/asyncio.py"] [tool.coverage.coverage_conditional_plugin.rules]