Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Write JSON response using a producer
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Sep 21, 2021
1 parent 0ce9315 commit 16e8580
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 17 deletions.
44 changes: 30 additions & 14 deletions synapse/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import urllib
from http import HTTPStatus
from inspect import isawaitable
from io import BytesIO
from typing import (
Any,
Awaitable,
Expand All @@ -45,7 +44,7 @@
from twisted.python import failure
from twisted.web import resource
from twisted.web.server import NOT_DONE_YET, Request
from twisted.web.static import File, NoRangeStaticProducer
from twisted.web.static import File
from twisted.web.util import redirectTo

from synapse.api.errors import (
Expand All @@ -60,6 +59,7 @@
from synapse.logging.opentracing import trace_servlet
from synapse.util import json_encoder
from synapse.util.caches import intern_dict
from synapse.util.iterutils import chunk_seq

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -707,15 +707,35 @@ def respond_with_json_bytes(
if send_cors:
set_cors_headers(request)

# note that this is zero-copy (the bytesio shares a copy-on-write buffer with
# the original `bytes`).
bytes_io = BytesIO(json_bytes)

producer = NoRangeStaticProducer(request, bytes_io)
producer.start()
_write_json_bytes_to_request(request, json_bytes)
return NOT_DONE_YET


def _write_json_bytes_to_request(request: Request, json_bytes: bytes) -> None:
"""Writes the JSON bytes to the request using an appropriate producer.
Note: This should be used instead of `Request.write` to correctly handle
large response bodies.
"""

# The problem with dumping all of the json response into the `Request`
# object at once (via `Request.write`) is that doing so starts the timeout
# for the next request to be received: so if it takes longer than 60s to
# stream back the response to the client, the client never gets it.
#
# The correct solution is to use a Producer; then the timeout is only
# started once all of the content is sent over the TCP connection.

# To make sure we don't write the whole of the json at once we split it up
# into chunks.
chunk_size = 4096
bytes_generator = chunk_seq(json_bytes, chunk_size)

# We use a `_ByteProducer` here rather than `NoRangeStaticProducer` as the
# unit tests can't cope with being given a pull producer.
_ByteProducer(request, bytes_generator)


def set_cors_headers(request: Request):
"""Set the CORS headers so that javascript running in a web browsers can
use this API
Expand Down Expand Up @@ -814,7 +834,7 @@ def finish_request(request: Request):

async def _async_write_json_to_request_in_thread(
request: SynapseRequest,
json_encoder: Callable[[Any], str],
json_encoder: Callable[[Any], bytes],
json_object: Any,
):
"""Encodes the given JSON object on a thread and then writes it to the
Expand All @@ -830,8 +850,4 @@ async def _async_write_json_to_request_in_thread(

json_str = await defer_to_thread(request.reactor, json_encoder, json_object)

try:
request.write(json_str)
request.finish()
except RuntimeError as e:
logger.info("Connection disconnected before response was written: %r", e)
_write_json_bytes_to_request(request, json_str)
19 changes: 17 additions & 2 deletions synapse/util/iterutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,28 @@
Iterable,
Iterator,
Mapping,
Sequence,
Set,
Sized,
Tuple,
TypeVar,
)

from typing_extensions import Protocol

T = TypeVar("T")
S = TypeVar("S", bound="_SelfSlice")


class _SelfSlice(Sized, Protocol):
"""A helper protocol that matches types where taking a slice results in the
same type being returned.
This is more specific than `Sequence`, which allows another `Sequence` to be
returned.
"""

def __getitem__(self: S, i: slice) -> S:
...


def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
Expand All @@ -46,7 +61,7 @@ def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
return iter(lambda: tuple(islice(sourceiter, size)), ())


def chunk_seq(iseq: Sequence[T], maxlen: int) -> Iterable[Sequence[T]]:
def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]:
"""Split the given sequence into chunks of the given size
The last chunk may be shorter than the given size.
Expand Down
2 changes: 1 addition & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def _throw(*args):
def _callback(request, **kwargs):
d = Deferred()
d.addCallback(_throw)
self.reactor.callLater(1, d.callback, True)
self.reactor.callLater(0.5, d.callback, True)
return make_deferred_yieldable(d)

res = JsonResource(self.homeserver)
Expand Down

0 comments on commit 16e8580

Please sign in to comment.