diff --git a/src/tribler/core/libtorrent/restapi/downloads_endpoint.py b/src/tribler/core/libtorrent/restapi/downloads_endpoint.py index 6253aa2450..1d547c0d0b 100644 --- a/src/tribler/core/libtorrent/restapi/downloads_endpoint.py +++ b/src/tribler/core/libtorrent/restapi/downloads_endpoint.py @@ -1,14 +1,15 @@ from __future__ import annotations -from asyncio import CancelledError, wait_for -from asyncio import TimeoutError as AsyncTimeoutError +import mimetypes +from asyncio import get_event_loop, shield, wait_for from binascii import hexlify, unhexlify -from contextlib import suppress from pathlib import Path, PurePosixPath from typing import TYPE_CHECKING, Any, Optional, TypedDict, cast import libtorrent as lt from aiohttp import web +from aiohttp.web_exceptions import HTTPPartialContent, HTTPRequestRangeNotSatisfiable +from aiohttp.web_response import StreamResponse from aiohttp_apispec import docs, json_schema from ipv8.messaging.anonymization.tunnel import PEER_FLAG_EXIT_BT from ipv8.REST.schema import schema @@ -18,7 +19,7 @@ from tribler.core.libtorrent.download_manager.download_config import DownloadConfig from tribler.core.libtorrent.download_manager.download_manager import DownloadManager from tribler.core.libtorrent.download_manager.download_state import DOWNLOAD, UPLOAD, DownloadStatus -from tribler.core.libtorrent.download_manager.stream import STREAM_PAUSE_TIME, Stream, StreamChunk +from tribler.core.libtorrent.download_manager.stream import Stream, StreamChunk from tribler.core.libtorrent.torrentdef import TorrentDef from tribler.core.restapi.rest_endpoint import ( HTTP_BAD_REQUEST, @@ -30,7 +31,7 @@ ) if TYPE_CHECKING: - from aiohttp.abc import Request + from aiohttp.abc import AbstractStreamWriter, Request from tribler.core.database.store import MetadataStore from tribler.core.tunnel.community import TriblerTunnelCommunity @@ -242,7 +243,6 @@ def get_files_info_json_paged(download: Download, view_start: Path, view_size: i "availability": Float, "peers": String, "total_pieces": Integer, - "vod_mode": Boolean, "vod_prebuffering_progress": Float, "vod_prebuffering_progress_consec": Float, "error": String, @@ -328,7 +328,6 @@ async def get_downloads(self, request: Request) -> RESTResponse: # noqa: C901 "max_download_speed": DownloadManager.get_libtorrent_max_download_rate(self.download_manager.config), "destination": str(download.config.get_dest_dir()), "total_pieces": tdef.get_nr_pieces(), - "vod_mode": download.stream and download.stream.enabled, "error": repr(state.get_error()) if state.get_error() else "", "time_added": download.config.get_time_added() } @@ -481,38 +480,6 @@ async def delete_download(self, request: Request) -> RESTResponse: return RESTResponse({"removed": True, "infohash": hexlify(download.get_def().get_infohash()).decode()}) - async def vod_response(self, download: Download, parameters: dict, request: Request, - vod_mode: bool) -> RESTResponse: - """ - Return a response for the VOD status of a download. - """ - modified = False - if vod_mode: - file_index = parameters.get("fileindex") - if file_index is None: - return RESTResponse({"error": "fileindex is necessary to enable vod_mode"}, - status=HTTP_BAD_REQUEST) - if download.stream is None: - download.add_stream() - download.stream = cast(Stream, download.stream) - - if not download.stream.enabled or download.stream.fileindex != file_index: - await wait_for(download.stream.enable(file_index, request.http_range.start or 0), 10) - await download.stream.updateprios() - modified = True - - elif not vod_mode and download.stream is not None and download.stream.enabled: - download.stream.disable() - modified = True - return RESTResponse({"vod_prebuffering_progress": download.stream.prebuffprogress, - "vod_prebuffering_progress_consec": download.stream.prebuffprogress_consec, - "vod_header_progress": download.stream.headerprogress, - "vod_footer_progress": download.stream.footerprogress, - "vod_mode": download.stream.enabled, - "infohash": hexlify(download.get_def().get_infohash()).decode(), - "modified": modified, - }) - @docs( tags=["Libtorrent"], summary="Update a specific download.", @@ -536,7 +503,7 @@ async def vod_response(self, download: Download, parameters: dict, request: Requ "anon_hops": (Integer, "The anonymity of a download can be changed at runtime by passing the anon_hops " "parameter, however, this must be the only parameter in this request.") })) - async def update_download(self, request: Request) -> RESTResponse: # noqa: C901, PLR0911, PLR0912 + async def update_download(self, request: Request) -> RESTResponse: # noqa: C901, PLR0912 """ Update a specific download. """ @@ -546,13 +513,6 @@ async def update_download(self, request: Request) -> RESTResponse: # noqa: C901 return DownloadsEndpoint.return_404() parameters = await request.json() - vod_mode = parameters.get("vod_mode") - if vod_mode is not None: - if not isinstance(vod_mode, bool): - return RESTResponse({"error": "vod_mode must be bool flag"}, - status=HTTP_BAD_REQUEST) - return await self.vod_response(download, parameters, request, vod_mode) - if len(parameters) > 1 and "anon_hops" in parameters: return RESTResponse({"error": "anon_hops must be the only parameter in this request"}, status=HTTP_BAD_REQUEST) @@ -1031,7 +991,7 @@ def _safe_extended_peer_info(self, ext_peer_info: bytes) -> str: 206: {"description": "Contents of the stream"} } ) - async def stream(self, request: Request) -> web.StreamResponse: # noqa: C901 + async def stream(self, request: Request) -> web.StreamResponse: """ Stream the contents of a file that is being downloaded. """ @@ -1041,57 +1001,85 @@ async def stream(self, request: Request) -> web.StreamResponse: # noqa: C901 return DownloadsEndpoint.return_404() file_index = int(request.match_info["fileindex"]) + if not 0 <= file_index < len(download.get_def().get_files()): + return DownloadsEndpoint.return_404() + + return TorrentStreamResponse(download, file_index) + + +class TorrentStreamResponse(StreamResponse): + """A response object to stream the contents of a download.""" + + def __init__(self, download: Download, file_index: int, **kwargs) -> None: + """ + Create a new TorrentStreamResponse. + """ + super().__init__(**kwargs) + self._download = download + self._file_index = file_index + + async def prepare(self, request: Request) -> AbstractStreamWriter | None: + """ + Prepare the response. + """ + file_name, file_size = self._download.get_def().get_files_with_length()[self._file_index] + try: + start = request.http_range.start + stop = request.http_range.stop + except ValueError: + self.headers["Content-Range"] = f"bytes */{file_size}" + self.set_status(HTTPRequestRangeNotSatisfiable.status_code) + return await super().prepare(request) + + todo = file_size + if start is not None or stop is not None: + if start < 0: + start += file_size + start = max(start, 0) + stop = min(stop if stop is not None else file_size, file_size) + todo = stop - start + + if start >= file_size: + self.headers["Content-Range"] = f"bytes */{file_size}" + self.set_status(HTTPRequestRangeNotSatisfiable.status_code) + return await super().prepare(request) + + self.headers["Content-Range"] = f"bytes {start}-{start + todo - 1}/{file_size}" + self.set_status(HTTPPartialContent.status_code) + + content_type, _ = mimetypes.guess_type(str(file_name)) + self.content_type = content_type or "application/octet-stream" + self.content_length = todo + self.headers["Accept-Ranges"] = "bytes" + + if self._download.stream is None: + self._download.add_stream() + self._download.stream = cast(Stream, self._download.stream) + stream = self._download.stream + + start = start or 0 + if not stream.enabled or stream.fileindex != self._file_index: + await wait_for(stream.enable(self._file_index, start), 10) + await stream.updateprios() + + reader = StreamChunk(self._download.stream, start) + await reader.open() + try: + writer = await super().prepare(request) + assert writer is not None + + await reader.seek(start) + # Note that the chuck size is the same as the underlying torrent's piece length + data = await reader.read() + while data: + await writer.write(data) + todo -= len(data) + if todo <= 0: + break + data = await reader.read() - http_range = request.http_range - start = http_range.start or 0 - - if download.stream is None: - download.add_stream() - download.stream = cast(Stream, download.stream) - await wait_for(download.stream.enable(file_index, None if start > 0 else 0), 10) - - stop = download.stream.filesize if http_range.stop is None else min(http_range.stop, download.stream.filesize) - - if not start < stop or not 0 <= start < download.stream.filesize or not 0 < stop <= download.stream.filesize: - return RESTResponse("Requested Range Not Satisfiable", status=416) - - response = web.StreamResponse(status=206, - reason="OK", - headers={"Accept-Ranges": "bytes", - "Content-Type": "application/octet-stream", - "Content-Length": f"{stop - start}", - "Content-Range": f"{start}-{stop}/{download.stream.filesize}"}) - response.force_close() - with suppress(CancelledError, ConnectionResetError): - async with StreamChunk(download.stream, start) as chunk: - await response.prepare(request) - bytes_todo = stop - start - bytes_done = 0 - self._logger.info("Got range request for %s-%s (%s bytes)", start, stop, bytes_todo) - while request.transport is not None and not request.transport.is_closing(): - if chunk.seekpos >= download.stream.filesize: - break - data = await chunk.read() - try: - if len(data) == 0: - break - if bytes_done + len(data) > bytes_todo: - # if we have more data than we need - endlen = bytes_todo - bytes_done - if endlen != 0: - await wait_for(response.write(data[:endlen]), STREAM_PAUSE_TIME) - - bytes_done += endlen - break - await wait_for(response.write(data), STREAM_PAUSE_TIME) - bytes_done += len(data) - - if chunk.resume(): - self._logger.debug("Stream %s-%s is resumed, starting sequential buffer", start, stop) - except AsyncTimeoutError: - # This means that stream writer has a full buffer, in practice means that - # the client keeps the conenction but sets the window size to 0. In this case - # there is no need to keep sequenial buffer if there are other chunks waiting for prios - if chunk.pause(): - self._logger.debug("Stream %s-%s is paused, stopping sequential buffer", start, stop) - return response + await writer.drain() + await writer.write_eof() + return writer + finally: + await shield(get_event_loop().run_in_executor(None, reader.close)) diff --git a/src/tribler/test_unit/core/libtorrent/restapi/test_downloads_endpoint.py b/src/tribler/test_unit/core/libtorrent/restapi/test_downloads_endpoint.py index a66844d2f0..cce69b17bb 100644 --- a/src/tribler/test_unit/core/libtorrent/restapi/test_downloads_endpoint.py +++ b/src/tribler/test_unit/core/libtorrent/restapi/test_downloads_endpoint.py @@ -413,7 +413,6 @@ async def test_get_downloads_normal_download(self) -> None: self.assertEqual(0, response_body_json["downloads"][0]["total_pieces"]) self.assertEqual(0, response_body_json["downloads"][0]["all_time_upload"]) self.assertEqual([], response_body_json["downloads"][0]["trackers"]) - self.assertIsNone(response_body_json["downloads"][0]["vod_mode"]) self.assertEqual(1, response_body_json["checkpoints"]["total"]) self.assertEqual(1, response_body_json["checkpoints"]["loaded"]) self.assertTrue(response_body_json["checkpoints"]["all_loaded"])