Skip to content

Commit

Permalink
feat!: implement streaming server remote endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
tymmesyde committed Nov 22, 2023
1 parent 1f78803 commit 5b71c1d
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 94 deletions.
193 changes: 107 additions & 86 deletions src/models/streaming_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::runtime::msg::{
use crate::runtime::{Effect, EffectFuture, Effects, Env, EnvError, EnvFutureExt, UpdateWithCtx};
use crate::types::addon::ResourcePath;
use crate::types::api::SuccessResponse;
use crate::types::profile::Profile;
use crate::types::streaming_server::Statistics;
use crate::types::profile::{AuthKey, Profile};
use crate::types::streaming_server::{Settings, Statistics};
use enclose::enclose;
use futures::{FutureExt, TryFutureExt};
use http::request::Request;
Expand All @@ -18,21 +18,6 @@ use sha1::{Digest, Sha1};
use std::iter;
use url::Url;

#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Settings {
pub app_path: String,
pub cache_root: String,
pub server_version: String,
pub cache_size: Option<f64>,
pub bt_max_connections: u64,
pub bt_handshake_timeout: u64,
pub bt_request_timeout: u64,
pub bt_download_speed_soft_limit: f64,
pub bt_download_speed_hard_limit: f64,
pub bt_min_peers_for_stable: u64,
}

#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct PlaybackDevice {
Expand All @@ -48,6 +33,14 @@ pub struct StatisticsRequest {
pub file_idx: u16,
}

#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct GetHTTPSResponse {
pub ip_address: String,
pub domain: String,
pub port: u16,
}

#[derive(Clone, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Selected {
Expand All @@ -60,7 +53,8 @@ pub struct Selected {
pub struct StreamingServer {
pub selected: Selected,
pub settings: Loadable<Settings, EnvError>,
pub base_url: Loadable<Url, EnvError>,
pub base_url: Option<Url>,
pub remote_url: Option<Url>,
pub playback_devices: Loadable<Vec<PlaybackDevice>, EnvError>,
pub torrent: Option<(String, Loadable<ResourcePath, EnvError>)>,
pub statistics: Option<Loadable<Statistics, EnvError>>,
Expand All @@ -70,7 +64,6 @@ impl StreamingServer {
pub fn new<E: Env + 'static>(profile: &Profile) -> (Self, Effects) {
let effects = Effects::many(vec![
get_settings::<E>(&profile.settings.streaming_server_url),
get_base_url::<E>(&profile.settings.streaming_server_url),
get_playback_devices::<E>(&profile.settings.streaming_server_url),
]);
(
Expand All @@ -80,7 +73,8 @@ impl StreamingServer {
statistics: None,
},
settings: Loadable::Loading,
base_url: Loadable::Loading,
base_url: None,
remote_url: None,
playback_devices: Loadable::Loading,
torrent: None,
statistics: None,
Expand All @@ -95,24 +89,28 @@ impl<E: Env + 'static> UpdateWithCtx<E> for StreamingServer {
match msg {
Msg::Action(Action::StreamingServer(ActionStreamingServer::Reload)) => {
let settings_effects = eq_update(&mut self.settings, Loadable::Loading);
let base_url_effects = eq_update(&mut self.base_url, Loadable::Loading);
let base_url_effects = eq_update(&mut self.base_url, None);
let remote_url_effects = eq_update(&mut self.remote_url, None);
Effects::many(vec![
get_settings::<E>(&self.selected.transport_url),
get_base_url::<E>(&self.selected.transport_url),
get_playback_devices::<E>(&self.selected.transport_url),
])
.unchanged()
.join(settings_effects)
.join(base_url_effects)
.join(remote_url_effects)
}
Msg::Action(Action::StreamingServer(ActionStreamingServer::UpdateSettings(
settings,
))) if self.settings.is_ready() => {
let settings_effects =
eq_update(&mut self.settings, Loadable::Ready(settings.to_owned()));
let remote_url_effects =
update_remote_url::<E>(&mut self.remote_url, &self.selected, settings, ctx);
Effects::one(set_settings::<E>(&self.selected.transport_url, settings))
.unchanged()
.join(settings_effects)
.join(remote_url_effects)
}
Msg::Action(Action::StreamingServer(ActionStreamingServer::CreateTorrent(
CreateTorrentArgs::Magnet(magnet),
Expand Down Expand Up @@ -217,12 +215,12 @@ impl<E: Env + 'static> UpdateWithCtx<E> for StreamingServer {
statistics: None,
};
self.settings = Loadable::Loading;
self.base_url = Loadable::Loading;
self.base_url = None;
self.remote_url = None;
self.torrent = None;
self.statistics = None;
Effects::many(vec![
get_settings::<E>(&self.selected.transport_url),
get_base_url::<E>(&self.selected.transport_url),
get_playback_devices::<E>(&self.selected.transport_url),
])
}
Expand All @@ -231,39 +229,30 @@ impl<E: Env + 'static> UpdateWithCtx<E> for StreamingServer {
{
match result {
Ok(settings) => {
eq_update(&mut self.settings, Loadable::Ready(settings.to_owned()))
}
Err(error) => {
let base_url_effects =
eq_update(&mut self.base_url, Loadable::Err(error.to_owned()));
let playback_devices_effects =
eq_update(&mut self.playback_devices, Loadable::Err(error.to_owned()));
let settings_effects =
eq_update(&mut self.settings, Loadable::Err(error.to_owned()));
let torrent_effects = eq_update(&mut self.torrent, None);
base_url_effects
.join(playback_devices_effects)
.join(settings_effects)
.join(torrent_effects)
}
}
}
Msg::Internal(Internal::StreamingServerBaseURLResult(url, result))
if self.selected.transport_url == *url && self.base_url.is_loading() =>
{
match result {
Ok(base_url) => {
eq_update(&mut self.base_url, Loadable::Ready(base_url.to_owned()))
eq_update(&mut self.settings, Loadable::Ready(settings.to_owned()));
let base_url_effects =
eq_update(&mut self.base_url, Some(settings.base_url.to_owned()));
let remote_url_effects = update_remote_url::<E>(
&mut self.remote_url,
&self.selected,
settings,
ctx,
);
settings_effects
.join(base_url_effects)
.join(remote_url_effects)
}
Err(error) => {
let base_url_effects =
eq_update(&mut self.base_url, Loadable::Err(error.to_owned()));
let base_url_effects = eq_update(&mut self.base_url, None);
let remote_url_effects = eq_update(&mut self.remote_url, None);
let playback_devices_effects =
eq_update(&mut self.playback_devices, Loadable::Err(error.to_owned()));
let settings_effects =
eq_update(&mut self.settings, Loadable::Err(error.to_owned()));
let torrent_effects = eq_update(&mut self.torrent, None);
base_url_effects
.join(remote_url_effects)
.join(playback_devices_effects)
.join(settings_effects)
.join(torrent_effects)
Expand All @@ -279,14 +268,15 @@ impl<E: Env + 'static> UpdateWithCtx<E> for StreamingServer {
Loadable::Ready(playback_devices.to_owned()),
),
Err(error) => {
let base_url_effects =
eq_update(&mut self.base_url, Loadable::Err(error.to_owned()));
let base_url_effects = eq_update(&mut self.base_url, None);
let remote_url_effects = eq_update(&mut self.remote_url, None);
let playback_devices_effects =
eq_update(&mut self.playback_devices, Loadable::Err(error.to_owned()));
let settings_effects =
eq_update(&mut self.settings, Loadable::Err(error.to_owned()));
let torrent_effects = eq_update(&mut self.torrent, None);
base_url_effects
.join(remote_url_effects)
.join(playback_devices_effects)
.join(settings_effects)
.join(torrent_effects)
Expand All @@ -299,12 +289,13 @@ impl<E: Env + 'static> UpdateWithCtx<E> for StreamingServer {
match result {
Ok(_) => Effects::none().unchanged(),
Err(error) => {
let base_url_effects =
eq_update(&mut self.base_url, Loadable::Err(error.to_owned()));
let base_url_effects = eq_update(&mut self.base_url, None);
let remote_url_effects = eq_update(&mut self.remote_url, None);
let settings_effects =
eq_update(&mut self.settings, Loadable::Err(error.to_owned()));
let torrent_effects = eq_update(&mut self.torrent, None);
base_url_effects
.join(remote_url_effects)
.join(settings_effects)
.join(torrent_effects)
}
Expand All @@ -331,6 +322,17 @@ impl<E: Env + 'static> UpdateWithCtx<E> for StreamingServer {
Err(_) => Effects::none().unchanged(),
}
}
Msg::Internal(Internal::StreamingServerGetHTTPSResult(url, result))
if self.selected.transport_url == *url =>
{
match result {
Ok(GetHTTPSResponse { domain, .. }) => {
let remote_url = Url::parse(&format!("https://{domain}")).ok();
eq_update(&mut self.remote_url, remote_url)
}
Err(_) => Effects::none().unchanged(),
}
}
Msg::Internal(Internal::StreamingServerCreateTorrentResult(
loading_info_hash,
result,
Expand All @@ -357,17 +359,12 @@ impl<E: Env + 'static> UpdateWithCtx<E> for StreamingServer {
}

fn get_settings<E: Env + 'static>(url: &Url) -> Effect {
#[derive(Deserialize)]
struct Resp {
values: Settings,
}
let endpoint = url.join("settings").expect("url builder failed");
let request = Request::get(endpoint.as_str())
.body(())
.expect("request builder failed");
EffectFuture::Concurrent(
E::fetch::<_, Resp>(request)
.map_ok(|resp| resp.values)
E::fetch::<_, Settings>(request)
.map(enclose!((url) move |result| {
Msg::Internal(Internal::StreamingServerSettingsResult(
url, result,
Expand All @@ -378,27 +375,6 @@ fn get_settings<E: Env + 'static>(url: &Url) -> Effect {
.into()
}

fn get_base_url<E: Env + 'static>(url: &Url) -> Effect {
#[derive(Deserialize)]
struct Resp {
#[serde(rename = "baseUrl")]
base_url: Url,
}
let endpoint = url.join("settings").expect("url builder failed");
let request = Request::get(endpoint.as_str())
.body(())
.expect("request builder failed");
EffectFuture::Concurrent(
E::fetch::<_, Resp>(request)
.map_ok(|resp| resp.base_url)
.map(enclose!((url) move |result|
Msg::Internal(Internal::StreamingServerBaseURLResult(url, result))
))
.boxed_env(),
)
.into()
}

fn get_playback_devices<E: Env + 'static>(url: &Url) -> Effect {
let endpoint = url.join("casting").expect("url builder failed");
let request = Request::get(endpoint.as_str())
Expand All @@ -419,6 +395,7 @@ fn set_settings<E: Env + 'static>(url: &Url, settings: &Settings) -> Effect {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct Body {
remote_https: Option<String>,
cache_size: Option<f64>,
bt_max_connections: u64,
bt_handshake_timeout: u64,
Expand All @@ -428,13 +405,14 @@ fn set_settings<E: Env + 'static>(url: &Url, settings: &Settings) -> Effect {
bt_min_peers_for_stable: u64,
}
let body = Body {
cache_size: settings.cache_size.to_owned(),
bt_max_connections: settings.bt_max_connections.to_owned(),
bt_handshake_timeout: settings.bt_handshake_timeout.to_owned(),
bt_request_timeout: settings.bt_request_timeout.to_owned(),
bt_download_speed_soft_limit: settings.bt_download_speed_soft_limit.to_owned(),
bt_download_speed_hard_limit: settings.bt_download_speed_hard_limit.to_owned(),
bt_min_peers_for_stable: settings.bt_min_peers_for_stable.to_owned(),
remote_https: settings.values.remote_https.to_owned(),
cache_size: settings.values.cache_size.to_owned(),
bt_max_connections: settings.values.bt_max_connections.to_owned(),
bt_handshake_timeout: settings.values.bt_handshake_timeout.to_owned(),
bt_request_timeout: settings.values.bt_request_timeout.to_owned(),
bt_download_speed_soft_limit: settings.values.bt_download_speed_soft_limit.to_owned(),
bt_download_speed_hard_limit: settings.values.bt_download_speed_hard_limit.to_owned(),
bt_min_peers_for_stable: settings.values.bt_min_peers_for_stable.to_owned(),
};
let endpoint = url.join("settings").expect("url builder failed");
let request = Request::post(endpoint.as_str())
Expand Down Expand Up @@ -626,3 +604,46 @@ fn play_on_device<E: Env + 'static>(url: &Url, args: &PlayOnDeviceArgs) -> Effec
)
.into()
}

fn get_https_endpoint<E: Env + 'static>(
url: &Url,
auth_key: &AuthKey,
ip_address: &String,
) -> Effect {
let endpoint = url
.join(&format!(
"/get-https?authKey={:?}&ipAddress={}",
auth_key, ip_address,
))
.expect("url builder failed");
let request = Request::get(endpoint.as_str())
.header(http::header::CONTENT_TYPE, "application/json")
.body(())
.expect("request builder failed");
EffectFuture::Concurrent(
E::fetch::<_, GetHTTPSResponse>(request)
.map(enclose!((url) move |result|
Msg::Internal(Internal::StreamingServerGetHTTPSResult(url, result))
))
.boxed_env(),
)
.into()
}

fn update_remote_url<E: Env + 'static>(
remote_url: &mut Option<Url>,
selected: &Selected,
settings: &Settings,
ctx: &Ctx,
) -> Effects {
match (
settings.values.remote_https.as_ref(),
ctx.profile.auth_key(),
) {
(Some(ip_address), Some(auth_key)) if ip_address.is_empty() => Effects::one(
get_https_endpoint::<E>(&selected.transport_url, auth_key, ip_address),
)
.unchanged(),
_ => eq_update(remote_url, None),
}
}
6 changes: 2 additions & 4 deletions src/runtime/msg/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@ use crate::{
library_with_filters::Selected as LibraryWithFiltersSelected,
meta_details::Selected as MetaDetailsSelected,
player::{Selected as PlayerSelected, VideoParams},
streaming_server::{
Settings as StreamingServerSettings,
StatisticsRequest as StreamingServerStatisticsRequest,
},
streaming_server::StatisticsRequest as StreamingServerStatisticsRequest,
},
types::{
addon::Descriptor,
api::AuthRequest,
library::LibraryItemId,
profile::Settings as ProfileSettings,
resource::{MetaItemId, MetaItemPreview, Video},
streaming_server::Settings as StreamingServerSettings,
},
};

Expand Down
Loading

0 comments on commit 5b71c1d

Please sign in to comment.