diff --git a/backend/Cargo.toml b/backend/Cargo.toml index b421d7a1..0f91f1c5 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -36,8 +36,16 @@ thiserror = "1.0" ## General tokio = { version = "1.32", features = ["rt", "macros", "rt-multi-thread" ] } -hyper = "0.14" +hyper = { version = "0.14", features = ["http2", "client"] } +nutype = { version = "0.3", features = ["serde", "regex"] } lazy_static = "1.4" +regex = "1.10" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +frunk = "0.4" +uuid = { version = "1.4", features = ["v4"] } +futures = "0.3" +rand = { version = "0.8", features = ["min_const_gen"] } ## OpenAPI + Swagger utoipa = { version = "4.0", features = ["yaml", "axum_extras", "chrono", "openapi_extensions"], optional = true } @@ -49,10 +57,11 @@ utoipa-rapidoc = { version = "1.0", features = ["axum"], optional = true } cli = { path = "../cli" } ## Frontend -frontend = { path = "../frontend", optional = true } +# frontend = { path = "../frontend", optional = true } [features] default = [ "swagger", "frontend-bundle" ] swagger = [ "dep:utoipa", "dep:utoipa-swagger-ui" , "dep:utoipa-redoc", "dep:utoipa-rapidoc" ] -frontend-bundle = [ "dep:frontend" ] +# frontend-bundle = [ "dep:frontend" ] +frontend-bundle = [ ] gen_api = [ "dep:utoipa" ] diff --git a/backend/src/connector/api.rs b/backend/src/connector/api.rs new file mode 100644 index 00000000..e6431393 --- /dev/null +++ b/backend/src/connector/api.rs @@ -0,0 +1,404 @@ +use super::context::ContextWrapper; +use super::dto::{self}; +use crate::prelude::*; +use axum::async_trait; +use hyper::StatusCode; +use serde::{Deserialize, Serialize}; +use std::task::{Context, Poll}; +use thiserror::Error; + +pub type ServiceError = Box; + +/// Errors that happend during API request proccessing +#[derive(Debug, Error)] +pub enum APIError { + #[error("the request to the specified resource failed")] + RequestFailed, + #[error("server received invalid status code from client: `{0}`")] + InvalidStatusCode(StatusCode), + #[error("can't read hyper response")] + ResponseError, +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum GetAlienResponse { + /// Alien Node name + AlienNodeName(String), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetAlienDirResponse { + /// Directory + Directory(dto::Dir), + /// Permission denied + PermissionDenied(dto::StatusExt), + /// Not acceptable backend + NotAcceptableBackend(dto::StatusExt), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetDisksResponse { + /// A JSON array with disks and their states + AJSONArrayWithDisksAndTheirStates(Vec), + /// Permission denied + PermissionDenied(dto::StatusExt), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum GetMetricsResponse { + /// Metrics + Metrics(dto::MetricsSnapshotModel), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetNodesResponse { + /// A JSON array of nodes info and vdisks on them + AJSONArrayOfNodesInfoAndVdisksOnThem(Vec), + /// Permission denied + PermissionDenied, +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetPartitionResponse { + /// A JSON with partition info + AJSONWithPartitionInfo(dto::Partition), + /// Permission denied + PermissionDenied(dto::StatusExt), + /// Not found + NotFound(dto::StatusExt), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetPartitionsResponse { + /// Node info and JSON array with partitions info + NodeInfoAndJSONArrayWithPartitionsInfo(dto::VDiskPartitions), + /// Permission denied + PermissionDenied(dto::StatusExt), + /// Not found + NotFound(dto::StatusExt), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetRecordsResponse { + /// Records count + RecordsCount(i32), + /// Permission denied + PermissionDenied(dto::StatusExt), + /// Not found + NotFound(dto::StatusExt), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetReplicasLocalDirsResponse { + /// A JSON array with dirs + AJSONArrayWithDirs(Vec), + /// Permission denied + PermissionDenied(dto::StatusExt), + /// Not found + NotFound(dto::StatusExt), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum GetSpaceInfoResponse { + /// Space info + SpaceInfo(dto::SpaceInfo), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum GetStatusResponse { + /// A JSON with node info + AJSONWithNodeInfo(dto::Node), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetVDiskResponse { + /// A JSON with vdisk info + AJSONWithVdiskInfo(dto::VDisk), + /// Permission denied + PermissionDenied(dto::StatusExt), + /// Not found + NotFound(dto::StatusExt), +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetVDisksResponse { + /// A JSON array of vdisks info + AJSONArrayOfVdisksInfo(Vec), + /// Permission denied + PermissionDenied, +} + +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum GetVersionResponse { + /// Version info + VersionInfo(dto::VersionInfo), +} + +/// Returns configuration of the node +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[must_use] +pub enum GetConfigurationResponse { + /// Configuration object + ConfigurationObject(dto::NodeConfiguration), + /// Permission denied + PermissionDenied, +} + +/// API +#[async_trait] +pub trait Api { + fn poll_ready( + &self, + _cx: &mut Context, + ) -> Poll>> { + Poll::Ready(Ok(())) + } + + /// Return directory of alien + async fn get_alien_dir(&self, context: &C) -> Result; + + /// Returns the list of disks with their states + async fn get_disks(&self, context: &C) -> Result; + + /// Get metrics + async fn get_metrics(&self, context: &C) -> Result; + + /// Returns a list of known nodes + async fn get_nodes(&self, context: &C) -> Result; + + /// Returns a partition info by ID + async fn get_partition( + &self, + v_disk_id: i32, + partition_id: String, + context: &C, + ) -> Result; + + /// Returns a list of partitions + async fn get_partitions( + &self, + v_disk_id: i32, + context: &C, + ) -> Result; + + /// Returns count of records of this on node + async fn get_records( + &self, + v_disk_id: i32, + context: &C, + ) -> Result; + + /// Returns directories of local replicas of vdisk + async fn get_replicas_local_dirs( + &self, + v_disk_id: i32, + context: &C, + ) -> Result; + + /// Get space info + async fn get_space_info(&self, context: &C) -> Result; + + /// Returns information about self + async fn get_status(&self, context: &C) -> Result; + + /// Returns a vdisk info by ID + async fn get_v_disk(&self, v_disk_id: i32, context: &C) -> Result; + + /// Returns a list of vdisks + async fn get_v_disks(&self, context: &C) -> Result; + + /// Returns server version + async fn get_version(&self, context: &C) -> Result; + + /// Returns configuration of the node + async fn get_configuration(&self, context: &C) -> Result; +} + +/// API where `Context` isn't passed on every API call +#[async_trait] +pub trait ApiNoContext { + fn poll_ready( + &self, + _cx: &mut Context, + ) -> Poll>>; + + fn context(&self) -> &C; + + /// Return directory of alien + async fn get_alien_dir(&self) -> Result; + + /// Returns the list of disks with their states + async fn get_disks(&self) -> Result; + + /// Get metrics + async fn get_metrics(&self) -> Result; + + /// Returns a list of known nodes + async fn get_nodes(&self) -> Result; + + /// Returns a partition info by ID + async fn get_partition( + &self, + v_disk_id: i32, + partition_id: String, + ) -> Result; + + /// Returns a list of partitions + async fn get_partitions(&self, v_disk_id: i32) -> Result; + + /// Returns count of records of this on node + async fn get_records(&self, v_disk_id: i32) -> Result; + + /// Returns directories of local replicas of vdisk + async fn get_replicas_local_dirs( + &self, + v_disk_id: i32, + ) -> Result; + + /// Get space info + async fn get_space_info(&self) -> Result; + + /// Returns information about self + async fn get_status(&self) -> Result; + + /// Returns a vdisk info by ID + async fn get_v_disk(&self, v_disk_id: i32) -> Result; + + /// Returns a list of vdisks + async fn get_v_disks(&self) -> Result; + + /// Returns server version + async fn get_version(&self) -> Result; + + /// Returns configuration of the node + async fn get_configuration(&self) -> Result; +} + +/// Trait to extend an API to make it easy to bind it to a context. +pub trait ContextWrapperExt +where + Self: Sized, +{ + /// Binds this API to a context. + fn with_context(self, context: C) -> ContextWrapper; +} + +impl + Send + Sync, C: Clone + Send + Sync> ContextWrapperExt for T { + fn with_context(self: T, context: C) -> ContextWrapper { + ContextWrapper::::new(self, context) + } +} + +#[async_trait] +impl + Send + Sync, C: Clone + Send + Sync> ApiNoContext for ContextWrapper { + fn poll_ready(&self, cx: &mut Context) -> Poll> { + self.api().poll_ready(cx) + } + + fn context(&self) -> &C { + Self::context(self) + } + + /// Return directory of alien + async fn get_alien_dir(&self) -> Result { + let context = self.context().clone(); + self.api().get_alien_dir(&context).await + } + /// Returns the list of disks with their states + async fn get_disks(&self) -> Result { + let context = self.context().clone(); + self.api().get_disks(&context).await + } + + /// Get metrics + async fn get_metrics(&self) -> Result { + let context = self.context().clone(); + self.api().get_metrics(&context).await + } + + /// Returns a list of known nodes + async fn get_nodes(&self) -> Result { + let context = self.context().clone(); + self.api().get_nodes(&context).await + } + + /// Returns a partition info by ID + async fn get_partition( + &self, + v_disk_id: i32, + partition_id: String, + ) -> Result { + let context = self.context().clone(); + self.api() + .get_partition(v_disk_id, partition_id, &context) + .await + } + + /// Returns a list of partitions + async fn get_partitions(&self, v_disk_id: i32) -> Result { + let context = self.context().clone(); + self.api().get_partitions(v_disk_id, &context).await + } + + /// Returns count of records of this on node + async fn get_records(&self, v_disk_id: i32) -> Result { + let context = self.context().clone(); + self.api().get_records(v_disk_id, &context).await + } + + /// Returns directories of local replicas of vdisk + async fn get_replicas_local_dirs( + &self, + v_disk_id: i32, + ) -> Result { + let context = self.context().clone(); + self.api() + .get_replicas_local_dirs(v_disk_id, &context) + .await + } + + /// Get space info + async fn get_space_info(&self) -> Result { + let context = self.context().clone(); + self.api().get_space_info(&context).await + } + + /// Returns information about self + async fn get_status(&self) -> Result { + let context = self.context().clone(); + self.api().get_status(&context).await + } + + /// Returns a vdisk info by ID + async fn get_v_disk(&self, v_disk_id: i32) -> Result { + let context = self.context().clone(); + self.api().get_v_disk(v_disk_id, &context).await + } + + /// Returns a list of vdisks + async fn get_v_disks(&self) -> Result { + let context = self.context().clone(); + self.api().get_v_disks(&context).await + } + + /// Returns server version + async fn get_version(&self) -> Result { + let context = self.context().clone(); + self.api().get_version(&context).await + } + + /// Returns configuration of the node + async fn get_configuration(&self) -> Result { + let context = self.context().clone(); + self.api().get_configuration(&context).await + } +} diff --git a/backend/src/connector/client.rs b/backend/src/connector/client.rs new file mode 100644 index 00000000..8fb3173b --- /dev/null +++ b/backend/src/connector/client.rs @@ -0,0 +1,442 @@ +#![allow( + missing_docs, + clippy::module_name_repetitions, + dead_code, + unused_variables +)] + +use super::{ + api::{ + APIError, Api, GetAlienDirResponse, GetConfigurationResponse, GetDisksResponse, + GetMetricsResponse, GetNodesResponse, GetPartitionResponse, GetPartitionsResponse, + GetRecordsResponse, GetReplicasLocalDirsResponse, GetSpaceInfoResponse, GetStatusResponse, + GetVDiskResponse, GetVDisksResponse, GetVersionResponse, + }, + context::{BodyExt, DropContextService}, + ClientError, Connector, +}; +use crate::{ + models::shared::{RequestTimeout, XSpanIdString}, + prelude::*, +}; +use axum::{ + async_trait, + headers::{authorization::Credentials, Authorization, HeaderMapExt}, + http::{HeaderName, HeaderValue}, +}; +use error_stack::ResultExt; +use frunk::hlist::Selector; +use hyper::{service::Service, Body, Request, Response, Uri}; +use serde::de::DeserializeOwned; +use std::{ + marker::PhantomData, + str::FromStr, + task::{Context, Poll}, +}; +use thiserror::Error; + +/// Error type for failing to create a Client +#[derive(Debug, Error)] +pub enum ClientInitError { + #[error("invlaid URL scheme")] + InvalidScheme, + + #[error("invlaid URI scheme")] + InvalidUri, + + #[error("no hostname specified")] + MissingHost, +} + +/// A client that implements the API by making HTTP calls out to a server. +#[derive(Clone)] +pub struct Client +where + S: Service<(Request, C), Response = Response> + Clone + Sync + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + std::fmt::Display, + C: Clone + Send + Sync + 'static, +{ + /// Inner service + client_service: S, + + /// Base path of the API + base_path: String, + + /// Context Marker + con_marker: PhantomData, + + /// Credentials Marker + cred_marker: PhantomData, + + /// Indices Marker + indices_marker: PhantomData, +} + +#[derive(Debug, Clone)] +pub enum HyperClient { + Http(hyper::client::Client), +} + +impl Service> for HyperClient { + type Response = Response; + type Error = hyper::Error; + type Future = hyper::client::ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + match self { + Self::Http(client) => client.poll_ready(cx), + } + } + + fn call(&mut self, req: Request) -> Self::Future { + match self { + Self::Http(client) => client.call(req), + } + } +} + +impl Client, C, Cr, I> +where + C: Clone + Send + Sync + 'static, +{ + /// Create an HTTP client. + /// + /// # Arguments + /// * `base_path` - base path of the client API, i.e. + /// + /// # Errors + /// + /// This function will return an error if base path isn't valid URL + pub fn try_new(base_path: &str) -> Result { + let uri = Uri::from_str(base_path).change_context(ClientInitError::InvalidUri)?; + + let scheme = uri.scheme_str().ok_or(ClientInitError::InvalidScheme)?; + let scheme = scheme.to_ascii_lowercase(); + + let connector = Connector::builder(); + + let client_service = match scheme.as_str() { + "http" => HyperClient::Http(hyper::client::Client::builder().build(connector.build())), + + _ => { + return Err(ClientInitError::InvalidScheme.into()); + } + }; + + let client_service = DropContextService::new(client_service); + + Ok(Self { + client_service, + base_path: into_base_path(base_path, None)?, + con_marker: PhantomData, + cred_marker: PhantomData, + indices_marker: PhantomData, + }) + } +} + +/// Convert input into a base path, e.g. . Also checks the scheme as it goes. +fn into_base_path( + input: impl TryInto, + _correct_scheme: Option<&'static str>, +) -> Result { + // First convert to Uri, since a base path is a subset of Uri. + let uri = input + .try_into() + .change_context(ClientInitError::InvalidUri)?; + + let scheme = uri.scheme_str().ok_or(ClientInitError::InvalidScheme)?; + + // Check the scheme if necessary + // if let Some(correct_scheme) = correct_scheme { + // if scheme != correct_scheme { + // return Err(ClientInitError::InvalidScheme); + // } + // } + + let host = uri.host().ok_or(ClientInitError::MissingHost)?; + let port = uri.port_u16().map(|x| format!(":{x}")).unwrap_or_default(); + Ok(format!( + "{}://{}{}{}", + scheme, + host, + port, + uri.path().trim_end_matches('/') + )) +} + +impl + Client< + DropContextService, C>, + C, + Cr, + I, + > +where + C: Clone + Send + Sync + 'static, +{ + /// Create an HTTP client. + /// + /// # Arguments + /// * `base_path` - base path of the client API, i.e. + /// + /// # Errors + /// + /// This function will return an error if base path isn't valid URL + pub fn try_new_http(base_path: &str) -> Result { + let http_connector = Connector::builder().build(); + + Self::try_new_with_connector(base_path, Some("http"), http_connector) + } +} + +impl + Client, C>, C, Cr, I> +where + Connector: hyper::client::connect::Connect + Clone + Send + Sync + 'static, + C: Clone + Send + Sync + 'static, +{ + /// Create a client with a custom implementation of [`hyper::client::Connect`]. + /// + /// Intended for use with custom implementations of connect for e.g. protocol logging + /// or similar functionality which requires wrapping the transport layer. When wrapping a TCP connection, + /// this function should be used in conjunction with `swagger::Connector::builder()`. + /// + /// # Arguments + /// + /// * `base_path` - base path of the client API, i.e. + /// * `protocol` - Which protocol to use when constructing the request url, e.g. `Some("http")` + /// * `connector` - Implementation of `hyper::client::Connect` to use for the client + /// + /// # Errors + /// + /// The function will fail if base path isn't a valid URL + /// + pub fn try_new_with_connector( + base_path: &str, + protocol: Option<&'static str>, + connector: Connector, + ) -> Result { + let client_service = hyper::client::Client::builder().build(connector); + let client_service = DropContextService::new(client_service); + + Ok(Self { + client_service, + base_path: into_base_path(base_path, protocol)?, + con_marker: PhantomData, + cred_marker: PhantomData, + indices_marker: PhantomData, + }) + } +} + +impl Client +where + Cr: Credentials + Clone, + S: Service<(Request, C), Response = Response> + Clone + Sync + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + std::fmt::Display + error_stack::Context, + C: Selector + + Selector + + Selector>, I2> + + Clone + + Send + + Sync + + 'static, +{ + fn form_get_request(&self, endpoint: &str, context: &C) -> Result, ClientError> { + let uri = format!("{}{endpoint}", self.base_path); + + let uri = Uri::from_str(&uri).change_context(ClientError::BadAddress)?; + let mut request = Request::builder() + .method("GET") + .uri(uri) + .body(Body::empty()) + .change_context(ClientError::CantFormRequest)?; + + let header = HeaderValue::from_str(Selector::::get(context).0.as_str()) + .change_context(ClientError::CantFormRequest)?; + + request + .headers_mut() + .insert(HeaderName::from_static("x-span-id"), header); + + if let Some(auth) = Selector::>, _>::get(context) { + request + .headers_mut() + .typed_insert::>(auth.clone()); + } + + Ok(request) + } + + async fn handle_response_json( + &self, + response: Response, + body_handler: impl Fn(R) -> T + Send, + ) -> Result { + let body = response.into_body(); + let body = body + .into_raw() + .await + .change_context(APIError::ResponseError)?; + let body = std::str::from_utf8(&body).change_context(APIError::ResponseError)?; + + let body = serde_json::from_str::(body) + .change_context(APIError::ResponseError) + .attach_printable("Response body did not match the schema")?; + + Ok(body_handler(body)) + } + + async fn call(&self, req: Request, cx: &C) -> Result, APIError> { + let duration = Selector::::get(cx).clone().into_inner(); + tokio::time::timeout( + duration, + self.client_service.clone().call((req, cx.clone())), + ) + .await + .change_context(APIError::RequestFailed) + .attach_printable("No Response received")? + .change_context(APIError::RequestFailed) + } +} + +#[async_trait] +impl Api for Client +where + Cr: Credentials + Clone, + S: Service<(Request, C), Response = Response> + Clone + Sync + Send + 'static, + S::Future: Send + 'static, + S::Error: Into + std::fmt::Display + error_stack::Context, + C: Selector + + Selector + + Selector>, I2> + + Clone + + Send + + Sync + + 'static, +{ + /// Return directory of alien + #[must_use] + async fn get_alien_dir(&self, context: &C) -> Result { + todo!() + } + + /// Returns the list of disks with their states + #[must_use] + async fn get_disks(&self, context: &C) -> Result { + todo!() + } + + /// Get metrics + #[must_use] + async fn get_metrics(&self, context: &C) -> Result { + todo!() + } + + /// Returns a list of known nodes + #[must_use] + async fn get_nodes(&self, context: &C) -> Result { + let request = self + .form_get_request("/nodes", context) + .change_context(APIError::RequestFailed)?; + let response = self.call(request, context).await?; + + match response.status().as_u16() { + 200 => Ok(self + .handle_response_json(response, |body: Vec| { + GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(body) + }) + .await?), + 403 => Ok(GetNodesResponse::PermissionDenied), + _ => Err(APIError::from(response))?, + } + } + + /// Returns a partition info by ID + #[must_use] + async fn get_partition( + &self, + v_disk_id: i32, + partition_id: String, + context: &C, + ) -> Result { + todo!() + } + + /// Returns a list of partitions + #[must_use] + async fn get_partitions( + &self, + v_disk_id: i32, + context: &C, + ) -> Result { + todo!() + } + + /// Returns count of records of this on node + #[must_use] + async fn get_records( + &self, + v_disk_id: i32, + context: &C, + ) -> Result { + todo!() + } + + /// Returns directories of local replicas of vdisk + #[must_use] + async fn get_replicas_local_dirs( + &self, + v_disk_id: i32, + context: &C, + ) -> Result { + todo!() + } + + /// Get space info + #[must_use] + async fn get_space_info(&self, context: &C) -> Result { + todo!() + } + + /// Returns information about self + #[must_use] + async fn get_status(&self, context: &C) -> Result { + todo!() + } + + /// Returns a vdisk info by ID + #[must_use] + async fn get_v_disk(&self, v_disk_id: i32, context: &C) -> Result { + todo!() + } + + /// Returns a list of vdisks + #[must_use] + async fn get_v_disks(&self, context: &C) -> Result { + todo!() + } + + /// Returns server version + #[must_use] + async fn get_version(&self, context: &C) -> Result { + todo!() + } + + /// Returns configuration of the node + #[must_use] + async fn get_configuration(&self, context: &C) -> Result { + todo!() + } +} + +impl From> for APIError { + fn from(response: Response) -> Self { + let code = response.status(); + + Self::InvalidStatusCode(code) + } +} diff --git a/backend/src/connector/context.rs b/backend/src/connector/context.rs new file mode 100644 index 00000000..b8d83a97 --- /dev/null +++ b/backend/src/connector/context.rs @@ -0,0 +1,137 @@ +use crate::models::shared::{RequestTimeout, XSpanIdString}; +use axum::headers::{authorization::Basic, Authorization}; +use frunk::HList; +use futures::{Stream, StreamExt}; +use hyper::{body::Bytes, Request}; +use std::{marker::PhantomData, task::Poll}; + +pub type ClientContext = HList![RequestTimeout, Option>, XSpanIdString]; + +/// Context wrapper, to bind an API with a context. +#[derive(Debug)] +pub struct ContextWrapper { + api: T, + context: C, +} + +pub trait Select { + type Index; + fn get(&self) -> &A; +} +// +// impl Select for ClientContext +// where +// ClientContext: frunk::hlist::Selector, +// Self::Index: Any, +// { +// fn get(&self) -> &A { +// self.get::() +// } +// } + +impl ContextWrapper { + /// Create a new `ContextWrapper`, binding the API and context. + pub const fn new(api: T, context: C) -> Self { + Self { api, context } + } + + /// Borrows the API. + pub const fn api(&self) -> &T { + &self.api + } + + /// Borrows the context. + pub const fn context(&self) -> &C { + &self.context + } +} + +/// Swagger Middleware that wraps a `hyper::service::Service` and drops any contextual information +/// on the request. Servers will normally want to use `DropContextMakeService`, which will create a +/// `DropContextService` to handle each connection, while clients can simply wrap a `hyper::Client` +/// in the middleware. +/// +/// ## Client Usage +/// +/// ``` +/// # use swagger::DropContextService; +/// # use hyper::service::Service as _; +/// +/// let client = hyper::Client::new(); +/// let mut client = DropContextService::new(client); +/// let request = (hyper::Request::get("http://www.google.com").body(hyper::Body::empty()).unwrap()); +/// let context = "Some Context".to_string(); +/// +/// let response = client.call((request, context)); +/// ``` +#[derive(Debug, Clone)] +pub struct DropContextService +where + C: Send + 'static, +{ + inner: T, + marker: PhantomData, +} + +impl DropContextService +where + C: Send + 'static, +{ + /// Create a new `DropContextService` struct wrapping a value + pub const fn new(inner: T) -> Self { + Self { + inner, + marker: PhantomData, + } + } +} + +impl hyper::service::Service<(Request, Context)> + for DropContextService +where + Context: Send + 'static, + Inner: hyper::service::Service>, +{ + type Response = Inner::Response; + type Error = Inner::Error; + type Future = Inner::Future; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, (req, _): (Request, Context)) -> Self::Future { + self.inner.call(req) + } +} + +/// Additional function for `hyper::Body` +pub trait BodyExt { + /// Raw body type + type Raw; + + /// Error if we can't gather up the raw body + type Error; + + /// Collect the body into a raw form + fn into_raw(self) -> futures::future::BoxFuture<'static, Result>; +} + +impl BodyExt for T +where + T: Stream> + Unpin + Send + 'static, +{ + type Raw = Vec; + type Error = E; + + fn into_raw(mut self) -> futures::future::BoxFuture<'static, Result> { + Box::pin(async { + let mut raw = Vec::new(); + while let (Some(chunk), rest) = self.into_future().await { + raw.extend_from_slice(&chunk?); + self = rest; + } + Ok(raw) + }) + } +} diff --git a/backend/src/connector/dto.rs b/backend/src/connector/dto.rs new file mode 100644 index 00000000..a8156774 --- /dev/null +++ b/backend/src/connector/dto.rs @@ -0,0 +1,1209 @@ +/* +* Note: This file is mostly autogenerated +*/ + +use std::collections::HashMap; + +type StdError = dyn std::error::Error; + +/// Function, used for parsing strings into DTOs +/// Accpets closures, that decides what to do with keys and values +fn parse(s: &str, mut matcher: F) -> Result<(), Box> +where + F: FnMut(&str, &str) -> Result<(), Box>, +{ + let mut string_iter = s.split(','); + let mut key_result = string_iter.next(); + + while key_result.is_some() { + let Some(val) = string_iter.next() else { + return Err("Missing value while parsing".into()); + }; + + if let Some(key) = key_result { + matcher(key, val)?; + } + + // Get the next key + key_result = string_iter.next(); + } + Ok(()) +} + +#[derive(Default, Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct Dir { + #[serde(rename = "name")] + #[serde(skip_serializing_if = "Option::is_none")] + pub name: Option, + + #[serde(rename = "path")] + #[serde(skip_serializing_if = "Option::is_none")] + pub path: Option, + + #[serde(rename = "children")] + #[serde(skip_serializing_if = "Option::is_none")] + pub children: Option>, +} + +impl Dir { + #[must_use] + pub const fn new() -> Self { + Self { + name: None, + path: None, + children: None, + } + } +} + +/// Converts the Dir value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for Dir { + fn to_string(&self) -> String { + let params: Vec> = vec![ + self.name + .as_ref() + .map(|name| ["name".to_string(), name.to_string()].join(",")), + self.path + .as_ref() + .map(|path| ["path".to_string(), path.to_string()].join(",")), + // Skipping children in query parameter serialization + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a Dir value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for Dir { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub name: Vec, + pub path: Vec, + pub children: Vec>, + } + let mut intermediate_rep = IntermediateRep::default(); + + // Parse into intermediate representation + parse(s, |key, val| { + match key { + "name" => intermediate_rep + .name + .push(::from_str(val).map_err(|x| x.to_string())?), + "path" => intermediate_rep + .path + .push(::from_str(val).map_err(|x| x.to_string())?), + "children" => { + return Err("Parsing a container in this style is not supported in Dir".into()) + } + _ => return Err("Unexpected key while parsing Dir".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + name: intermediate_rep.name.into_iter().next(), + path: intermediate_rep.path.into_iter().next(), + children: intermediate_rep.children.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct DiskState { + #[serde(rename = "name")] + pub name: String, + + #[serde(rename = "path")] + pub path: String, + + #[serde(rename = "is_active")] + pub is_active: bool, +} + +impl DiskState { + /// Creates a new [`DiskState`]. + #[must_use] + pub const fn new(name: String, path: String, is_active: bool) -> Self { + Self { + name, + path, + is_active, + } + } +} + +/// Converts the [`DiskState`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for DiskState { + fn to_string(&self) -> String { + let params: Vec> = vec![ + Some("name".to_string()), + Some(self.name.to_string()), + Some("path".to_string()), + Some(self.path.to_string()), + Some("is_active".to_string()), + Some(self.is_active.to_string()), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`DiskState`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for DiskState { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub name: Vec, + pub path: Vec, + pub is_active: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + + parse(s, |key, val| { + match key { + "name" => intermediate_rep + .name + .push(::from_str(val).map_err(|x| x.to_string())?), + "path" => intermediate_rep + .path + .push(::from_str(val).map_err(|x| x.to_string())?), + "is_active" => intermediate_rep + .is_active + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing DiskState".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + name: intermediate_rep + .name + .into_iter() + .next() + .ok_or_else(|| "name missing in DiskState".to_string())?, + path: intermediate_rep + .path + .into_iter() + .next() + .ok_or_else(|| "path missing in DiskState".to_string())?, + is_active: intermediate_rep + .is_active + .into_iter() + .next() + .ok_or_else(|| "is_active missing in DiskState".to_string())?, + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct DistrFunc { + // Note: inline enums are not fully supported by openapi-generator + #[serde(rename = "func")] + #[serde(skip_serializing_if = "Option::is_none")] + pub func: Option, +} + +impl DistrFunc { + #[must_use] + pub const fn new() -> Self { + Self { func: None } + } +} + +/// Converts the [`DistrFunc`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for DistrFunc { + fn to_string(&self) -> String { + let params: Vec> = vec![self + .func + .as_ref() + .map(|func| ["func".to_string(), func.to_string()].join(","))]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`DistrFunc`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for DistrFunc { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub func: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + // Parse into intermediate representation + parse(s, |key, val| { + match key { + "func" => intermediate_rep + .func + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing DistrFunc".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + func: intermediate_rep.func.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct Error { + #[serde(rename = "code")] + pub code: String, + + #[serde(rename = "message")] + pub message: String, +} + +impl Error { + #[must_use] + pub const fn new(code: String, message: String) -> Self { + Self { code, message } + } +} + +/// Converts the [`Error`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for Error { + fn to_string(&self) -> String { + let params: Vec> = vec![ + Some("code".to_string()), + Some(self.code.to_string()), + Some("message".to_string()), + Some(self.message.to_string()), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`Error`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for Error { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub code: Vec, + pub message: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + parse(s, |key, val| { + match key { + "code" => intermediate_rep + .code + .push(::from_str(val).map_err(|x| x.to_string())?), + "message" => intermediate_rep + .message + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing Error".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + code: intermediate_rep + .code + .into_iter() + .next() + .ok_or_else(|| "code missing in Error".to_string())?, + message: intermediate_rep + .message + .into_iter() + .next() + .ok_or_else(|| "message missing in Error".to_string())?, + }) + } +} + +#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct MetricsEntryModel { + #[serde(rename = "value")] + pub value: u64, + + #[serde(rename = "timestamp")] + pub timestamp: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct MetricsSnapshotModel { + #[serde(rename = "metrics")] + pub metrics: HashMap, +} + +impl PartialEq for MetricsEntryModel { + fn eq(&self, other: &Self) -> bool { + self.value == other.value + } +} + +impl PartialOrd for MetricsEntryModel { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for MetricsEntryModel { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.value.cmp(&other.value) + } +} + +impl Eq for MetricsEntryModel {} +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct Node { + #[serde(rename = "name")] + pub name: String, + + #[serde(rename = "address")] + pub address: String, + + #[serde(rename = "vdisks")] + #[serde(skip_serializing_if = "Option::is_none")] + pub vdisks: Option>, +} + +impl Node { + #[must_use] + pub const fn new(name: String, address: String) -> Self { + Self { + name, + address, + vdisks: None, + } + } +} + +/// Converts the [`Node`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for Node { + fn to_string(&self) -> String { + let params: Vec> = vec![ + Some("name".to_string()), + Some(self.name.to_string()), + Some("address".to_string()), + Some(self.address.to_string()), + // Skipping vdisks in query parameter serialization + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`Node`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for Node { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub name: Vec, + pub address: Vec, + pub vdisks: Vec>, + } + + let mut intermediate_rep = IntermediateRep::default(); + parse(s, |key, val| { + match key { + "name" => intermediate_rep + .name + .push(::from_str(val).map_err(|x| x.to_string())?), + "address" => intermediate_rep + .address + .push(::from_str(val).map_err(|x| x.to_string())?), + "vdisks" => { + return Err("Parsing a container in this style is not supported in Node".into()) + } + _ => return Err("Unexpected key while parsing Node".into()), + } + Ok(()) + })?; + + Ok(Self { + name: intermediate_rep + .name + .into_iter() + .next() + .ok_or_else(|| "name missing in Node".to_string())?, + address: intermediate_rep + .address + .into_iter() + .next() + .ok_or_else(|| "address missing in Node".to_string())?, + vdisks: intermediate_rep.vdisks.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct NodeConfiguration { + #[serde(rename = "blob_file_name_prefix")] + #[serde(skip_serializing_if = "Option::is_none")] + pub blob_file_name_prefix: Option, + + #[serde(rename = "root_dir_name")] + #[serde(skip_serializing_if = "Option::is_none")] + pub root_dir_name: Option, +} + +impl NodeConfiguration { + #[must_use] + pub const fn new() -> Self { + Self { + blob_file_name_prefix: None, + root_dir_name: None, + } + } +} + +/// Converts the [`NodeConfiguration`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for NodeConfiguration { + fn to_string(&self) -> String { + let params: Vec> = vec![ + self.blob_file_name_prefix + .as_ref() + .map(|blob_file_name_prefix| { + [ + "blob_file_name_prefix".to_string(), + blob_file_name_prefix.to_string(), + ] + .join(",") + }), + self.root_dir_name.as_ref().map(|root_dir_name| { + ["root_dir_name".to_string(), root_dir_name.to_string()].join(",") + }), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`NodeConfiguration`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for NodeConfiguration { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub blob_file_name_prefix: Vec, + pub root_dir_name: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + + parse(s, |key, val| { + match key { + "blob_file_name_prefix" => intermediate_rep + .blob_file_name_prefix + .push(::from_str(val).map_err(|x| x.to_string())?), + "root_dir_name" => intermediate_rep + .root_dir_name + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing NodeConfiguration".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + blob_file_name_prefix: intermediate_rep.blob_file_name_prefix.into_iter().next(), + root_dir_name: intermediate_rep.root_dir_name.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct Partition { + #[serde(rename = "vdisk_id")] + #[serde(skip_serializing_if = "Option::is_none")] + pub vdisk_id: Option, + + #[serde(rename = "node_name")] + #[serde(skip_serializing_if = "Option::is_none")] + pub node_name: Option, + + #[serde(rename = "disk_name")] + #[serde(skip_serializing_if = "Option::is_none")] + pub disk_name: Option, + + #[serde(rename = "timestamp")] + #[serde(skip_serializing_if = "Option::is_none")] + pub timestamp: Option, + + #[serde(rename = "records_count")] + #[serde(skip_serializing_if = "Option::is_none")] + pub records_count: Option, +} + +impl Partition { + #[must_use] + pub const fn new() -> Self { + Self { + vdisk_id: None, + node_name: None, + disk_name: None, + timestamp: None, + records_count: None, + } + } +} + +/// Converts the [`Partition`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for Partition { + fn to_string(&self) -> String { + let params: Vec> = vec![ + self.vdisk_id + .as_ref() + .map(|vdisk_id| ["vdisk_id".to_string(), vdisk_id.to_string()].join(",")), + self.node_name + .as_ref() + .map(|node_name| ["node_name".to_string(), node_name.to_string()].join(",")), + self.disk_name + .as_ref() + .map(|disk_name| ["disk_name".to_string(), disk_name.to_string()].join(",")), + self.timestamp + .as_ref() + .map(|timestamp| ["timestamp".to_string(), timestamp.to_string()].join(",")), + self.records_count.as_ref().map(|records_count| { + ["records_count".to_string(), records_count.to_string()].join(",") + }), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`Partition`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for Partition { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub vdisk_id: Vec, + pub node_name: Vec, + pub disk_name: Vec, + pub timestamp: Vec, + pub records_count: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + + parse(s, |key, val| { + match key { + "vdisk_id" => intermediate_rep + .vdisk_id + .push(::from_str(val).map_err(|x| x.to_string())?), + "node_name" => intermediate_rep + .node_name + .push(::from_str(val).map_err(|x| x.to_string())?), + "disk_name" => intermediate_rep + .disk_name + .push(::from_str(val).map_err(|x| x.to_string())?), + "timestamp" => intermediate_rep + .timestamp + .push(::from_str(val).map_err(|x| x.to_string())?), + "records_count" => intermediate_rep + .records_count + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing Partition".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + vdisk_id: intermediate_rep.vdisk_id.into_iter().next(), + node_name: intermediate_rep.node_name.into_iter().next(), + disk_name: intermediate_rep.disk_name.into_iter().next(), + timestamp: intermediate_rep.timestamp.into_iter().next(), + records_count: intermediate_rep.records_count.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct Replica { + #[serde(rename = "node")] + pub node: String, + + #[serde(rename = "disk")] + pub disk: String, + + #[serde(rename = "path")] + pub path: String, +} + +impl Replica { + #[must_use] + pub const fn new(node: String, disk: String, path: String) -> Self { + Self { node, disk, path } + } +} + +/// Converts the [`Replica`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for Replica { + fn to_string(&self) -> String { + let params: Vec> = vec![ + Some("node".to_string()), + Some(self.node.to_string()), + Some("disk".to_string()), + Some(self.disk.to_string()), + Some("path".to_string()), + Some(self.path.to_string()), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`Replica`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for Replica { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub node: Vec, + pub disk: Vec, + pub path: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + + parse(s, |key, val| { + match key { + "node" => intermediate_rep + .node + .push(::from_str(val).map_err(|x| x.to_string())?), + "disk" => intermediate_rep + .disk + .push(::from_str(val).map_err(|x| x.to_string())?), + "path" => intermediate_rep + .path + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing Replica".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + node: intermediate_rep + .node + .into_iter() + .next() + .ok_or_else(|| "node missing in Replica".to_string())?, + disk: intermediate_rep + .disk + .into_iter() + .next() + .ok_or_else(|| "disk missing in Replica".to_string())?, + path: intermediate_rep + .path + .into_iter() + .next() + .ok_or_else(|| "path missing in Replica".to_string())?, + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct SpaceInfo { + #[serde(rename = "total_disk_space_bytes")] + pub total_disk_space_bytes: u64, + + #[serde(rename = "free_disk_space_bytes")] + pub free_disk_space_bytes: u64, + + #[serde(rename = "used_disk_space_bytes")] + pub used_disk_space_bytes: u64, + + #[serde(rename = "occupied_disk_space_bytes")] + pub occupied_disk_space_bytes: u64, + + #[serde(rename = "occupied_disk_space_by_disk")] + pub occupied_disk_space_by_disk: std::collections::HashMap, +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct StatusExt { + #[serde(rename = "status")] + #[serde(skip_serializing_if = "Option::is_none")] + pub status: Option, + + #[serde(rename = "ok")] + #[serde(skip_serializing_if = "Option::is_none")] + pub ok: Option, + + #[serde(rename = "msg")] + #[serde(skip_serializing_if = "Option::is_none")] + pub msg: Option, +} + +impl StatusExt { + #[must_use] + pub const fn new() -> Self { + Self { + status: None, + ok: None, + msg: None, + } + } +} + +/// Converts the [`StatusExt`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for StatusExt { + fn to_string(&self) -> String { + let params: Vec> = vec![ + self.status + .as_ref() + .map(|status| ["status".to_string(), status.to_string()].join(",")), + self.ok + .as_ref() + .map(|ok| ["ok".to_string(), ok.to_string()].join(",")), + self.msg + .as_ref() + .map(|msg| ["msg".to_string(), msg.to_string()].join(",")), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`StatusExt`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for StatusExt { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub status: Vec, + pub ok: Vec, + pub msg: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + + parse(s, |key, val| { + match key { + "status" => intermediate_rep + .status + .push(::from_str(val).map_err(|x| x.to_string())?), + "ok" => intermediate_rep + .ok + .push(::from_str(val).map_err(|x| x.to_string())?), + "msg" => intermediate_rep + .msg + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing StatusExt".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + status: intermediate_rep.status.into_iter().next(), + ok: intermediate_rep.ok.into_iter().next(), + msg: intermediate_rep.msg.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct VDisk { + #[serde(rename = "id")] + pub id: i32, + + #[serde(rename = "replicas")] + #[serde(skip_serializing_if = "Option::is_none")] + pub replicas: Option>, +} + +impl VDisk { + #[must_use] + pub const fn new(id: i32) -> Self { + Self { id, replicas: None } + } +} + +/// Converts the [`VDisk`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for VDisk { + fn to_string(&self) -> String { + let params: Vec> = vec![ + Some("id".to_string()), + Some(self.id.to_string()), + // Skipping replicas in query parameter serialization + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`VDisk`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for VDisk { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub id: Vec, + pub replicas: Vec>, + } + let mut intermediate_rep = IntermediateRep::default(); + parse(s, |key, val| { + match key { + "id" => intermediate_rep + .id + .push(::from_str(val).map_err(|x| x.to_string())?), + "replicas" => { + return Err( + "Parsing a container in this style is not supported in VDisk".into(), + ) + } + _ => return Err("Unexpected key while parsing VDisk".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + id: intermediate_rep + .id + .into_iter() + .next() + .ok_or_else(|| "id missing in VDisk".to_string())?, + replicas: intermediate_rep.replicas.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct VDiskPartitions { + #[serde(rename = "vdisk")] + #[serde(skip_serializing_if = "Option::is_none")] + pub vdisk: Option, + + #[serde(rename = "node")] + #[serde(skip_serializing_if = "Option::is_none")] + pub node: Option, + + #[serde(rename = "disk")] + #[serde(skip_serializing_if = "Option::is_none")] + pub disk: Option, + + #[serde(rename = "partitions")] + #[serde(skip_serializing_if = "Option::is_none")] + pub partitions: Option>, +} + +impl VDiskPartitions { + #[must_use] + pub const fn new() -> Self { + Self { + vdisk: None, + node: None, + disk: None, + partitions: None, + } + } +} + +/// Converts the [`VDiskPartitions`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for VDiskPartitions { + fn to_string(&self) -> String { + let params: Vec> = vec![ + self.vdisk + .as_ref() + .map(|vdisk| ["vdisk".to_string(), vdisk.to_string()].join(",")), + self.node + .as_ref() + .map(|node| ["node".to_string(), node.to_string()].join(",")), + self.disk + .as_ref() + .map(|disk| ["disk".to_string(), disk.to_string()].join(",")), + self.partitions.as_ref().map(|partitions| { + [ + "partitions".to_string(), + partitions + .iter() + .map(std::string::ToString::to_string) + .collect::>() + .join(","), + ] + .join(",") + }), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`VDiskPartitions`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for VDiskPartitions { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub vdisk: Vec, + pub node: Vec, + pub disk: Vec, + pub partitions: Vec>, + } + let mut intermediate_rep = IntermediateRep::default(); + + parse(s, |key, val| { + match key { + "vdisk" => intermediate_rep + .vdisk + .push(::from_str(val).map_err(|x| x.to_string())?), + "node" => intermediate_rep + .node + .push(::from_str(val).map_err(|x| x.to_string())?), + "disk" => intermediate_rep + .disk + .push(::from_str(val).map_err(|x| x.to_string())?), + "partitions" => { + return Err( + "Parsing a container in this style is not supported in VDiskPartitions" + .into(), + ) + } + _ => return Err("Unexpected key while parsing VDiskPartitions".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + vdisk: intermediate_rep.vdisk.into_iter().next(), + node: intermediate_rep.node.into_iter().next(), + disk: intermediate_rep.disk.into_iter().next(), + partitions: intermediate_rep.partitions.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct Version { + #[serde(rename = "version")] + #[serde(skip_serializing_if = "Option::is_none")] + pub version: Option, + + #[serde(rename = "build_time")] + #[serde(skip_serializing_if = "Option::is_none")] + pub build_time: Option, +} + +impl Version { + #[must_use] + pub const fn new() -> Self { + Self { + version: None, + build_time: None, + } + } +} + +/// Converts the [`Version`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for Version { + fn to_string(&self) -> String { + let params: Vec> = vec![ + self.version + .as_ref() + .map(|version| ["version".to_string(), version.to_string()].join(",")), + self.build_time + .as_ref() + .map(|build_time| ["build_time".to_string(), build_time.to_string()].join(",")), + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`Version`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for Version { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub version: Vec, + pub build_time: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + parse(s, |key, val| { + match key { + "version" => intermediate_rep + .version + .push(::from_str(val).map_err(|x| x.to_string())?), + "build_time" => intermediate_rep + .build_time + .push(::from_str(val).map_err(|x| x.to_string())?), + _ => return Err("Unexpected key while parsing Version".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + version: intermediate_rep.version.into_iter().next(), + build_time: intermediate_rep.build_time.into_iter().next(), + }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[cfg_attr(feature = "conversion", derive(frunk::LabelledGeneric))] +pub struct VersionInfo { + #[serde(rename = "bobversion")] + #[serde(skip_serializing_if = "Option::is_none")] + pub bobversion: Option, + + #[serde(rename = "pearlversion")] + #[serde(skip_serializing_if = "Option::is_none")] + pub pearlversion: Option, +} + +impl VersionInfo { + #[must_use] + pub const fn new() -> Self { + Self { + bobversion: None, + pearlversion: None, + } + } +} + +/// Converts the [`VersionInfo`] value to the Query Parameters representation (style=form, explode=false) +/// specified in +/// Should be implemented in a serde serializer +impl std::string::ToString for VersionInfo { + fn to_string(&self) -> String { + let params: Vec> = vec![ + // Skipping bobversion in query parameter serialization + + // Skipping pearlversion in query parameter serialization + + ]; + + params.into_iter().flatten().collect::>().join(",") + } +} + +/// Converts Query Parameters representation (style=form, explode=false) to a [`VersionInfo`] value +/// as specified in +/// Should be implemented in a serde deserializer +impl std::str::FromStr for VersionInfo { + type Err = Box; + + fn from_str(s: &str) -> std::result::Result { + /// An intermediate representation of the struct to use for parsing. + #[derive(Default)] + struct IntermediateRep { + pub bobversion: Vec, + pub pearlversion: Vec, + } + let mut intermediate_rep = IntermediateRep::default(); + parse(s, |key, val| { + match key { + "bobversion" => intermediate_rep.bobversion.push( + ::from_str(val).map_err(|x| x.to_string())?, + ), + "pearlversion" => intermediate_rep.pearlversion.push( + ::from_str(val).map_err(|x| x.to_string())?, + ), + _ => return Err("Unexpected key while parsing VersionInfo".into()), + } + Ok(()) + })?; + + // Use the intermediate representation to return the struct + Ok(Self { + bobversion: intermediate_rep.bobversion.into_iter().next(), + pearlversion: intermediate_rep.pearlversion.into_iter().next(), + }) + } +} diff --git a/backend/src/connector/mod.rs b/backend/src/connector/mod.rs index 8b137891..8daa1a50 100644 --- a/backend/src/connector/mod.rs +++ b/backend/src/connector/mod.rs @@ -1 +1,265 @@ +use self::{ + api::{APIError, ApiNoContext}, + client::Client, + context::ClientContext, +}; +use crate::{ + models::{ + bob::NodeName, + shared::{BobConnectionData, Hostname, RequestTimeout, XSpanIdString}, + }, + prelude::*, +}; +use api::ContextWrapperExt; +use axum::headers::{authorization::Basic, Authorization}; +use error_stack::ResultExt; +use frunk::hlist; +use hyper::StatusCode; +use std::{collections::HashMap, sync::Arc}; +use thiserror::Error; +pub mod api; +pub mod client; +pub mod context; +pub mod dto; + +pub type ApiInterface = dyn ApiNoContext + Send + Sync; + +#[derive(Debug, Error)] +pub enum ClientError { + #[error("couldn't init http client")] + InitClient, + #[error("couldn't probe connection to the node")] + Inaccessible, + #[error("permission denied")] + PermissionDenied, + #[error("no client found found for requested resource")] + NoClient, + #[error("bad address: no port")] + NoPort, + #[error("bad address: couldn't parse hostname")] + BadAddress, + #[error("Can't form hyper request")] + CantFormRequest, +} + +/// HTTP Connector construction +#[derive(Debug)] +pub struct Connector; + +impl Connector { + /// Alows building a HTTP(S) connector. Used for instantiating clients with custom + /// connectors. + #[must_use] + pub const fn builder() -> Builder { + Builder {} + } +} + +/// Builder for HTTP(S) connectors +#[derive(Debug)] +pub struct Builder {} + +impl Builder { + /// [Stub] Use HTTPS instead of HTTP + #[must_use] + pub const fn https(self) -> HttpsBuilder { + HttpsBuilder {} + } + + /// Build a HTTP connector + #[must_use] + pub fn build(self) -> hyper::client::connect::HttpConnector { + hyper::client::connect::HttpConnector::new() + } +} + +/// [Stub] Builder for HTTPS connectors +#[derive(Debug)] +pub struct HttpsBuilder {} + +impl HttpsBuilder { + pub fn build(self) { + unimplemented!() + } +} + +#[derive(Clone)] +pub struct BobClient { + /// Bob's hostname + hostname: Hostname, + + // NOTE: Can (and should) the connection mutate?.. + /// Connection, + main: Arc, + + /// Clients for all known nodes + cluster: HashMap>, +} + +#[allow(clippy::missing_fields_in_debug)] +impl std::fmt::Debug for BobClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let user = &self + .context() + .get::>, _>() + .as_ref() + .map_or("Unknown", |cred| cred.username()); + f.debug_struct("BobClient") + .field("hostname", &self.hostname) + .field("user", &user) + .finish() + } +} + +impl BobClient { + /// Creates new [`BobClient`] from [`BobConnectionData`] + /// + /// # Errors + /// The function will fail if a hostname isn't a valid url + pub async fn try_new( + bob_data: BobConnectionData, + timeout: RequestTimeout, + ) -> Result { + let auth = if let Some(creds) = bob_data.credentials { + Some(Authorization::basic(&creds.login, &creds.password)) + } else { + None + }; + let hostname = bob_data.hostname.clone(); + + let context: ClientContext = hlist![timeout, auth, XSpanIdString::default()]; + let client = Box::new( + Client::try_new_http(&hostname.to_string()).change_context(ClientError::InitClient)?, + ); + let nodes = client + .with_context(context.clone()) + .get_nodes() + .await + .change_context(ClientError::InitClient)?; + let api::GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(nodes) = nodes else { + Err(APIError::InvalidStatusCode(StatusCode::FORBIDDEN)) + .change_context(ClientError::PermissionDenied)? + }; + + let cluster: HashMap> = nodes + .iter() + .map(|node| { + ( + &node.name, + Self::change_port(&node.address, &bob_data.hostname), + ) + }) + .filter_map(|(name, hostname)| { + if hostname.is_err() { + tracing::warn!("couldn't change port for {name}. Client won't be created"); + } + hostname + .ok() + .map(|hostname| (name, Client::try_new_http(&hostname.to_string()))) + }) + .filter_map(|(name, client)| { + if client.is_err() { + tracing::warn!("couldn't create client for {hostname}"); + } + client.ok().map(|client| { + ( + name.clone(), + Arc::new(client.with_context(context.clone())) as Arc, + ) + }) + }) + .collect(); + let client = Box::new( + Client::try_new_http(&hostname.to_string()).change_context(ClientError::InitClient)?, + ); + Ok(Self { + hostname: bob_data.hostname, + main: Arc::new(client.with_context(context)), + cluster, + }) + } + + /// Probes connection to the Bob cluster + /// + /// Returns [`StatusCode`], that it received from Bob + /// + /// # Errors + /// + /// The function fails if there was an error during creation of request + /// It shouldn't happen on normal behaviour + /// + pub async fn probe(&self) -> Result { + match self + .main + .get_nodes() + .await + .change_context(ClientError::Inaccessible)? + { + api::GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(_) => Ok(StatusCode::OK), + api::GetNodesResponse::PermissionDenied => Err(ClientError::PermissionDenied.into()), + } + } + + /// Probes connection to the `hostname` + /// + /// # Errors + /// + /// This function will return an error if no client present for the specified `hostname` or + /// the client was unable to send request + pub async fn probe_socket(&self, name: &NodeName) -> Result { + if let Some(client) = self.cluster.get(name) { + match client + .get_nodes() + .await + .change_context(ClientError::Inaccessible)? + { + api::GetNodesResponse::AJSONArrayOfNodesInfoAndVdisksOnThem(_) => { + Ok(StatusCode::OK) + } + api::GetNodesResponse::PermissionDenied => { + Err(ClientError::PermissionDenied.into()) + } + } + } else { + Err(ClientError::NoClient.into()) + } + } + + #[must_use] + pub fn context(&self) -> &ClientContext { + self.main.context() + } + + #[must_use] + pub fn api(&self) -> &Arc { + &self.main + } + + pub fn cluster(&self) -> impl Iterator> { + self.cluster.values() + } + + #[must_use] + pub fn cluster_with_addr(&self) -> &HashMap> { + &self.cluster + } + + /// Changes port of the address with specified [`Hostname`] + /// + /// # Errors + /// + /// This function will return an error if address doesn't have a port or is an invalid [`Hostname`] + pub fn change_port(address: &str, src_hostname: &Hostname) -> Result { + let (body, _) = address.rsplit_once(':').ok_or(ClientError::NoPort)?; + let mut body = body.to_string(); + body.push_str(&format!(":{:?}", src_hostname.port())); + + Hostname::new(body).change_context(ClientError::BadAddress) + } + + #[must_use] + pub const fn hostname(&self) -> &Hostname { + &self.hostname + } +} diff --git a/backend/src/error.rs b/backend/src/error.rs index 3ac148d7..3d5abaf1 100644 --- a/backend/src/error.rs +++ b/backend/src/error.rs @@ -1,5 +1,3 @@ -#![allow(clippy::module_name_repetitions)] - use axum::response::{IntoResponse, Response}; use hyper::StatusCode; use thiserror::Error; diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 279af0ca..a6e3b85b 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -1,4 +1,4 @@ -#![allow(clippy::multiple_crate_versions)] +#![allow(clippy::multiple_crate_versions, clippy::module_name_repetitions)] #[cfg(feature = "swagger")] use axum::{routing::get, Router}; @@ -8,13 +8,13 @@ use utoipa::OpenApi; pub mod config; pub mod connector; pub mod error; -pub mod macros; pub mod models; +pub mod router; pub mod services; #[cfg_attr(feature = "swagger", derive(OpenApi))] #[cfg_attr(feature = "swagger", openapi( - paths(root), + paths(root, services::auth::login, services::auth::logout), tags( (name = "bob", description = "BOB management API") ) @@ -34,8 +34,14 @@ pub struct ApiDoc; pub async fn root() -> &'static str { "Hello Bob!" } + /// Generate openapi documentation for the project +/// +/// # Panics +/// +/// Panics if `OpenAPI` couldn't be converted into YAML format #[cfg(feature = "swagger")] +#[allow(clippy::expect_used)] pub fn openapi_doc() -> Router { use utoipa_rapidoc::RapiDoc; use utoipa_redoc::{Redoc, Servable}; @@ -67,7 +73,9 @@ pub fn openapi_doc() -> Router { pub mod prelude { #![allow(unused_imports)] pub use crate::error::AppError; - pub use crate::macros::RouteError; + pub use crate::router::RouteError; + #[cfg(feature = "swagger")] + pub use crate::ApiDoc; pub use axum::response::Result as AxumResult; pub use error_stack::{Context, Report, Result, ResultExt}; #[cfg(feature = "swagger")] diff --git a/backend/src/macros.rs b/backend/src/macros.rs deleted file mode 100644 index 553d1501..00000000 --- a/backend/src/macros.rs +++ /dev/null @@ -1,84 +0,0 @@ -// NOTE: might be better to declare API as global constant, but the downside of this is -// that we need to pass around this variable alongside with macro call -// lazy_static! { -// pub static ref API: OpenApi = ApiDoc::openapi(); -// } - -use thiserror::Error; - -#[derive(Debug, Error)] -pub enum RouteError { - #[error("No route found in OpenAPI scheme")] - NoRoute, - #[error("No method found for specified route in OpenAPI scheme")] - NoMethod, - #[error("No `operation_id` found")] - NoOperation, - #[error("OpenAPI's `operation_id` doesn't match handler's name")] - NoMatch, -} - -// TODO: add all axum's routing options (wildcards, any, on, service routing) support -/// Check if the following route corresponds with `OpenAPI` declaration -/// Relies on `operation_id` field, should NOT be changed on new route declaration -#[macro_export] -macro_rules! new_api_route { - ($router:expr, $route:literal, get($func:ident)) => {{ - new_api_route!($route, $func, Get).map(|_| $router.route($route, get($func))) - }}; - ($router:expr, $route:literal, post($func:ident)) => {{ - new_api_route!($route, $func, Post).map(|_| $router.route($route, post($func))) - }}; - ($router:expr, $route:literal, delete($func:ident)) => {{ - new_api_route!($route, $func, Delete).map(|_| $router.route($route, delete($func))) - }}; - ($router:expr, $route:literal, put($func:ident)) => {{ - new_api_route!($route, $func, Put).map(|_| $router.route($route, put($func))) - }}; - ($router:expr, $route:literal, head($func:ident)) => {{ - new_api_route!($route, $func, Head).map(|_| $router.route($route, head($func))) - }}; - ($router:expr, $route:literal, options($func:ident)) => {{ - new_api_route!($route, $func, Options).map(|_| $router.route($route, options($func))) - }}; - ($router:expr, $route:literal, patch($func:ident)) => {{ - new_api_route!($route, $func, Patch).map(|_| $router.route($route, patch($func))) - }}; - ($router:expr, $route:literal, trace($func:ident)) => {{ - new_api_route!($route, $func, Trace).map(|_| $router.route($route, trace($func))) - }}; - ($router:expr, $route:literal, connect($func:ident)) => {{ - new_api_route!($route, $func, Connect).map(|_| $router.route($route, connect($func))) - }}; - ($route:literal, $func:ident, $method:ident) => {{ - || -> Result<(), RouteError> { - #[cfg(not(feature = "swagger"))] - return Ok(()); - #[cfg(feature = "swagger")] - return Ok(ApiDoc::openapi() - .paths - .get_path_item( - $route - .split('/') - .map(|arg| { - arg.starts_with(':') - .then(|| ["{", &arg[1..], "}"].concat()) - .unwrap_or_else(|| arg.to_string()) - }) - .collect::>() - .join("/"), - ) - .ok_or(RouteError::NoRoute)? - .operations - .get(&utoipa::openapi::PathItemType::$method) - .ok_or(RouteError::NoMethod)? - .operation_id - .clone() - .ok_or(RouteError::NoOperation)? - .eq(&stringify!($func)) - .then_some(()) - .ok_or(RouteError::NoMatch)?); - } - } - ()}; -} diff --git a/backend/src/main.rs b/backend/src/main.rs index c4558fd8..c55a4428 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -1,13 +1,20 @@ #![allow(clippy::multiple_crate_versions)] -#[cfg(feature = "swagger")] -use backend::ApiDoc; - -use axum::{routing::get, Router}; -use backend::{config::ConfigExt, new_api_route, prelude::*, root, services::api_router}; +use axum::Router; +use axum_login::{memory_store::MemoryStore as AuthMemoryStore, AuthLayer}; +use axum_sessions::async_session::MemoryStore as SessionMemoryStore; +use axum_sessions::SessionLayer; +use backend::router::{NoApi, RouterApiExt}; +use backend::services::auth::SocketBobMemoryStore; +use backend::{config::ConfigExt, prelude::*, root, services::api_router}; use cli::Parser; use error_stack::{Result, ResultExt}; +use hyper::Method; +use rand::Rng; +use std::collections::HashMap; +use std::sync::Arc; use std::{env, path::PathBuf}; +use tokio::sync::RwLock; use tower::ServiceBuilder; use tower_http::{cors::CorsLayer, services::ServeDir}; use tracing::Level; @@ -52,17 +59,38 @@ fn init_tracer(_log_file: &Option, trace_level: Level) { #[allow(clippy::unwrap_used, clippy::expect_used)] fn router(cors: CorsLayer) -> Router { + let secret = rand::thread_rng().gen::<[u8; 64]>(); + + let session_store = SessionMemoryStore::new(); + let session_layer = SessionLayer::new(session_store, &secret); + + // We can use in-memory database since we don't need to actually store users + let store = Arc::new(RwLock::new(HashMap::default())); + + let bob_store: SocketBobMemoryStore = AuthMemoryStore::new(&store); + let auth_layer = AuthLayer::new(bob_store, &secret); + let mut frontend = env::current_exe().expect("Couldn't get current executable path."); frontend.pop(); frontend.push(FRONTEND_FOLDER); tracing::info!("serving frontend at: {frontend:?}"); - let mut router = Router::new() + let router = Router::new() // Frontend .nest_service("/", ServeDir::new(frontend)); // Add API - router = new_api_route!(router, "/root", get(root)).expect("Couldn't register new API route"); + let router = router + .api_route::<_, _, NoApi>("/root", Method::GET, root) + .expect("Couldn't register new API route"); router - .nest("/api", api_router()) - .layer(ServiceBuilder::new().layer(cors)) + .nest( + "/api", + api_router().expect("Couldn't get API routes").layer( + ServiceBuilder::new() + .layer(cors) + .layer(session_layer) + .layer(auth_layer), + ), + ) + .with_state(store) } diff --git a/backend/src/models/api.rs b/backend/src/models/api.rs new file mode 100644 index 00000000..e69de29b diff --git a/backend/src/models/bob.rs b/backend/src/models/bob.rs new file mode 100644 index 00000000..3a4109f9 --- /dev/null +++ b/backend/src/models/bob.rs @@ -0,0 +1,4 @@ +pub type NodeName = String; +pub type DiskName = String; +pub type NodeAddress = String; +pub type IsActive = bool; diff --git a/backend/src/models/mod.rs b/backend/src/models/mod.rs index 8b137891..f97d3a2a 100644 --- a/backend/src/models/mod.rs +++ b/backend/src/models/mod.rs @@ -1 +1,3 @@ - +pub mod api; +pub mod bob; +pub mod shared; diff --git a/backend/src/models/shared.rs b/backend/src/models/shared.rs new file mode 100644 index 00000000..f3e38c8d --- /dev/null +++ b/backend/src/models/shared.rs @@ -0,0 +1,100 @@ +use nutype::nutype; +use serde::{Deserialize, Serialize}; +use std::{net::SocketAddr, time::Duration}; + +#[nutype(sanitize(trim, lowercase) validate(not_empty, regex = r"(https?)?(?::[\/]{2})?([^\:]+):(([1-9][0-9]{0,3}|[1-5][0-9]{4}|6[0-4][0-9]{3}|65[0-4][0-9]{2}|655[0-2][0-9]|6553[0-5])$)"))] +#[derive(Display, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct Hostname(String); + +impl Hostname { + /// Can be safely unwraped thanks to regex + #[must_use] + #[allow(clippy::pedantic, clippy::unwrap_used)] + pub fn port(&self) -> u16 { + self.clone() + .into_inner() + .rsplit(':') + .next() + .unwrap() + .parse::() + .unwrap() + } +} + +impl TryFrom for Hostname { + type Error = HostnameError; + + fn try_from(value: SocketAddr) -> Result { + Self::new(value.to_string()) + } +} + +impl TryFrom for SocketAddr { + type Error = std::net::AddrParseError; + + fn try_from(value: Hostname) -> Result { + value.to_string().parse() + } +} + +/// Data needed to connect to a BOB cluster +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct BobConnectionData { + /// Address to connect to + pub hostname: Hostname, + + /// [Optional] Credentials used for BOB authentication + #[serde(skip_serializing_if = "Option::is_none")] + pub credentials: Option, +} + +/// Optional auth credentials for a BOB cluster +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Serialize, Deserialize)] +pub struct Credentials { + /// Login used during auth + pub login: String, + + /// Password used during auth + pub password: String, +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct RequestTimeout(Duration); + +impl RequestTimeout { + #[must_use] + pub const fn from_millis(millis: u64) -> Self { + Self(Duration::from_millis(millis)) + } + + #[must_use] + pub const fn into_inner(self) -> Duration { + self.0 + } +} + +/// Header - `X-Span-ID` - used to track a request through a chain of microservices. +pub const X_SPAN_ID: &str = "X-Span-ID"; + +/// Wrapper for a string being used as an X-Span-ID. +#[derive(Debug, Clone)] +pub struct XSpanIdString(pub String); + +impl XSpanIdString { + /// Extract an X-Span-ID from a request header if present, and if not + /// generate a new one. + pub fn get_or_generate(req: &hyper::Request) -> Self { + let x_span_id = req.headers().get(X_SPAN_ID); + + x_span_id + .and_then(|x| x.to_str().ok()) + .map(|x| Self(x.to_string())) + .unwrap_or_default() + } +} + +impl Default for XSpanIdString { + fn default() -> Self { + Self(uuid::Uuid::new_v4().to_string()) + } +} diff --git a/backend/src/router.rs b/backend/src/router.rs new file mode 100644 index 00000000..58aaeeb4 --- /dev/null +++ b/backend/src/router.rs @@ -0,0 +1,208 @@ +// NOTE: might be better to declare API as global constant, but the downside of this is +// that we need to pass around this variable alongside with macro call +// lazy_static! { +// pub static ref API: OpenApi = ApiDoc::openapi(); +// } + +use crate::prelude::*; +use crate::ApiDoc; +use axum::body::HttpBody; +use axum::routing::on; +use axum::{handler::Handler, routing::MethodFilter, Router}; +use hyper::{Body, Method}; +use std::convert::Infallible; +use std::ops::Deref; +use thiserror::Error; +use utoipa::{openapi::PathItemType, OpenApi}; + +#[derive(Debug, Error)] +pub enum RouteError { + #[error("No route found in OpenAPI scheme")] + NoRoute, + #[error("No method found for specified route in OpenAPI scheme")] + NoMethod, + #[error("No `operation_id` found")] + NoOperation, + #[error("OpenAPI's `operation_id` doesn't match handler's name")] + NoMatch, + #[error("Unexpected Hyper method - was it `Method::CONNECT`?")] + UnexpectedMethod, + #[error("Error occured during handler processing")] + InvalidHandler, +} + +pub struct NoApi; +impl<'a> ApiVersion<'a> for NoApi {} + +pub struct ApiV1; + +pub trait ApiVersion<'a> { + #[must_use] + fn to_path() -> &'a str { + "" + } +} + +impl<'a> ApiVersion<'a> for ApiV1 { + fn to_path() -> &'a str { + "/api/v1" + } +} + +pub trait RouterApiExt { + /// Add API Route + /// + fn api_route<'a, H, T, Version>( + self, + path: &str, + method: Method, + handler: H, + ) -> Result + where + H: Handler, + T: 'static, + S: Send + Sync + 'static, + Version: ApiVersion<'a>, + Self: Sized; +} + +impl RouterApiExt for Router +where + B: HttpBody + Send + 'static, + S: Clone + Send + Sync + 'static, +{ + fn api_route<'a, H, T, Version>( + self, + path: &str, + method: Method, + handler: H, + ) -> Result + where + H: Handler, + T: 'static, + S: Send + Sync + 'static, + Version: ApiVersion<'a>, + Self: Sized, + { + check_api::<_, _, _, H, Version>( + path, + &*TryInto::>::try_into(method.clone())?, + )?; + Ok(self.route( + path, + on( + *TryInto::>::try_into(method)?, + handler, + ), + )) + } +} + +/// Check if the following route corresponds with `OpenAPI` declaration +/// Relies on `operation_id` field, should NOT be changed on new route declaration +fn check_api<'a, T, S, B, H, Version>(path: &str, method: &PathItemType) -> Result<(), RouteError> +where + H: Handler, + T: 'static, + S: Send + Sync + 'static, + Version: ApiVersion<'a>, +{ + #[cfg(not(feature = "swagger"))] + return Ok(()); + #[cfg(feature = "swagger")] + { + let ver = Version::to_path(); + let route = [ + ver, + &path + .split('/') + .map(|arg| { + arg.starts_with(':') + .then(|| ["{", &arg[1..], "}"].concat()) + .unwrap_or_else(|| arg.to_string()) + }) + .collect::>() + .join("/"), + ] + .concat(); + let operation_id = ApiDoc::openapi() + .paths + .get_path_item(&route) + .ok_or(RouteError::NoRoute) + .attach_printable(format!("route: {route}"))? + .operations + .get(method) + .ok_or(RouteError::NoMethod)? + .operation_id + .clone() + .ok_or(RouteError::NoOperation)?; + let handler_name = &[std::any::type_name::() + .rsplit_once(':') + .ok_or(RouteError::InvalidHandler)? + .1] + .concat(); + + operation_id + .eq(handler_name) + .then_some(()) + .ok_or(RouteError::NoMatch) + .attach_printable(format!("left: {operation_id}, right: {handler_name}")) + } +} + +// TODO: Restrict input types by some trait? +pub struct MethodWrapper(T); + +impl Deref for MethodWrapper { + type Target = PathItemType; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Deref for MethodWrapper { + type Target = MethodFilter; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl TryFrom for MethodWrapper { + type Error = RouteError; + + fn try_from(value: Method) -> std::result::Result { + Ok(match value { + Method::GET => Self(PathItemType::Get), + Method::PUT => Self(PathItemType::Put), + Method::POST => Self(PathItemType::Post), + Method::HEAD => Self(PathItemType::Head), + Method::PATCH => Self(PathItemType::Patch), + Method::TRACE => Self(PathItemType::Trace), + Method::DELETE => Self(PathItemType::Delete), + Method::OPTIONS => Self(PathItemType::Options), + Method::CONNECT => Self(PathItemType::Connect), + _ => Err(RouteError::UnexpectedMethod)?, + }) + } +} + +impl TryFrom for MethodWrapper { + fn try_from(value: Method) -> std::result::Result { + Ok(match value { + Method::GET => Self(MethodFilter::GET), + Method::PUT => Self(MethodFilter::PUT), + Method::POST => Self(MethodFilter::POST), + Method::HEAD => Self(MethodFilter::HEAD), + Method::PATCH => Self(MethodFilter::PATCH), + Method::TRACE => Self(MethodFilter::TRACE), + Method::DELETE => Self(MethodFilter::DELETE), + Method::OPTIONS => Self(MethodFilter::OPTIONS), + Method::CONNECT => Err(RouteError::UnexpectedMethod)?, + _ => Err(RouteError::UnexpectedMethod)?, + }) + } + + type Error = RouteError; +} diff --git a/backend/src/services/auth.rs b/backend/src/services/auth.rs new file mode 100644 index 00000000..ec363602 --- /dev/null +++ b/backend/src/services/auth.rs @@ -0,0 +1,159 @@ +use crate::router::ApiV1; +use crate::router::ApiVersion; +use crate::{ + connector::BobClient, + models::shared::{BobConnectionData, Hostname, RequestTimeout}, + prelude::*, +}; +use axum::{ + async_trait, + extract::State, + headers::{authorization::Basic, Authorization}, + Extension, Json, +}; +use axum_login::{ + extractors::AuthContext, memory_store::MemoryStore, secrecy::SecretVec, AuthUser, UserStore, +}; +use hyper::StatusCode; +use serde::Deserialize; +use std::{collections::HashMap, fmt::Display, hash::Hash, sync::Arc}; +use thiserror::Error; +use tokio::sync::RwLock; + +pub type SocketBobMemoryStore = MemoryStore; +pub type BobStore = HashMap; +pub type SharedBobStore = Arc>>; + +#[derive(Debug, Error)] +pub enum AuthError { + #[error("couldn't load user")] + LoadError, + #[error("couldn't lock user store")] + PoisonError, +} + +/// Optional credentials for a BOB cluster +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Deserialize)] +pub struct Credentials { + pub login: String, + pub password: String, +} + +/// Data needed to login to a BOB cluster +impl AuthUser for BobClient { + fn get_id(&self) -> Hostname { + self.hostname().clone() + } + + fn get_password_hash(&self) -> SecretVec { + self.context() + .get::>, _>() + .as_ref() + .map_or_else( + || SecretVec::new("".into()), + |cred: &Authorization| SecretVec::new(cred.password().as_bytes().to_vec()), + ) + } +} + +/// Login to a BOB cluster +/// +/// # Errors +/// This function can return the following errors +/// +/// 1. [`StatusCode::BAD_REQUEST`] +/// The function failed to parse hostname of the request +/// +/// 2. [`StatusCode::NOT_FOUND`] +/// The client was unbale to reach the host +/// +/// 3. [`StatusCode::UNAUTHORIZED`] +/// The client could.t authorize on the host +/// +#[cfg_attr(feature = "swagger", utoipa::path( + post, + context_path = ApiV1::to_path(), + path = "/login", + responses( + (status = 200, description = "Hello Bob!") + ) + ))] +pub async fn login( + State(store): State>>>, + mut auth: AuthContext, + Extension(request_timeout): Extension, + Json(bob): Json, +) -> AxumResult { + tracing::info!("post /login : {:?}", &bob); + let hostname = bob.hostname.clone(); + + let Ok(bob_client) = BobClient::try_new(bob, request_timeout).await else { + tracing::warn!("Couldn't create client"); + return Err(StatusCode::UNAUTHORIZED.into()); + }; + let Ok(res) = bob_client.probe().await else { + return Err(StatusCode::UNAUTHORIZED.into()); + }; + tracing::info!("received {res} from BOB"); + + if res == StatusCode::OK { + if let Err(err) = auth.login(&bob_client).await { + tracing::warn!("Couldn't login the user. Err: {err}, User: {bob_client:?}"); + return Err(StatusCode::UNAUTHORIZED.into()); + }; + store.write().await.insert(hostname, bob_client); + tracing::info!("AUTHORIZATION SUCCESSFUL"); + tracing::info!("Logged in as {:?}", &auth.current_user); + } + + Ok(res) +} + +/// Logout from a BOB cluster +#[cfg_attr(feature = "swagger", utoipa::path( + post, + context_path = ApiV1::to_path(), + path = "/logout", + responses( + (status = 200, description = "Hello Bob!") + ) + ))] +pub async fn logout(mut auth: AuthContext) { + tracing::info!("get /logout : {:?}", &auth.current_user); + auth.logout().await; +} + +/// An ephemeral in-memory store, since we don't need to store any users. +#[derive(Clone, Debug, Default)] +pub struct BobMemoryStore { + inner: Arc>>, +} + +impl BobMemoryStore { + /// Creates a new in-memory store. + pub fn new(inner: &Arc>>) -> Self { + Self { + inner: inner.clone(), + } + } +} + +#[async_trait] +impl UserStore for BobMemoryStore +where + Role: PartialOrd + PartialEq + Clone + Send + Sync + 'static, + UserId: Display + Eq + Clone + Send + Sync + Hash + 'static, + BobClient: AuthUser, +{ + type User = BobClient; + type Error = AuthError; + + async fn load_user( + &self, + user_id: &UserId, + ) -> std::result::Result, Self::Error> { + tracing::debug!("load_user: {}", user_id); + + Ok(self.inner.read().await.get(user_id).cloned()) + } +} diff --git a/backend/src/services/mod.rs b/backend/src/services/mod.rs index d32e834c..e6a7c683 100644 --- a/backend/src/services/mod.rs +++ b/backend/src/services/mod.rs @@ -1,14 +1,33 @@ +use self::auth::{login, logout, SharedBobStore}; +use crate::{ + connector::BobClient, + models::shared::Hostname, + prelude::*, + router::{ApiV1, RouterApiExt}, +}; use axum::{ response::{IntoResponse, Response}, Router, }; -use hyper::{Body, StatusCode}; +use axum_login::RequireAuthorizationLayer; +use hyper::{Body, Method, StatusCode}; use thiserror::Error; +pub mod auth; + +type RequireAuth = RequireAuthorizationLayer; + /// Export all secured routes -#[allow(dead_code)] -pub fn api_router() -> Router<(), Body> { - Router::new() +/// +/// # Errors +/// +/// This function will return an error if route pat +pub fn api_router() -> Result, Body>, RouteError> { + let router = Router::new().api_route::<_, _, ApiV1>("/logout", Method::POST, logout)?; + let router = router.route_layer(RequireAuth::login()); + let router = router.api_route::<_, _, ApiV1>("/login", Method::POST, login)?; + + Ok(router) } /// Errors that happend during API request proccessing