Skip to content

Commit

Permalink
Merge pull request #684 from Stremio/feat/convert-stream-structs
Browse files Browse the repository at this point in the history
Feat/Convert stream structs prepartion
  • Loading branch information
elpiel authored Aug 19, 2024
2 parents 30e92be + 82dca80 commit 0e64b56
Show file tree
Hide file tree
Showing 16 changed files with 545 additions and 165 deletions.
14 changes: 7 additions & 7 deletions src/deep_links/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl From<(&Stream, Option<&Url>, &Settings)> for ExternalPlayerLink {
"choose" => Some(OpenPlayerLink {
android: Some(format!(
"{}#Intent;type=video/any;scheme=https;end",
http_regex.replace(url, "intent://"),
http_regex.replace(url.as_str(), "intent://"),
)),
..Default::default()
}),
Expand All @@ -96,27 +96,27 @@ impl From<(&Stream, Option<&Url>, &Settings)> for ExternalPlayerLink {
visionos: Some(format!("vlc-x-callback://x-callback-url/stream?url={url}")),
android: Some(format!(
"{}#Intent;package=org.videolan.vlc;type=video;scheme=https;end",
http_regex.replace(url, "intent://"),
http_regex.replace(url.as_str(), "intent://"),
)),
..Default::default()
}),
"mxplayer" => Some(OpenPlayerLink {
android: Some(format!(
"{}#Intent;package=com.mxtech.videoplayer.ad;type=video;scheme=https;end",
http_regex.replace(url, "intent://"),
http_regex.replace(url.as_str(), "intent://"),
)),
..Default::default()
}),
"justplayer" => Some(OpenPlayerLink {
android: Some(format!(
"{}#Intent;package=com.brouken.player;type=video;scheme=https;end",
http_regex.replace(url, "intent://"),
http_regex.replace(url.as_str(), "intent://"),
)),
..Default::default()
}),
"outplayer" => Some(OpenPlayerLink {
ios: Some(format!("{}", http_regex.replace(url, "outplayer://"))),
visionos: Some(format!("{}", http_regex.replace(url, "outplayer://"))),
ios: Some(http_regex.replace(url.as_str(), "outplayer://").to_string()),
visionos: Some(http_regex.replace(url.as_str(), "outplayer://").to_string()),
..Default::default()
}),
"infuse" => Some(OpenPlayerLink {
Expand Down Expand Up @@ -166,7 +166,7 @@ impl From<(&Stream, Option<&Url>, &Settings)> for ExternalPlayerLink {
};
ExternalPlayerLink {
download,
streaming,
streaming: streaming.as_ref().map(ToString::to_string),
playlist,
file_name,
open_player,
Expand Down
184 changes: 82 additions & 102 deletions src/models/streaming_server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
use enclose::enclose;
use futures::{FutureExt, TryFutureExt};
use http::request::Request;
use magnet_url::{Magnet, MagnetError};
use serde::{Deserialize, Serialize};
use sha1::{Digest, Sha1};
use url::Url;

use crate::constants::META_RESOURCE_NAME;
use crate::models::common::{eq_update, Loadable};
use crate::models::ctx::{Ctx, CtxError};
Expand All @@ -10,16 +18,10 @@ use crate::types::api::SuccessResponse;
use crate::types::empty_string_as_null;
use crate::types::profile::{AuthKey, Profile};
use crate::types::streaming_server::{
DeviceInfo, GetHTTPSResponse, NetworkInfo, Settings, SettingsResponse, Statistics,
CreateMagnetRequest, CreateTorrentBlobRequest, DeviceInfo, GetHTTPSResponse, NetworkInfo,
Settings, SettingsResponse, Statistics, StatisticsRequest, TorrentStatisticsRequest,
};
use enclose::enclose;
use futures::{FutureExt, TryFutureExt};
use http::request::Request;
use magnet_url::{Magnet, MagnetError};
use serde::{Deserialize, Serialize};
use sha1::{Digest, Sha1};
use std::iter;
use url::Url;
use crate::types::torrent::InfoHash;

#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
Expand All @@ -29,13 +31,6 @@ pub struct PlaybackDevice {
pub r#type: String,
}

#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct StatisticsRequest {
pub info_hash: String,
pub file_idx: u16,
}

#[derive(Clone, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Selected {
Expand All @@ -53,7 +48,7 @@ pub struct StreamingServer {
pub playback_devices: Loadable<Vec<PlaybackDevice>, EnvError>,
pub network_info: Loadable<NetworkInfo, EnvError>,
pub device_info: Loadable<DeviceInfo, EnvError>,
pub torrent: Option<(String, Loadable<ResourcePath, EnvError>)>,
pub torrent: Option<(InfoHash, Loadable<ResourcePath, EnvError>)>,
/// [`Loadable::Loading`] is used only on the first statistics request.
pub statistics: Option<Loadable<Statistics, EnvError>>,
}
Expand Down Expand Up @@ -124,12 +119,10 @@ impl<E: Env + 'static> UpdateWithCtx<E> for StreamingServer {
CreateTorrentArgs::Magnet(magnet),
))) => match parse_magnet(magnet) {
Ok((info_hash, announce)) => {
let torrent_effects = eq_update(
&mut self.torrent,
Some((info_hash.to_owned(), Loadable::Loading)),
);
let torrent_effects =
eq_update(&mut self.torrent, Some((info_hash, Loadable::Loading)));
Effects::many(vec![
create_magnet::<E>(&self.selected.transport_url, &info_hash, &announce),
create_magnet::<E>(&self.selected.transport_url, info_hash, &announce),
Effect::Msg(Box::new(Msg::Event(Event::MagnetParsed {
magnet: magnet.to_owned(),
}))),
Expand Down Expand Up @@ -160,7 +153,11 @@ impl<E: Env + 'static> UpdateWithCtx<E> for StreamingServer {
Some((info_hash.to_owned(), Loadable::Loading)),
);
Effects::many(vec![
create_torrent::<E>(&self.selected.transport_url, &info_hash, torrent),
create_torrent_request::<E>(
&self.selected.transport_url,
info_hash,
torrent,
),
Effect::Msg(Box::new(Msg::Event(Event::TorrentParsed {
torrent: torrent.to_owned(),
}))),
Expand Down Expand Up @@ -520,55 +517,24 @@ fn set_settings<E: Env + 'static>(url: &Url, settings: &Settings) -> Effect {
.into()
}

fn create_magnet<E: Env + 'static>(url: &Url, info_hash: &str, announce: &[String]) -> Effect {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct PeerSearch {
sources: Vec<String>,
min: u32,
max: u32,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct Torrent {
info_hash: String,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct Body {
torrent: Torrent,
peer_search: Option<PeerSearch>,
}
let info_hash = info_hash.to_owned();
let endpoint = url
.join(&format!("{info_hash}/"))
.expect("url builder failed")
.join("create")
.expect("url builder failed");
let body = Body {
torrent: Torrent {
info_hash: info_hash.to_owned(),
},
peer_search: if !announce.is_empty() {
Some(PeerSearch {
sources: iter::once(&format!("dht:{info_hash}"))
.chain(announce.iter())
.cloned()
.collect(),
min: 40,
max: 200,
})
} else {
None
},
pub async fn create_magnet_request<E: Env + 'static>(
url: Url,
info_hash: InfoHash,
announce: Vec<String>,
) -> Result<serde_json::Value, EnvError> {
let request = CreateMagnetRequest {
server_url: url.to_owned(),
info_hash,
announce: announce.to_vec(),
};
let request = Request::post(endpoint.as_str())
.header(http::header::CONTENT_TYPE, "application/json")
.body(body)
.expect("request builder failed");

E::fetch::<_, serde_json::Value>(request.into()).await
}

fn create_magnet<E: Env + 'static>(url: &Url, info_hash: InfoHash, announce: &[String]) -> Effect {
EffectFuture::Concurrent(
E::fetch::<_, serde_json::Value>(request)
.map_ok(|_| ())
create_magnet_request::<E>(url.to_owned(), info_hash, announce.to_vec())
.map_ok(|_response| ())
.map(enclose!((info_hash) move |result| {
Msg::Internal(Internal::StreamingServerCreateTorrentResult(
info_hash, result,
Expand All @@ -579,21 +545,18 @@ fn create_magnet<E: Env + 'static>(url: &Url, info_hash: &str, announce: &[Strin
.into()
}

fn create_torrent<E: Env + 'static>(url: &Url, info_hash: &str, torrent: &[u8]) -> Effect {
#[derive(Serialize)]
struct Body {
blob: String,
}
let info_hash = info_hash.to_owned();
let endpoint = url.join("/create").expect("url builder failed");
let request = Request::post(endpoint.as_str())
.header(http::header::CONTENT_TYPE, "application/json")
.body(Body {
blob: hex::encode(torrent),
})
.expect("request builder failed");
pub fn create_torrent_request<E: Env + 'static>(
url: &Url,
info_hash: InfoHash,
torrent: &[u8],
) -> Effect {
let request = CreateTorrentBlobRequest {
server_url: url.to_owned(),
torrent: torrent.to_vec(),
};

EffectFuture::Concurrent(
E::fetch::<_, serde_json::Value>(request)
E::fetch::<_, serde_json::Value>(request.into())
.map_ok(|_| ())
.map(enclose!((info_hash) move |result| {
Msg::Internal(Internal::StreamingServerCreateTorrentResult(
Expand All @@ -605,14 +568,18 @@ fn create_torrent<E: Env + 'static>(url: &Url, info_hash: &str, torrent: &[u8])
.into()
}

fn parse_magnet(magnet: &Url) -> Result<(String, Vec<String>), MagnetError> {
fn parse_magnet(magnet: &Url) -> Result<(InfoHash, Vec<String>), MagnetError> {
let magnet = Magnet::new(magnet.as_str())?;
let info_hash = magnet.xt.ok_or(MagnetError::NotAMagnetURL)?;
let info_hash = info_hash
.parse()
.map_err(|_err| MagnetError::NotAMagnetURL)?;

let announce = magnet.tr;
Ok((info_hash, announce))
}

fn parse_torrent(torrent: &[u8]) -> Result<(String, Vec<String>), serde_bencode::Error> {
fn parse_torrent(torrent: &[u8]) -> Result<(InfoHash, Vec<String>), serde_bencode::Error> {
#[derive(Deserialize)]
struct TorrentFile {
info: serde_bencode::value::Value,
Expand All @@ -626,7 +593,8 @@ fn parse_torrent(torrent: &[u8]) -> Result<(String, Vec<String>), serde_bencode:
let info_bytes = serde_bencode::to_bytes(&torrent_file.info)?;
let mut hasher = Sha1::new();
hasher.update(info_bytes);
let info_hash = hex::encode(hasher.finalize());
let info_hash = InfoHash::new(hasher.finalize().into());

let mut announce = vec![];
if let Some(announce_entry) = torrent_file.announce {
announce.push(announce_entry);
Expand All @@ -641,26 +609,25 @@ fn parse_torrent(torrent: &[u8]) -> Result<(String, Vec<String>), serde_bencode:
}

fn get_torrent_statistics<E: Env + 'static>(url: &Url, request: &StatisticsRequest) -> Effect {
let statistics_request = request.clone();
let endpoint = url
.join(&format!(
"/{}/{}/stats.json",
statistics_request.info_hash.clone(),
statistics_request.file_idx
))
.expect("url builder failed");
let request = Request::get(endpoint.as_str())
.header(http::header::CONTENT_TYPE, "application/json")
.body(())
.expect("request builder failed");
let fetch_fut = enclose!((url, request) async move {
let request = TorrentStatisticsRequest {
server_url: url,
request,
};

let statistics: Option<Statistics> = E::fetch(request.into()).await?;

Ok(statistics)
});

// let statistics_request = request.to_owned();
// It's happening when the engine is destroyed for inactivity:
// If it was downloaded to 100% and that the stream is paused, then played,
// it will create a new engine and return the correct stats
EffectFuture::Concurrent(
E::fetch::<_, Option<Statistics>>(request)
.map(enclose!((url) move |result|
Msg::Internal(Internal::StreamingServerStatisticsResult((url, statistics_request), result))
fetch_fut
.map(enclose!((url, request) move |result|
Msg::Internal(Internal::StreamingServerStatisticsResult((url, request), result))
))
.boxed_env(),
)
Expand Down Expand Up @@ -736,3 +703,16 @@ fn update_remote_url<E: Env + 'static>(
_ => eq_update(remote_url, None),
}
}

#[cfg(test)]
mod tests {
use magnet_url::Magnet;

#[test]
fn test_magnet_hash() {
let magnet = Magnet::new("magnet:?xt=urn:btih:0d54e2339706f173ac20f4effb4ad42d9c7a84e9&dn=Halo.S02.1080p.WEBRip.x265.DDP5.1.Atmos-WAR").expect("Should be valid magnet Url");

// assert_eq!(magnet.xt)
dbg!(magnet);
}
}
6 changes: 4 additions & 2 deletions src/runtime/msg/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ use crate::{
library_with_filters::Selected as LibraryWithFiltersSelected,
meta_details::Selected as MetaDetailsSelected,
player::{Selected as PlayerSelected, VideoParams},
streaming_server::StatisticsRequest as StreamingServerStatisticsRequest,
},
types::{
addon::Descriptor,
api::AuthRequest,
library::LibraryItemId,
profile::Settings as ProfileSettings,
resource::{MetaItemId, MetaItemPreview, Video},
streaming_server::Settings as StreamingServerSettings,
streaming_server::{
Settings as StreamingServerSettings,
StatisticsRequest as StreamingServerStatisticsRequest,
},
},
};

Expand Down
13 changes: 8 additions & 5 deletions src/runtime/msg/internal.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::models::common::ResourceLoadable;
use url::Url;

use crate::models::common::ResourceLoadable;
use crate::models::ctx::CtxError;
use crate::models::link::LinkError;
use crate::models::local_search::Searchable;
use crate::models::streaming_server::{PlaybackDevice, StatisticsRequest};
use crate::models::streaming_server::PlaybackDevice;
use crate::runtime::EnvError;
use crate::types::addon::{Descriptor, Manifest, ResourceRequest, ResourceResponse};
use crate::types::api::{
Expand All @@ -14,11 +14,14 @@ use crate::types::api::{
};
use crate::types::library::{LibraryBucket, LibraryItem, LibraryItemId};
use crate::types::profile::{Auth, AuthKey, Profile, User};
use crate::types::resource::{MetaItem, Stream};
use crate::types::streaming_server::{
DeviceInfo, GetHTTPSResponse, NetworkInfo, SettingsResponse, Statistics,
DeviceInfo, GetHTTPSResponse, NetworkInfo, SettingsResponse, Statistics, StatisticsRequest,
};
use crate::types::streams::StreamItemState;
use crate::types::{
resource::{MetaItem, Stream},
torrent::InfoHash,
};

pub type CtxStorageResponse = (
Option<Profile>,
Expand Down Expand Up @@ -112,7 +115,7 @@ pub enum Internal {
/// Result for updating streaming server settings.
StreamingServerUpdateSettingsResult(Url, Result<(), EnvError>),
/// Result for creating a torrent.
StreamingServerCreateTorrentResult(String, Result<(), EnvError>),
StreamingServerCreateTorrentResult(InfoHash, Result<(), EnvError>),
/// Result for playing on device.
StreamingServerPlayOnDeviceResult(String, Result<(), EnvError>),
// Result for get https endpoint request
Expand Down
1 change: 1 addition & 0 deletions src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod resource;
pub mod search_history;
pub mod streaming_server;
pub mod streams;
pub mod torrent;

mod query_params_encode;
pub use query_params_encode::*;
Expand Down
Loading

0 comments on commit 0e64b56

Please sign in to comment.