diff --git a/backend/src/connector/api.rs b/backend/src/connector/api.rs index 58c3233e..8c92737e 100644 --- a/backend/src/connector/api.rs +++ b/backend/src/connector/api.rs @@ -6,7 +6,6 @@ //! //! -use super::dto::{self}; use super::prelude::*; pub type ServiceError = Box; @@ -22,6 +21,17 @@ pub enum APIError { ResponseError, } +impl IntoResponse for APIError { + fn into_response(self) -> axum::response::Response { + match self { + Self::RequestFailed => StatusCode::NOT_FOUND, + Self::InvalidStatusCode(code) => code, + Self::ResponseError => StatusCode::INTERNAL_SERVER_ERROR, + } + .into_response() + } +} + #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum GetAlienResponse { /// Alien Node name diff --git a/backend/src/connector/client.rs b/backend/src/connector/client.rs index 3793f24b..758d4a52 100644 --- a/backend/src/connector/client.rs +++ b/backend/src/connector/client.rs @@ -306,19 +306,68 @@ where /// Return directory of alien #[must_use] async fn get_alien_dir(&self, context: &C) -> Result { - todo!() + let request = self + .form_request("/alien/dir", Method::GET, context) + .change_context(APIError::RequestFailed)?; + let response = self.call(request, context).await?; + + match response.status().as_u16() { + 200 => Ok(self + .handle_response_json(response, |body: Dir| GetAlienDirResponse::Directory(body)) + .await?), + 403 => Ok(self + .handle_response_json(response, |body: StatusExt| { + GetAlienDirResponse::PermissionDenied(body) + }) + .await?), + 406 => Ok(self + .handle_response_json(response, |body: StatusExt| { + GetAlienDirResponse::NotAcceptableBackend(body) + }) + .await?), + _ => Err(APIError::from(response))?, + } } /// Returns the list of disks with their states #[must_use] async fn get_disks(&self, context: &C) -> Result { - todo!() + let request = self + .form_request("/disks/list", Method::GET, context) + .change_context(APIError::RequestFailed)?; + let response = self.call(request, context).await?; + + match response.status().as_u16() { + 200 => Ok(self + .handle_response_json(response, |body: Vec| { + GetDisksResponse::AJSONArrayWithDisksAndTheirStates(body) + }) + .await?), + 403 => Ok(self + .handle_response_json(response, |body: StatusExt| { + GetDisksResponse::PermissionDenied(body) + }) + .await?), + _ => Err(APIError::from(response))?, + } } /// Get metrics #[must_use] async fn get_metrics(&self, context: &C) -> Result { - todo!() + let request = self + .form_request("/metrics", Method::GET, context) + .change_context(APIError::RequestFailed)?; + let response = self.call(request, context).await?; + + match response.status().as_u16() { + 200 => Ok(self + .handle_response_json(response, |body: MetricsSnapshotModel| { + GetMetricsResponse::Metrics(body) + }) + .await?), + _ => Err(APIError::from(response))?, + } } /// Returns a list of known nodes @@ -333,7 +382,7 @@ where match response.status().as_u16() { 200 => Ok(self - .handle_response_json(response, |body: Vec| { + .handle_response_json(response, |body: Vec| { GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(body) }) .await?), @@ -346,77 +395,269 @@ where #[must_use] async fn get_partition( &self, - v_disk_id: i32, - partition_id: String, + param_v_disk_id: i32, + param_partition_id: String, context: &C, ) -> Result { - todo!() + let request = self + .form_request( + &format!("/vdisks/{param_v_disk_id}/partitions/{param_partition_id}"), + Method::GET, + context, + ) + .change_context(APIError::RequestFailed)?; + let response = self.call(request, context).await?; + + match response.status().as_u16() { + 200 => Ok(self + .handle_response_json(response, |body: Partition| { + GetPartitionResponse::AJSONWithPartitionInfo(body) + }) + .await?), + 403 => Ok(self + .handle_response_json(response, |body: StatusExt| { + GetPartitionResponse::PermissionDenied(body) + }) + .await?), + 404 => Ok(self + .handle_response_json(response, |body: StatusExt| { + GetPartitionResponse::NotFound(body) + }) + .await?), + _ => Err(APIError::from(response))?, + } } /// Returns a list of partitions #[must_use] async fn get_partitions( &self, - v_disk_id: i32, + param_v_disk_id: i32, context: &C, ) -> Result { - todo!() + let request = self + .form_request( + &format!("/vdisks/{param_v_disk_id}/partitions"), + Method::GET, + context, + ) + .change_context(APIError::RequestFailed)?; + let response = self.call(request, context).await?; + + match response.status().as_u16() { + 200 => Ok(self + .handle_response_json(response, |body: VDiskPartitions| { + GetPartitionsResponse::NodeInfoAndJSONArrayWithPartitionsInfo(body) + }) + .await?), + 403 => Ok(self + .handle_response_json(response, |body: StatusExt| { + GetPartitionsResponse::PermissionDenied(body) + }) + .await?), + + 404 => Ok(self + .handle_response_json(response, |body: StatusExt| { + GetPartitionsResponse::NotFound(body) + }) + .await?), + _ => Err(APIError::from(response))?, + } } /// Returns count of records of this on node #[must_use] async fn get_records( &self, - v_disk_id: i32, + param_v_disk_id: i32, context: &C, ) -> Result { - todo!() + let request = self + .form_request( + &format!("/vdisks/{param_v_disk_id}/records/count"), + Method::GET, + context, + ) + .change_context(APIError::RequestFailed)?; + let response = self.call(request, context).await?; + + match response.status().as_u16() { + 200 => Ok(self + .handle_response_json(response, GetRecordsResponse::RecordsCount) + .await?), + 403 => Ok(self + .handle_response_json(response, |body: StatusExt| { + GetRecordsResponse::PermissionDenied(body) + }) + .await?), + 404 => Ok(self + .handle_response_json(response, |body: StatusExt| { + GetRecordsResponse::NotFound(body) + }) + .await?), + _ => Err(APIError::from(response))?, + } } /// Returns directories of local replicas of vdisk #[must_use] async fn get_replicas_local_dirs( &self, - v_disk_id: i32, + param_v_disk_id: i32, context: &C, ) -> Result { - todo!() + let request = self + .form_request( + &format!("/vdisks/{param_v_disk_id}/replicas/local/dirs"), + Method::GET, + context, + ) + .change_context(APIError::RequestFailed)?; + let response = self.call(request, context).await?; + + match response.status().as_u16() { + 200 => Ok(self + .handle_response_json(response, |body: Vec| { + GetReplicasLocalDirsResponse::AJSONArrayWithDirs(body) + }) + .await?), + 403 => Ok(self + .handle_response_json(response, |body: StatusExt| { + GetReplicasLocalDirsResponse::PermissionDenied(body) + }) + .await?), + 404 => Ok(self + .handle_response_json(response, |body: StatusExt| { + GetReplicasLocalDirsResponse::NotFound(body) + }) + .await?), + _ => Err(APIError::from(response))?, + } } /// Get space info #[must_use] async fn get_space_info(&self, context: &C) -> Result { - todo!() + let request = self + .form_request("/status/space", Method::GET, context) + .change_context(APIError::RequestFailed)?; + let response = self.call(request, context).await?; + + match response.status().as_u16() { + 200 => Ok(self + .handle_response_json(response, |body: SpaceInfo| { + GetSpaceInfoResponse::SpaceInfo(body) + }) + .await?), + _ => Err(APIError::from(response))?, + } } /// Returns information about self #[must_use] async fn get_status(&self, context: &C) -> Result { - todo!() + let request = self + .form_request("/status", Method::GET, context) + .change_context(APIError::RequestFailed)?; + let response = self.call(request, context).await?; + + match response.status().as_u16() { + 200 => Ok(self + .handle_response_json(response, |body: Node| { + GetStatusResponse::AJSONWithNodeInfo(body) + }) + .await?), + _ => Err(APIError::from(response))?, + } } /// Returns a vdisk info by ID #[must_use] - async fn get_v_disk(&self, v_disk_id: i32, context: &C) -> Result { - todo!() + async fn get_v_disk( + &self, + param_v_disk_id: i32, + context: &C, + ) -> Result { + let request = self + .form_request(&format!("/vdisks/{param_v_disk_id}"), Method::GET, context) + .change_context(APIError::RequestFailed)?; + let response = self.call(request, context).await?; + + match response.status().as_u16() { + 200 => Ok(self + .handle_response_json(response, |body: VDisk| { + GetVDiskResponse::AJSONWithVdiskInfo(body) + }) + .await?), + 403 => Ok(self + .handle_response_json(response, |body: StatusExt| { + GetVDiskResponse::PermissionDenied(body) + }) + .await?), + 404 => Ok(self + .handle_response_json(response, |body: StatusExt| GetVDiskResponse::NotFound(body)) + .await?), + _ => Err(APIError::from(response))?, + } } /// Returns a list of vdisks #[must_use] async fn get_v_disks(&self, context: &C) -> Result { - todo!() + let request = self + .form_request("/vdisks", Method::GET, context) + .change_context(APIError::RequestFailed)?; + let response = self.call(request, context).await?; + + match response.status().as_u16() { + 200 => Ok(self + .handle_response_json(response, |body: Vec| { + GetVDisksResponse::AJSONArrayOfVdisksInfo(body) + }) + .await?), + 403 => Ok(GetVDisksResponse::PermissionDenied), + + _ => Err(APIError::from(response))?, + } } /// Returns server version #[must_use] async fn get_version(&self, context: &C) -> Result { - todo!() + let request = self + .form_request("/version", Method::GET, context) + .change_context(APIError::RequestFailed)?; + let response = self.call(request, context).await?; + + match response.status().as_u16() { + 200 => Ok(self + .handle_response_json(response, |body: VersionInfo| { + GetVersionResponse::VersionInfo(body) + }) + .await?), + + _ => Err(APIError::from(response))?, + } } /// Returns configuration of the node #[must_use] async fn get_configuration(&self, context: &C) -> Result { - todo!() + let request = self + .form_request("/configuration", Method::GET, context) + .change_context(APIError::RequestFailed)?; + let response = self.call(request, context).await?; + + match response.status().as_u16() { + 200 => Ok(self + .handle_response_json(response, |body: NodeConfiguration| { + GetConfigurationResponse::ConfigurationObject(body) + }) + .await?), + 403 => Ok(GetConfigurationResponse::PermissionDenied), + + _ => Err(APIError::from(response))?, + } } } diff --git a/backend/src/connector/dto.rs b/backend/src/connector/dto.rs index a4755746..b617c454 100644 --- a/backend/src/connector/dto.rs +++ b/backend/src/connector/dto.rs @@ -12,7 +12,7 @@ //! use std::collections::HashMap; - +use utoipa::ToSchema; type StdError = dyn std::error::Error; /// Function, used for parsing strings into DTOs @@ -360,7 +360,7 @@ impl std::str::FromStr for Error { } } -#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] +#[derive(ToSchema, Debug, Default, Clone, serde::Serialize, serde::Deserialize)] #[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] pub struct MetricsEntryModel { #[serde(rename = "value")] @@ -370,7 +370,13 @@ pub struct MetricsEntryModel { pub timestamp: u64, } -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +impl utoipa::PartialSchema for MetricsEntryModel { + fn schema() -> utoipa::openapi::RefOr { + ::schema().1 + } +} + +#[derive(ToSchema, Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] pub struct MetricsSnapshotModel { #[serde(rename = "metrics")] @@ -486,7 +492,7 @@ impl std::str::FromStr for Node { } } -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(ToSchema, Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] pub struct NodeConfiguration { #[serde(rename = "blob_file_name_prefix")] diff --git a/backend/src/connector/mod.rs b/backend/src/connector/mod.rs index 34ed58ff..e9f62340 100644 --- a/backend/src/connector/mod.rs +++ b/backend/src/connector/mod.rs @@ -3,6 +3,7 @@ mod prelude { context::{BodyExt, ContextWrapper, DropContextService, Has}, ClientError, Connector, }; + pub use crate::connector::dto::*; pub use crate::{models::shared::XSpanIdString, prelude::*, services::auth::HttpClient}; pub use axum::{ headers::{authorization::Credentials, Authorization, HeaderMapExt}, @@ -27,7 +28,7 @@ pub mod client; pub mod context; pub mod dto; -pub type ApiInterface = dyn ApiNoContext + Send + Sync; +// pub type ApiInterface = dyn ApiNoContext + Send + Sync; #[derive(Debug, Error)] pub enum ClientError { diff --git a/backend/src/lib.rs b/backend/src/lib.rs index ab452228..f11f5401 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -12,16 +12,58 @@ pub mod models; pub mod router; pub mod services; +struct SecurityAddon; + +impl Modify for SecurityAddon { + fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) { + if let Some(components) = openapi.components.as_mut() { + components.add_security_scheme( + "api_key", + SecurityScheme::ApiKey(ApiKey::Header(ApiKeyValue::new("bob_apikey"))), + ); + } + } +} + #[derive(OpenApi)] #[cfg_attr(not(all(feature = "swagger", debug_assertions)), openapi())] #[cfg_attr(all(feature = "swagger", debug_assertions), openapi( - paths(root, services::auth::login, services::auth::logout), + paths( + root, + services::auth::login, + services::auth::logout, + services::api::get_disks_count, + services::api::get_nodes_count, + services::api::get_rps, + services::api::get_space, + ), components( - schemas(models::shared::Credentials, models::shared::Hostname, models::shared::BobConnectionData) + schemas(models::shared::Credentials, models::shared::Hostname, models::shared::BobConnectionData, + models::api::DiskProblem, + models::api::DiskStatus, + models::api::DiskStatusName, + models::api::DiskCount, + models::api::NodeProblem, + models::api::NodeStatus, + models::api::NodeStatusName, + models::api::NodeCount, + models::api::ReplicaProblem, + models::api::ReplicaStatus, + models::api::SpaceInfo, + models::api::VDiskStatus, + models::api::Operation, + models::api::RPS, + models::api::RawMetricEntry, + models::api::TypedMetrics, + connector::dto::MetricsEntryModel, + connector::dto::MetricsSnapshotModel, + connector::dto::NodeConfiguration + ) ), tags( (name = "bob", description = "BOB management API") - ) + ), + modifiers(&SecurityAddon) ))] pub struct ApiDoc; @@ -79,7 +121,7 @@ pub mod prelude { connector::{ client::Client, context::{ClientContext, ContextWrapper, DropContextService}, - BobClient, + dto, BobClient, }, error::AppError, models::{ @@ -100,9 +142,18 @@ pub mod prelude { pub use error_stack::{Context, Report, Result, ResultExt}; pub use hyper::{client::HttpConnector, Body, Method, Request, StatusCode}; pub use serde::{Deserialize, Serialize}; - pub use std::{collections::HashMap, hash::Hash, marker::PhantomData, str::FromStr}; + pub use std::{ + collections::{HashMap, HashSet}, + hash::Hash, + marker::PhantomData, + str::FromStr, + sync::Arc, + }; pub use thiserror::Error; - pub use utoipa::{IntoParams, OpenApi, ToSchema}; + pub use utoipa::{ + openapi::security::{ApiKey, ApiKeyValue, SecurityScheme}, + IntoParams, Modify, OpenApi, PartialSchema, ToSchema, + }; pub use uuid::Uuid; } @@ -120,9 +171,10 @@ pub mod main { }, ApiDoc, }; - pub use axum::error_handling::HandleErrorLayer; - pub use axum::middleware::from_fn_with_state; - pub use axum::{BoxError, Extension, Router}; + pub use axum::{ + error_handling::HandleErrorLayer, middleware::from_fn_with_state, BoxError, Extension, + Router, + }; pub use cli::Parser; pub use error_stack::{Result, ResultExt}; pub use hyper::{Method, StatusCode}; diff --git a/backend/src/models/api.rs b/backend/src/models/api.rs index 7c51c244..0882880b 100644 --- a/backend/src/models/api.rs +++ b/backend/src/models/api.rs @@ -1,8 +1,6 @@ #![allow(unused_qualifications)] -use crate::connector::dto::{MetricsEntryModel, MetricsSnapshotModel}; -use crate::prelude::*; -use strum::{EnumIter, IntoEnumIterator}; +use super::prelude::*; pub const DEFAULT_MAX_CPU: u64 = 90; pub const DEFAULT_MIN_FREE_SPACE: f64 = 0.1; @@ -10,28 +8,6 @@ pub const DEFAULT_MIN_FREE_SPACE: f64 = 0.1; /// Connection Data pub use crate::models::shared::{BobConnectionData, Credentials}; -/// Physical disk definition -#[derive(ToSchema, Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct Disk { - /// Disk name - pub name: String, - - /// Disk path - pub path: String, - - /// Disk status - #[serde(flatten)] - pub status: DiskStatus, - - #[serde(rename = "totalSpace")] - pub total_space: u64, - - #[serde(rename = "usedSpace")] - pub used_space: u64, - - pub iops: u64, -} - /// Defines kind of problem on disk #[derive(ToSchema, Debug, Clone, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)] pub enum DiskProblem { @@ -63,34 +39,6 @@ pub enum DiskStatusName { Offline, } -pub type DiskCount = TypedMap; - -#[derive(ToSchema, Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct Node { - pub name: String, - - pub hostname: String, - - pub vdisks: Vec, - #[serde(flatten)] - pub status: NodeStatus, - - #[serde(rename = "rps")] - #[serde(skip_serializing_if = "Option::is_none")] - pub rps: Option, - - #[serde(rename = "alienCount")] - #[serde(skip_serializing_if = "Option::is_none")] - pub alien_count: Option, - - #[serde(rename = "corruptedCount")] - #[serde(skip_serializing_if = "Option::is_none")] - pub corrupted_count: Option, - - #[serde(skip_serializing_if = "Option::is_none")] - pub space: Option, -} - /// Defines kind of problem on Node #[derive(ToSchema, Debug, Clone, Eq, PartialEq, PartialOrd, Serialize, Deserialize, Hash)] pub enum NodeProblem { @@ -183,21 +131,6 @@ pub enum NodeStatusName { Offline, } -pub type NodeCount = TypedMap; - -/// [`VDisk`]'s replicas -#[derive(ToSchema, Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct Replica { - pub node: String, - - pub disk: String, - - pub path: String, - - #[serde(flatten)] - pub status: ReplicaStatus, -} - /// Reasons why Replica is offline #[derive(ToSchema, Debug, Clone, Eq, PartialEq, PartialOrd, Serialize, Deserialize)] pub enum ReplicaProblem { @@ -237,20 +170,6 @@ pub struct SpaceInfo { pub occupied_disk: u64, } -/// Virtual disk Component -#[derive(ToSchema, Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct VDisk { - pub id: u64, - - #[serde(flatten)] - pub status: VDiskStatus, - - #[serde(rename = "partitionCount")] - pub partition_count: u64, - - pub replicas: Vec, -} - /// Virtual disk status. /// /// Variants - Virtual Disk status @@ -266,42 +185,6 @@ pub enum VDiskStatus { Offline, } -#[derive(ToSchema, Debug, Clone, Serialize, Deserialize)] -pub struct DetailedNode { - pub name: String, - - pub hostname: String, - - pub vdisks: Vec, - - #[serde(flatten)] - pub status: NodeStatus, - - pub metrics: DetailedNodeMetrics, - - pub disks: Vec, -} - -#[derive(ToSchema, Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct DetailedNodeMetrics { - pub rps: RPS, - - pub alien_count: u64, - - pub corrupted_count: u64, - - pub space: SpaceInfo, - - pub cpu_load: u64, - - pub total_ram: u64, - - pub used_ram: u64, - - pub descr_amount: u64, -} - #[derive( ToSchema, Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq, PartialOrd, Ord, EnumIter, )] @@ -313,8 +196,6 @@ pub enum Operation { Delete, } -pub type RPS = TypedMap; - #[derive( ToSchema, Clone, Debug, Serialize, Deserialize, Hash, Eq, PartialEq, PartialOrd, Ord, EnumIter, )] @@ -355,46 +236,77 @@ pub enum RawMetricEntry { HardwareDescrAmount, } -pub type TypedMetrics = TypedMap; +#[allow(dead_code, clippy::expect_used)] +fn get_map_schema() -> Object { + let mut res = ObjectBuilder::new(); + for key in Id::iter() { + let key = serde_json::to_string(&key).expect("infallible"); + let key = key.trim_matches('"'); + res = res.required(key).property(key, V::schema()); + } + res.build() +} + +#[derive(ToSchema, Debug, Serialize, Clone)] +#[aliases(RPS = TypedMap, TypedMetrics = TypedMap, NodeCount = TypedMap, DiskCount = TypedMap)] +pub struct TypedMap { + // FIXME: Bugged + // See -> https://github.com/juhaku/utoipa/issues/644 + // #[schema(schema_with = get_map_schema::)] + #[serde(flatten)] + map: HashMap, +} -#[derive(ToSchema, Debug, Serialize, Deserialize, Clone)] -pub struct TypedMap(HashMap); +// pub type TypedMetrics = TypedMap; -impl std::ops::Index for TypedMap { +impl std::ops::Index for TypedMap { type Output = V; fn index(&self, index: Id) -> &Self::Output { - self.0.index(&index) + self.map.index(&index) } } #[allow(clippy::expect_used)] -impl std::ops::IndexMut for TypedMap { +impl std::ops::IndexMut + for TypedMap +{ fn index_mut(&mut self, index: Id) -> &mut Self::Output { - self.0.get_mut(&index).expect("infallible") + self.map.get_mut(&index).expect("infallible") } } -impl Default for TypedMap { +impl Default for TypedMap { fn default() -> Self { let mut map = HashMap::new(); for key in Id::iter() { map.insert(key, V::default()); } - Self(map) + Self { map } } } -impl TypedMap { +impl TypedMap { + #[must_use] pub fn new() -> Self { Self::default() } } +pub trait Util { + fn key_iter() -> Id::Iterator; +} + +impl Util for TypedMap { + fn key_iter() -> Id::Iterator { + Id::iter() + } +} + #[allow(clippy::expect_used)] -impl From for TypedMetrics { - fn from(value: MetricsSnapshotModel) -> Self { +impl From for TypedMetrics { + fn from(value: dto::MetricsSnapshotModel) -> Self { let mut map = HashMap::new(); let mut value = value.metrics; for key in RawMetricEntry::iter() { @@ -404,7 +316,7 @@ impl From for TypedMetrics { map.insert(key, value); } - Self(map) + Self { map } } } diff --git a/backend/src/models/mod.rs b/backend/src/models/mod.rs index 39107f58..b6b7cbfb 100644 --- a/backend/src/models/mod.rs +++ b/backend/src/models/mod.rs @@ -6,4 +6,6 @@ pub mod prelude { pub use crate::prelude::*; pub use hyper::Uri; pub use std::{net::SocketAddr, time::Duration}; + pub use strum::{EnumIter, IntoEnumIterator}; + pub use utoipa::openapi::{Object, ObjectBuilder}; } diff --git a/backend/src/models/shared.rs b/backend/src/models/shared.rs index eeca6367..030780bb 100644 --- a/backend/src/models/shared.rs +++ b/backend/src/models/shared.rs @@ -2,6 +2,7 @@ use super::prelude::*; use std::result::Result; #[derive(ToSchema, Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[schema(value_type = String)] pub struct Hostname( #[serde( deserialize_with = "hyper_serde::deserialize", diff --git a/backend/src/services/api.rs b/backend/src/services/api.rs index c24defba..c49c48e3 100644 --- a/backend/src/services/api.rs +++ b/backend/src/services/api.rs @@ -1,15 +1,18 @@ -use std::collections::HashSet; - -use futures::StreamExt; - -use crate::{ - connector::api::{prelude::*, ApiNoContext}, - models::api::*, -}; - use super::prelude::*; + /// Returns count of Physical Disks per status /// +#[cfg_attr(feature = "swagger", + utoipa::path( + get, + context_path = ApiV1::to_path(), + path = "/disks/count", + responses( + (status = 200, body = DiskCount, content_type = "application/json", description = "Returns a list with count of physical disks per status"), + (status = 401, description = "Unauthorized") + ), + security(("api_key" = [])) +))] pub async fn get_disks_count(Extension(client): Extension) -> Json { tracing::info!("get /disks/count : {:?}", client); @@ -80,6 +83,16 @@ pub async fn get_disks_count(Extension(client): Extension) -> Jso /// Get Nodes count per Status /// +#[cfg_attr(feature = "swagger", utoipa::path( + get, + context_path = ApiV1::to_path(), + path = "/nodes/count", + responses( + (status = 200, body = NodeCount, content_type = "application/json", description = "Node count list per status"), + (status = 401, description = "Unauthorized") + ), + security(("api_key" = [])) + ))] pub async fn get_nodes_count(Extension(client): Extension) -> Json { tracing::info!("get /nodes/count : {:?}", client); @@ -116,6 +129,16 @@ pub async fn get_nodes_count(Extension(client): Extension) -> Jso /// Returns Total RPS on cluster /// +#[cfg_attr(feature = "swagger", utoipa::path( + get, + context_path = ApiV1::to_path(), + path = "/nodes/rps", + responses( + (status = 200, body = RPS, content_type = "application/json", description = "RPS list per operation on all nodes"), + (status = 401, description = "Unauthorized") + ), + security(("api_key" = [])) + ))] pub async fn get_rps(Extension(client): Extension) -> Json { tracing::info!("get /nodes/rps : {:?}", client); @@ -151,6 +174,16 @@ pub async fn get_rps(Extension(client): Extension) -> Json { /// Return inforamtion about space on cluster /// +#[cfg_attr(feature = "swagger", utoipa::path( + get, + context_path = ApiV1::to_path(), + path = "/nodes/space", + responses( + (status = 200, body = SpaceInfo, content_type = "application/json", description = "Cluster Space Information"), + (status = 401, description = "Unauthorized") + ), + security(("api_key" = [])) + ))] pub async fn get_space(Extension(client): Extension) -> Json { tracing::info!("get /space : {:?}", client); let mut spaces: FuturesUnordered<_> = client @@ -196,519 +229,14 @@ fn is_bad_node(node_metrics: &TypedMetrics) -> bool { > node_metrics[RawMetricEntry::HardwareTotalRam] } -/// Returns list of all known nodes -/// -/// # Errors -/// -/// This function will return an error if one of the requests to get list of virtual disks or nodes fails -pub async fn get_nodes(Extension(client): Extension) -> AxumResult>> { - tracing::info!("get /nodes : {client:?}"); - - let mut metrics: FuturesUnordered<_> = client - .cluster() - .map(move |node| { - let handle = node.clone(); - tokio::spawn(async move { - ( - handle.get_status().await, - handle.get_metrics().await, - handle.get_space_info().await, - ) - }) - }) - .collect(); - - let vdisks = get_vdisks(Extension(client.clone())).await.map_err(|err| { - tracing::error!("{err:?}"); - APIError::RequestFailed - })?; - - let nodes = request_nodes(client.api()).await?; - let vdisks: HashMap = vdisks.iter().map(|vdisk| (vdisk.id, vdisk)).collect(); - - let nodes: HashMap<&NodeName, &crate::bob_client::dto::Node> = - nodes.iter().map(move |node| (&node.name, node)).collect(); - - let mut res = nodes - .iter() - .map(|(&name, node)| { - let vdisks = node - .vdisks - .as_ref() - .map_or_else(std::vec::Vec::new, |node_vdisks| { - node_vdisks - .iter() - .filter_map(|vdisk| vdisks.get(&(vdisk.id as u64))) - .map(|vdisk| (*vdisk).clone()) - .collect() - }); - ( - name, - Node { - name: name.clone(), - hostname: node.address.clone(), - vdisks, - status: NodeStatus::Offline, - rps: None, - alien_count: None, - corrupted_count: None, - space: None, - }, - ) - }) - .collect::>(); - - let mut counter = 0; - while let Some(fut) = metrics.next().await { - let Ok(( - Ok(client::GetStatusResponse::AJSONWithNodeInfo(status)), - Ok(client::GetMetricsResponse::Metrics(metric)), - Ok(client::GetSpaceInfoResponse::SpaceInfo(space)), - )) = fut - else { - tracing::warn!("couldn't finish task: tokio task failed."); - continue; - }; - if let Some(node) = res.get_mut(&status.name.to_string()) { - let metric = Into::::into(metric); - tracing::info!("#{counter}: received metrics successfully."); - node.status = NodeStatus::from_problems(NodeProblem::from_metrics(&metric)); - node.rps = Some( - metric[PearlGetCountRate].value - + metric[PearlPutCountRate].value - + metric[PearlExistCountRate].value - + metric[PearlDeleteCountRate].value, - ); - node.alien_count = Some(metric[BackendAlienCount].value); - node.corrupted_count = Some(metric[BackendCorruptedBlobCount].value); - node.space = Some(SpaceInfo { - total_disk: space.total_disk_space_bytes, - free_disk: space.total_disk_space_bytes - space.used_disk_space_bytes, - used_disk: space.used_disk_space_bytes, - occupied_disk: space.occupied_disk_space_bytes, - }); - } - counter += 1; - } - tracing::trace!("send response: {res:?}"); - - Ok(Json(res.values().cloned().collect())) -} - -/// Get Virtual Disks -/// -/// # Errors -/// -/// This function will return an error if one of the requests to get list of vdisks or nodes fails -pub async fn get_vdisks( - Extension(client): Extension, -) -> AxumResult>> { - tracing::info!("get /vdisks : {client:?}"); - - let mut disks: FuturesUnordered<_> = client - .cluster() - .map(move |node| { - let handle = node.clone(); - tokio::spawn(async move { (handle.get_status().await, handle.get_disks().await) }) - }) - .collect(); - - let api = client.api(); - let nodes = request_nodes(api).await?; - let virt_disks = request_vdisks(api).await?; - - let nodes: HashMap<&NodeName, &bob_client::dto::Node> = - nodes.iter().map(|node| (&node.name, node)).collect(); - - let mut res_disks = HashMap::new(); - while let Some(res) = disks.next().await { - if let Ok(( - Ok(client::GetStatusResponse::AJSONWithNodeInfo(status)), - Ok(client::GetDisksResponse::AJSONArrayWithDisksAndTheirStates(disks)), - )) = res - { - let mut map = HashMap::new(); - for disk in disks { - map.insert(disk.name, disk.is_active); - } - res_disks.insert(status.name, map); - } else { - tracing::warn!("couldn't receive node's space info"); - } - } - - let mut res = vec![]; - - for vdisk in virt_disks { - let replicas = if let Some(replicas) = vdisk.replicas { - let mut res = vec![]; - for replica in replicas { - res.push(if let Some(disks) = res_disks.get(&replica.node) { - process_replica(&client, replica, disks, &nodes).await - } else { - Replica { - node: replica.node, - disk: replica.disk, - path: replica.path, - status: ReplicaStatus::Offline(vec![ReplicaProblem::NodeUnavailable]), - } - }); - } - res - } else { - vec![] - }; - let count = replicas - .iter() - .filter(|replica| matches!(replica.status, ReplicaStatus::Offline(_))) - .count(); - let status = if count == 0 { - VDiskStatus::Good - } else if count == replicas.len() { - VDiskStatus::Offline - } else { - VDiskStatus::Bad - }; - let part = client.api().get_partitions(vdisk.id).await.ok(); - let partition_count = if let Some( - client::GetPartitionsResponse::NodeInfoAndJSONArrayWithPartitionsInfo(part), - ) = part - { - part.partitions.unwrap_or_default().len() - } else { - 0 - } as u64; - res.push(VDisk { - id: vdisk.id as u64, - status, - partition_count, - replicas, - }); - } - tracing::trace!("send response: {res:?}"); - - Ok(Json(res)) -} - -async fn process_replica( - client: &BobClient, - replica: bob_client::dto::Replica, - disks: &HashMap, - nodes: &HashMap<&NodeName, &bob_client::dto::Node>, -) -> Replica { - let mut status = ReplicaStatus::Good; - if let Some(disk_state) = disks.get(&replica.disk) { - if !disk_state { - status = ReplicaStatus::Offline(vec![ReplicaProblem::DiskUnavailable]); - } - } else { - status = ReplicaStatus::Offline(vec![ReplicaProblem::DiskUnavailable]); - } - - if let Some(node) = nodes.get(&replica.node) { - if !is_node_online(client, node).await { - status = match status { - ReplicaStatus::Good => { - ReplicaStatus::Offline(vec![ReplicaProblem::DiskUnavailable]) - } - ReplicaStatus::Offline(mut problems) => { - problems.push(ReplicaProblem::NodeUnavailable); - ReplicaStatus::Offline(problems) - } - } - } - } else { - status = match status { - ReplicaStatus::Good => ReplicaStatus::Offline(vec![ReplicaProblem::DiskUnavailable]), - ReplicaStatus::Offline(mut problems) => { - problems.push(ReplicaProblem::NodeUnavailable); - ReplicaStatus::Offline(problems) - } - } - } - - Replica { - node: replica.node, - disk: replica.disk, - path: replica.path, - status, - } -} - -async fn is_node_online(client: &BobClient, node: &bob_client::dto::Node) -> bool { - (client.probe_socket(&node.name).await).map_or(false, |code| code == StatusCode::OK) -} - -fn proccess_disks( - disks: &[bob_client::dto::DiskState], - space: &bob_client::dto::SpaceInfo, - metrics: &bob_client::dto::MetricsSnapshotModel, -) -> Vec { - let mut res_disks = vec![]; - let mut visited_disks = HashSet::new(); - for disk in disks { - if !visited_disks.insert(disk.name.clone()) { - tracing::warn!( - "disk {} with path {} duplicated, skipping...", - disk.name, - disk.path - ); - continue; - } - let status = - if let Some(&occupied_space) = space.occupied_disk_space_by_disk.get(&disk.name) { - disk_status_from_space(space, occupied_space) - } else { - DiskStatus::Offline - }; - res_disks.push(Disk { - name: disk.name.clone(), - path: disk.path.clone(), - status, - total_space: space.total_disk_space_bytes, - used_space: space - .occupied_disk_space_by_disk - .get(&disk.name.clone()) - .copied() - .unwrap_or_default(), - iops: metrics - .metrics - .get(&format!("hardware.disks.{:?}_iops", disk.name)) - .cloned() - .unwrap_or_default() - .value, - }); - } - - res_disks -} - #[allow(clippy::cast_precision_loss)] -fn disk_status_from_space(space: &bob_client::dto::SpaceInfo, occupied_space: u64) -> DiskStatus { +fn disk_status_from_space(space: &dto::SpaceInfo, occupied_space: u64) -> DiskStatus { if ((space.total_disk_space_bytes - occupied_space) as f64 / space.total_disk_space_bytes as f64) - < MIN_FREE_SPACE + < DEFAULT_MIN_FREE_SPACE { DiskStatus::Bad(vec![DiskProblem::FreeSpaceRunningOut]) } else { DiskStatus::Good } } - -// Bad function, 6 args :( -async fn process_vdisks_for_node( - client: BobClient, - virt_disks: &[bob_client::dto::VDisk], - node_name: NodeName, - all_disks: &HashMap, - nodes: &HashMap<&NodeName, &bob_client::dto::Node>, - partitions_count_on_vdisk: &HashMap, -) -> Vec { - let mut res_replicas = vec![]; - let mut res_vdisks = vec![]; - for (vdisk, replicas) in virt_disks - .iter() - .filter_map(|vdisk| vdisk.replicas.as_ref().map(|repl| (vdisk, repl))) - .filter(|(_, replicas)| replicas.iter().any(|replica| replica.node == node_name)) - { - for replica in replicas { - res_replicas.push(( - vdisk.id, - process_replica(&client, replica.clone(), all_disks, nodes).await, - )); - } - res_vdisks.push(VDisk { - id: vdisk.id as u64, - status: if res_replicas - .iter() - .any(|(_, replica)| matches!(replica.status, ReplicaStatus::Offline(_))) - { - VDiskStatus::Bad - } else { - VDiskStatus::Good - }, - partition_count: partitions_count_on_vdisk - .get(&vdisk.id) - .copied() - .unwrap_or_default() as u64, - replicas: res_replicas - .iter() - .filter(|(id, _)| id == &vdisk.id) - .map(|(_, replica)| replica.clone()) - .collect(), - }); - } - - res_vdisks -} - -/// Get Detailed Information on Node -/// -/// # Errors -/// -/// This function will return an error if the server was unable to get node'a client -/// or one of the requests to get information from the node fails -pub async fn get_detailed_node_info( - Extension(client): Extension, - Path(node_name): Path, -) -> AxumResult> { - let mut all_disks: FuturesUnordered<_> = client - .cluster() - .map(move |node| { - let handle = node.clone(); - tokio::spawn(async move { handle.get_disks().await }) - }) - .collect(); - - let node_client = get_client_by_node(&client, node_name.clone()).await?; - - let virt_disks = request_vdisks(&node_client).await?; - - let mut all_partitions: FuturesUnordered<_> = virt_disks - .iter() - .map(|vdisk| { - let id = vdisk.id; - let handle = client.api().clone(); - tokio::spawn(async move { (id, handle.get_partitions(id).await) }) - }) - .collect(); - - let metrics = request_metrics(&node_client).await?; - let typed_metrics: TypedMetrics = metrics.clone().into(); - - let space = request_space(&node_client).await?; - let node_status = request_status(&node_client).await?; - let disks = request_disks(&node_client).await?; - - let res_disks = proccess_disks(&disks, &space, &metrics); - let nodes = request_nodes(&node_client).await?; - - let nodes = nodes - .iter() - .map(|node| (&node.name, node)) - .collect::>(); - - let mut proc_disks = HashMap::new(); - while let Some(disks) = all_disks.next().await { - let Ok(Ok(client::GetDisksResponse::AJSONArrayWithDisksAndTheirStates(disks))) = disks - else { - tracing::error!("couldn't get disk inforamtion from node"); - continue; - }; - for disk in disks { - proc_disks.insert(disk.name, disk.is_active); - } - } - - let mut res_partitions = HashMap::new(); - while let Some(partitions) = all_partitions.next().await { - let Ok(( - id, - Ok(client::GetPartitionsResponse::NodeInfoAndJSONArrayWithPartitionsInfo(partitions)), - )) = partitions - else { - // tracing::error!("couldn't get Partition inforamtion from node"); // Too noisy - continue; - }; - if let Some(partitions) = partitions.partitions { - res_partitions.insert(id, partitions.len()); - } - } - - let vdisks = process_vdisks_for_node( - client, - &virt_disks, - node_name, - &proc_disks, - &nodes, - &res_partitions, - ) - .await; - - let mut rps = RPS::new(); - - let status = NodeStatus::from_problems(NodeProblem::from_metrics(&typed_metrics)); - - rps[Get] = typed_metrics[PearlGetCountRate].value; - rps[Put] = typed_metrics[PearlPutCountRate].value; - rps[Exist] = typed_metrics[PearlExistCountRate].value; - rps[Delete] = typed_metrics[PearlDeleteCountRate].value; - - let res = Json(DetailedNode { - name: node_status.name, - hostname: node_status.address, - vdisks, - status, - metrics: DetailedNodeMetrics { - rps, - alien_count: typed_metrics[BackendAlienCount].value, - corrupted_count: typed_metrics[BackendCorruptedBlobCount].value, - space: SpaceInfo { - total_disk: space.total_disk_space_bytes, - free_disk: space.total_disk_space_bytes - space.used_disk_space_bytes, - used_disk: space.used_disk_space_bytes, - occupied_disk: space.occupied_disk_space_bytes, - }, - cpu_load: typed_metrics[HardwareBobCpuLoad].value, - total_ram: typed_metrics[HardwareTotalRam].value, - used_ram: typed_metrics[HardwareUsedRam].value, - descr_amount: typed_metrics[HardwareDescrAmount].value, - }, - disks: res_disks, - }); - tracing::trace!("send response: {res:?}"); - - Ok(res) -} - -/// Get Raw Metrics from Node -/// -/// # Errors -/// -/// This function will return an error if the server was unable to get node'a client or the request to get metrics fails -pub async fn raw_metrics_by_node( - Extension(client): Extension, - Path(node_name): Path, -) -> AxumResult> { - let client = get_client_by_node(&client, node_name).await?; - - Ok(Json(request_metrics(&client).await?)) -} - -/// Get Configuration from Node -/// -/// # Errors -/// -/// This function will return an error if the server was unable to get node'a client or the request to get configuration fails -pub async fn raw_configuration_by_node( - Extension(client): Extension, - Path(node_name): Path, -) -> AxumResult> { - let client = get_client_by_node(&client, node_name).await?; - - Ok(Json(request_configuration(&client).await?)) -} - -async fn get_client_by_node( - client: &BobClient, - node_name: NodeName, -) -> AxumResult> { - let nodes = request_nodes(client.api()).await?; - - let node = nodes - .iter() - .find(|node| node.name == node_name) - .ok_or_else(|| { - tracing::error!("Couldn't find specified node"); - APIError::RequestFailed - })?; - - client - .cluster_with_addr() - .get(&node.name) - .ok_or_else(|| { - tracing::error!("Couldn't find specified node"); - APIError::RequestFailed.into() - }) - .cloned() -} diff --git a/backend/src/services/methods.rs b/backend/src/services/methods.rs index 3b996e3b..795144c7 100644 --- a/backend/src/services/methods.rs +++ b/backend/src/services/methods.rs @@ -1,12 +1,12 @@ -use hyper::StatusCode; +use super::prelude::*; -use crate::bob_client::api; -use crate::bob_client::dto; -use crate::{bob_client::ApiInterface, prelude::*}; -use std::sync::Arc; - -pub async fn request_metrics(client: &Arc) -> AxumResult { - let api::GetMetricsResponse::Metrics(metrics) = client.get_metrics().await.map_err(|err| { +pub async fn request_metrics< + Context: Send + Sync, + ApiInterface: ApiNoContext + Send + Sync, +>( + client: &ApiInterface, +) -> AxumResult { + let GetMetricsResponse::Metrics(metrics) = client.get_metrics().await.map_err(|err| { tracing::error!("{err}"); APIError::RequestFailed })?; @@ -14,8 +14,13 @@ pub async fn request_metrics(client: &Arc) -> AxumResult) -> AxumResult> { - let api::GetVDisksResponse::AJSONArrayOfVdisksInfo(virt_disks) = +pub async fn request_vdisks< + Context: Send + Sync, + ApiInterface: ApiNoContext + Send + Sync, +>( + client: &ApiInterface, +) -> AxumResult> { + let GetVDisksResponse::AJSONArrayOfVdisksInfo(virt_disks) = client.get_v_disks().await.map_err(|err| { tracing::error!("{err}"); APIError::RequestFailed @@ -27,18 +32,27 @@ pub async fn request_vdisks(client: &Arc) -> AxumResult) -> AxumResult { - let api::GetSpaceInfoResponse::SpaceInfo(space) = - client.get_space_info().await.map_err(|err| { - tracing::error!("{err}"); - APIError::RequestFailed - })?; +pub async fn request_space< + Context: Send + Sync, + ApiInterface: ApiNoContext + Send + Sync, +>( + client: &ApiInterface, +) -> AxumResult { + let GetSpaceInfoResponse::SpaceInfo(space) = client.get_space_info().await.map_err(|err| { + tracing::error!("{err}"); + APIError::RequestFailed + })?; Ok(space) } -pub async fn request_status(client: &Arc) -> AxumResult { - let api::GetStatusResponse::AJSONWithNodeInfo(node_status) = +pub async fn request_status< + Context: Send + Sync, + ApiInterface: ApiNoContext + Send + Sync, +>( + client: &ApiInterface, +) -> AxumResult { + let GetStatusResponse::AJSONWithNodeInfo(node_status) = client.get_status().await.map_err(|err| { tracing::error!("{err}"); APIError::RequestFailed @@ -47,8 +61,13 @@ pub async fn request_status(client: &Arc) -> AxumResult Ok(node_status) } -pub async fn request_disks(client: &Arc) -> AxumResult> { - let api::GetDisksResponse::AJSONArrayWithDisksAndTheirStates(disks) = +pub async fn request_disks< + Context: Send + Sync, + ApiInterface: ApiNoContext + Send + Sync, +>( + client: &ApiInterface, +) -> AxumResult> { + let GetDisksResponse::AJSONArrayWithDisksAndTheirStates(disks) = client.get_disks().await.map_err(|err| { tracing::error!("{err}"); APIError::RequestFailed @@ -64,10 +83,13 @@ pub async fn request_disks(client: &Arc) -> AxumResult, +pub async fn request_configuration< + Context: Send + Sync, + ApiInterface: ApiNoContext + Send + Sync, +>( + client: &ApiInterface, ) -> AxumResult { - let api::GetConfigurationResponse::ConfigurationObject(configuration) = + let GetConfigurationResponse::ConfigurationObject(configuration) = client.get_configuration().await.map_err(|err| { tracing::error!("couldn't get node's configuration: {err}"); APIError::RequestFailed @@ -80,8 +102,13 @@ pub async fn request_configuration( Ok(configuration) } -pub async fn request_nodes(client: &Arc) -> AxumResult> { - let api::GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(nodes) = +pub async fn request_nodes< + Context: Send + Sync, + ApiInterface: ApiNoContext + Send + Sync, +>( + client: &ApiInterface, +) -> AxumResult> { + let GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(nodes) = client.get_nodes().await.map_err(|err| { tracing::error!("couldn't get nodes list from bob: {err}"); APIError::RequestFailed diff --git a/backend/src/services/mod.rs b/backend/src/services/mod.rs index 513f6b3b..084310a8 100644 --- a/backend/src/services/mod.rs +++ b/backend/src/services/mod.rs @@ -1,23 +1,29 @@ mod prelude { - pub use crate::connector::ClientError; - pub use crate::prelude::*; - pub use axum::middleware::from_fn_with_state; + pub use crate::{ + connector::{ + api::{prelude::*, ApiNoContext}, + ClientError, + }, + models::api::*, + prelude::*, + }; pub use axum::{ extract::{FromRef, FromRequestParts}, http::request::Parts, - middleware::Next, + middleware::{from_fn_with_state, Next}, Router, }; - pub use futures::stream::FuturesUnordered; - pub use std::sync::Arc; + pub use futures::{stream::FuturesUnordered, StreamExt}; pub use tokio::sync::Mutex; pub use tower_sessions::Session; } pub mod api; pub mod auth; +pub mod methods; use crate::root; +use api::{get_disks_count, get_nodes_count, get_rps, get_space}; use auth::{login, logout, require_auth, AuthState, BobUser, HttpBobClient, InMemorySessionStore}; use prelude::*; @@ -38,6 +44,10 @@ pub fn api_router_v1(auth_state: BobAuthState) -> Result, R Router::new() .with_context::() .api_route("/root", &Method::GET, root) + .api_route("/disks/count", &Method::GET, get_disks_count) + .api_route("/nodes/count", &Method::GET, get_nodes_count) + .api_route("/nodes/rps", &Method::GET, get_rps) + .api_route("/nodes/space", &Method::GET, get_space) .unwrap()? .route_layer(from_fn_with_state(auth_state, require_auth)) .with_context::()