From 16e85805719483dfd4a9b4b3972de84e173875d6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 20 Sep 2021 09:42:00 +0100 Subject: [PATCH] Write JSON response using a producer --- synapse/http/server.py | 44 ++++++++++++++++++++++++++------------- synapse/util/iterutils.py | 19 +++++++++++++++-- tests/test_server.py | 2 +- 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index ece71c78b208..0930f645fd12 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -21,7 +21,6 @@ import urllib from http import HTTPStatus from inspect import isawaitable -from io import BytesIO from typing import ( Any, Awaitable, @@ -45,7 +44,7 @@ from twisted.python import failure from twisted.web import resource from twisted.web.server import NOT_DONE_YET, Request -from twisted.web.static import File, NoRangeStaticProducer +from twisted.web.static import File from twisted.web.util import redirectTo from synapse.api.errors import ( @@ -60,6 +59,7 @@ from synapse.logging.opentracing import trace_servlet from synapse.util import json_encoder from synapse.util.caches import intern_dict +from synapse.util.iterutils import chunk_seq logger = logging.getLogger(__name__) @@ -707,15 +707,35 @@ def respond_with_json_bytes( if send_cors: set_cors_headers(request) - # note that this is zero-copy (the bytesio shares a copy-on-write buffer with - # the original `bytes`). - bytes_io = BytesIO(json_bytes) - - producer = NoRangeStaticProducer(request, bytes_io) - producer.start() + _write_json_bytes_to_request(request, json_bytes) return NOT_DONE_YET +def _write_json_bytes_to_request(request: Request, json_bytes: bytes) -> None: + """Writes the JSON bytes to the request using an appropriate producer. + + Note: This should be used instead of `Request.write` to correctly handle + large response bodies. + """ + + # The problem with dumping all of the json response into the `Request` + # object at once (via `Request.write`) is that doing so starts the timeout + # for the next request to be received: so if it takes longer than 60s to + # stream back the response to the client, the client never gets it. + # + # The correct solution is to use a Producer; then the timeout is only + # started once all of the content is sent over the TCP connection. + + # To make sure we don't write the whole of the json at once we split it up + # into chunks. + chunk_size = 4096 + bytes_generator = chunk_seq(json_bytes, chunk_size) + + # We use a `_ByteProducer` here rather than `NoRangeStaticProducer` as the + # unit tests can't cope with being given a pull producer. + _ByteProducer(request, bytes_generator) + + def set_cors_headers(request: Request): """Set the CORS headers so that javascript running in a web browsers can use this API @@ -814,7 +834,7 @@ def finish_request(request: Request): async def _async_write_json_to_request_in_thread( request: SynapseRequest, - json_encoder: Callable[[Any], str], + json_encoder: Callable[[Any], bytes], json_object: Any, ): """Encodes the given JSON object on a thread and then writes it to the @@ -830,8 +850,4 @@ async def _async_write_json_to_request_in_thread( json_str = await defer_to_thread(request.reactor, json_encoder, json_object) - try: - request.write(json_str) - request.finish() - except RuntimeError as e: - logger.info("Connection disconnected before response was written: %r", e) + _write_json_bytes_to_request(request, json_str) diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py index 8ac3eab2f54f..4938ddf70321 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py @@ -21,13 +21,28 @@ Iterable, Iterator, Mapping, - Sequence, Set, + Sized, Tuple, TypeVar, ) +from typing_extensions import Protocol + T = TypeVar("T") +S = TypeVar("S", bound="_SelfSlice") + + +class _SelfSlice(Sized, Protocol): + """A helper protocol that matches types where taking a slice results in the + same type being returned. + + This is more specific than `Sequence`, which allows another `Sequence` to be + returned. + """ + + def __getitem__(self: S, i: slice) -> S: + ... def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]: @@ -46,7 +61,7 @@ def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]: return iter(lambda: tuple(islice(sourceiter, size)), ()) -def chunk_seq(iseq: Sequence[T], maxlen: int) -> Iterable[Sequence[T]]: +def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]: """Split the given sequence into chunks of the given size The last chunk may be shorter than the given size. diff --git a/tests/test_server.py b/tests/test_server.py index 7f469defd215..f2ffbc895b88 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -105,7 +105,7 @@ def _throw(*args): def _callback(request, **kwargs): d = Deferred() d.addCallback(_throw) - self.reactor.callLater(1, d.callback, True) + self.reactor.callLater(0.5, d.callback, True) return make_deferred_yieldable(d) res = JsonResource(self.homeserver)