From a0383611ba86e058a65c7b8d342a60b81133a3ac Mon Sep 17 00:00:00 2001 From: Egbert Bouman Date: Mon, 14 Oct 2024 14:43:46 +0200 Subject: [PATCH] Fix streaming endpoint --- .../libtorrent/restapi/downloads_endpoint.py | 190 ++++++++---------- .../test_unit/core/libtorrent/mocks.py | 13 ++ .../restapi/test_downloads_endpoint.py | 128 ++---------- 3 files changed, 116 insertions(+), 215 deletions(-) diff --git a/src/tribler/core/libtorrent/restapi/downloads_endpoint.py b/src/tribler/core/libtorrent/restapi/downloads_endpoint.py index 6253aa24502..3ce49619dfc 100644 --- a/src/tribler/core/libtorrent/restapi/downloads_endpoint.py +++ b/src/tribler/core/libtorrent/restapi/downloads_endpoint.py @@ -1,14 +1,15 @@ from __future__ import annotations -from asyncio import CancelledError, wait_for -from asyncio import TimeoutError as AsyncTimeoutError +import mimetypes +from asyncio import get_event_loop, shield, wait_for from binascii import hexlify, unhexlify -from contextlib import suppress from pathlib import Path, PurePosixPath from typing import TYPE_CHECKING, Any, Optional, TypedDict, cast import libtorrent as lt from aiohttp import web +from aiohttp.web_exceptions import HTTPPartialContent, HTTPRequestRangeNotSatisfiable +from aiohttp.web_response import StreamResponse from aiohttp_apispec import docs, json_schema from ipv8.messaging.anonymization.tunnel import PEER_FLAG_EXIT_BT from ipv8.REST.schema import schema @@ -18,7 +19,7 @@ from tribler.core.libtorrent.download_manager.download_config import DownloadConfig from tribler.core.libtorrent.download_manager.download_manager import DownloadManager from tribler.core.libtorrent.download_manager.download_state import DOWNLOAD, UPLOAD, DownloadStatus -from tribler.core.libtorrent.download_manager.stream import STREAM_PAUSE_TIME, Stream, StreamChunk +from tribler.core.libtorrent.download_manager.stream import Stream, StreamChunk from tribler.core.libtorrent.torrentdef import TorrentDef from tribler.core.restapi.rest_endpoint import ( HTTP_BAD_REQUEST, @@ -30,7 +31,7 @@ ) if TYPE_CHECKING: - from aiohttp.abc import Request + from aiohttp.abc import AbstractStreamWriter, BaseRequest, Request from tribler.core.database.store import MetadataStore from tribler.core.tunnel.community import TriblerTunnelCommunity @@ -242,7 +243,6 @@ def get_files_info_json_paged(download: Download, view_start: Path, view_size: i "availability": Float, "peers": String, "total_pieces": Integer, - "vod_mode": Boolean, "vod_prebuffering_progress": Float, "vod_prebuffering_progress_consec": Float, "error": String, @@ -328,7 +328,6 @@ async def get_downloads(self, request: Request) -> RESTResponse: # noqa: C901 "max_download_speed": DownloadManager.get_libtorrent_max_download_rate(self.download_manager.config), "destination": str(download.config.get_dest_dir()), "total_pieces": tdef.get_nr_pieces(), - "vod_mode": download.stream and download.stream.enabled, "error": repr(state.get_error()) if state.get_error() else "", "time_added": download.config.get_time_added() } @@ -481,38 +480,6 @@ async def delete_download(self, request: Request) -> RESTResponse: return RESTResponse({"removed": True, "infohash": hexlify(download.get_def().get_infohash()).decode()}) - async def vod_response(self, download: Download, parameters: dict, request: Request, - vod_mode: bool) -> RESTResponse: - """ - Return a response for the VOD status of a download. - """ - modified = False - if vod_mode: - file_index = parameters.get("fileindex") - if file_index is None: - return RESTResponse({"error": "fileindex is necessary to enable vod_mode"}, - status=HTTP_BAD_REQUEST) - if download.stream is None: - download.add_stream() - download.stream = cast(Stream, download.stream) - - if not download.stream.enabled or download.stream.fileindex != file_index: - await wait_for(download.stream.enable(file_index, request.http_range.start or 0), 10) - await download.stream.updateprios() - modified = True - - elif not vod_mode and download.stream is not None and download.stream.enabled: - download.stream.disable() - modified = True - return RESTResponse({"vod_prebuffering_progress": download.stream.prebuffprogress, - "vod_prebuffering_progress_consec": download.stream.prebuffprogress_consec, - "vod_header_progress": download.stream.headerprogress, - "vod_footer_progress": download.stream.footerprogress, - "vod_mode": download.stream.enabled, - "infohash": hexlify(download.get_def().get_infohash()).decode(), - "modified": modified, - }) - @docs( tags=["Libtorrent"], summary="Update a specific download.", @@ -536,7 +503,7 @@ async def vod_response(self, download: Download, parameters: dict, request: Requ "anon_hops": (Integer, "The anonymity of a download can be changed at runtime by passing the anon_hops " "parameter, however, this must be the only parameter in this request.") })) - async def update_download(self, request: Request) -> RESTResponse: # noqa: C901, PLR0911, PLR0912 + async def update_download(self, request: Request) -> RESTResponse: # noqa: C901, PLR0912 """ Update a specific download. """ @@ -546,13 +513,6 @@ async def update_download(self, request: Request) -> RESTResponse: # noqa: C901 return DownloadsEndpoint.return_404() parameters = await request.json() - vod_mode = parameters.get("vod_mode") - if vod_mode is not None: - if not isinstance(vod_mode, bool): - return RESTResponse({"error": "vod_mode must be bool flag"}, - status=HTTP_BAD_REQUEST) - return await self.vod_response(download, parameters, request, vod_mode) - if len(parameters) > 1 and "anon_hops" in parameters: return RESTResponse({"error": "anon_hops must be the only parameter in this request"}, status=HTTP_BAD_REQUEST) @@ -1031,7 +991,7 @@ def _safe_extended_peer_info(self, ext_peer_info: bytes) -> str: 206: {"description": "Contents of the stream"} } ) - async def stream(self, request: Request) -> web.StreamResponse: # noqa: C901 + async def stream(self, request: Request) -> web.StreamResponse: """ Stream the contents of a file that is being downloaded. """ @@ -1041,57 +1001,85 @@ async def stream(self, request: Request) -> web.StreamResponse: # noqa: C901 return DownloadsEndpoint.return_404() file_index = int(request.match_info["fileindex"]) + if not 0 <= file_index < len(download.get_def().get_files()): + return DownloadsEndpoint.return_404() + + return TorrentStreamResponse(download, file_index) + + +class TorrentStreamResponse(StreamResponse): + """A response object to stream the contents of a download.""" + + def __init__(self, download: Download, file_index: int, **kwargs) -> None: + """ + Create a new TorrentStreamResponse. + """ + super().__init__(**kwargs) + self._download = download + self._file_index = file_index + + async def prepare(self, request: BaseRequest) -> AbstractStreamWriter | None: + """ + Prepare the response. + """ + file_name, file_size = self._download.get_def().get_files_with_length()[self._file_index] + try: + start = request.http_range.start + stop = request.http_range.stop + except ValueError: + self.headers["Content-Range"] = f"bytes */{file_size}" + self.set_status(HTTPRequestRangeNotSatisfiable.status_code) + return await super().prepare(request) + + todo = file_size + if start is not None or stop is not None: + if start < 0: + start += file_size + start = max(start, 0) + stop = min(stop if stop is not None else file_size, file_size) + todo = stop - start + + if start >= file_size: + self.headers["Content-Range"] = f"bytes */{file_size}" + self.set_status(HTTPRequestRangeNotSatisfiable.status_code) + return await super().prepare(request) + + self.headers["Content-Range"] = f"bytes {start}-{start + todo - 1}/{file_size}" + self.set_status(HTTPPartialContent.status_code) + + content_type, _ = mimetypes.guess_type(str(file_name)) + self.content_type = content_type or "application/octet-stream" + self.content_length = todo + self.headers["Accept-Ranges"] = "bytes" + + if self._download.stream is None: + self._download.add_stream() + self._download.stream = cast(Stream, self._download.stream) + stream = self._download.stream + + start = start or 0 + if not stream.enabled or stream.fileindex != self._file_index: + await wait_for(stream.enable(self._file_index, start), 10) + await stream.updateprios() + + reader = StreamChunk(self._download.stream, start) + await reader.open() + try: + writer = await super().prepare(request) + assert writer is not None + + await reader.seek(start) + # Note that the chuck size is the same as the underlying torrent's piece length + data = await reader.read() + while data: + await writer.write(data[:todo]) + todo -= len(data) + if todo <= 0: + break + data = await reader.read() - http_range = request.http_range - start = http_range.start or 0 - - if download.stream is None: - download.add_stream() - download.stream = cast(Stream, download.stream) - await wait_for(download.stream.enable(file_index, None if start > 0 else 0), 10) - - stop = download.stream.filesize if http_range.stop is None else min(http_range.stop, download.stream.filesize) - - if not start < stop or not 0 <= start < download.stream.filesize or not 0 < stop <= download.stream.filesize: - return RESTResponse("Requested Range Not Satisfiable", status=416) - - response = web.StreamResponse(status=206, - reason="OK", - headers={"Accept-Ranges": "bytes", - "Content-Type": "application/octet-stream", - "Content-Length": f"{stop - start}", - "Content-Range": f"{start}-{stop}/{download.stream.filesize}"}) - response.force_close() - with suppress(CancelledError, ConnectionResetError): - async with StreamChunk(download.stream, start) as chunk: - await response.prepare(request) - bytes_todo = stop - start - bytes_done = 0 - self._logger.info("Got range request for %s-%s (%s bytes)", start, stop, bytes_todo) - while request.transport is not None and not request.transport.is_closing(): - if chunk.seekpos >= download.stream.filesize: - break - data = await chunk.read() - try: - if len(data) == 0: - break - if bytes_done + len(data) > bytes_todo: - # if we have more data than we need - endlen = bytes_todo - bytes_done - if endlen != 0: - await wait_for(response.write(data[:endlen]), STREAM_PAUSE_TIME) - - bytes_done += endlen - break - await wait_for(response.write(data), STREAM_PAUSE_TIME) - bytes_done += len(data) - - if chunk.resume(): - self._logger.debug("Stream %s-%s is resumed, starting sequential buffer", start, stop) - except AsyncTimeoutError: - # This means that stream writer has a full buffer, in practice means that - # the client keeps the conenction but sets the window size to 0. In this case - # there is no need to keep sequenial buffer if there are other chunks waiting for prios - if chunk.pause(): - self._logger.debug("Stream %s-%s is paused, stopping sequential buffer", start, stop) - return response + await writer.drain() + await writer.write_eof() + return writer + finally: + await shield(get_event_loop().run_in_executor(None, reader.close)) diff --git a/src/tribler/test_unit/core/libtorrent/mocks.py b/src/tribler/test_unit/core/libtorrent/mocks.py index 0cc6c10bff0..308338f1edc 100644 --- a/src/tribler/test_unit/core/libtorrent/mocks.py +++ b/src/tribler/test_unit/core/libtorrent/mocks.py @@ -56,6 +56,19 @@ b'e' ) + +TORRENT_WITH_VIDEO = ( + b'd' + b'4:infod' + b'6:lengthi10e' + b'4:name13:somevideo.mp4' + b'12:piece lengthi524288e' + b'6:pieces10:\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01' + b'e' + b'e' +) + + TORRENT_WITH_DIRS = libtorrent.torrent_info(libtorrent.bdecode(TORRENT_WITH_DIRS_CONTENT)) """ Torrent structure: diff --git a/src/tribler/test_unit/core/libtorrent/restapi/test_downloads_endpoint.py b/src/tribler/test_unit/core/libtorrent/restapi/test_downloads_endpoint.py index a66844d2f06..faa8b6f0527 100644 --- a/src/tribler/test_unit/core/libtorrent/restapi/test_downloads_endpoint.py +++ b/src/tribler/test_unit/core/libtorrent/restapi/test_downloads_endpoint.py @@ -3,7 +3,6 @@ """ from __future__ import annotations -from asyncio import ensure_future, sleep from binascii import hexlify from io import StringIO from pathlib import Path @@ -21,7 +20,7 @@ from tribler.core.libtorrent.torrentdef import TorrentDef, TorrentDefNoMetainfo from tribler.core.restapi.rest_endpoint import HTTP_BAD_REQUEST, HTTP_INTERNAL_SERVER_ERROR, HTTP_NOT_FOUND from tribler.test_unit.base_restapi import BodyCapture, MockRequest, response_to_bytes, response_to_json -from tribler.test_unit.core.libtorrent.mocks import TORRENT_WITH_DIRS, TORRENT_WITH_DIRS_CONTENT +from tribler.test_unit.core.libtorrent.mocks import TORRENT_WITH_DIRS, TORRENT_WITH_DIRS_CONTENT, TORRENT_WITH_VIDEO from tribler.tribler_config import TriblerConfigManager @@ -269,7 +268,9 @@ class StreamRequest(MockRequest): A MockRequest that mimics StreamRequests. """ - def __init__(self, query: dict, infohash: str, fileindex: int) -> None: + __slots__ = ['http_range'] + + def __init__(self, query: dict, infohash: str, fileindex: int, **kwargs) -> None: """ Create a new StreamRequest. """ @@ -277,6 +278,7 @@ def __init__(self, query: dict, infohash: str, fileindex: int) -> None: self._infohash = infohash self._fileindex = fileindex self._payload_writer = BodyCapture() + self.http_range = Mock(**kwargs) def get_transmitted(self) -> bytes: """ @@ -413,7 +415,6 @@ async def test_get_downloads_normal_download(self) -> None: self.assertEqual(0, response_body_json["downloads"][0]["total_pieces"]) self.assertEqual(0, response_body_json["downloads"][0]["all_time_upload"]) self.assertEqual([], response_body_json["downloads"][0]["trackers"]) - self.assertIsNone(response_body_json["downloads"][0]["vod_mode"]) self.assertEqual(1, response_body_json["checkpoints"]["total"]) self.assertEqual(1, response_body_json["checkpoints"]["loaded"]) self.assertTrue(response_body_json["checkpoints"]["all_loaded"]) @@ -634,106 +635,6 @@ async def test_update_download_no_download(self) -> None: self.assertEqual(HTTP_NOT_FOUND, response.status) self.assertEqual("this download does not exist", response_body_json["error"]) - async def test_update_download_bad_vod_mode(self) -> None: - """ - Test if a graceful error is returned when the vod mode parameter is garbage. - """ - self.download_manager.get_download = Mock(return_value=Mock()) - - response = await self.endpoint.update_download(UpdateDownloadRequest({"vod_mode": "bla"}, "01" * 20)) - response_body_json = await response_to_json(response) - - self.assertEqual(HTTP_BAD_REQUEST, response.status) - self.assertEqual("vod_mode must be bool flag", response_body_json["error"]) - - async def test_update_download_vod_mode_no_fileindex(self) -> None: - """ - Test if a download can be turned into a VOD download. - """ - download = self.create_mock_download() - self.download_manager.get_download = Mock(return_value=download) - - response = await self.endpoint.update_download(UpdateDownloadRequest({"vod_mode": True}, "01" * 20)) - response_body_json = await response_to_json(response) - - self.assertEqual(HTTP_BAD_REQUEST, response.status) - self.assertEqual("fileindex is necessary to enable vod_mode", response_body_json["error"]) - - async def test_update_download_vod_mode_enable(self) -> None: - """ - Test if a download can be turned into a VOD download. - """ - download = self.create_mock_download() - self.download_manager.get_download = Mock(return_value=download) - - with patch("tribler.core.libtorrent.download_manager.stream.Stream.enable", AsyncMock()): - response = await self.endpoint.update_download(UpdateDownloadRequest({"vod_mode": True, - "fileindex": 0}, - "01" * 20)) - response_body_json = await response_to_json(response) - download.stream.close() - - self.assertEqual(200, response.status) - self.assertEqual(0, response_body_json["vod_prebuffering_progress"]) - self.assertEqual(0, response_body_json["vod_prebuffering_progress_consec"]) - self.assertEqual(0, response_body_json["vod_header_progress"]) - self.assertEqual(0, response_body_json["vod_footer_progress"]) - self.assertFalse(response_body_json["vod_mode"]) - self.assertEqual("01" * 20, response_body_json["infohash"]) - self.assertTrue(response_body_json["modified"]) - - async def test_update_download_vod_mode_reenable(self) -> None: - """ - Test if a VOD download can be turned into still a VOD download. - """ - download = self.create_mock_download() - download.stream = Stream(download) - self.download_manager.get_download = Mock(return_value=download) - - with patch("tribler.core.libtorrent.download_manager.stream.Stream.enable", AsyncMock()): - response = await self.endpoint.update_download(UpdateDownloadRequest({"vod_mode": True, - "fileindex": 0}, - "01" * 20)) - response_body_json = await response_to_json(response) - download.stream.close() - - self.assertEqual(200, response.status) - self.assertEqual(0, response_body_json["vod_prebuffering_progress"]) - self.assertEqual(0, response_body_json["vod_prebuffering_progress_consec"]) - self.assertEqual(0, response_body_json["vod_header_progress"]) - self.assertEqual(0, response_body_json["vod_footer_progress"]) - self.assertFalse(response_body_json["vod_mode"]) - self.assertEqual("01" * 20, response_body_json["infohash"]) - self.assertTrue(response_body_json["modified"]) - - async def test_update_download_vod_mode_disable(self) -> None: - """ - Test if a VOD download can be turned into a normal download. - """ - download = self.create_mock_download() - download.handle = Mock(is_valid=Mock(return_value=False)) - download.stream = Stream(download) - download.stream.infohash = b"\x01" * 20 - download.stream.fileindex = 0 - self.download_manager.get_download = Mock(return_value=download) - - with patch("tribler.core.libtorrent.download_manager.stream.Stream.enable", AsyncMock()): - response = await self.endpoint.update_download(UpdateDownloadRequest({"vod_mode": False, - "fileindex": 0}, - "01" * 20)) - response_body_json = await response_to_json(response) - download.stream.close() - - self.assertEqual(200, response.status) - self.assertIsNone(download.stream.fileindex) - self.assertEqual(0, response_body_json["vod_prebuffering_progress"]) - self.assertEqual(0, response_body_json["vod_prebuffering_progress_consec"]) - self.assertEqual(0, response_body_json["vod_header_progress"]) - self.assertEqual(0, response_body_json["vod_footer_progress"]) - self.assertFalse(response_body_json["vod_mode"]) - self.assertEqual("01" * 20, response_body_json["infohash"]) - self.assertTrue(response_body_json["modified"]) - async def test_update_download_anon_hops_garbage(self) -> None: """ Test if anon hops can only exist as the only parameter. @@ -1172,15 +1073,16 @@ async def test_stream_unsatisfiable(self) -> None: download.stream.infohash = b"\x01" * 20 download.stream.fileindex = 0 download.stream.filesize = 0 + download.tdef = TorrentDef.load_from_memory(TORRENT_WITH_VIDEO) self.download_manager.get_download = Mock(return_value=download) + request = StreamRequest({}, "01" * 20, 0, start=100, stop=200) with patch("tribler.core.libtorrent.download_manager.stream.Stream.enable", AsyncMock()): - response = await self.endpoint.stream(StreamRequest({}, "01" * 20, 0)) - response_body_bytes = await response_to_bytes(response) - download.stream.close() + response = await self.endpoint.stream(request) + await response.prepare(request) self.assertEqual(416, response.status) - self.assertEqual(b"Requested Range Not Satisfiable", response_body_bytes) + self.assertEqual("Requested Range Not Satisfiable", response.reason) async def test_stream(self) -> None: """ @@ -1202,15 +1104,13 @@ async def test_stream(self) -> None: download.stream.prebuffsize = 0 download.stream.enable = AsyncMock() download.lt_status = Mock(pieces=[True]) + download.tdef = TorrentDef.load_from_memory(TORRENT_WITH_VIDEO) self.download_manager.get_download = Mock(return_value=download) - request = StreamRequest({}, "01" * 20, 0) + request = StreamRequest({}, "01" * 20, 0, start=0, stop=1) with patch("tribler.core.libtorrent.download_manager.stream.Stream.enable", AsyncMock()): - response_future = ensure_future(self.endpoint.stream(request)) - while not request.get_transmitted(): - await sleep(0) - request.transport.closing = True - response = await response_future + response = await self.endpoint.stream(request) + await response.prepare(request) self.assertEqual(206, response.status) self.assertEqual(b'"', request.get_transmitted())