diff --git a/src/app/app_ctx.rs b/src/app/app_ctx.rs index c1b6ddf..c97c4bd 100644 --- a/src/app/app_ctx.rs +++ b/src/app/app_ctx.rs @@ -18,7 +18,7 @@ use crate::{ settings_reader::SettingsModel, }; -use super::{EventsSync, PrometheusMetrics}; +use super::{EventsSync, HttpWriters, PrometheusMetrics}; pub const APP_VERSION: &'static str = env!("CARGO_PKG_VERSION"); @@ -44,6 +44,7 @@ pub struct AppContext { pub sync: EventsSync, pub states: Arc, pub persist_markers: PersistMarkersByTable, + pub http_writers: HttpWriters, persist_amount: AtomicUsize, } @@ -66,6 +67,7 @@ impl AppContext { settings, persist_amount: AtomicUsize::new(0), sync: EventsSync::new(), + http_writers: HttpWriters::new(), } } diff --git a/src/app/http_writers.rs b/src/app/http_writers.rs new file mode 100644 index 0000000..78f0d37 --- /dev/null +++ b/src/app/http_writers.rs @@ -0,0 +1,73 @@ +use std::collections::BTreeMap; + +use my_no_sql_server_core::rust_extensions::date_time::DateTimeAsMicroseconds; +use tokio::sync::Mutex; + +#[derive(Debug)] +pub struct WriterInfo { + pub version: String, + pub last_ping: DateTimeAsMicroseconds, +} + +pub struct HttpWriters { + data: Mutex>, +} + +impl HttpWriters { + pub fn new() -> Self { + Self { + data: Mutex::new(BTreeMap::new()), + } + } + + pub async fn update(&self, name: &str, version: &str, now: DateTimeAsMicroseconds) { + let mut data = self.data.lock().await; + match data.get_mut(name) { + Some(writer_info) => { + writer_info.last_ping = now; + if writer_info.version != version { + writer_info.version = version.to_string(); + } + } + None => { + data.insert( + name.to_string(), + WriterInfo { + version: version.to_string(), + last_ping: now, + }, + ); + } + } + } + + pub async fn get( + &self, + convert: impl Fn(&str, &WriterInfo) -> TResult, + ) -> Vec { + let data = self.data.lock().await; + + let mut result = Vec::with_capacity(data.len()); + + for (key, itm) in data.iter() { + let itm = convert(key, itm); + result.push(itm); + } + + result + } + + pub async fn gc(&self, now: DateTimeAsMicroseconds) { + let mut data = self.data.lock().await; + let mut to_remove = Vec::new(); + for (name, writer_info) in data.iter() { + if now.duration_since(writer_info.last_ping).get_full_minutes() > 1 { + to_remove.push(name.clone()); + } + } + + for name in to_remove { + data.remove(&name); + } + } +} diff --git a/src/app/mod.rs b/src/app/mod.rs index 87a2df5..e11fa96 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -7,3 +7,5 @@ pub use metrics::PrometheusMetrics; pub use metrics::UpdatePendingToSyncModel; mod events_sync; pub use events_sync::*; +mod http_writers; +pub use http_writers::*; diff --git a/src/background/gc_http_sessions.rs b/src/background/gc_http_sessions.rs index eb93c50..e874ae1 100644 --- a/src/background/gc_http_sessions.rs +++ b/src/background/gc_http_sessions.rs @@ -29,5 +29,7 @@ impl MyTimerTick for GcHttpSessionsTimer { .remove_pending_to_sync(&data_reader.connection); } } + + self.app.http_writers.gc(now).await; } } diff --git a/src/http/controllers/api/ping_action.rs b/src/http/controllers/api/ping_action.rs index 8cb298c..3cfaafc 100644 --- a/src/http/controllers/api/ping_action.rs +++ b/src/http/controllers/api/ping_action.rs @@ -1,4 +1,12 @@ -use my_http_server::{macros::http_route, HttpContext, HttpFailResult, HttpOkResult, HttpOutput}; +use std::sync::Arc; + +use my_http_server::{ + macros::{http_route, MyHttpInput}, + HttpContext, HttpFailResult, HttpOkResult, HttpOutput, +}; +use my_no_sql_server_core::rust_extensions::date_time::DateTimeAsMicroseconds; + +use crate::app::AppContext; #[http_route( method: "GET", @@ -6,15 +14,39 @@ use my_http_server::{macros::http_route, HttpContext, HttpFailResult, HttpOkResu controller: "Monitoring", description: "Endpoint to ping the service", summary: "Endpoint to ping the service", + input_data: PingHttpInputModel, result:[ {status_code: 204, description: "Ok result"}, ] )] -pub struct PingAction; +pub struct PingAction { + app: Arc, +} + +impl PingAction { + pub fn new(app: Arc) -> Self { + Self { app } + } +} async fn handle_request( - _: &PingAction, + action: &PingAction, + input_data: PingHttpInputModel, _ctx: &mut HttpContext, ) -> Result { + let now = DateTimeAsMicroseconds::now(); + action + .app + .http_writers + .update(&input_data.name, &input_data.version, now) + .await; HttpOutput::Empty.into_ok_result(false).into() } + +#[derive(Debug, MyHttpInput)] +pub struct PingHttpInputModel { + #[http_query(name = "clientName", description = "Client Name")] + pub name: String, + #[http_query(name = "clientVersion", description = "Client Version")] + pub version: String, +} diff --git a/src/http/controllers/builder.rs b/src/http/controllers/builder.rs index c9f91fb..52f80bd 100644 --- a/src/http/controllers/builder.rs +++ b/src/http/controllers/builder.rs @@ -8,7 +8,7 @@ pub fn build(app: &Arc) -> ControllersMiddleware { let mut result = ControllersMiddleware::new(None, None); result.register_get_action(Arc::new(super::api::IsAliveAction)); - result.register_get_action(Arc::new(super::api::PingAction)); + result.register_get_action(Arc::new(super::api::PingAction::new(app.clone()))); result.register_get_action(Arc::new(super::tables_controller::GetListAction::new( app.clone(), diff --git a/src/http/controllers/status_controller/models.rs b/src/http/controllers/status_controller/models.rs index 8b5864a..bbb87e7 100644 --- a/src/http/controllers/status_controller/models.rs +++ b/src/http/controllers/status_controller/models.rs @@ -105,10 +105,12 @@ impl StatusModel { let used_http_connections = app.metrics.get_http_connections_amount(); + let writers = WriterApiModel::new(app).await; + if app.states.is_initialized() { return Self { not_initialized: None, - initialized: Some(InitializedModel::new(readers, tables_model)), + initialized: Some(InitializedModel::new(readers, tables_model, writers)), status_bar: StatusBarModel::new( app, tcp, @@ -129,12 +131,21 @@ impl StatusModel { #[derive(Serialize, Deserialize, Debug, MyHttpObjectStructure)] pub struct InitializedModel { pub readers: Vec, + pub writers: Vec, pub tables: Vec, } impl InitializedModel { - pub fn new(readers: Vec, tables: Vec) -> Self { - Self { readers, tables } + pub fn new( + readers: Vec, + tables: Vec, + writers: Vec, + ) -> Self { + Self { + readers, + tables, + writers, + } } } @@ -172,3 +183,22 @@ async fn get_readers(app: &AppContext) -> (Vec, usize, usize) { (result, tcp_count, http_count) } + +#[derive(Serialize, Deserialize, Debug, MyHttpObjectStructure)] +pub struct WriterApiModel { + pub name: String, + pub version: String, + pub last_update: String, +} + +impl WriterApiModel { + pub async fn new(app: &AppContext) -> Vec { + app.http_writers + .get(|name, itm| Self { + name: name.to_string(), + version: itm.version.to_string(), + last_update: itm.last_ping.to_rfc3339(), + }) + .await + } +} diff --git a/src/http/controllers/status_controller/status_action.rs b/src/http/controllers/status_controller/status_action.rs index 0b0648a..566046b 100644 --- a/src/http/controllers/status_controller/status_action.rs +++ b/src/http/controllers/status_controller/status_action.rs @@ -11,7 +11,7 @@ use super::models::StatusModel; description: "Monitoring API", summary: "Returns monitoring metrics", result:[ - {status_code: 200, description: "Monitoring snapshot", model: "Vec"}, + {status_code: 200, description: "Monitoring snapshot", model: "StatusModel"}, ] )] pub struct StatusAction {