From 0a05593db1e67a04ab523682d86c7499d4962c40 Mon Sep 17 00:00:00 2001 From: Thomas Date: Tue, 19 Nov 2024 08:27:50 +0100 Subject: [PATCH] Improvements --- src/http_server.rs | 148 +++++++++++++++++++++++++++------------------ 1 file changed, 88 insertions(+), 60 deletions(-) diff --git a/src/http_server.rs b/src/http_server.rs index 37ca1b0..eb6d55b 100644 --- a/src/http_server.rs +++ b/src/http_server.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use futures::future::try_join_all; use log::{error, info}; use rocket::http::{Accept, ContentType, Status}; use rocket::tokio::task; @@ -63,73 +64,100 @@ async fn metrics( ) -> Result { Ok(serve_metrics(Format::Prometheus, unscheduled_tasks).await) } -async fn process_task(task: Task) -> Result { - info!("Requesting data for {:?}", &task,); - match task { - Task::SonarrToday(sonarr) => { - let name = &sonarr.name; - let result = sonarr.get_today_shows().await; - let result = HashMap::from([(name.to_string(), result)]); - Ok(TaskResult::SonarrToday(result)) - } - Task::SonarrMissing(sonarr) => { - let name = &sonarr.name; - let result = sonarr.get_last_week_missing_shows().await; - let result = HashMap::from([(name.to_string(), result)]); - Ok(TaskResult::SonarrMissing(result)) - } - Task::TautulliSession(tautulli) => { - let result = tautulli.get_session_summary().await; - Ok(TaskResult::TautulliSession(result)) - } - Task::TautulliLibrary(tautulli) => { - let result = tautulli.get_libraries().await; - Ok(TaskResult::TautulliLibrary(result)) - } - Task::Radarr(radarr) => { - let name = &radarr.name; - let result = radarr.get_radarr_movies().await; - let result = HashMap::from([(name.to_string(), result)]); - Ok(TaskResult::Radarr(result)) - } - Task::Overseerr(overseerr) => { - let result = overseerr.get_overseerr_requests().await; - Ok(TaskResult::Overseerr(result)) - } - Task::PlexSession(plex) => { - let name = &plex.name; - let result = plex.get_current_sessions().await; - let result = HashMap::from([(name.to_string(), result)]); - let users = plex.get_users().await; - Ok(TaskResult::PlexSession(result, users)) - } - Task::PlexLibrary(plex) => { - let name = &plex.name; - let result = plex.get_all_library_size().await; - let result = HashMap::from([(name.to_string(), result)]); - Ok(TaskResult::PlexLibrary(result)) - } - Task::Default => Ok(TaskResult::Default), - } +async fn process_tasks(tasks: Vec) -> Result, JoinError> { + let task_futures: Vec<_> = tasks + .into_iter() + .map(|task| async { + info!("Requesting data for {:?}", &task,); + match task { + Task::SonarrToday(sonarr) => { + let name = &sonarr.name; + let result = sonarr.get_today_shows().await; + let result = HashMap::from([(name.to_string(), result)]); + Ok(TaskResult::SonarrToday(result)) + } + Task::SonarrMissing(sonarr) => { + let name = &sonarr.name; + let result = sonarr.get_last_week_missing_shows().await; + let result = HashMap::from([(name.to_string(), result)]); + Ok(TaskResult::SonarrMissing(result)) + } + Task::TautulliSession(tautulli) => { + let result = tautulli.get_session_summary().await; + Ok(TaskResult::TautulliSession(result)) + } + Task::TautulliLibrary(tautulli) => { + let result = tautulli.get_libraries().await; + Ok(TaskResult::TautulliLibrary(result)) + } + Task::Radarr(radarr) => { + let name = &radarr.name; + let result = radarr.get_radarr_movies().await; + let result = HashMap::from([(name.to_string(), result)]); + Ok(TaskResult::Radarr(result)) + } + Task::Overseerr(overseerr) => { + let result = overseerr.get_overseerr_requests().await; + Ok(TaskResult::Overseerr(result)) + } + Task::PlexSession(plex) => { + let name = &plex.name; + let result = plex.get_current_sessions().await; + let result = HashMap::from([(name.to_string(), result)]); + let users = plex.get_users().await; + Ok(TaskResult::PlexSession(result, users)) + } + Task::PlexLibrary(plex) => { + let name = &plex.name; + let result = plex.get_all_library_size().await; + let result = HashMap::from([(name.to_string(), result)]); + Ok(TaskResult::PlexLibrary(result)) + } + Task::Default => Ok(TaskResult::Default), + } + }) + .collect(); + try_join_all(task_futures).await } async fn serve_metrics(format: Format, unscheduled_tasks: &State>) -> MetricsResponse { - let mut join_set = JoinSet::new(); - for task in unscheduled_tasks.iter().cloned() { - join_set.spawn(process_task(task)); - } - - wait_for_metrics(format, join_set).await.map_or_else( - |e| { - error!("General error while fetching providers data: {e}"); + match process_tasks(unscheduled_tasks.inner().clone()).await { + Ok(task_results) => match format_metrics(task_results) { + Ok(metrics) => MetricsResponse::new(Status::Ok, format, metrics), + Err(e) => { + error!("Error formatting metrics: {e}"); + MetricsResponse::new( + Status::InternalServerError, + format, + "Error formatting metrics. Check the logs.".into(), + ) + } + }, + Err(e) => { + error!("Error while processing tasks: {e}"); MetricsResponse::new( Status::InternalServerError, format, - "Error while fetching providers data. Check the logs".into(), + "Error while fetching provider data. Check the logs.".into(), ) - }, - |metrics| MetricsResponse::new(Status::Ok, format, metrics), - ) + } + } + //let mut join_set = JoinSet::new(); + //for task in unscheduled_tasks.iter().cloned() { + // join_set.spawn(process_task(task)); + //} + + //wait_for_metrics(format, join_set).await.map_or_else( + // |e| { + // error!("General error while fetching providers data: {e}"); + // MetricsResponse::new( + // Status::InternalServerError, + // format, + // "Error while fetching providers data. Check the logs".into(), + // ) + // }, + // |metrics| MetricsResponse::new(Status::Ok, format, metrics), + //) } async fn wait_for_metrics(