From 50974f761f6a41c73be0c4449bd48f08652f9b44 Mon Sep 17 00:00:00 2001 From: Simeon Romanov Date: Thu, 30 Nov 2023 23:45:08 +0300 Subject: [PATCH] refactoring --- backend/src/models/api.rs | 153 ++++----- backend/src/models/mod.rs | 6 +- backend/src/services/api.rs | 558 +++----------------------------- backend/src/services/methods.rs | 108 +++++++ backend/src/services/mod.rs | 15 +- 5 files changed, 246 insertions(+), 594 deletions(-) diff --git a/backend/src/models/api.rs b/backend/src/models/api.rs index 9338d132..a7da0084 100644 --- a/backend/src/models/api.rs +++ b/backend/src/models/api.rs @@ -9,7 +9,7 @@ pub const DEFAULT_MIN_FREE_SPACE_PERCENTAGE: f64 = 0.1; pub use crate::models::shared::{BobConnectionData, Credentials}; /// Physical disk definition -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize)] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] pub struct Disk { /// Disk name @@ -32,7 +32,7 @@ pub struct Disk { } /// Defines kind of problem on disk -#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)] +#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Serialize, Hash)] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] pub enum DiskProblem { #[serde(rename = "freeSpaceRunningOut")] @@ -43,7 +43,7 @@ pub enum DiskProblem { /// /// Variant - Disk Status /// Content - List of problems on disk. 'null' if status != 'bad' -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Hash)] #[serde(tag = "status", content = "problems")] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] pub enum DiskStatus { @@ -75,7 +75,7 @@ impl DiskStatus { } /// Defines disk status names -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash, EnumIter)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Hash, EnumIter)] #[serde(rename_all = "camelCase")] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] pub enum DiskStatusName { @@ -84,7 +84,7 @@ pub enum DiskStatusName { Offline, } -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] pub struct Node { pub name: String, @@ -97,7 +97,7 @@ pub struct Node { #[serde(rename = "rps")] #[serde(skip_serializing_if = "Option::is_none")] - pub rps: Option, + pub rps: Option, #[serde(rename = "alienCount")] #[serde(skip_serializing_if = "Option::is_none")] @@ -112,7 +112,7 @@ pub struct Node { } /// Defines kind of problem on Node -#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)] +#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Serialize, Hash)] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] pub enum NodeProblem { #[serde(rename = "aliensExists")] @@ -177,7 +177,7 @@ impl NodeProblem { /// Variants - Node status /// /// Content - List of problems on node. 'null' if status != 'bad' -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Hash)] #[serde(tag = "status", content = "problems")] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] pub enum NodeStatus { @@ -217,7 +217,7 @@ impl TypedMetrics { } /// Defines node status names -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash, EnumIter)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Hash, EnumIter)] #[serde(rename_all = "camelCase")] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] pub enum NodeStatusName { @@ -227,7 +227,7 @@ pub enum NodeStatusName { } /// [`VDisk`]'s replicas -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize)] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] pub struct Replica { pub node: String, @@ -241,7 +241,7 @@ pub struct Replica { } /// Reasons why Replica is offline -#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Serialize, Deserialize)] +#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Serialize)] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] pub enum ReplicaProblem { #[serde(rename = "nodeUnavailable")] @@ -255,7 +255,7 @@ pub enum ReplicaProblem { /// Variants - Replica status /// /// Content - List of problems on replica. 'null' if status != 'offline' -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize)] #[serde(tag = "status", content = "problems")] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] pub enum ReplicaStatus { @@ -266,7 +266,7 @@ pub enum ReplicaStatus { } /// Disk space information in bytes -#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize)] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] pub struct SpaceInfo { /// Total disk space amount @@ -282,8 +282,38 @@ pub struct SpaceInfo { pub occupied_disk: u64, } +impl From for SpaceInfo { + fn from(space: dto::SpaceInfo) -> Self { + Self { + total_disk: space.total_disk_space_bytes, + free_disk: space.total_disk_space_bytes - space.used_disk_space_bytes, + used_disk: space.used_disk_space_bytes, + occupied_disk: space.occupied_disk_space_bytes, + } + } +} + +impl AddAssign for SpaceInfo { + fn add_assign(&mut self, rhs: Self) { + self.total_disk = rhs.total_disk; + self.free_disk = rhs.free_disk; + self.used_disk = rhs.used_disk; + self.occupied_disk = rhs.occupied_disk; + } +} + +impl Add for SpaceInfo { + type Output = Self; + + fn add(mut self, rhs: Self) -> Self::Output { + self += rhs; + + self + } +} + /// Virtual disk Component -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize)] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] pub struct VDisk { pub id: u64, @@ -301,11 +331,9 @@ pub struct VDisk { /// /// Variants - Virtual Disk status /// status == 'bad' when at least one of its replicas has problems -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize)] #[serde(tag = "status")] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] -// #[cfg_attr(all(feature = "swagger", debug_assertions), -// schema(example = json!({"status": "good"})))] pub enum VDiskStatus { #[serde(rename = "good")] Good, @@ -316,7 +344,7 @@ pub enum VDiskStatus { } /// Types of operations on BOB cluster -#[derive(Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq, PartialOrd, Ord, EnumIter)] +#[derive(Debug, Clone, Serialize, Hash, Eq, PartialEq, PartialOrd, Ord, EnumIter)] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] #[serde(rename_all = "camelCase")] pub enum Operation { @@ -326,7 +354,7 @@ pub enum Operation { Delete, } -#[derive(Clone, Debug, Serialize, Deserialize, Hash, Eq, PartialEq, PartialOrd, Ord, EnumIter)] +#[derive(Clone, Debug, Serialize, Hash, Eq, PartialEq, PartialOrd, Ord, EnumIter)] #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] pub enum RawMetricEntry { #[serde(rename = "cluster_grinder.get_count_rate")] @@ -392,6 +420,38 @@ pub type NodeCount = TypedMap; // #[cfg(not(all(feature = "swagger", debug_assertions)))] pub type DiskCount = TypedMap; +impl RPS { + #[must_use] + pub fn from_metrics(metrics: &TypedMetrics) -> Self { + let mut rps = Self::new(); + rps[Operation::Get] += metrics[RawMetricEntry::ClusterGrinderGetCountRate].value; + rps[Operation::Delete] += metrics[RawMetricEntry::ClusterGrinderDeleteCountRate].value; + rps[Operation::Exist] += metrics[RawMetricEntry::ClusterGrinderExistCountRate].value; + rps[Operation::Put] += metrics[RawMetricEntry::ClusterGrinderPutCountRate].value; + + rps + } +} + +impl AddAssign for RPS { + fn add_assign(&mut self, rhs: Self) { + self[Operation::Get] += rhs[Operation::Get]; + self[Operation::Delete] += rhs[Operation::Delete]; + self[Operation::Exist] += rhs[Operation::Exist]; + self[Operation::Put] += rhs[Operation::Put]; + } +} + +impl Add for RPS { + type Output = Self; + + fn add(mut self, rhs: Self) -> Self::Output { + self += rhs; + + self + } +} + #[derive(Debug, Serialize, Clone)] // #[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] // #[cfg_attr(all(feature = "swagger", debug_assertions), @@ -525,61 +585,6 @@ impl From for TypedMetrics { } } -#[derive(IntoParams, Deserialize, Clone)] -#[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] -pub struct Pagination { - #[serde(default)] - pub page: usize, - #[serde(default)] - pub per_page: usize, -} - -#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] -#[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] -pub struct PaginatedResponse { - /// The page of data being returned - pub data: Vec, - /// The number of rows returned in the current page - pub count: usize, - /// The total number of rows available - pub total: usize, - /// The current page being returned - pub page: usize, - /// The number of pages available - pub page_count: usize, -} - -impl PaginatedResponse { - /// Create a new `PaginatedResponse` - #[must_use] - pub fn new(data: Vec, total: usize, page: usize, page_size: usize) -> Self { - let count = data.len().try_into().unwrap_or(0); - let page_count = total / page_size + usize::from(total % page_size != 0); - - Self { - data, - count, - total, - page, - page_count, - } - } - - /// Transform the data contained in the `PaginatedResponse` - pub fn map(self, func: F) -> PaginatedResponse - where - F: Fn(T) -> B, - { - PaginatedResponse:: { - data: self.data.into_iter().map(func).collect(), - count: self.count, - total: self.total, - page: self.page, - page_count: self.page_count, - } - } -} - #[cfg(test)] mod tests { use super::{ diff --git a/backend/src/models/mod.rs b/backend/src/models/mod.rs index b6b7cbfb..f6f06968 100644 --- a/backend/src/models/mod.rs +++ b/backend/src/models/mod.rs @@ -5,7 +5,11 @@ pub mod shared; pub mod prelude { pub use crate::prelude::*; pub use hyper::Uri; - pub use std::{net::SocketAddr, time::Duration}; + pub use std::{ + net::SocketAddr, + ops::{Add, AddAssign}, + time::Duration, + }; pub use strum::{EnumIter, IntoEnumIterator}; pub use utoipa::openapi::{Object, ObjectBuilder}; } diff --git a/backend/src/services/api.rs b/backend/src/services/api.rs index e4b2bd43..251aeb4e 100644 --- a/backend/src/services/api.rs +++ b/backend/src/services/api.rs @@ -1,16 +1,10 @@ -use super::{ - auth::HttpClient, - methods::{fetch_configuration, fetch_metrics, fetch_nodes, fetch_vdisks}, - prelude::*, -}; -use crate::{ - connector::dto::NodeConfiguration, - models::bob::{DiskName, IsActive}, -}; -use axum::extract::{Path, Query}; +use super::prelude::*; + +// TODO: For methods, that requires information from all nodes (/disks/count, /nodes/rps, etc.), +// think of better method of returning info +// another thread that constantly updates info in period and cache the results? /// Returns count of Physical Disks per status -/// #[cfg_attr(all(feature = "swagger", debug_assertions), utoipa::path( get, @@ -40,31 +34,16 @@ pub async fn get_disks_count(Extension(client): Extension) -> Jso let mut count = DiskCount::new(); while let Some(res) = space.next().await { - let mut disks_visited = HashSet::new(); - let (disks, space) = match res { - Ok(d) => d, - Err(err) => { - tracing::warn!("couldn't finish request: tokio task failed. Err: {err}"); - continue; - } + let Ok((disks, space)) = res else { + tracing::warn!("couldn't finish request: tokio task failed. Err: {res:?}"); + continue; }; - let space = match space { - Ok(GetSpaceInfoResponse::SpaceInfo(space)) => space, - Err(err) => { - tracing::warn!("couldn't finish getSpace request. Err: {err}"); - continue; - } + let Ok(GetSpaceInfoResponse::SpaceInfo(space)) = space else { + tracing::warn!("couldn't finish getSpace request. {space:?}"); + continue; }; let disks = match disks { - Ok(GetDisksResponse::AJSONArrayWithDisksAndTheirStates(disks)) => { - let mut res = vec![]; - for disk in disks { - if disks_visited.insert(disk.name.clone()) { - res.push(disk); - } - } - res - } + Ok(GetDisksResponse::AJSONArrayWithDisksAndTheirStates(disks)) => disks, Ok(GetDisksResponse::PermissionDenied(err)) => { count[DiskStatusName::Offline] += 1; tracing::warn!("Permission Denied. Err: {err:?}"); @@ -76,14 +55,16 @@ pub async fn get_disks_count(Extension(client): Extension) -> Jso continue; } }; - let active_disks = disks.iter().filter(|disk| disk.is_active); - for disk in active_disks { + let mut active = 0; + disks.iter().filter(|disk| disk.is_active).for_each(|disk| { + active += 1; match DiskStatus::from_space_info(&space, &disk.name) { DiskStatus::Good => count[DiskStatusName::Good] += 1, DiskStatus::Offline => count[DiskStatusName::Offline] += 1, DiskStatus::Bad(_) => count[DiskStatusName::Bad] += 1, } - } + }); + count[DiskStatusName::Offline] = (disks.len() - active) as u64; } tracing::info!("total disks count: {count:?}"); @@ -91,7 +72,6 @@ pub async fn get_disks_count(Extension(client): Extension) -> Jso } /// Get Nodes count per Status -/// #[cfg_attr(all(feature = "swagger", debug_assertions), utoipa::path( get, @@ -120,20 +100,18 @@ pub async fn get_nodes_count(Extension(client): Extension) -> Jso let mut count = NodeCount::new(); - let mut counter = 0; while let Some(res) = metrics.next().await { if let Ok(Ok(GetMetricsResponse::Metrics(metrics))) = res { - tracing::trace!("#{counter}: metrics received successfully"); + tracing::trace!("metrics received successfully"); if Into::::into(metrics).is_bad_node() { count[NodeStatusName::Bad] += 1; } else { count[NodeStatusName::Good] += 1; } } else { - tracing::warn!("#{counter}: couldn't receive metrics from node"); + tracing::warn!("couldn't receive metrics from node"); // TODO: Some better message count[NodeStatusName::Offline] += 1; } - counter += 1; } tracing::info!("total nodes per status count: {count:?}"); @@ -141,7 +119,6 @@ pub async fn get_nodes_count(Extension(client): Extension) -> Jso } /// Returns Total RPS on cluster -/// #[cfg_attr(all(feature = "swagger", debug_assertions), utoipa::path( get, @@ -169,20 +146,14 @@ pub async fn get_rps(Extension(client): Extension) -> Json { .collect(); let mut rps = RPS::new(); - let mut counter = 0; while let Some(res) = metrics.next().await { if let Ok(Ok(metrics)) = res { - tracing::info!("#{counter}: metrics received successfully"); + tracing::info!("metrics received successfully"); let GetMetricsResponse::Metrics(metrics) = metrics; - let metrics = Into::::into(metrics); - rps[Operation::Get] += metrics[RawMetricEntry::ClusterGrinderGetCountRate].value; - rps[Operation::Delete] += metrics[RawMetricEntry::ClusterGrinderDeleteCountRate].value; - rps[Operation::Exist] += metrics[RawMetricEntry::ClusterGrinderExistCountRate].value; - rps[Operation::Put] += metrics[RawMetricEntry::ClusterGrinderPutCountRate].value; + rps += RPS::from_metrics(&metrics.into()); } else { - tracing::warn!("#{counter}: couldn't receive metrics from node"); + tracing::warn!("couldn't receive metrics from node"); // TODO: Some better message } - counter += 1; } tracing::info!("total rps: {rps:?}"); @@ -190,7 +161,6 @@ pub async fn get_rps(Extension(client): Extension) -> Json { } /// Return inforamtion about space on cluster -/// #[cfg_attr(all(feature = "swagger", debug_assertions), utoipa::path( get, @@ -213,20 +183,17 @@ pub async fn get_space(Extension(client): Extension) -> Json, ) -> AxumResult> { tracing::info!("get /vdisks/{vdisk_id} : {client:?}"); - get_vdisk_by_id(&client, vdisk_id).await.map(Json) } -pub async fn get_vdisk_by_id(client: &HttpBobClient, vdisk_id: u64) -> AxumResult { - let vdisks = fetch_vdisks(client.api_main()).await?; - let vdisk = vdisks - .iter() - .find(|vdisk| vdisk.id as u64 == vdisk_id) - .ok_or_else(|| StatusCode::NOT_FOUND.into_response())?; - let clients = vdisk - .replicas - .iter() - .flatten() - .map(|replica| client.api_secondary(&replica.node)); - let first_client = clients.clone().next(); - let partition_count = if let Some(Some(handle)) = first_client { - handle.get_partitions(vdisk_id as i32).await.map_or_else( - |_err| 0, - |parts| { - if let GetPartitionsResponse::NodeInfoAndJSONArrayWithPartitionsInfo(parts) = parts - { - parts.partitions.unwrap_or_default().len() - } else { - 0 - } - }, - ) - } else { - 0 - }; - let mut disks: FuturesUnordered<_> = clients - .map(move |node| { - let handle = node.cloned(); - tokio::spawn(async move { - if let Some(handle) = handle { - Ok((handle.get_status().await, handle.get_disks().await)) - } else { - Err(APIError::RequestFailed) - } - }) - }) - .collect(); - let mut replicas: HashMap<_, _> = vdisk - .replicas - .clone() - .into_iter() - .flatten() - .map(|replica| { - ( - (replica.disk.clone(), replica.node.clone()), - Replica { - node: replica.node, - disk: replica.disk, - path: replica.path, - status: ReplicaStatus::Offline(vec![ReplicaProblem::NodeUnavailable]), - }, - ) - }) - .collect(); - while let Some(res) = disks.next().await { - if let Ok(Ok(( - Ok(GetStatusResponse::AJSONWithNodeInfo(status)), - Ok(GetDisksResponse::AJSONArrayWithDisksAndTheirStates(disks)), - ))) = res - { - for disk in disks { - replicas.insert( - (disk.name.clone(), status.name.clone()), - Replica { - node: status.name.clone(), - disk: disk.name, - path: disk.path, - status: disk - .is_active - .then_some(ReplicaStatus::Good) - .unwrap_or_else(|| { - ReplicaStatus::Offline(vec![ReplicaProblem::DiskUnavailable]) - }), - }, - ); - } - } else { - tracing::warn!("couldn't receive node's space info"); - } - } - - let replicas: Vec<_> = replicas.into_values().collect(); - let count = replicas - .iter() - .filter(|replica| matches!(replica.status, ReplicaStatus::Offline(_))) - .count(); - let status = if count == 0 { - VDiskStatus::Good - } else if count == replicas.len() { - VDiskStatus::Offline - } else { - VDiskStatus::Bad - }; - - Ok(VDisk { - id: vdisk_id, - status, - partition_count: partition_count as u64, - replicas, - }) -} /// Returns node inforamtion by their node name /// /// # Errors @@ -463,7 +326,6 @@ pub async fn get_node_info( tokio::spawn(async move { handle.clone().get_space_info().await }) }; - let client = Arc::new(client); let Ok(Ok(GetStatusResponse::AJSONWithNodeInfo(status))) = status.await else { return Err(StatusCode::NOT_FOUND.into()); }; @@ -496,20 +358,10 @@ pub async fn get_node_info( { let metric = Into::::into(metric); node.status = NodeStatus::from_problems(NodeProblem::default_from_metrics(&metric)); - node.rps = Some( - metric[RawMetricEntry::PearlGetCountRate].value - + metric[RawMetricEntry::PearlPutCountRate].value - + metric[RawMetricEntry::PearlExistCountRate].value - + metric[RawMetricEntry::PearlDeleteCountRate].value, - ); + node.rps = Some(RPS::from_metrics(&metric)); node.alien_count = Some(metric[RawMetricEntry::BackendAlienCount].value); node.corrupted_count = Some(metric[RawMetricEntry::BackendCorruptedBlobCount].value); - node.space = Some(SpaceInfo { - total_disk: space.total_disk_space_bytes, - free_disk: space.total_disk_space_bytes - space.used_disk_space_bytes, - used_disk: space.used_disk_space_bytes, - occupied_disk: space.occupied_disk_space_bytes, - }); + node.space = Some(SpaceInfo::from(space)); } while let Some(vdisk) = vdisks.next().await { @@ -523,313 +375,6 @@ pub async fn get_node_info( Ok(Json(node)) } -// /// Returns list of all known nodes -// /// -// /// # Errors -// /// -// /// This function will return an error if one of the requests to get list of virtual disks or nodes fails -// #[cfg_attr(feature = "swagger", utoipa::path( -// get, -// context_path = ApiV1::to_path(), -// path = "/nodes", -// responses( -// ( -// status = 200, body = PaginatedResponse, -// content_type = "application/json", -// description = "Node List"), -// (status = 401, description = "Unauthorized") -// ), -// security(("api_key" = [])) -// ))] -// pub async fn get_nodes( -// Extension(client): Extension, -// Query(params): Query>, -// ) -> AxumResult>> { -// tracing::info!("get /nodes : {client:?}"); -// -// let len = client.cluster_with_addr().len(); -// Ok(if let Some(page_params) = params { -// Json(PaginatedResponse::new( -// batch_get_nodes(client, page_params.clone()).await?, -// len, -// page_params.page, -// page_params.per_page, -// )) -// } else { -// Json(PaginatedResponse::new( -// dump_get_nodes(client).await?, -// len, -// 1, -// 1, -// )) -// }) -// } - -// pub async fn batch_get_nodes( -// client: HttpBobClient, -// Pagination { page, per_page }: Pagination, -// ) -> AxumResult> { -// if page == 0 { -// return Err(StatusCode::BAD_REQUEST.into()); -// } -// let len = client.cluster_with_addr().len(); -// let first_node = (page - 1) * per_page; -// if first_node >= len { -// return Err(StatusCode::NOT_FOUND.into()); -// } -// let iter = client -// .cluster() -// .skip(first_node) -// .take(len.min(page * per_page)); -// -// todo!() -// } -// -// pub async fn dump_get_nodes(client: HttpBobClient) -> AxumResult> { -// let mut metrics: FuturesUnordered<_> = client -// .cluster() -// .map(move |node| { -// let handle = node.clone(); -// tokio::spawn(async move { -// ( -// handle.get_status().await, -// handle.get_metrics().await, -// handle.get_space_info().await, -// ) -// }) -// }) -// .collect(); -// -// let vdisks = get_vdisks(Extension(client.clone())).await.map_err(|err| { -// tracing::error!("{err:?}"); -// APIError::RequestFailed -// })?; -// -// let nodes = fetch_nodes(client.api_main()).await?; -// let vdisks: HashMap = vdisks.iter().map(|vdisk| (vdisk.id, vdisk)).collect(); -// -// let nodes: HashMap<&NodeName, &dto::Node> = -// nodes.iter().map(move |node| (&node.name, node)).collect(); -// -// let mut res = nodes -// .iter() -// .map(|(&name, node)| { -// let vdisks = node -// .vdisks -// .as_ref() -// .map_or_else(std::vec::Vec::new, |node_vdisks| { -// node_vdisks -// .iter() -// .filter_map(|vdisk| vdisks.get(&(vdisk.id as u64))) -// .map(|vdisk| (*vdisk).clone()) -// .collect() -// }); -// ( -// name, -// Node { -// name: name.clone(), -// hostname: node.address.clone(), -// vdisks, -// status: NodeStatus::Offline, -// rps: None, -// alien_count: None, -// corrupted_count: None, -// space: None, -// }, -// ) -// }) -// .collect::>(); -// -// let mut counter = 0; -// while let Some(fut) = metrics.next().await { -// let Ok(( -// Ok(GetStatusResponse::AJSONWithNodeInfo(status)), -// Ok(GetMetricsResponse::Metrics(metric)), -// Ok(GetSpaceInfoResponse::SpaceInfo(space)), -// )) = fut -// else { -// tracing::warn!("couldn't finish task: tokio task failed."); -// continue; -// }; -// if let Some(node) = res.get_mut(&status.name.to_string()) { -// let metric = Into::::into(metric); -// tracing::info!("#{counter}: received metrics successfully."); -// node.status = NodeStatus::from_problems(NodeProblem::default_from_metrics(&metric)); -// node.rps = Some( -// metric[RawMetricEntry::PearlGetCountRate].value -// + metric[RawMetricEntry::PearlPutCountRate].value -// + metric[RawMetricEntry::PearlExistCountRate].value -// + metric[RawMetricEntry::PearlDeleteCountRate].value, -// ); -// node.alien_count = Some(metric[RawMetricEntry::BackendAlienCount].value); -// node.corrupted_count = Some(metric[RawMetricEntry::BackendCorruptedBlobCount].value); -// node.space = Some(SpaceInfo { -// total_disk: space.total_disk_space_bytes, -// free_disk: space.total_disk_space_bytes - space.used_disk_space_bytes, -// used_disk: space.used_disk_space_bytes, -// occupied_disk: space.occupied_disk_space_bytes, -// }); -// } -// counter += 1; -// } -// tracing::trace!("send response: {res:?}"); -// -// Ok(res.values().cloned().collect()) -// } - -// /// Get Virtual Disks -// /// -// /// # Errors -// /// -// /// This function will return an error if one of the requests to get list of vdisks or nodes fails -// #[cfg_attr(feature = "swagger", utoipa::path( -// get, -// context_path = ApiV1::to_path(), -// path = "/vdisks", -// responses( -// (status = 200, body = Vec, content_type = "application/json", description = "Virtual disks list"), -// (status = 401, description = "Unauthorized") -// ), -// security(("api_key" = [])) -// ))] -// pub async fn get_vdisks( -// Extension(client): Extension, -// ) -> AxumResult>> { -// tracing::info!("get /vdisks : {client:?}"); -// -// let mut disks: FuturesUnordered<_> = client -// .cluster() -// .map(move |node| { -// let handle = node.clone(); -// tokio::spawn(async move { (handle.get_status().await, handle.get_disks().await) }) -// }) -// .collect(); -// -// let api = client.api_main(); -// let nodes = fetch_nodes(api).await?; -// let virt_disks = fetch_vdisks(api).await?; -// -// let nodes: HashMap<&NodeName, &dto::Node> = -// nodes.iter().map(|node| (&node.name, node)).collect(); -// -// let mut res_disks = HashMap::new(); -// while let Some(res) = disks.next().await { -// if let Ok(( -// Ok(GetStatusResponse::AJSONWithNodeInfo(status)), -// Ok(GetDisksResponse::AJSONArrayWithDisksAndTheirStates(disks)), -// )) = res -// { -// let mut map = HashMap::new(); -// for disk in disks { -// map.insert(disk.name, disk.is_active); -// } -// res_disks.insert(status.name, map); -// } else { -// tracing::warn!("couldn't receive node's space info"); -// } -// } -// -// let mut res = vec![]; -// -// for vdisk in virt_disks { -// let replicas = if let Some(replicas) = vdisk.replicas { -// let mut res = vec![]; -// for replica in replicas { -// res.push(if let Some(disks) = res_disks.get(&replica.node) { -// process_replica(&client, replica, disks, &nodes).await -// } else { -// Replica { -// node: replica.node, -// disk: replica.disk, -// path: replica.path, -// status: ReplicaStatus::Offline(vec![ReplicaProblem::NodeUnavailable]), -// } -// }); -// } -// res -// } else { -// vec![] -// }; -// let count = replicas -// .iter() -// .filter(|replica| matches!(replica.status, ReplicaStatus::Offline(_))) -// .count(); -// let status = if count == 0 { -// VDiskStatus::Good -// } else if count == replicas.len() { -// VDiskStatus::Offline -// } else { -// VDiskStatus::Bad -// }; -// let part = client.api_main().get_partitions(vdisk.id).await.ok(); -// let partition_count = -// if let Some(GetPartitionsResponse::NodeInfoAndJSONArrayWithPartitionsInfo(part)) = part -// { -// part.partitions.unwrap_or_default().len() -// } else { -// 0 -// } as u64; -// res.push(VDisk { -// id: vdisk.id as u64, -// status, -// partition_count, -// replicas, -// }); -// } -// tracing::trace!("send response: {res:?}"); -// -// Ok(Json(res)) -// } - -async fn process_replica( - client: &HttpBobClient, - replica: dto::Replica, - disks: &HashMap, - nodes: &HashMap<&NodeName, &dto::Node>, -) -> Replica { - let mut status = ReplicaStatus::Good; - if let Some(disk_state) = disks.get(&replica.disk) { - if !disk_state { - status = ReplicaStatus::Offline(vec![ReplicaProblem::DiskUnavailable]); - } - } else { - status = ReplicaStatus::Offline(vec![ReplicaProblem::DiskUnavailable]); - } - - if let Some(node) = nodes.get(&replica.node) { - if !is_node_online(client, node).await { - status = match status { - ReplicaStatus::Good => { - ReplicaStatus::Offline(vec![ReplicaProblem::DiskUnavailable]) - } - ReplicaStatus::Offline(mut problems) => { - problems.push(ReplicaProblem::NodeUnavailable); - ReplicaStatus::Offline(problems) - } - } - } - } else { - status = match status { - ReplicaStatus::Good => ReplicaStatus::Offline(vec![ReplicaProblem::DiskUnavailable]), - ReplicaStatus::Offline(mut problems) => { - problems.push(ReplicaProblem::NodeUnavailable); - ReplicaStatus::Offline(problems) - } - } - } - - Replica { - node: replica.node, - disk: replica.disk, - path: replica.path, - status, - } -} - -async fn is_node_online(client: &HttpBobClient, node: &dto::Node) -> bool { - (client.probe_socket(&node.name).await).map_or(false, |code| code == StatusCode::OK) -} - /// Get Raw Metrics from Node /// /// # Errors @@ -850,9 +395,16 @@ pub async fn raw_metrics_by_node( Extension(client): Extension, Path(node_name): Path, ) -> AxumResult> { - let client = get_client_by_node(&client, node_name).await?; - - Ok(Json(fetch_metrics(client.as_ref()).await?.into())) + Ok(Json( + fetch_metrics( + &client + .api_secondary(&node_name) + .cloned() + .ok_or(StatusCode::NOT_FOUND)?, + ) + .await? + .into(), + )) } /// Get Configuration from Node @@ -874,32 +426,14 @@ pub async fn raw_metrics_by_node( pub async fn raw_configuration_by_node( Extension(client): Extension, Path(node_name): Path, -) -> AxumResult> { - let client = get_client_by_node(&client, node_name).await?; - - Ok(Json(fetch_configuration(client.as_ref()).await?)) -} - -async fn get_client_by_node( - client: &HttpBobClient, - node_name: NodeName, -) -> AxumResult> { - let nodes = fetch_nodes(client.api_main()).await?; - - let node = nodes - .iter() - .find(|node| node.name == node_name) - .ok_or_else(|| { - tracing::error!("Couldn't find specified node"); - APIError::RequestFailed - })?; - - client - .cluster_with_addr() - .get(&node.name) - .ok_or_else(|| { - tracing::error!("Couldn't find specified node"); - APIError::RequestFailed.into() - }) - .cloned() +) -> AxumResult> { + Ok(Json( + fetch_configuration( + &client + .api_secondary(&node_name) + .cloned() + .ok_or(StatusCode::NOT_FOUND)?, + ) + .await?, + )) } diff --git a/backend/src/services/methods.rs b/backend/src/services/methods.rs index 3a0716d1..0ca0abbf 100644 --- a/backend/src/services/methods.rs +++ b/backend/src/services/methods.rs @@ -160,3 +160,111 @@ pub async fn fetch_nodes< Ok(nodes) } + +/// Return `VDisk` information by id +/// +/// # Errors +/// +/// This function will return an error if vdisks information couldn't be fetched or no vdisk with +/// provided id was found +pub async fn get_vdisk_by_id(client: &HttpBobClient, vdisk_id: u64) -> AxumResult { + let virtual_disks = fetch_vdisks(client.api_main()).await?; + let virtual_disks = virtual_disks + .iter() + .find(|vdisk| vdisk.id as u64 == vdisk_id) + .ok_or_else(|| StatusCode::NOT_FOUND.into_response())?; + let clients = virtual_disks + .replicas + .iter() + .flatten() + .map(|replica| replica.node.clone()) + .collect::>() + .iter() + .filter_map(|node_name| client.api_secondary(node_name)) + .collect::>(); + let partition_count = if let Some(handle) = clients.first() { + handle.get_partitions(vdisk_id as i32).await.map_or_else( + |_err| 0, + |parts| { + if let GetPartitionsResponse::NodeInfoAndJSONArrayWithPartitionsInfo(parts) = parts + { + parts.partitions.unwrap_or_default().len() + } else { + 0 + } + }, + ) + } else { + 0 + }; + let mut disks: FuturesUnordered<_> = clients + .iter() + .map(move |&node| { + let handle = node.clone(); + tokio::spawn(async move { (handle.get_status().await, handle.get_disks().await) }) + }) + .collect(); + let mut replicas: HashMap<_, _> = virtual_disks + .replicas + .clone() + .into_iter() + .flatten() + .map(|replica| { + ( + (replica.disk.clone(), replica.node.clone()), + Replica { + node: replica.node, + disk: replica.disk, + path: replica.path, + status: ReplicaStatus::Offline(vec![ReplicaProblem::NodeUnavailable]), + }, + ) + }) + .collect(); + while let Some(res) = disks.next().await { + if let Ok(( + Ok(GetStatusResponse::AJSONWithNodeInfo(status)), + Ok(GetDisksResponse::AJSONArrayWithDisksAndTheirStates(disks)), + )) = res + { + for disk in disks { + replicas.insert( + (disk.name.clone(), status.name.clone()), + Replica { + node: status.name.clone(), + disk: disk.name, + path: disk.path, + status: disk + .is_active + .then_some(ReplicaStatus::Good) + .unwrap_or_else(|| { + ReplicaStatus::Offline(vec![ReplicaProblem::DiskUnavailable]) + }), + }, + ); + } + } else { + tracing::warn!("couldn't receive node's space info"); + } + } + + let replicas: Vec<_> = replicas.into_values().collect(); + let count = replicas + .iter() + .filter(|replica| matches!(replica.status, ReplicaStatus::Offline(_))) + .count(); + let status = if count == 0 { + VDiskStatus::Good + } else if count == replicas.len() { + VDiskStatus::Offline + } else { + VDiskStatus::Bad + }; + + Ok(VDisk { + id: vdisk_id, + status, + partition_count: partition_count as u64, + replicas, + }) +} diff --git a/backend/src/services/mod.rs b/backend/src/services/mod.rs index a641dd87..e3af9708 100644 --- a/backend/src/services/mod.rs +++ b/backend/src/services/mod.rs @@ -1,4 +1,7 @@ mod prelude { + pub use super::methods::{ + fetch_configuration, fetch_metrics, fetch_nodes, fetch_vdisks, get_vdisk_by_id, + }; pub use crate::{ connector::{ api::{prelude::*, ApiNoContext}, @@ -8,7 +11,7 @@ mod prelude { prelude::*, }; pub use axum::{ - extract::{FromRef, FromRequestParts}, + extract::{FromRef, FromRequestParts, Path}, http::request::Parts, middleware::{from_fn_with_state, Next}, Router, @@ -23,15 +26,13 @@ pub mod auth; pub mod methods; use crate::root; -use api::{get_disks_count, get_nodes_count, get_rps, get_space}; +use api::{ + get_disks_count, get_node_info, get_nodes_count, get_nodes_list, get_rps, get_space, + get_vdisk_info, get_vdisks_list, raw_configuration_by_node, raw_metrics_by_node, +}; use auth::{login, logout, require_auth, AuthState, BobUser, HttpBobClient, InMemorySessionStore}; use prelude::*; -use self::api::{ - get_node_info, get_nodes_list, get_vdisk_info, get_vdisks_list, raw_configuration_by_node, - raw_metrics_by_node, -}; - type BobAuthState = AuthState< BobUser, Uuid,