Skip to content

Commit

Permalink
Fix streaming endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
egbertbouman committed Oct 14, 2024
1 parent f32e57b commit 1f06c19
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 102 deletions.
190 changes: 89 additions & 101 deletions src/tribler/core/libtorrent/restapi/downloads_endpoint.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from __future__ import annotations

from asyncio import CancelledError, wait_for
from asyncio import TimeoutError as AsyncTimeoutError
import mimetypes
from asyncio import get_event_loop, shield, wait_for
from binascii import hexlify, unhexlify
from contextlib import suppress
from pathlib import Path, PurePosixPath
from typing import TYPE_CHECKING, Any, Optional, TypedDict, cast

import libtorrent as lt
from aiohttp import web
from aiohttp.web_exceptions import HTTPPartialContent, HTTPRequestRangeNotSatisfiable
from aiohttp.web_response import StreamResponse
from aiohttp_apispec import docs, json_schema
from ipv8.messaging.anonymization.tunnel import PEER_FLAG_EXIT_BT
from ipv8.REST.schema import schema
Expand All @@ -18,7 +19,7 @@
from tribler.core.libtorrent.download_manager.download_config import DownloadConfig
from tribler.core.libtorrent.download_manager.download_manager import DownloadManager
from tribler.core.libtorrent.download_manager.download_state import DOWNLOAD, UPLOAD, DownloadStatus
from tribler.core.libtorrent.download_manager.stream import STREAM_PAUSE_TIME, Stream, StreamChunk
from tribler.core.libtorrent.download_manager.stream import Stream, StreamChunk
from tribler.core.libtorrent.torrentdef import TorrentDef
from tribler.core.restapi.rest_endpoint import (
HTTP_BAD_REQUEST,
Expand All @@ -30,7 +31,7 @@
)

if TYPE_CHECKING:
from aiohttp.abc import Request
from aiohttp.abc import AbstractStreamWriter, Request

from tribler.core.database.store import MetadataStore
from tribler.core.tunnel.community import TriblerTunnelCommunity
Expand Down Expand Up @@ -242,7 +243,6 @@ def get_files_info_json_paged(download: Download, view_start: Path, view_size: i
"availability": Float,
"peers": String,
"total_pieces": Integer,
"vod_mode": Boolean,
"vod_prebuffering_progress": Float,
"vod_prebuffering_progress_consec": Float,
"error": String,
Expand Down Expand Up @@ -328,7 +328,6 @@ async def get_downloads(self, request: Request) -> RESTResponse: # noqa: C901
"max_download_speed": DownloadManager.get_libtorrent_max_download_rate(self.download_manager.config),
"destination": str(download.config.get_dest_dir()),
"total_pieces": tdef.get_nr_pieces(),
"vod_mode": download.stream and download.stream.enabled,
"error": repr(state.get_error()) if state.get_error() else "",
"time_added": download.config.get_time_added()
}
Expand Down Expand Up @@ -481,38 +480,6 @@ async def delete_download(self, request: Request) -> RESTResponse:

return RESTResponse({"removed": True, "infohash": hexlify(download.get_def().get_infohash()).decode()})

async def vod_response(self, download: Download, parameters: dict, request: Request,
vod_mode: bool) -> RESTResponse:
"""
Return a response for the VOD status of a download.
"""
modified = False
if vod_mode:
file_index = parameters.get("fileindex")
if file_index is None:
return RESTResponse({"error": "fileindex is necessary to enable vod_mode"},
status=HTTP_BAD_REQUEST)
if download.stream is None:
download.add_stream()
download.stream = cast(Stream, download.stream)

if not download.stream.enabled or download.stream.fileindex != file_index:
await wait_for(download.stream.enable(file_index, request.http_range.start or 0), 10)
await download.stream.updateprios()
modified = True

elif not vod_mode and download.stream is not None and download.stream.enabled:
download.stream.disable()
modified = True
return RESTResponse({"vod_prebuffering_progress": download.stream.prebuffprogress,
"vod_prebuffering_progress_consec": download.stream.prebuffprogress_consec,
"vod_header_progress": download.stream.headerprogress,
"vod_footer_progress": download.stream.footerprogress,
"vod_mode": download.stream.enabled,
"infohash": hexlify(download.get_def().get_infohash()).decode(),
"modified": modified,
})

@docs(
tags=["Libtorrent"],
summary="Update a specific download.",
Expand All @@ -536,7 +503,7 @@ async def vod_response(self, download: Download, parameters: dict, request: Requ
"anon_hops": (Integer, "The anonymity of a download can be changed at runtime by passing the anon_hops "
"parameter, however, this must be the only parameter in this request.")
}))
async def update_download(self, request: Request) -> RESTResponse: # noqa: C901, PLR0911, PLR0912
async def update_download(self, request: Request) -> RESTResponse: # noqa: C901, PLR0912
"""
Update a specific download.
"""
Expand All @@ -546,13 +513,6 @@ async def update_download(self, request: Request) -> RESTResponse: # noqa: C901
return DownloadsEndpoint.return_404()

parameters = await request.json()
vod_mode = parameters.get("vod_mode")
if vod_mode is not None:
if not isinstance(vod_mode, bool):
return RESTResponse({"error": "vod_mode must be bool flag"},
status=HTTP_BAD_REQUEST)
return await self.vod_response(download, parameters, request, vod_mode)

if len(parameters) > 1 and "anon_hops" in parameters:
return RESTResponse({"error": "anon_hops must be the only parameter in this request"},
status=HTTP_BAD_REQUEST)
Expand Down Expand Up @@ -1031,7 +991,7 @@ def _safe_extended_peer_info(self, ext_peer_info: bytes) -> str:
206: {"description": "Contents of the stream"}
}
)
async def stream(self, request: Request) -> web.StreamResponse: # noqa: C901
async def stream(self, request: Request) -> web.StreamResponse:
"""
Stream the contents of a file that is being downloaded.
"""
Expand All @@ -1041,57 +1001,85 @@ async def stream(self, request: Request) -> web.StreamResponse: # noqa: C901
return DownloadsEndpoint.return_404()

file_index = int(request.match_info["fileindex"])
if not 0 <= file_index < len(download.get_def().get_files()):
return DownloadsEndpoint.return_404()

return TorrentStreamResponse(download, file_index)


class TorrentStreamResponse(StreamResponse):
"""A response object to stream the contents of a download."""

def __init__(self, download: Download, file_index: int, **kwargs) -> None:
"""
Create a new TorrentStreamResponse.
"""
super().__init__(**kwargs)
self._download = download
self._file_index = file_index

async def prepare(self, request: Request) -> AbstractStreamWriter | None:
"""
Prepare the response.
"""
file_name, file_size = self._download.get_def().get_files_with_length()[self._file_index]
try:
start = request.http_range.start
stop = request.http_range.stop
except ValueError:
self.headers["Content-Range"] = f"bytes */{file_size}"
self.set_status(HTTPRequestRangeNotSatisfiable.status_code)
return await super().prepare(request)

todo = file_size
if start is not None or stop is not None:
if start < 0:
start += file_size
start = max(start, 0)
stop = min(stop if stop is not None else file_size, file_size)
todo = stop - start

if start >= file_size:
self.headers["Content-Range"] = f"bytes */{file_size}"
self.set_status(HTTPRequestRangeNotSatisfiable.status_code)
return await super().prepare(request)

self.headers["Content-Range"] = f"bytes {start}-{start + todo - 1}/{file_size}"
self.set_status(HTTPPartialContent.status_code)

content_type, _ = mimetypes.guess_type(str(file_name))
self.content_type = content_type or "application/octet-stream"
self.content_length = todo
self.headers["Accept-Ranges"] = "bytes"

if self._download.stream is None:
self._download.add_stream()
self._download.stream = cast(Stream, self._download.stream)
stream = self._download.stream

start = start or 0
if not stream.enabled or stream.fileindex != self._file_index:
await wait_for(stream.enable(self._file_index, start), 10)
await stream.updateprios()

reader = StreamChunk(self._download.stream, start)
await reader.open()
try:
writer = await super().prepare(request)
assert writer is not None

await reader.seek(start)
# Note that the chuck size is the same as the underlying torrent's piece length
data = await reader.read()
while data:
await writer.write(data)
todo -= len(data)
if todo <= 0:
break
data = await reader.read()

http_range = request.http_range
start = http_range.start or 0

if download.stream is None:
download.add_stream()
download.stream = cast(Stream, download.stream)
await wait_for(download.stream.enable(file_index, None if start > 0 else 0), 10)

stop = download.stream.filesize if http_range.stop is None else min(http_range.stop, download.stream.filesize)

if not start < stop or not 0 <= start < download.stream.filesize or not 0 < stop <= download.stream.filesize:
return RESTResponse("Requested Range Not Satisfiable", status=416)

response = web.StreamResponse(status=206,
reason="OK",
headers={"Accept-Ranges": "bytes",
"Content-Type": "application/octet-stream",
"Content-Length": f"{stop - start}",
"Content-Range": f"{start}-{stop}/{download.stream.filesize}"})
response.force_close()
with suppress(CancelledError, ConnectionResetError):
async with StreamChunk(download.stream, start) as chunk:
await response.prepare(request)
bytes_todo = stop - start
bytes_done = 0
self._logger.info("Got range request for %s-%s (%s bytes)", start, stop, bytes_todo)
while request.transport is not None and not request.transport.is_closing():
if chunk.seekpos >= download.stream.filesize:
break
data = await chunk.read()
try:
if len(data) == 0:
break
if bytes_done + len(data) > bytes_todo:
# if we have more data than we need
endlen = bytes_todo - bytes_done
if endlen != 0:
await wait_for(response.write(data[:endlen]), STREAM_PAUSE_TIME)

bytes_done += endlen
break
await wait_for(response.write(data), STREAM_PAUSE_TIME)
bytes_done += len(data)

if chunk.resume():
self._logger.debug("Stream %s-%s is resumed, starting sequential buffer", start, stop)
except AsyncTimeoutError:
# This means that stream writer has a full buffer, in practice means that
# the client keeps the conenction but sets the window size to 0. In this case
# there is no need to keep sequenial buffer if there are other chunks waiting for prios
if chunk.pause():
self._logger.debug("Stream %s-%s is paused, stopping sequential buffer", start, stop)
return response
await writer.drain()
await writer.write_eof()
return writer
finally:
await shield(get_event_loop().run_in_executor(None, reader.close))
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ async def test_get_downloads_normal_download(self) -> None:
self.assertEqual(0, response_body_json["downloads"][0]["total_pieces"])
self.assertEqual(0, response_body_json["downloads"][0]["all_time_upload"])
self.assertEqual([], response_body_json["downloads"][0]["trackers"])
self.assertIsNone(response_body_json["downloads"][0]["vod_mode"])
self.assertEqual(1, response_body_json["checkpoints"]["total"])
self.assertEqual(1, response_body_json["checkpoints"]["loaded"])
self.assertTrue(response_body_json["checkpoints"]["all_loaded"])
Expand Down

0 comments on commit 1f06c19

Please sign in to comment.