Skip to content

Commit

Permalink
Improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
tcheronneau committed Nov 19, 2024
1 parent 7590f86 commit 0a05593
Showing 1 changed file with 88 additions and 60 deletions.
148 changes: 88 additions & 60 deletions src/http_server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Result;
use futures::future::try_join_all;
use log::{error, info};
use rocket::http::{Accept, ContentType, Status};
use rocket::tokio::task;
Expand Down Expand Up @@ -63,73 +64,100 @@ async fn metrics(
) -> Result<MetricsResponse, MetricsError> {
Ok(serve_metrics(Format::Prometheus, unscheduled_tasks).await)
}
async fn process_task(task: Task) -> Result<TaskResult, JoinError> {
info!("Requesting data for {:?}", &task,);
match task {
Task::SonarrToday(sonarr) => {
let name = &sonarr.name;
let result = sonarr.get_today_shows().await;
let result = HashMap::from([(name.to_string(), result)]);
Ok(TaskResult::SonarrToday(result))
}
Task::SonarrMissing(sonarr) => {
let name = &sonarr.name;
let result = sonarr.get_last_week_missing_shows().await;
let result = HashMap::from([(name.to_string(), result)]);
Ok(TaskResult::SonarrMissing(result))
}
Task::TautulliSession(tautulli) => {
let result = tautulli.get_session_summary().await;
Ok(TaskResult::TautulliSession(result))
}
Task::TautulliLibrary(tautulli) => {
let result = tautulli.get_libraries().await;
Ok(TaskResult::TautulliLibrary(result))
}
Task::Radarr(radarr) => {
let name = &radarr.name;
let result = radarr.get_radarr_movies().await;
let result = HashMap::from([(name.to_string(), result)]);
Ok(TaskResult::Radarr(result))
}
Task::Overseerr(overseerr) => {
let result = overseerr.get_overseerr_requests().await;
Ok(TaskResult::Overseerr(result))
}
Task::PlexSession(plex) => {
let name = &plex.name;
let result = plex.get_current_sessions().await;
let result = HashMap::from([(name.to_string(), result)]);
let users = plex.get_users().await;
Ok(TaskResult::PlexSession(result, users))
}
Task::PlexLibrary(plex) => {
let name = &plex.name;
let result = plex.get_all_library_size().await;
let result = HashMap::from([(name.to_string(), result)]);
Ok(TaskResult::PlexLibrary(result))
}
Task::Default => Ok(TaskResult::Default),
}
async fn process_tasks(tasks: Vec<Task>) -> Result<Vec<TaskResult>, JoinError> {
let task_futures: Vec<_> = tasks
.into_iter()
.map(|task| async {
info!("Requesting data for {:?}", &task,);
match task {
Task::SonarrToday(sonarr) => {
let name = &sonarr.name;
let result = sonarr.get_today_shows().await;
let result = HashMap::from([(name.to_string(), result)]);
Ok(TaskResult::SonarrToday(result))
}
Task::SonarrMissing(sonarr) => {
let name = &sonarr.name;
let result = sonarr.get_last_week_missing_shows().await;
let result = HashMap::from([(name.to_string(), result)]);
Ok(TaskResult::SonarrMissing(result))
}
Task::TautulliSession(tautulli) => {
let result = tautulli.get_session_summary().await;
Ok(TaskResult::TautulliSession(result))
}
Task::TautulliLibrary(tautulli) => {
let result = tautulli.get_libraries().await;
Ok(TaskResult::TautulliLibrary(result))
}
Task::Radarr(radarr) => {
let name = &radarr.name;
let result = radarr.get_radarr_movies().await;
let result = HashMap::from([(name.to_string(), result)]);
Ok(TaskResult::Radarr(result))
}
Task::Overseerr(overseerr) => {
let result = overseerr.get_overseerr_requests().await;
Ok(TaskResult::Overseerr(result))
}
Task::PlexSession(plex) => {
let name = &plex.name;
let result = plex.get_current_sessions().await;
let result = HashMap::from([(name.to_string(), result)]);
let users = plex.get_users().await;
Ok(TaskResult::PlexSession(result, users))
}
Task::PlexLibrary(plex) => {
let name = &plex.name;
let result = plex.get_all_library_size().await;
let result = HashMap::from([(name.to_string(), result)]);
Ok(TaskResult::PlexLibrary(result))
}
Task::Default => Ok(TaskResult::Default),
}
})
.collect();
try_join_all(task_futures).await
}

async fn serve_metrics(format: Format, unscheduled_tasks: &State<Vec<Task>>) -> MetricsResponse {
let mut join_set = JoinSet::new();
for task in unscheduled_tasks.iter().cloned() {
join_set.spawn(process_task(task));
}

wait_for_metrics(format, join_set).await.map_or_else(
|e| {
error!("General error while fetching providers data: {e}");
match process_tasks(unscheduled_tasks.inner().clone()).await {
Ok(task_results) => match format_metrics(task_results) {
Ok(metrics) => MetricsResponse::new(Status::Ok, format, metrics),
Err(e) => {
error!("Error formatting metrics: {e}");
MetricsResponse::new(
Status::InternalServerError,
format,
"Error formatting metrics. Check the logs.".into(),
)
}
},
Err(e) => {
error!("Error while processing tasks: {e}");
MetricsResponse::new(
Status::InternalServerError,
format,
"Error while fetching providers data. Check the logs".into(),
"Error while fetching provider data. Check the logs.".into(),
)
},
|metrics| MetricsResponse::new(Status::Ok, format, metrics),
)
}
}
//let mut join_set = JoinSet::new();
//for task in unscheduled_tasks.iter().cloned() {
// join_set.spawn(process_task(task));
//}

//wait_for_metrics(format, join_set).await.map_or_else(
// |e| {
// error!("General error while fetching providers data: {e}");
// MetricsResponse::new(
// Status::InternalServerError,
// format,
// "Error while fetching providers data. Check the logs".into(),
// )
// },
// |metrics| MetricsResponse::new(Status::Ok, format, metrics),
//)
}

async fn wait_for_metrics(
Expand Down

0 comments on commit 0a05593

Please sign in to comment.