Skip to content

Commit

Permalink
Added writers to monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed Nov 12, 2024
1 parent 1121bde commit 57aa92f
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 9 deletions.
4 changes: 3 additions & 1 deletion src/app/app_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
settings_reader::SettingsModel,
};

use super::{EventsSync, PrometheusMetrics};
use super::{EventsSync, HttpWriters, PrometheusMetrics};

pub const APP_VERSION: &'static str = env!("CARGO_PKG_VERSION");

Expand All @@ -44,6 +44,7 @@ pub struct AppContext {
pub sync: EventsSync,
pub states: Arc<AppStates>,
pub persist_markers: PersistMarkersByTable,
pub http_writers: HttpWriters,
persist_amount: AtomicUsize,
}

Expand All @@ -66,6 +67,7 @@ impl AppContext {
settings,
persist_amount: AtomicUsize::new(0),
sync: EventsSync::new(),
http_writers: HttpWriters::new(),
}
}

Expand Down
73 changes: 73 additions & 0 deletions src/app/http_writers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::collections::BTreeMap;

use my_no_sql_server_core::rust_extensions::date_time::DateTimeAsMicroseconds;
use tokio::sync::Mutex;

#[derive(Debug)]
pub struct WriterInfo {
pub version: String,
pub last_ping: DateTimeAsMicroseconds,
}

pub struct HttpWriters {
data: Mutex<BTreeMap<String, WriterInfo>>,
}

impl HttpWriters {
pub fn new() -> Self {
Self {
data: Mutex::new(BTreeMap::new()),
}
}

pub async fn update(&self, name: &str, version: &str, now: DateTimeAsMicroseconds) {
let mut data = self.data.lock().await;
match data.get_mut(name) {
Some(writer_info) => {
writer_info.last_ping = now;
if writer_info.version != version {
writer_info.version = version.to_string();
}
}
None => {
data.insert(
name.to_string(),
WriterInfo {
version: version.to_string(),
last_ping: now,
},
);
}
}
}

pub async fn get<TResult>(
&self,
convert: impl Fn(&str, &WriterInfo) -> TResult,
) -> Vec<TResult> {
let data = self.data.lock().await;

let mut result = Vec::with_capacity(data.len());

for (key, itm) in data.iter() {
let itm = convert(key, itm);
result.push(itm);
}

result
}

pub async fn gc(&self, now: DateTimeAsMicroseconds) {
let mut data = self.data.lock().await;
let mut to_remove = Vec::new();
for (name, writer_info) in data.iter() {
if now.duration_since(writer_info.last_ping).get_full_minutes() > 1 {
to_remove.push(name.clone());
}
}

for name in to_remove {
data.remove(&name);
}
}
}
2 changes: 2 additions & 0 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ pub use metrics::PrometheusMetrics;
pub use metrics::UpdatePendingToSyncModel;
mod events_sync;
pub use events_sync::*;
mod http_writers;
pub use http_writers::*;
2 changes: 2 additions & 0 deletions src/background/gc_http_sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,7 @@ impl MyTimerTick for GcHttpSessionsTimer {
.remove_pending_to_sync(&data_reader.connection);
}
}

self.app.http_writers.gc(now).await;
}
}
38 changes: 35 additions & 3 deletions src/http/controllers/api/ping_action.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,52 @@
use my_http_server::{macros::http_route, HttpContext, HttpFailResult, HttpOkResult, HttpOutput};
use std::sync::Arc;

use my_http_server::{
macros::{http_route, MyHttpInput},
HttpContext, HttpFailResult, HttpOkResult, HttpOutput,
};
use my_no_sql_server_core::rust_extensions::date_time::DateTimeAsMicroseconds;

use crate::app::AppContext;

#[http_route(
method: "GET",
route: "/api/Ping",
controller: "Monitoring",
description: "Endpoint to ping the service",
summary: "Endpoint to ping the service",
input_data: PingHttpInputModel,
result:[
{status_code: 204, description: "Ok result"},
]
)]
pub struct PingAction;
pub struct PingAction {
app: Arc<AppContext>,
}

impl PingAction {
pub fn new(app: Arc<AppContext>) -> Self {
Self { app }
}
}

async fn handle_request(
_: &PingAction,
action: &PingAction,
input_data: PingHttpInputModel,
_ctx: &mut HttpContext,
) -> Result<HttpOkResult, HttpFailResult> {
let now = DateTimeAsMicroseconds::now();
action
.app
.http_writers
.update(&input_data.name, &input_data.version, now)
.await;
HttpOutput::Empty.into_ok_result(false).into()
}

#[derive(Debug, MyHttpInput)]
pub struct PingHttpInputModel {
#[http_query(name = "clientName", description = "Client Name")]
pub name: String,
#[http_query(name = "clientVersion", description = "Client Version")]
pub version: String,
}
2 changes: 1 addition & 1 deletion src/http/controllers/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub fn build(app: &Arc<AppContext>) -> ControllersMiddleware {
let mut result = ControllersMiddleware::new(None, None);

result.register_get_action(Arc::new(super::api::IsAliveAction));
result.register_get_action(Arc::new(super::api::PingAction));
result.register_get_action(Arc::new(super::api::PingAction::new(app.clone())));

result.register_get_action(Arc::new(super::tables_controller::GetListAction::new(
app.clone(),
Expand Down
36 changes: 33 additions & 3 deletions src/http/controllers/status_controller/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,12 @@ impl StatusModel {

let used_http_connections = app.metrics.get_http_connections_amount();

let writers = WriterApiModel::new(app).await;

if app.states.is_initialized() {
return Self {
not_initialized: None,
initialized: Some(InitializedModel::new(readers, tables_model)),
initialized: Some(InitializedModel::new(readers, tables_model, writers)),
status_bar: StatusBarModel::new(
app,
tcp,
Expand All @@ -129,12 +131,21 @@ impl StatusModel {
#[derive(Serialize, Deserialize, Debug, MyHttpObjectStructure)]
pub struct InitializedModel {
pub readers: Vec<ReaderModel>,
pub writers: Vec<WriterApiModel>,
pub tables: Vec<TableModel>,
}

impl InitializedModel {
pub fn new(readers: Vec<ReaderModel>, tables: Vec<TableModel>) -> Self {
Self { readers, tables }
pub fn new(
readers: Vec<ReaderModel>,
tables: Vec<TableModel>,
writers: Vec<WriterApiModel>,
) -> Self {
Self {
readers,
tables,
writers,
}
}
}

Expand Down Expand Up @@ -172,3 +183,22 @@ async fn get_readers(app: &AppContext) -> (Vec<ReaderModel>, usize, usize) {

(result, tcp_count, http_count)
}

#[derive(Serialize, Deserialize, Debug, MyHttpObjectStructure)]
pub struct WriterApiModel {
pub name: String,
pub version: String,
pub last_update: String,
}

impl WriterApiModel {
pub async fn new(app: &AppContext) -> Vec<Self> {
app.http_writers
.get(|name, itm| Self {
name: name.to_string(),
version: itm.version.to_string(),
last_update: itm.last_ping.to_rfc3339(),
})
.await
}
}
2 changes: 1 addition & 1 deletion src/http/controllers/status_controller/status_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::models::StatusModel;
description: "Monitoring API",
summary: "Returns monitoring metrics",
result:[
{status_code: 200, description: "Monitoring snapshot", model: "Vec<StatusModel>"},
{status_code: 200, description: "Monitoring snapshot", model: "StatusModel"},
]
)]
pub struct StatusAction {
Expand Down

0 comments on commit 57aa92f

Please sign in to comment.