diff --git a/backend/src/connector/mod.rs b/backend/src/connector/mod.rs index e9f62340..83338e4e 100644 --- a/backend/src/connector/mod.rs +++ b/backend/src/connector/mod.rs @@ -11,6 +11,7 @@ mod prelude { }; pub use futures::{Stream, StreamExt}; pub use hyper::{body::Bytes, service::Service, Response, Uri}; + pub use std::collections::BTreeMap; pub use std::{ str::FromStr, sync::Arc, @@ -107,7 +108,7 @@ pub struct BobClient + Send main: Arc, /// Clients for all known nodes - cluster: HashMap>, + cluster: BTreeMap>, context_marker: PhantomData, } @@ -168,7 +169,7 @@ impl + Send + Sync> .change_context(ClientError::PermissionDenied)? }; - let cluster: HashMap> = nodes + let cluster: BTreeMap> = nodes .iter() .filter_map(|node| HttpClient::from_node(node, &bob_data.hostname, context.clone())) .collect(); @@ -300,7 +301,7 @@ impl + Send + Sync> } #[must_use] - pub const fn cluster_with_addr(&self) -> &HashMap> { + pub const fn cluster_with_addr(&self) -> &BTreeMap> { &self.cluster } diff --git a/backend/src/models/api.rs b/backend/src/models/api.rs index 789a7aca..9338d132 100644 --- a/backend/src/models/api.rs +++ b/backend/src/models/api.rs @@ -525,6 +525,61 @@ impl From for TypedMetrics { } } +#[derive(IntoParams, Deserialize, Clone)] +#[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] +pub struct Pagination { + #[serde(default)] + pub page: usize, + #[serde(default)] + pub per_page: usize, +} + +#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)] +#[cfg_attr(all(feature = "swagger", debug_assertions), derive(ToSchema))] +pub struct PaginatedResponse { + /// The page of data being returned + pub data: Vec, + /// The number of rows returned in the current page + pub count: usize, + /// The total number of rows available + pub total: usize, + /// The current page being returned + pub page: usize, + /// The number of pages available + pub page_count: usize, +} + +impl PaginatedResponse { + /// Create a new `PaginatedResponse` + #[must_use] + pub fn new(data: Vec, total: usize, page: usize, page_size: usize) -> Self { + let count = data.len().try_into().unwrap_or(0); + let page_count = total / page_size + usize::from(total % page_size != 0); + + Self { + data, + count, + total, + page, + page_count, + } + } + + /// Transform the data contained in the `PaginatedResponse` + pub fn map(self, func: F) -> PaginatedResponse + where + F: Fn(T) -> B, + { + PaginatedResponse:: { + data: self.data.into_iter().map(func).collect(), + count: self.count, + total: self.total, + page: self.page, + page_count: self.page_count, + } + } +} + #[cfg(test)] mod tests { use super::{ diff --git a/backend/src/services/api.rs b/backend/src/services/api.rs index ce60f96e..9b34cbfc 100644 --- a/backend/src/services/api.rs +++ b/backend/src/services/api.rs @@ -1,15 +1,13 @@ -use axum::extract::Path; - -use crate::{ - connector::dto::NodeConfiguration, - models::bob::{DiskName, IsActive}, -}; - use super::{ auth::HttpClient, methods::{fetch_configuration, fetch_metrics, fetch_nodes, fetch_vdisks}, prelude::*, }; +use crate::{ + connector::dto::NodeConfiguration, + models::bob::{DiskName, IsActive}, +}; +use axum::extract::{Path, Query}; /// Returns count of Physical Disks per status /// @@ -234,7 +232,7 @@ pub async fn get_space(Extension(client): Extension) -> Json) -> Json, content_type = "application/json", description = "Node List"), + (status = 200, body = PaginatedResponse, content_type = "application/json", description = "Node List"), (status = 401, description = "Unauthorized") ), security(("api_key" = [])) ))] -pub async fn get_nodes(Extension(client): Extension) -> AxumResult>> { +pub async fn get_nodes( + Extension(client): Extension, + Query(params): Query>, +) -> AxumResult>> { tracing::info!("get /nodes : {client:?}"); + let len = client.cluster_with_addr().len(); + Ok(if let Some(page_params) = params { + Json(PaginatedResponse::new( + batch_get_nodes(client, page_params.clone()).await?, + len, + page_params.page, + page_params.per_page, + )) + } else { + Json(PaginatedResponse::new( + dump_get_nodes(client).await?, + len, + 1, + 1, + )) + }) +} + +pub async fn batch_get_nodes( + client: BobClient< + ClientContext, + ContextWrapper< + Client< + DropContextService, ClientContext>, + ClientContext, + Basic, + >, + ClientContext, + >, + >, + page_params: Pagination, +) -> AxumResult> { + todo!() +} + +pub async fn dump_get_nodes(client: HttpBobClient) -> AxumResult> { let mut metrics: FuturesUnordered<_> = client .cluster() .map(move |node| { @@ -341,7 +378,7 @@ pub async fn get_nodes(Extension(client): Extension) -> AxumResul } tracing::trace!("send response: {res:?}"); - Ok(Json(res.values().cloned().collect())) + Ok(res.values().cloned().collect()) } /// Get Virtual Disks @@ -497,44 +534,6 @@ async fn is_node_online(client: &HttpBobClient, node: &dto::Node) -> bool { (client.probe_socket(&node.name).await).map_or(false, |code| code == StatusCode::OK) } -fn proccess_disks( - disks: &[dto::DiskState], - space: &dto::SpaceInfo, - metrics: &dto::MetricsSnapshotModel, -) -> Vec { - let mut res_disks = vec![]; - let mut visited_disks = HashSet::new(); - for disk in disks { - if !visited_disks.insert(disk.name.clone()) { - tracing::warn!( - "disk {} with path {} duplicated, skipping...", - disk.name, - disk.path - ); - continue; - } - res_disks.push(Disk { - name: disk.name.clone(), - path: disk.path.clone(), - status: DiskStatus::from_space_info(space, &disk.name), - total_space: space.total_disk_space_bytes, - used_space: space - .occupied_disk_space_by_disk - .get(&disk.name.clone()) - .copied() - .unwrap_or_default(), - iops: metrics - .metrics - .get(&format!("hardware.disks.{:?}_iops", disk.name)) - .cloned() - .unwrap_or_default() - .value, - }); - } - - res_disks -} - /// Get Raw Metrics from Node /// /// # Errors