diff --git a/src/http_server.rs b/src/http_server.rs index 4de5eb6..eb6d55b 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use futures::future::try_join_all; use log::{error, info}; use rocket::http::{Accept, ContentType, Status}; use rocket::tokio::task; @@ -63,72 +64,100 @@ async fn metrics( ) -> Result { Ok(serve_metrics(Format::Prometheus, unscheduled_tasks).await) } -async fn process_task(task: Task) -> Result { - info!("Requesting data for {:?}", &task,); - match task { - Task::SonarrToday(sonarr) => { - let name = &sonarr.name; - let result = sonarr.get_today_shows().await; - let result = HashMap::from([(name.to_string(), result)]); - Ok(TaskResult::SonarrToday(result)) - } - Task::SonarrMissing(sonarr) => { - let name = &sonarr.name; - let result = sonarr.get_last_week_missing_shows().await; - let result = HashMap::from([(name.to_string(), result)]); - Ok(TaskResult::SonarrMissing(result)) - } - Task::TautulliSession(tautulli) => { - let result = tautulli.get_session_summary().await; - Ok(TaskResult::TautulliSession(result)) - } - Task::TautulliLibrary(tautulli) => { - let result = tautulli.get_libraries().await; - Ok(TaskResult::TautulliLibrary(result)) - } - Task::Radarr(radarr) => { - let name = &radarr.name; - let result = radarr.get_radarr_movies().await; - let result = HashMap::from([(name.to_string(), result)]); - Ok(TaskResult::Radarr(result)) - } - Task::Overseerr(overseerr) => { - let result = overseerr.get_overseerr_requests().await; - Ok(TaskResult::Overseerr(result)) - } - Task::PlexSession(plex) => { - let name = &plex.name; - let result = plex.get_current_sessions().await; - let result = HashMap::from([(name.to_string(), result)]); - Ok(TaskResult::PlexSession(result)) - } - Task::PlexLibrary(plex) => { - let name = &plex.name; - let result = plex.get_all_library_size().await; - let result = HashMap::from([(name.to_string(), result)]); - Ok(TaskResult::PlexLibrary(result)) - } - Task::Default => Ok(TaskResult::Default), - } +async fn process_tasks(tasks: Vec) -> Result, JoinError> { + let task_futures: Vec<_> = tasks + .into_iter() + .map(|task| async { + info!("Requesting data for {:?}", &task,); + match task { + Task::SonarrToday(sonarr) => { + let name = &sonarr.name; + let result = sonarr.get_today_shows().await; + let result = HashMap::from([(name.to_string(), result)]); + Ok(TaskResult::SonarrToday(result)) + } + Task::SonarrMissing(sonarr) => { + let name = &sonarr.name; + let result = sonarr.get_last_week_missing_shows().await; + let result = HashMap::from([(name.to_string(), result)]); + Ok(TaskResult::SonarrMissing(result)) + } + Task::TautulliSession(tautulli) => { + let result = tautulli.get_session_summary().await; + Ok(TaskResult::TautulliSession(result)) + } + Task::TautulliLibrary(tautulli) => { + let result = tautulli.get_libraries().await; + Ok(TaskResult::TautulliLibrary(result)) + } + Task::Radarr(radarr) => { + let name = &radarr.name; + let result = radarr.get_radarr_movies().await; + let result = HashMap::from([(name.to_string(), result)]); + Ok(TaskResult::Radarr(result)) + } + Task::Overseerr(overseerr) => { + let result = overseerr.get_overseerr_requests().await; + Ok(TaskResult::Overseerr(result)) + } + Task::PlexSession(plex) => { + let name = &plex.name; + let result = plex.get_current_sessions().await; + let result = HashMap::from([(name.to_string(), result)]); + let users = plex.get_users().await; + Ok(TaskResult::PlexSession(result, users)) + } + Task::PlexLibrary(plex) => { + let name = &plex.name; + let result = plex.get_all_library_size().await; + let result = HashMap::from([(name.to_string(), result)]); + Ok(TaskResult::PlexLibrary(result)) + } + Task::Default => Ok(TaskResult::Default), + } + }) + .collect(); + try_join_all(task_futures).await } async fn serve_metrics(format: Format, unscheduled_tasks: &State>) -> MetricsResponse { - let mut join_set = JoinSet::new(); - for task in unscheduled_tasks.iter().cloned() { - join_set.spawn(process_task(task)); - } - - wait_for_metrics(format, join_set).await.map_or_else( - |e| { - error!("General error while fetching providers data: {e}"); + match process_tasks(unscheduled_tasks.inner().clone()).await { + Ok(task_results) => match format_metrics(task_results) { + Ok(metrics) => MetricsResponse::new(Status::Ok, format, metrics), + Err(e) => { + error!("Error formatting metrics: {e}"); + MetricsResponse::new( + Status::InternalServerError, + format, + "Error formatting metrics. Check the logs.".into(), + ) + } + }, + Err(e) => { + error!("Error while processing tasks: {e}"); MetricsResponse::new( Status::InternalServerError, format, - "Error while fetching providers data. Check the logs".into(), + "Error while fetching provider data. Check the logs.".into(), ) - }, - |metrics| MetricsResponse::new(Status::Ok, format, metrics), - ) + } + } + //let mut join_set = JoinSet::new(); + //for task in unscheduled_tasks.iter().cloned() { + // join_set.spawn(process_task(task)); + //} + + //wait_for_metrics(format, join_set).await.map_or_else( + // |e| { + // error!("General error while fetching providers data: {e}"); + // MetricsResponse::new( + // Status::InternalServerError, + // format, + // "Error while fetching providers data. Check the logs".into(), + // ) + // }, + // |metrics| MetricsResponse::new(Status::Ok, format, metrics), + //) } async fn wait_for_metrics( diff --git a/src/main.rs.test b/src/main.rs.test index c680ae2..b36a753 100644 --- a/src/main.rs.test +++ b/src/main.rs.test @@ -82,16 +82,21 @@ async fn main() -> Result<()> { .log_level() .expect("Log level cannot be not available"); let config = config::read(args.config.clone(), log_level)?; - let jelly_conf = config.jellyfin.expect("Jellyfin config not found"); - for (name, j) in jelly_conf { - let jellyfin = Jellyfin::new(&name, &j.address, &j.api_key)?; - let session = jellyfin.get_sessions().await?; - println!("{:?}", session); - } + //let jelly_conf = config.jellyfin.expect("Jellyfin config not found"); + //for (name, j) in jelly_conf { + // let jellyfin = Jellyfin::new(&name, &j.address, &j.api_key)?; + // let session = jellyfin.get_sessions().await?; + // println!("{:?}", session); + //} - //let plex = config.plex.expect("plex config not found"); - //for (name, p) in plex { - // let plex = providers::plex::Plex::new(&name, &p.address, &p.token)?; + let plex = config.plex.expect("plex config not found"); + for (name, p) in plex { + let plex = providers::plex::Plex::new(&name, &p.address, &p.token)?; + let users = plex.get_users().await; + for user in users { + println!("{:?}", user); + } + } // let history = match plex.get_history().await { // Ok(history) => history, // Err(e) => { diff --git a/src/prometheus.rs b/src/prometheus.rs index 5ba3a7a..ca73877 100644 --- a/src/prometheus.rs +++ b/src/prometheus.rs @@ -8,9 +8,10 @@ use std::collections::HashMap; use std::sync::atomic::AtomicU64; use crate::providers::overseerr::OverseerrRequest; -use crate::providers::plex::{LibraryInfos, PlexSessions}; +use crate::providers::plex::{LibraryInfos, PlexSessions, User as PlexUser}; use crate::providers::radarr::RadarrMovie; use crate::providers::sonarr::SonarrEpisode; +use crate::providers::structs::plex::BandwidthLocation; use crate::providers::structs::tautulli::Library; use crate::providers::tautulli::SessionSummary; @@ -27,38 +28,18 @@ pub enum TaskResult { TautulliLibrary(Vec), Radarr(HashMap>), Overseerr(Vec), - PlexSession(HashMap>), + PlexSession(HashMap>, Vec), PlexLibrary(HashMap>), Default, } #[derive(Clone, Hash, Eq, PartialEq, EncodeLabelSet, Debug)] struct PlexSessionBandwidth { + pub name: String, pub location: String, } #[derive(Clone, Hash, Eq, PartialEq, EncodeLabelSet, Debug)] struct PlexSessionLabels { - pub name: String, - pub title: String, - pub user: String, - pub decision: String, - pub state: String, - pub platform: String, - pub local: i8, - pub relayed: i8, - pub media_type: String, - pub secure: i8, - pub address: String, - pub public_address: String, - pub season_number: Option, - pub episode_number: Option, - pub quality: String, - pub city: String, - pub longitude: String, - pub latitude: String, -} -#[derive(Clone, Hash, Eq, PartialEq, EncodeLabelSet, Debug)] -struct PlexSessionPercentageLabels { pub name: String, pub title: String, pub user: String, @@ -187,8 +168,8 @@ pub fn format_metrics(task_result: Vec) -> anyhow::Result { } TaskResult::Radarr(movies) => format_radarr_metrics(movies, &mut registry), TaskResult::Overseerr(overseerr) => format_overseerr_metrics(overseerr, &mut registry), - TaskResult::PlexSession(sessions) => { - format_plex_session_metrics(sessions, &mut registry) + TaskResult::PlexSession(sessions, users) => { + format_plex_session_metrics(sessions, users, &mut registry) } TaskResult::PlexLibrary(libraries) => { format_plex_library_metrics(libraries, &mut registry) @@ -259,7 +240,6 @@ fn format_tautulli_session_metrics(sessions: Vec, registry: &mut let tautulli_session = Family::>::default(); let tautulli_session_percentage = Family::>::default(); - let tautulli_total_session = Family::>::default(); registry.register( "tautulli_session", format!("Tautulli session status"), @@ -270,11 +250,6 @@ fn format_tautulli_session_metrics(sessions: Vec, registry: &mut format!("Tautulli session progress"), tautulli_session_percentage.clone(), ); - let total_sessions = sessions.len(); - let labels = EmptyLabel {}; - tautulli_total_session - .get_or_create(&labels) - .inc_by(total_sessions as f64); sessions.into_iter().for_each(|session| { let labels = TautulliSessionPercentageLabels { user: session.user.clone(), @@ -439,12 +414,12 @@ fn format_overseerr_metrics(requests: Vec, registry: &mut Regi fn format_plex_session_metrics( sessions: HashMap>, + users: Vec, registry: &mut Registry, ) { debug!("Formatting {sessions:?} as Prometheus"); let plex_sessions = Family::>::default(); - let plex_sessions_percentage = - Family::>::default(); + let plex_sessions_percentage = Family::>::default(); let plex_session_bandwidth = Family::>::default(); registry.register( "plex_sessions", @@ -461,71 +436,78 @@ fn format_plex_session_metrics( format!("Plex session bandwidth"), plex_session_bandwidth.clone(), ); - if sessions.len() == 0 { - plex_session_bandwidth - .get_or_create(&PlexSessionBandwidth { - location: "WAN".to_string(), - }) - .set(0.0); - plex_session_bandwidth - .get_or_create(&PlexSessionBandwidth { - location: "LAN".to_string(), - }) - .set(0.0); - } - + let mut wan_bandwidth = 0.0; + let mut lan_bandwidth = 0.0; + let mut inactive_users = users; sessions.into_iter().for_each(|(name, sessions)| { sessions.into_iter().for_each(|session| { + match session.bandwidth.location { + BandwidthLocation::Wan => wan_bandwidth += session.bandwidth.bandwidth as f64, + BandwidthLocation::Lan => lan_bandwidth += session.bandwidth.bandwidth as f64, + }; + inactive_users.retain(|user| user.title != session.user); + let session_labels = PlexSessionLabels { + name: name.clone(), + title: session.title, + user: session.user, + decision: session.stream_decision.to_string(), + state: session.state, + platform: session.platform, + local: session.local as i8, + relayed: session.relayed as i8, + secure: session.secure as i8, + address: session.address, + public_address: session.location.ip_address, + season_number: session.season_number, + episode_number: session.episode_number, + media_type: session.media_type, + quality: session.quality, + city: session.location.city, + longitude: session.location.longitude, + latitude: session.location.latitude, + }; + plex_sessions_percentage - .get_or_create(&PlexSessionPercentageLabels { - name: name.clone(), - title: session.title.clone(), - user: session.user.clone(), - decision: session.stream_decision.to_string().clone(), - state: session.state.clone(), - platform: session.platform.clone(), - local: session.local as i8, - relayed: session.relayed as i8, - secure: session.secure as i8, - address: session.address.clone(), - public_address: session.location.ip_address.clone(), - season_number: session.season_number.clone(), - episode_number: session.episode_number.clone(), - media_type: session.media_type.clone(), - quality: session.quality.clone(), - city: session.location.city.clone(), - longitude: session.location.longitude.clone(), - latitude: session.location.latitude.clone(), - }) + .get_or_create(&session_labels) .set(session.progress as f64); + plex_sessions.get_or_create(&session_labels).set(1.0); + }); + inactive_users.iter().for_each(|user| { plex_sessions .get_or_create(&PlexSessionLabels { - name: name.clone(), - title: session.title.clone(), - user: session.user.clone(), - decision: session.stream_decision.to_string().clone(), - state: session.state.clone(), - platform: session.platform.clone(), - local: session.local as i8, - relayed: session.relayed as i8, - secure: session.secure as i8, - address: session.address.clone(), - media_type: session.media_type.clone(), - public_address: session.location.ip_address.clone(), - season_number: session.season_number.clone(), - episode_number: session.episode_number.clone(), - quality: session.quality.clone(), - city: session.location.city.clone(), - longitude: session.location.longitude.clone(), - latitude: session.location.latitude.clone(), - }) - .set(1.0); - plex_session_bandwidth - .get_or_create(&PlexSessionBandwidth { - location: session.bandwidth.location.to_string(), + name: name.to_string(), + title: "".to_string(), + user: user.title.clone(), + decision: "".to_string(), + state: "inactive".to_string(), + platform: "".to_string(), + local: 0, + relayed: 0, + secure: 0, + address: "".to_string(), + media_type: "".to_string(), + public_address: "".to_string(), + season_number: None, + episode_number: None, + quality: "".to_string(), + city: "".to_string(), + longitude: "".to_string(), + latitude: "".to_string(), }) - .set(session.bandwidth.bandwidth as f64); + .set(0.0); }); + plex_session_bandwidth + .get_or_create(&PlexSessionBandwidth { + name: name.clone(), + location: "LAN".to_string(), + }) + .set(lan_bandwidth); + plex_session_bandwidth + .get_or_create(&PlexSessionBandwidth { + name, + location: "WAN".to_string(), + }) + .set(wan_bandwidth); }); } fn format_plex_library_metrics( @@ -574,17 +556,17 @@ fn format_plex_library_metrics( let mut season_count = 0; let mut show_count = 0; libraries.into_iter().for_each(|(name, library)| { - library - .into_iter() - .for_each(|lib| match lib.library_type.as_str() { + library.into_iter().for_each(|lib| { + let library_labels = PlexLibraryLabels { + name: name.clone(), + library_name: lib.library_name.clone(), + library_type: lib.library_type.clone(), + }; + match lib.library_type.as_str() { "movie" => { movie_count += lib.library_size; plex_library - .get_or_create(&PlexLibraryLabels { - name: name.clone(), - library_name: lib.library_name.clone(), - library_type: lib.library_type.clone(), - }) + .get_or_create(&library_labels) .set(lib.library_size as f64); } "show" => { @@ -603,14 +585,11 @@ fn format_plex_library_metrics( } _ => { plex_library - .get_or_create(&PlexLibraryLabels { - name: name.clone(), - library_name: lib.library_name.clone(), - library_type: lib.library_type.clone(), - }) + .get_or_create(&library_labels) .set(lib.library_size as f64); } - }); + }; + }); }); plex_movie_count .get_or_create(&EmptyLabel {}) diff --git a/src/providers/plex.rs b/src/providers/plex.rs index b63268f..48416b9 100644 --- a/src/providers/plex.rs +++ b/src/providers/plex.rs @@ -3,8 +3,8 @@ use reqwest; use reqwest::header; use serde::{Deserialize, Serialize}; -pub use crate::providers::structs::plex::{MediaContainer, PlexSessions}; -use crate::providers::structs::plex::{Metadata, PlexResponse}; +pub use crate::providers::structs::plex::{MediaContainer, PlexSessions, User}; +use crate::providers::structs::plex::{Metadata, PlexResponse, StatUser}; use crate::providers::{Provider, ProviderError, ProviderErrorKind}; #[derive(Debug, Deserialize, Clone, Serialize)] @@ -250,4 +250,50 @@ impl Plex { movies_viewed, } } + pub async fn get_statistics(&self) -> Result { + let url = format!("{}/statistics/bandwidth", self.address); + debug!("Requesting statistics from {}", url); + let response = match self.client.get(&url).send().await { + Ok(response) => response, + Err(e) => { + return Err(ProviderError::new( + Provider::Plex, + ProviderErrorKind::GetError, + &format!("{:?}", e), + )); + } + }; + let statistics = match response.json::().await { + Ok(statistics) => statistics, + Err(e) => { + return Err(ProviderError::new( + Provider::Plex, + ProviderErrorKind::ParseError, + &format!("{:?}", e), + )); + } + }; + Ok(statistics) + } + pub async fn get_users(&self) -> Vec { + let statistics = match self.get_statistics().await { + Ok(statistics) => statistics, + Err(e) => { + error!("Failed to get statistics: {}", e); + return Vec::new(); + } + }; + let statistics_container = match statistics.media_container { + MediaContainer::StatisticsContainer(statistics_container) => statistics_container, + _ => { + error!("Media container received does not match statistics container"); + return Vec::new(); + } + }; + statistics_container + .account + .into_iter() + .map(|item| >::into(item)) + .collect() + } } diff --git a/src/providers/structs/plex.rs b/src/providers/structs/plex.rs index 963eb21..481239f 100644 --- a/src/providers/structs/plex.rs +++ b/src/providers/structs/plex.rs @@ -3,22 +3,30 @@ use serde::{Deserialize, Serialize}; use std::fmt::Display; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(untagged)] -pub enum Metadata { - SessionMetadata(SessionMetadata), - HistoryMetadata(HistoryMetadata), - LibraryMetadata(LibraryMetadata), - Default(serde_json::Value), +#[serde(rename_all = "camelCase")] +pub struct PlexResponse { + //#[serde(rename = "MediaContainer",deserialize_with = "deserialize_media_container")] + #[serde(rename = "MediaContainer")] + pub media_container: MediaContainer, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(untagged)] pub enum MediaContainer { + StatisticsContainer(StatisticsContainer), LibraryContainer(LibraryContainer), LibraryItemsContainer(LibraryItemsContainer), ActivityContainer(ActivityContainer), Default(serde_json::Value), } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum Metadata { + SessionMetadata(SessionMetadata), + HistoryMetadata(HistoryMetadata), + LibraryMetadata(LibraryMetadata), + Default(serde_json::Value), +} #[derive(Debug, Default, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] @@ -42,14 +50,6 @@ impl Display for LibraryType { } } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct PlexResponse { - //#[serde(rename = "MediaContainer",deserialize_with = "deserialize_media_container")] - #[serde(rename = "MediaContainer")] - pub media_container: MediaContainer, -} - #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ActivityContainer { @@ -59,6 +59,14 @@ pub struct ActivityContainer { pub metadata: Vec, } +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct StatisticsContainer { + pub size: i64, + #[serde(rename = "Account")] + pub account: Vec, +} + #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct LibraryContainer { @@ -223,11 +231,23 @@ pub struct Stream { pub decision: Option, } +#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct StatUser { + pub name: String, +} #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct User { pub title: String, } +impl From for User { + fn from(stat_user: StatUser) -> Self { + User { + title: stat_user.name, + } + } +} #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")]