From 73a6e89fad1296c1891fc60e01eaadfc56bb5705 Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Thu, 3 Feb 2022 13:46:59 +0100 Subject: [PATCH 1/2] Revert "fix: move all data handle to protocol & ensure connection is closed (#1332)" This reverts commit fc6e056e628923b0ae41a39f02b8fee8f1fc932e. --- uvicorn/_handlers/http.py | 23 +++++++++++++++-------- uvicorn/protocols/http/h11_impl.py | 4 ---- uvicorn/protocols/http/httptools_impl.py | 2 -- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/uvicorn/_handlers/http.py b/uvicorn/_handlers/http.py index 0d0fc9bab..f70ab708c 100644 --- a/uvicorn/_handlers/http.py +++ b/uvicorn/_handlers/http.py @@ -7,9 +7,6 @@ from uvicorn.server import ServerState -MAX_RECV = 2 ** 16 - - async def handle_http( reader: asyncio.StreamReader, writer: asyncio.StreamWriter, @@ -36,13 +33,13 @@ async def handle_http( # Use a future to coordinate between the protocol and this handler task. # https://docs.python.org/3/library/asyncio-protocol.html#connecting-existing-sockets loop = asyncio.get_event_loop() - reader_read = asyncio.create_task(reader.read(MAX_RECV)) + connection_lost = loop.create_future() # Switch the protocol from the stream reader to our own HTTP protocol class. protocol = config.http_protocol_class( # type: ignore[call-arg, operator] config=config, server_state=server_state, - on_connection_lost=reader_read.cancel, + on_connection_lost=lambda: connection_lost.set_result(True), ) transport = writer.transport transport.set_protocol(protocol) @@ -59,7 +56,7 @@ async def handle_http( @task.add_done_callback def retrieve_exception(task: asyncio.Task) -> None: - exc = task.exception() if not task.cancelled() else None + exc = task.exception() if exc is None: return @@ -77,5 +74,15 @@ def retrieve_exception(task: asyncio.Task) -> None: # Kick off the HTTP protocol. protocol.connection_made(transport) - data = await reader_read - protocol.data_received(data) + + # Pass any data already in the read buffer. + # The assumption here is that we haven't read any data off the stream reader + # yet: all data that the client might have already sent since the connection has + # been established is in the `_buffer`. + data = reader._buffer # type: ignore + if data: + protocol.data_received(data) + + # Let the transport run in the background. When closed, this future will complete + # and we'll exit here. + await connection_lost diff --git a/uvicorn/protocols/http/h11_impl.py b/uvicorn/protocols/http/h11_impl.py index b4f4e1ac9..76af34393 100644 --- a/uvicorn/protocols/http/h11_impl.py +++ b/uvicorn/protocols/http/h11_impl.py @@ -222,10 +222,6 @@ def handle_events(self): continue self.cycle.more_body = False self.cycle.message_event.set() - elif event_type is h11.ConnectionClosed: - break - if self.conn.our_state is h11.MUST_CLOSE and not self.transport.is_closing(): - self.transport.close() def handle_upgrade(self, event): upgrade_value = None diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index bac061771..34f66c97c 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -136,8 +136,6 @@ def data_received(self, data): self.transport.close() except httptools.HttpParserUpgrade: self.handle_upgrade() - if data == b"" and not self.transport.is_closing(): - self.transport.close() def handle_upgrade(self): upgrade_value = None From 9170b0647fc1e46372b64d07769a5c2740123add Mon Sep 17 00:00:00 2001 From: Marcelo Trylesinski Date: Thu, 3 Feb 2022 13:54:27 +0100 Subject: [PATCH 2/2] Revert stream interface --- setup.cfg | 1 - uvicorn/_handlers/__init__.py | 0 uvicorn/_handlers/http.py | 88 ------------------- uvicorn/protocols/http/h11_impl.py | 13 +-- uvicorn/protocols/http/httptools_impl.py | 13 +-- .../protocols/websockets/websockets_impl.py | 8 +- uvicorn/protocols/websockets/wsproto_impl.py | 8 +- uvicorn/server.py | 37 +++----- 8 files changed, 20 insertions(+), 148 deletions(-) delete mode 100644 uvicorn/_handlers/__init__.py delete mode 100644 uvicorn/_handlers/http.py diff --git a/setup.cfg b/setup.cfg index 58832b22d..df6ccb49d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -30,7 +30,6 @@ files = uvicorn/supervisors/watchgodreload.py, uvicorn/logging.py, uvicorn/middleware/asgi2.py, - uvicorn/_handlers, uvicorn/server.py, uvicorn/__init__.py, uvicorn/__main__.py, diff --git a/uvicorn/_handlers/__init__.py b/uvicorn/_handlers/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/uvicorn/_handlers/http.py b/uvicorn/_handlers/http.py deleted file mode 100644 index f70ab708c..000000000 --- a/uvicorn/_handlers/http.py +++ /dev/null @@ -1,88 +0,0 @@ -import asyncio -from typing import TYPE_CHECKING - -from uvicorn.config import Config - -if TYPE_CHECKING: # pragma: no cover - from uvicorn.server import ServerState - - -async def handle_http( - reader: asyncio.StreamReader, - writer: asyncio.StreamWriter, - server_state: "ServerState", - config: Config, -) -> None: - # Run transport/protocol session from streams. - # - # This is a bit fiddly, so let me explain why we do this in the first place. - # - # This was introduced to switch to the asyncio streams API while retaining our - # existing protocols-based code. - # - # The aim was to: - # * Make it easier to support alternative async libaries (all of which expose - # a streams API, rather than anything similar to asyncio's transports and - # protocols) while keeping the change footprint (and risk) at a minimum. - # * Keep a "fast track" for asyncio that's as efficient as possible, by reusing - # our asyncio-optimized protocols-based implementation. - # - # See: https://github.com/encode/uvicorn/issues/169 - # See: https://github.com/encode/uvicorn/pull/869 - - # Use a future to coordinate between the protocol and this handler task. - # https://docs.python.org/3/library/asyncio-protocol.html#connecting-existing-sockets - loop = asyncio.get_event_loop() - connection_lost = loop.create_future() - - # Switch the protocol from the stream reader to our own HTTP protocol class. - protocol = config.http_protocol_class( # type: ignore[call-arg, operator] - config=config, - server_state=server_state, - on_connection_lost=lambda: connection_lost.set_result(True), - ) - transport = writer.transport - transport.set_protocol(protocol) - - # Asyncio stream servers don't `await` handler tasks (like the one we're currently - # running), so we must make sure exceptions that occur in protocols but outside the - # ASGI cycle (e.g. bugs) are properly retrieved and logged. - # Vanilla asyncio handles exceptions properly out-of-the-box, but uvloop doesn't. - # So we need to attach a callback to handle exceptions ourselves for that case. - # (It's not easy to know which loop we're effectively running on, so we attach the - # callback in all cases. In practice it won't be called on vanilla asyncio.) - task = asyncio.current_task() - assert task is not None - - @task.add_done_callback - def retrieve_exception(task: asyncio.Task) -> None: - exc = task.exception() - - if exc is None: - return - - loop.call_exception_handler( - { - "message": "Fatal error in server handler", - "exception": exc, - "transport": transport, - "protocol": protocol, - } - ) - # Hang up the connection so the client doesn't wait forever. - transport.close() - - # Kick off the HTTP protocol. - protocol.connection_made(transport) - - # Pass any data already in the read buffer. - # The assumption here is that we haven't read any data off the stream reader - # yet: all data that the client might have already sent since the connection has - # been established is in the `_buffer`. - data = reader._buffer # type: ignore - if data: - protocol.data_received(data) - - # Let the transport run in the background. When closed, this future will complete - # and we'll exit here. - await connection_lost diff --git a/uvicorn/protocols/http/h11_impl.py b/uvicorn/protocols/http/h11_impl.py index 76af34393..78e5ffdc1 100644 --- a/uvicorn/protocols/http/h11_impl.py +++ b/uvicorn/protocols/http/h11_impl.py @@ -1,7 +1,6 @@ import asyncio import http import logging -from typing import Callable from urllib.parse import unquote import h11 @@ -35,15 +34,12 @@ def _get_status_phrase(status_code): class H11Protocol(asyncio.Protocol): - def __init__( - self, config, server_state, on_connection_lost: Callable = None, _loop=None - ): + def __init__(self, config, server_state, _loop=None): if not config.loaded: config.load() self.config = config self.app = config.loaded_app - self.on_connection_lost = on_connection_lost self.loop = _loop or asyncio.get_event_loop() self.logger = logging.getLogger("uvicorn.error") self.access_logger = logging.getLogger("uvicorn.access") @@ -113,9 +109,6 @@ def connection_lost(self, exc): if exc is None: self.transport.close() - if self.on_connection_lost is not None: - self.on_connection_lost() - def eof_received(self): pass @@ -266,9 +259,7 @@ def handle_upgrade(self, event): output += [name, b": ", value, b"\r\n"] output.append(b"\r\n") protocol = self.ws_protocol_class( - config=self.config, - server_state=self.server_state, - on_connection_lost=self.on_connection_lost, + config=self.config, server_state=self.server_state ) protocol.connection_made(self.transport) protocol.data_received(b"".join(output)) diff --git a/uvicorn/protocols/http/httptools_impl.py b/uvicorn/protocols/http/httptools_impl.py index 34f66c97c..e6183bbcb 100644 --- a/uvicorn/protocols/http/httptools_impl.py +++ b/uvicorn/protocols/http/httptools_impl.py @@ -4,7 +4,6 @@ import re import urllib from collections import deque -from typing import Callable import httptools @@ -41,15 +40,12 @@ def _get_status_line(status_code): class HttpToolsProtocol(asyncio.Protocol): - def __init__( - self, config, server_state, on_connection_lost: Callable = None, _loop=None - ): + def __init__(self, config, server_state, _loop=None): if not config.loaded: config.load() self.config = config self.app = config.loaded_app - self.on_connection_lost = on_connection_lost self.loop = _loop or asyncio.get_event_loop() self.logger = logging.getLogger("uvicorn.error") self.access_logger = logging.getLogger("uvicorn.access") @@ -114,9 +110,6 @@ def connection_lost(self, exc): if exc is None: self.transport.close() - if self.on_connection_lost is not None: - self.on_connection_lost() - def eof_received(self): pass @@ -180,9 +173,7 @@ def handle_upgrade(self): output += [name, b": ", value, b"\r\n"] output.append(b"\r\n") protocol = self.ws_protocol_class( - config=self.config, - server_state=self.server_state, - on_connection_lost=self.on_connection_lost, + config=self.config, server_state=self.server_state ) protocol.connection_made(self.transport) protocol.data_received(b"".join(output)) diff --git a/uvicorn/protocols/websockets/websockets_impl.py b/uvicorn/protocols/websockets/websockets_impl.py index 4a19fc384..a918c6f9d 100644 --- a/uvicorn/protocols/websockets/websockets_impl.py +++ b/uvicorn/protocols/websockets/websockets_impl.py @@ -1,7 +1,6 @@ import asyncio import http import logging -from typing import Callable from urllib.parse import unquote import websockets @@ -25,15 +24,12 @@ def is_serving(self): class WebSocketProtocol(websockets.WebSocketServerProtocol): - def __init__( - self, config, server_state, on_connection_lost: Callable = None, _loop=None - ): + def __init__(self, config, server_state, _loop=None): if not config.loaded: config.load() self.config = config self.app = config.loaded_app - self.on_connection_lost = on_connection_lost self.loop = _loop or asyncio.get_event_loop() self.root_path = config.root_path @@ -96,8 +92,6 @@ def connection_lost(self, exc): self.handshake_completed_event.set() super().connection_lost(exc) - if self.on_connection_lost is not None: - self.on_connection_lost() if exc is None: self.transport.close() diff --git a/uvicorn/protocols/websockets/wsproto_impl.py b/uvicorn/protocols/websockets/wsproto_impl.py index 73529698a..5093fb32a 100644 --- a/uvicorn/protocols/websockets/wsproto_impl.py +++ b/uvicorn/protocols/websockets/wsproto_impl.py @@ -1,6 +1,5 @@ import asyncio import logging -from typing import Callable from urllib.parse import unquote import h11 @@ -20,15 +19,12 @@ class WSProtocol(asyncio.Protocol): - def __init__( - self, config, server_state, on_connection_lost: Callable = None, _loop=None - ): + def __init__(self, config, server_state, _loop=None): if not config.loaded: config.load() self.config = config self.app = config.loaded_app - self.on_connection_lost = on_connection_lost self.loop = _loop or asyncio.get_event_loop() self.logger = logging.getLogger("uvicorn.error") self.root_path = config.root_path @@ -81,8 +77,6 @@ def connection_lost(self, exc): prefix = "%s:%d - " % tuple(self.client) if self.client else "" self.logger.log(TRACE_LOG_LEVEL, "%sWebSocket connection lost", prefix) - if self.on_connection_lost is not None: - self.on_connection_lost() if exc is None: self.transport.close() diff --git a/uvicorn/server.py b/uvicorn/server.py index 8fe6d76c8..b909b764d 100644 --- a/uvicorn/server.py +++ b/uvicorn/server.py @@ -1,4 +1,5 @@ import asyncio +import functools import logging import os import platform @@ -9,11 +10,10 @@ import time from email.utils import formatdate from types import FrameType -from typing import TYPE_CHECKING, Any, List, Optional, Set, Tuple, Union +from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union import click -from uvicorn._handlers.http import handle_http from uvicorn.config import Config if TYPE_CHECKING: @@ -24,13 +24,6 @@ Protocols = Union[H11Protocol, HttpToolsProtocol, WSProtocol, WebSocketProtocol] -if sys.platform != "win32": - from asyncio import start_unix_server as _start_unix_server -else: - - async def _start_unix_server(*args: Any, **kwargs: Any) -> Any: - raise NotImplementedError("Cannot start a unix server on win32") - HANDLED_SIGNALS = ( signal.SIGINT, # Unix signal 2. Sent by Ctrl+C. @@ -99,12 +92,10 @@ async def startup(self, sockets: list = None) -> None: config = self.config - async def handler( - reader: asyncio.StreamReader, writer: asyncio.StreamWriter - ) -> None: - await handle_http( - reader, writer, server_state=self.server_state, config=config - ) + create_protocol = functools.partial( + config.http_protocol_class, config=config, server_state=self.server_state + ) + loop = asyncio.get_running_loop() if sockets is not None: # Explicitly passed a list of open sockets. @@ -122,8 +113,8 @@ def _share_socket(sock: socket.SocketType) -> socket.SocketType: for sock in sockets: if config.workers > 1 and platform.system() == "Windows": sock = _share_socket(sock) - server = await asyncio.start_server( - handler, sock=sock, ssl=config.ssl, backlog=config.backlog + server = await loop.create_server( + create_protocol, sock=sock, ssl=config.ssl, backlog=config.backlog ) self.servers.append(server) listeners = sockets @@ -131,8 +122,8 @@ def _share_socket(sock: socket.SocketType) -> socket.SocketType: elif config.fd is not None: # Use an existing socket, from a file descriptor. sock = socket.fromfd(config.fd, socket.AF_UNIX, socket.SOCK_STREAM) - server = await asyncio.start_server( - handler, sock=sock, ssl=config.ssl, backlog=config.backlog + server = await loop.create_server( + create_protocol, sock=sock, ssl=config.ssl, backlog=config.backlog ) assert server.sockets is not None # mypy listeners = server.sockets @@ -143,8 +134,8 @@ def _share_socket(sock: socket.SocketType) -> socket.SocketType: uds_perms = 0o666 if os.path.exists(config.uds): uds_perms = os.stat(config.uds).st_mode - server = await _start_unix_server( - handler, path=config.uds, ssl=config.ssl, backlog=config.backlog + server = await loop.create_server( # type: ignore[call-overload] + create_protocol, path=config.uds, ssl=config.ssl, backlog=config.backlog ) os.chmod(config.uds, uds_perms) assert server.sockets is not None # mypy @@ -154,8 +145,8 @@ def _share_socket(sock: socket.SocketType) -> socket.SocketType: else: # Standard case. Create a socket from a host/port pair. try: - server = await asyncio.start_server( - handler, + server = await loop.create_server( + create_protocol, host=config.host, port=config.port, ssl=config.ssl,