From 132a0260603c674d1b37c035c78b0c95fa5ae572 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Thu, 22 Aug 2024 17:25:25 +0800 Subject: [PATCH 1/5] remove cluster api --- examples/websocket/Cargo.toml | 10 - examples/websocket/config/conf-default.toml | 101 --- examples/websocket/src/main.rs | 27 - examples/websocket/src/processor.rs | 156 ----- tardis/Cargo.toml | 8 - tardis/src/cluster.rs | 13 - tardis/src/cluster/cluster_broadcast.rs | 110 ---- tardis/src/cluster/cluster_hashmap.rs | 276 -------- tardis/src/cluster/cluster_processor.rs | 497 --------------- tardis/src/cluster/cluster_publish.rs | 213 ------- tardis/src/cluster/cluster_receive.rs | 205 ------ tardis/src/cluster/cluster_watch_by_cache.rs | 67 -- tardis/src/cluster/cluster_watch_by_k8s.rs | 93 --- tardis/src/config/config_dto.rs | 40 +- tardis/src/lib.rs | 35 +- tardis/src/web.rs | 7 +- tardis/src/web/cluster_id_mw.rs | 35 -- tardis/src/web/web_server.rs | 13 +- tardis/src/web/web_server/status_api.rs | 73 --- tardis/src/web/ws_processor.rs | 590 ------------------ .../src/web/ws_processor/cluster_protocol.rs | 25 - tardis/tests/test_cluster.rs | 318 ---------- tardis/tests/test_websocket.rs | 466 -------------- 23 files changed, 5 insertions(+), 3373 deletions(-) delete mode 100644 examples/websocket/Cargo.toml delete mode 100644 examples/websocket/config/conf-default.toml delete mode 100644 examples/websocket/src/main.rs delete mode 100644 examples/websocket/src/processor.rs delete mode 100644 tardis/src/cluster.rs delete mode 100644 tardis/src/cluster/cluster_broadcast.rs delete mode 100644 tardis/src/cluster/cluster_hashmap.rs delete mode 100644 tardis/src/cluster/cluster_processor.rs delete mode 100644 tardis/src/cluster/cluster_publish.rs delete mode 100644 tardis/src/cluster/cluster_receive.rs delete mode 100644 tardis/src/cluster/cluster_watch_by_cache.rs delete mode 100644 tardis/src/cluster/cluster_watch_by_k8s.rs delete mode 100644 tardis/src/web/cluster_id_mw.rs delete mode 100644 tardis/src/web/web_server/status_api.rs delete mode 100644 tardis/src/web/ws_processor.rs delete mode 100644 tardis/src/web/ws_processor/cluster_protocol.rs delete mode 100644 tardis/tests/test_cluster.rs delete mode 100644 tardis/tests/test_websocket.rs diff --git a/examples/websocket/Cargo.toml b/examples/websocket/Cargo.toml deleted file mode 100644 index d57dfdfd..00000000 --- a/examples/websocket/Cargo.toml +++ /dev/null @@ -1,10 +0,0 @@ -[package] -name = "tardis-example-websocket" -version = "0.1.0" -authors.workspace = true -edition.workspace = true -publish = false - -[dependencies] -serde = { version = "1", features = ["derive"] } -tardis = { path = "../../tardis", features = ["web-server", "tracing-appender"] } diff --git a/examples/websocket/config/conf-default.toml b/examples/websocket/config/conf-default.toml deleted file mode 100644 index 92c78449..00000000 --- a/examples/websocket/config/conf-default.toml +++ /dev/null @@ -1,101 +0,0 @@ -[app] -id = "websocket" -name = "ws基础示例" -desc = "ws基础示例" -version = "1.0.0" - -[fw.web_server] -port = 8089 -doc_urls = [["test env", "http://localhost:8089/"]] -tls_key = """ ------BEGIN PRIVATE KEY----- -MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQCeZuT0jXfIbCyI -srXkPaJ6SeqOlJyokAbGL1xcuPyqFQo2KGzStG2B7vI9EX5vPA8PaNToUSUjC/92 -T+Y4Swl3cEY70F8Lw6toY2eA1m01sxgeSOyFDXzgmhdr7su+cVgLrBGh8Dus8BYo -sMcx6ytahFJmD7MjjeRbDhERi6WlC5N4KqFcPKh7kd+u0X3WmbQj/Ge+mSh2xJJB -uXofMKhFBVZTyAa4Vc4pkoVqj2nCMgBGYsDkoW48flbzFTURvB6j9MCY8X338GL8 -Q1o0wvB0ghEL+VZ5SlGa56VJ/1yEZSIQkbzXsTNP19RNjwA+rKvYWG7pLaNgwVd3 -5KdNYa9lkrud0JdSZiTXpRU+shQZLzZQng0+ArK85L2co45mJImPChLloMZ14tfJ -lPag2yrlaHcVpWLxgBJCSqaJnDdesBZ88YIYaS0iMfEHxeJKDBlhwCGB5k21RgdR -WDWrvxWDJ+bAUuDjNN4XWix02gNhhRCWcDAiTY2HTmvOuRRze6f8yBkXxIA5YT8Q -xQM0SVUL+Wi5Sp/rIZdRiDSyHN3w519kPd9gcf/OD6/m5GeZiwPvA6RCPc3gpNfb -NSlF0qmOm+BtP6s8oBmYV2HtIyryL0O0VaVTAWGxwAfZo82nGcZ7DCg3ov29HQFR -O0UDrbJCYWTzjddfaNSOyFr39SFMyQIDAQABAoICAGzFhVU1S9qyBYa9CVA4zrig -x1OvU0Ag3rX60bbORJpapk94RPoPcd82gNtUhvjFSiYvu3voNNMzKvJIfXOMe2ys -CV6LgKjobWzaTbnYBNhKKjapptja4TSuO4UfToNw+5/QQa9s8t2XY+//LFnn/jZD -BP4Wq9GvXu7z4t6kgVOrdlGfHHzlHmD0U6CNNJdHpr7LVUY5j+K2qyQ8xFJkrDfx -KU2Q3qR4L4C2n60D3ADBKxUqPQECpVse+oAjwPRztKqrqBGeSv0V8jL94RS4BDxN -EH2jczEIyq7PgN4mX5Z+h/zOxtlqUizyrVOw7d8b3UlLuKWgdD89pN2sgwINIU5O -vEi2qQKSb96qJlT6G1s1IWznZO2fDQyZzWXySImYsGhOh16nsfTShuZa5iEB8znh -C+9QCNemjp24IiSQQlnK/0aFSU95nKyguKLhCOXTyyd4JavBD29UsbvBHgeCpNEX -vVkH+myXSZ68kpARH33DQYnAPWmvIJ1doJFicw2Nbk6gU/peN9lBsbpOynji7H2F -aho2/uxq6VKvbcj/JChCkcaFEPJPKlpHCoISnTCgorKV+hTVj9TW3fZiirllHPrd -AEpkuFleWoZq1LqyCwVa2QfUXOiq1RF7MZgClgkGtCz8RTFn7rMG2q2LAqVv8ice -f0e1nJ5bWBM8+xzQeUyxAoIBAQDSEUrg4YaQN8KFpMghF8N6w0V98HA/ufYcN78l -U5Vpst4WQu9xdV4AQaSaSJg0Y8+ddbxxiQmZPn7Mvjo6n52kd8y9/QcCnlHrOxOP -7aPjMJoloQ0q9mfXOyeAP1DlNBtg6GVWliv5CEOrFUGzqi9GfxF9mpuiUD9C9UNc -EKPnLuXEE+z6RsVkZ6nbRUDV2Dw4+vScGkXeJkBijSCCc7vvMWDqS5i7wA20/ymp -qK3upijmp9ELIsfNma8NZvxjGL2Q202WV1sROY9RmZ2Pw0xybNeUHSMBTJ5ZNAcX -somveEfhNyQHyLIG5WWkQ6yKm9eZ8Js1cLEEUoK1Pens9+RNAoIBAQDBCZNUoyL4 -Pgvm0N8M5vvos0/iK9mK6dQ+PyK9ulLwQ914lYFnsS4/5Bm3lMKhk7l75uOfCICS -xD5qj8adpCSzTi0FYZn5+v2YJV2dcq85LbS0X1C2tC0x/h+CKHeTAWedaHbolweR -iYJ8LUAh/OO0YDG4Y8NuNKYuu3GMfRtgaZ3ozwC9RrF3Stg4Mvz7mZ2YvIt9KAh4 -1G4zffXouksMUR3Ef0m435Q0GFZxbwG9d0S9xhKN9Z6y5m0AXv6JP0cRf5P2hDor -cj+RU5Lt1jOKpEa4JEklHyjkMLzGvilCnZcp3CCtgaxnVkPaxAQE1SzbImsCiuT6 -0ZeLk7rLVXhtAoIBAEpXVlF5Xp7BLPUQ2MZlW4ehfYFRStgynndj/yBdkR3j8Bae -v5A+gfLDSFZVZI/91qaZ2QOxBJWX+VXJ6F+Ax1BCGVIPLqDtmH4P3R9tcTdgpty0 -Py26IAuaYqNPRPioVRwZgmt/H7a1BwWPsz2695oSNAVoZrWtgoXWuGzncqnI/gVD -K/VygD7FNOZlfslvCfj8Tv7DiYBRty1pVlz0SkkfB3ZDJJ0lHXS1bHAfQtaXNRhT -wXYYa+9F+R5cGOuVANk30yBmb9MLtL2jCJCtA+vmuvAu04mNLOjSscPgKjvP7Fg2 -pGigzOgF3M+iJ3iyKD3RWYwzAmJONPWhIJz9KW0CggEBALlw53YRWhC8GlbcJMqS -Q0GisIUHtu55/wjlQgydp/tkVqf2h+ADF/kMuZ/garEp5rAeeGBYnJthpsxb3mOm -QhN2cE2RG4hIE8JLcwlEGQyG2efFARbjM9+ozsAM/AHfAZj3t/Ns0zp+rTl5hvif -pmerWVhXjnCaZ2LRcx2ROfF+xNHXulAaT3XYEqq6YMmbIwiNaDsNcEYTXOJeiRPP -kzT73+ralQnizs7U6oYCK9vFbeYZF84EPTaDogcPOtrCTRWER7I0CGWgVWzYqneM -pOr1MXLIePQOYQwFvfP2zlBOb98otsO3pNWIy5qnX2z9FSBWihJhN+Oz4S+W9YVM -UGkCggEAJrHCoLJC0BzQv169vujovOH65jSndngxN/KhDQPTElmKUz7XcUh68yiR -qWbYtNzcpuc0ocV0P2To2tFaUWhGDghjel+8uJG+OUSegkPRXSRyLOtEwx9+FLpw -aNlB3Yi80Lprm/s0crQ8f+zm/gV1LXqGhuflo/lnygMdgUU1RBvrZvohMVY87eck -07ib4myXmoG/U5Lf+Lvk399NySyy3equoboHu4Izci34CHnfKGZA8nHMUNDqYqGI -RP0xpZzlp0QZRgpOQOt2xXPsAO3h5nOPlYJa7Tb92i4lQEartEcDpZJPgC+Gr9Kt -9Q5v1aEquucco2QmPFOQPhHZaCJArQ== ------END PRIVATE KEY----- -""" -tls_cert = """ ------BEGIN CERTIFICATE----- -MIIFazCCA1OgAwIBAgIULRrJkdzV+cTqUpqdYYsFolVIJ2EwDQYJKoZIhvcNAQEL -BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM -GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMzAyMTcxMDUyMDlaFw0yODAy -MTYxMDUyMDlaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw -HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggIiMA0GCSqGSIb3DQEB -AQUAA4ICDwAwggIKAoICAQCeZuT0jXfIbCyIsrXkPaJ6SeqOlJyokAbGL1xcuPyq -FQo2KGzStG2B7vI9EX5vPA8PaNToUSUjC/92T+Y4Swl3cEY70F8Lw6toY2eA1m01 -sxgeSOyFDXzgmhdr7su+cVgLrBGh8Dus8BYosMcx6ytahFJmD7MjjeRbDhERi6Wl -C5N4KqFcPKh7kd+u0X3WmbQj/Ge+mSh2xJJBuXofMKhFBVZTyAa4Vc4pkoVqj2nC -MgBGYsDkoW48flbzFTURvB6j9MCY8X338GL8Q1o0wvB0ghEL+VZ5SlGa56VJ/1yE -ZSIQkbzXsTNP19RNjwA+rKvYWG7pLaNgwVd35KdNYa9lkrud0JdSZiTXpRU+shQZ -LzZQng0+ArK85L2co45mJImPChLloMZ14tfJlPag2yrlaHcVpWLxgBJCSqaJnDde -sBZ88YIYaS0iMfEHxeJKDBlhwCGB5k21RgdRWDWrvxWDJ+bAUuDjNN4XWix02gNh -hRCWcDAiTY2HTmvOuRRze6f8yBkXxIA5YT8QxQM0SVUL+Wi5Sp/rIZdRiDSyHN3w -519kPd9gcf/OD6/m5GeZiwPvA6RCPc3gpNfbNSlF0qmOm+BtP6s8oBmYV2HtIyry -L0O0VaVTAWGxwAfZo82nGcZ7DCg3ov29HQFRO0UDrbJCYWTzjddfaNSOyFr39SFM -yQIDAQABo1MwUTAdBgNVHQ4EFgQU2yd0UFYG6B+4d2hFm5VdxY+UvfkwHwYDVR0j -BBgwFoAU2yd0UFYG6B+4d2hFm5VdxY+UvfkwDwYDVR0TAQH/BAUwAwEB/zANBgkq -hkiG9w0BAQsFAAOCAgEAYdqK8lw5WwLWp3y0KIlR96WRaS5PxVxu6IljBOjFkMyo -mck3S2jpIWyP++D5njjPyvJ5Q9bfRni6mQcTXHbW3++NKDhDewpRMUpzMoR+0qYL -Q5/WvM/BcMf1zxMKawOAK7H4cNqA/DA9anZrF9Q607Fg8lhjF8V+/hvAM4xbEYKu -98JkTiYiWivJi36NgruF02RrpukD1xyOUpFwuzV31n7sTPikZcOKh3GJ43Ce7jtU -R6B11XyQKIkE3XDSBT61MQXV3x8CntYSchczOAmp+Oj+AxNA5XiZ0oc8oZvC9I7l -1rsA2szO21wQ5g78/hGKcPtNDfD9SBH7OTrsqyUzxmNVN/Tm4mrSPtThn5uf5T4h -ZNYEVhEyhfdQD7mmzurv9+w/f0VjwpN72/uqnS0XaRZNA86LYqwixo3W2R2Hu0f7 -jyKjqs2Z2r8hOa6NwM1RikuEuO0Id5ph/XHV8Pd6nwLHD36RZBgHJHo9Lp6DPOcS -+ZYdnEwcDCscOfr6Ec4GGw+I1C8q6ycFTMq66dHv1Bc8VSdacGHluBj10t6pSSxO -dCIt7ke/LD5PS3uzBkeMdEfPCN2EiU4Vc6/BIh3lj8GbmX9zU+/x34DvPhUsUUVi -kBAtL3ogKBt46B88cy8+bfBkZkMcURvotE+rOw4/eZQX07SbQQHksK3tlCyeHrQ= ------END CERTIFICATE----- -""" - -[log] -level = "debug" -directives = ["tungstenite=debug"] -tracing_appender = { rotation = "minutely", dir = "./", filename = "app.log" } diff --git a/examples/websocket/src/main.rs b/examples/websocket/src/main.rs deleted file mode 100644 index 2899abdd..00000000 --- a/examples/websocket/src/main.rs +++ /dev/null @@ -1,27 +0,0 @@ -use std::env; - -use tardis::basic::result::TardisResult; -use tardis::tokio; -use tardis::web::web_server::WebServerModule; -use tardis::web::ws_processor::TardisWebsocketMgrMessage; -use tardis::TardisFuns; - -use crate::processor::Page; - -mod processor; - -/// -/// Visit: https://127.0.0.1:8089/echo -/// Visit: https://127.0.0.1:8089/broadcast -/// -#[tokio::main] -async fn main() -> TardisResult<()> { - env::set_var("RUST_LOG", "info,tardis=trace"); - env::set_var("PROFILE", "default"); - // Initial configuration - TardisFuns::init(Some("config")).await?; - - TardisFuns::web_server().add_route(WebServerModule::from(Page).with_ws::(100)).await.start().await?; - TardisFuns::web_server().await; - Ok(()) -} diff --git a/examples/websocket/src/processor.rs b/examples/websocket/src/processor.rs deleted file mode 100644 index 5fb6ad01..00000000 --- a/examples/websocket/src/processor.rs +++ /dev/null @@ -1,156 +0,0 @@ -use std::collections::HashMap; -use std::sync::Arc; - -use serde::{Deserialize, Serialize}; -use tardis::basic::result::TardisResult; -use tardis::tokio::sync::broadcast::Sender; -use tardis::web::poem::web::websocket::BoxWebSocketUpgraded; -use tardis::web::poem::web::{websocket::WebSocket, Data, Path}; -use tardis::web::poem_openapi::payload::Html; -use tardis::web::poem_openapi::{self}; -use tardis::web::ws_processor::{ws_echo, TardisWebsocketMgrMessage, TardisWebsocketResp, WsBroadcast, WsBroadcastContext, WsHooks}; -use tardis::TardisFuns; -#[derive(Debug, Clone)] -pub struct Page; - -#[poem_openapi::OpenApi] -impl Page { - #[oai(path = "/echo", method = "get")] - async fn echo(&self) -> Html<&'static str> { - Html( - r##" - -
- Name: - -
- - - - - - - "##, - ) - } - - #[oai(path = "/broadcast", method = "get")] - async fn broadcast(&self) -> Html<&'static str> { - Html( - r##" - -
- Name: - -
- - - - - - - "##, - ) - } - - #[oai(path = "/ws/echo/:name", method = "get")] - async fn ws_echo(&self, name: Path, websocket: WebSocket) -> BoxWebSocketUpgraded { - ws_echo( - name.0, - HashMap::new(), - websocket, - |req_session, msg, _| async move { - let resp = format!("echo:{msg} by {req_session}"); - Some(resp) - }, - |_, _| async move {}, - ) - } - - #[oai(path = "/ws/broadcast/:name", method = "get")] - async fn ws_broadcast(&self, name: Path, websocket: WebSocket, sender: Data<&Sender>>) -> BoxWebSocketUpgraded { - pub struct Hooks { - ext: HashMap, - } - impl WsHooks for Hooks { - async fn on_process(&self, req: tardis::web::ws_processor::TardisWebsocketReq, _context: &WsBroadcastContext) -> Option { - let example_msg = TardisFuns::json.json_to_obj::(req.msg).unwrap(); - Some(TardisWebsocketResp { - msg: TardisFuns::json.obj_to_json(&TardisResult::Ok(format!("echo:{}, ext info:{}", example_msg.msg, self.ext.get("some_key").unwrap()))).unwrap(), - to_avatars: if example_msg.to.is_empty() { vec![] } else { vec![example_msg.to] }, - ignore_avatars: vec![], - }) - } - } - let hooks = Hooks { - ext: HashMap::from([("some_key".to_string(), "ext_value".to_string())]), - }; - WsBroadcast::new(sender.clone(), hooks, WsBroadcastContext::new(false, true)).run(vec![name.0], websocket).await - } -} - -#[derive(Deserialize, Serialize)] -pub struct WebsocketExample { - pub msg: String, - pub to: String, -} diff --git a/tardis/Cargo.toml b/tardis/Cargo.toml index 3fcde467..7da328a5 100644 --- a/tardis/Cargo.toml +++ b/tardis/Cargo.toml @@ -305,10 +305,6 @@ required-features = [ name = "test_web_client" required-features = ["test", "web-client"] -[[test]] -name = "test_websocket" -required-features = ["test", "web-server", "ws-client"] - [[test]] name = "test_cache_client" required-features = ["test", "cache"] @@ -333,10 +329,6 @@ required-features = ["test", "os"] name = "test_basic_tracing" required-features = ["test", "tracing"] -[[test]] -name = "test_cluster" -required-features = ["test", "cluster", "k8s"] - [[bench]] name = "json_benchmark" harness = false diff --git a/tardis/src/cluster.rs b/tardis/src/cluster.rs deleted file mode 100644 index c786469e..00000000 --- a/tardis/src/cluster.rs +++ /dev/null @@ -1,13 +0,0 @@ -/// Broadcast channel between cluster nodes. -pub mod cluster_broadcast; -/// Sync map between cluster nodes. -pub mod cluster_hashmap; -/// Cluster processor. -pub mod cluster_processor; -/// Event publish -pub mod cluster_publish; -/// Event receive -pub mod cluster_receive; -mod cluster_watch_by_cache; -#[cfg(feature = "k8s")] -mod cluster_watch_by_k8s; diff --git a/tardis/src/cluster/cluster_broadcast.rs b/tardis/src/cluster/cluster_broadcast.rs deleted file mode 100644 index 1718cec7..00000000 --- a/tardis/src/cluster/cluster_broadcast.rs +++ /dev/null @@ -1,110 +0,0 @@ -use std::sync::{Arc, Weak}; - -use serde_json::Value; -use tokio::sync::broadcast; - -use crate::basic::{error::TardisError, result::TardisResult}; - -use super::{ - cluster_processor::{peer_count, subscribe_if_not_exist, unsubscribe, ClusterEventTarget, ClusterHandler, TardisClusterMessageReq}, - cluster_publish::publish_event_no_response, -}; - -pub struct ClusterBroadcastChannel -where - T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug, -{ - pub ident: String, - pub local_broadcast_channel: broadcast::Sender>, -} - -impl ClusterBroadcastChannel -where - T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug, -{ - pub fn event_name(&self) -> String { - format!("tardis/broadcast/{}", self.ident) - } - pub async fn send(&self, message: T) -> TardisResult<()> { - match self.local_broadcast_channel.send(message.clone().into()) { - Ok(size) => { - tracing::trace!("[Tardis.Cluster] broadcast channel send to {size} local subscribers"); - } - Err(result) => { - tracing::error!("[Tardis.Cluster] broadcast channel send error: {:?}", result); - } - } - let event = format!("tardis/broadcast/{}", self.ident); - let json = serde_json::to_value(message).map_err(|e| TardisError::internal_error(&e.to_string(), ""))?; - if peer_count().await != 0 { - let _ = publish_event_no_response(event, json, ClusterEventTarget::Broadcast).await?; - } - Ok(()) - } - pub fn new(ident: impl Into, capacity: usize) -> Arc { - let sender = broadcast::Sender::new(capacity); - let cluster_chan = Arc::new(Self { - ident: ident.into(), - local_broadcast_channel: sender, - }); - tracing::trace!("[Tardis.Cluster] create broadcast channel: {}", cluster_chan.event_name()); - let subscriber = BroadcastChannelSubscriber { - channel: Arc::downgrade(&cluster_chan), - event_name: cluster_chan.event_name(), - }; - tokio::spawn(async { - subscribe_if_not_exist(subscriber).await; - }); - cluster_chan - } -} - -impl Drop for ClusterBroadcastChannel -where - T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug, -{ - fn drop(&mut self) { - let event_name = self.event_name(); - tokio::spawn(async move { - unsubscribe(&event_name).await; - }); - } -} - -impl std::ops::Deref for ClusterBroadcastChannel -where - T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug, -{ - type Target = broadcast::Sender>; - - fn deref(&self) -> &Self::Target { - &self.local_broadcast_channel - } -} - -pub struct BroadcastChannelSubscriber -where - T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug, -{ - event_name: String, - channel: Weak>, -} - -impl ClusterHandler for BroadcastChannelSubscriber -where - T: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug, -{ - fn event_name(&self) -> String { - self.event_name.to_string() - } - async fn handle(self: Arc, message_req: TardisClusterMessageReq) -> TardisResult> { - if let Ok(message) = serde_json::from_value(message_req.msg) { - if let Some(chan) = self.channel.upgrade() { - let _ = chan.local_broadcast_channel.send(Arc::new(message)); - } else { - unsubscribe(&self.event_name()).await; - } - } - Ok(None) - } -} diff --git a/tardis/src/cluster/cluster_hashmap.rs b/tardis/src/cluster/cluster_hashmap.rs deleted file mode 100644 index 5ba1d55a..00000000 --- a/tardis/src/cluster/cluster_hashmap.rs +++ /dev/null @@ -1,276 +0,0 @@ -use std::{ - collections::HashMap, - fmt, - sync::Arc, - time::{Duration, Instant}, -}; - -use crate::{ - basic::{json::TardisJson, result::TardisResult}, - TardisFuns, -}; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use std::hash::Hash; -use tokio::sync::RwLock; - -use super::{ - cluster_processor::{peer_count, ClusterEventTarget, ClusterHandler, TardisClusterMessageReq}, - cluster_publish::{publish_event_no_response, ClusterEvent}, - cluster_receive::listen::Stream, -}; - -// Cshm = ClusterStaticHashMap -#[derive(Clone)] -pub struct ClusterStaticHashMap { - pub map: Arc>>, - pub ident: &'static str, - pub cluster_sync: bool, - pub modify_handler: Arc>>, -} - -impl fmt::Debug for ClusterStaticHashMap { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ClusterStaticHashMap").field("ident", &self.ident).field("cluster_sync", &self.cluster_sync).finish() - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -enum CshmEvent { - Insert(Vec<(K, V)>), - Remove { keys: Vec }, - Get { key: K }, - Modify { key: K, mapper: String, modify: Value }, -} - -pub struct ClusterStaticHashMapBuilder { - ident: &'static str, - cluster_sync: bool, - modify_handler: HashMap>, - _phantom: std::marker::PhantomData<(K, V)>, -} - -impl ClusterStaticHashMapBuilder { - pub fn new(ident: &'static str) -> Self { - Self { - ident, - cluster_sync: true, - modify_handler: HashMap::new(), - _phantom: std::marker::PhantomData, - } - } - pub fn sync(mut self, cluster_sync: bool) -> Self { - self.cluster_sync = cluster_sync; - self - } - pub fn modify_handler(mut self, mapper: &'static str, handler: impl Fn(&mut V, &Value) + Send + Sync + 'static) -> Self { - self.modify_handler.insert(mapper.to_string(), Box::new(handler)); - self - } - pub fn build(self) -> ClusterStaticHashMap { - ClusterStaticHashMap { - map: Arc::new(RwLock::new(HashMap::new())), - ident: self.ident, - cluster_sync: self.cluster_sync, - modify_handler: Arc::new(self.modify_handler), - } - } -} - -impl ClusterStaticHashMap -where - K: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + Hash + Eq, - V: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned, -{ - pub fn builder(ident: &'static str) -> ClusterStaticHashMapBuilder { - ClusterStaticHashMapBuilder::new(ident) - } - - pub fn new(ident: &'static str) -> Self { - Self { - map: Arc::new(RwLock::new(HashMap::new())), - ident, - cluster_sync: true, - modify_handler: Arc::new(HashMap::new()), - } - } - pub fn new_standalone(ident: &'static str) -> Self { - Self { - map: Arc::new(RwLock::new(HashMap::new())), - ident, - cluster_sync: false, - modify_handler: Arc::new(HashMap::new()), - } - } - pub fn is_cluster(&self) -> bool { - self.cluster_sync && TardisFuns::fw_config().cluster.is_some() - } - pub fn event_name(&self) -> String { - format!("tardis/hashmap/{ident}", ident = self.ident) - } - pub fn local(&self) -> &RwLock> { - &self.map - } - pub async fn insert(&self, key: K, value: V) -> TardisResult<()> { - self.map.write().await.insert(key.clone(), value.clone()); - if self.is_cluster() { - let event = CshmEvent::::Insert(vec![(key, value)]); - let json = TardisJson.obj_to_json(&event)?; - let event_name = self.event_name(); - tokio::spawn(async move { - let _result = publish_event_no_response(event_name, json, ClusterEventTarget::Broadcast).await; - }); - } - Ok(()) - } - pub async fn batch_insert(&self, pairs: Vec<(K, V)>) -> TardisResult<()> { - { - let mut wg = self.map.write().await; - for (key, value) in pairs.iter() { - wg.insert(key.clone(), value.clone()); - } - } - if self.is_cluster() { - let event = CshmEvent::::Insert(pairs); - let json = TardisJson.obj_to_json(&event)?; - let event_name = self.event_name(); - tokio::spawn(async move { - let _result = publish_event_no_response(event_name, json, ClusterEventTarget::Broadcast).await; - }); - } - Ok(()) - } - pub async fn remove(&self, key: K) -> TardisResult<()> { - self.map.write().await.remove(&key); - if self.is_cluster() { - let event = CshmEvent::::Remove { keys: vec![key] }; - let json = TardisJson.obj_to_json(&event)?; - let event_name = self.event_name(); - tokio::spawn(async move { - let _result = publish_event_no_response(event_name, json, ClusterEventTarget::Broadcast).await; - }); - } - Ok(()) - } - pub async fn batch_remove(&self, keys: Vec) -> TardisResult<()> { - { - let mut wg = self.map.write().await; - for key in keys.iter() { - wg.remove(key); - } - } - if self.is_cluster() { - let event = CshmEvent::::Remove { keys }; - let json = TardisJson.obj_to_json(&event)?; - let event_name = self.event_name(); - tokio::spawn(async move { - let _result = publish_event_no_response(event_name, json, ClusterEventTarget::Broadcast).await; - }); - } - Ok(()) - } - pub async fn get(&self, key: K) -> TardisResult> { - if let Some(v) = self.map.read().await.get(&key) { - Ok(Some(v.clone())) - } else { - self.get_remote(key.clone()).await - } - } - async fn get_remote(&self, key: K) -> TardisResult> { - if !self.is_cluster() { - return Ok(None); - } - let peer_count = peer_count().await; - if peer_count == 0 { - return Ok(None); - } - let Ok(mut receiver) = ClusterEvent::new(self.event_name()) - .message(&CshmEvent::::Get { key }) - .expect("not valid json value") - .listener(Stream) - .target(ClusterEventTarget::Broadcast) - .publish() - .await - else { - return Ok(None); - }; - - let create_time = Instant::now(); - let mut count = 0; - while let Some(resp) = receiver.recv().await { - if let Ok(Some(v)) = TardisJson.json_to_obj::>(resp.msg) { - return Ok(Some(v)); - } - count += 1; - if count >= peer_count { - return Ok(None); - } - if create_time.elapsed() > Duration::from_secs(1) { - return Ok(None); - } - } - Ok(None) - } - pub async fn modify(&self, key: K, mapper: &'static str, modify: Value) -> TardisResult<()> { - let mapper = mapper.to_string(); - let mut wg = self.map.write().await; - if let Some(v) = wg.get_mut(&key) { - if let Some(handler) = self.modify_handler.get(&mapper) { - handler(v, &modify); - } - } - drop(wg); - if self.is_cluster() { - let event = CshmEvent::::Modify { key, mapper, modify }; - let json = TardisJson.obj_to_json(&event)?; - let event_name = self.event_name(); - tokio::spawn(async move { - let _result = publish_event_no_response(event_name, json, ClusterEventTarget::Broadcast).await; - }); - } - Ok(()) - } -} - -impl ClusterHandler for ClusterStaticHashMap -where - K: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + Hash + Eq, - V: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned, -{ - async fn handle(self: Arc, message: TardisClusterMessageReq) -> TardisResult> { - let event: CshmEvent = TardisJson.json_to_obj(message.msg)?; - match event { - CshmEvent::Insert(pairs) => { - let mut wg = self.map.write().await; - for (key, value) in pairs { - wg.insert(key, value); - } - Ok(None) - } - CshmEvent::Remove { keys } => { - let mut wg = self.map.write().await; - for key in keys { - wg.remove(&key); - } - Ok(None) - } - CshmEvent::Get { key } => { - let rg = self.map.read().await; - let value = rg.get(&key); - Ok(Some(TardisJson.obj_to_json(&value)?)) - } - CshmEvent::Modify { key, mapper, modify } => { - let mut wg = self.map.write().await; - if let Some(v) = wg.get_mut(&key) { - if let Some(handler) = self.modify_handler.get(&mapper) { - handler(v, &modify); - } - } - Ok(None) - } - } - } - fn event_name(&self) -> String { - ClusterStaticHashMap::event_name(self) - } -} diff --git a/tardis/src/cluster/cluster_processor.rs b/tardis/src/cluster/cluster_processor.rs deleted file mode 100644 index bcfa14f3..00000000 --- a/tardis/src/cluster/cluster_processor.rs +++ /dev/null @@ -1,497 +0,0 @@ -use std::collections::{HashMap, HashSet}; -use std::net::SocketAddr; -use std::pin::Pin; -use std::sync::{Arc, OnceLock}; -use std::time::Duration; - -use futures::Future; -use futures_util::{SinkExt, StreamExt}; -use poem::web::websocket::{BoxWebSocketUpgraded, Message, WebSocket}; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use tokio::sync::{mpsc, RwLock}; -use tracing::{debug, error, info, instrument, trace, warn}; - -use crate::basic::error::TardisError; -use crate::cluster::cluster_publish::ClusterEvent; -use crate::cluster::cluster_receive::init_response_dispatcher; -use crate::cluster::cluster_watch_by_cache; -#[cfg(feature = "k8s")] -use crate::cluster::cluster_watch_by_k8s; -use crate::config::config_dto::FrameworkConfig; -use crate::tardis_static; -use crate::web::web_server::status_api::TardisStatus; -use crate::web::web_server::TardisWebServer; -use crate::web::ws_client::TardisWSClient; -use crate::web::ws_processor::ws_insts_mapping_avatars; -// use crate::web::ws_processor::cluster_protocol::Avatar; -use crate::{basic::result::TardisResult, TardisFuns}; - -pub const CLUSTER_NODE_WHOAMI: &str = "__cluster_node_who_am_i__"; -/// cluster ping event -pub const EVENT_PING: &str = "tardis/ping"; -/// cluster status check event -pub const EVENT_STATUS: &str = "tardis/status"; -pub const CLUSTER_MESSAGE_CACHE_SIZE: usize = 10000; -pub const WHOAMI_TIMEOUT: Duration = Duration::from_secs(30); - -tardis_static! { - pub async set local_socket_addr: SocketAddr; - pub async set local_node_id: String; - pub async set responser_dispatcher: mpsc::Sender; - pub(crate) cache_nodes: Arc>>; - pub(crate) subscribers: Arc>>; -} - -/// clone the cache_nodes_info at current time -pub async fn load_cache_nodes_info() -> HashMap { - cache_nodes().read().await.clone() -} - -pub async fn peer_count() -> usize { - let local_node_id = local_node_id().await; - cache_nodes().read().await.keys().filter(|k| if let ClusterRemoteNodeKey::NodeId(id) = k { id != local_node_id } else { false }).count() -} - -#[derive(Debug, Clone, Eq, Hash, PartialEq)] -pub enum ClusterRemoteNodeKey { - SocketAddr(SocketAddr), - NodeId(String), -} - -impl From for ClusterRemoteNodeKey { - fn from(val: SocketAddr) -> Self { - ClusterRemoteNodeKey::SocketAddr(val) - } -} - -impl From for ClusterRemoteNodeKey { - fn from(val: String) -> Self { - ClusterRemoteNodeKey::NodeId(val) - } -} - -impl ClusterRemoteNodeKey { - pub fn as_socket_addr(&self) -> Option { - match self { - ClusterRemoteNodeKey::SocketAddr(socket_addr) => Some(*socket_addr), - _ => None, - } - } - pub fn as_node_id(&self) -> Option { - match self { - ClusterRemoteNodeKey::NodeId(node_id) => Some(node_id.clone()), - _ => None, - } - } -} - -impl std::fmt::Display for ClusterRemoteNodeKey { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ClusterRemoteNodeKey::SocketAddr(socket_addr) => write!(f, "{}", socket_addr), - ClusterRemoteNodeKey::NodeId(node_id) => write!(f, "[id]{}", node_id), - } - } -} - -pub type ClusterMessageId = String; - -/// Cluster event subscriber trait, a subscriber object can be registered to the cluster event system and respond to the event -/// -/// # Register -/// see [`subscribe`], [`subscribe_boxed`] and [`subscribe_if_not_exist`] -pub trait ClusterHandler: Send + Sync + 'static { - fn event_name(&self) -> String; - fn handle(self: Arc, message_req: TardisClusterMessageReq) -> impl Future>> + Send; -} - -pub struct ClusterHandlerObj { - pub event_name: String, - pub handle: Box Pin>> + Send>> + Send + Sync>, -} -impl ClusterHandlerObj { - pub fn new(handler: H) -> Self { - let acred = Arc::new(handler); - Self { - event_name: acred.event_name(), - handle: Box::new(move |message_req| { - let cloned = acred.clone(); - let fut = cloned.handle(message_req); - Box::pin(fut) - }), - } - } -} - -#[derive(Debug, Clone, Default)] -pub enum ClusterEventTarget { - #[default] - /// broadcast to all known nodes that id is known - Broadcast, - /// to single remote node - Single(ClusterRemoteNodeKey), - /// to multi nodes - Multi(Vec), - /// raw client - Client(Arc), -} - -impl ClusterEventTarget { - pub fn multi, I: IntoIterator>(iter: I) -> Self { - ClusterEventTarget::Multi(iter.into_iter().map(|v| v.into()).collect()) - } -} - -impl From for ClusterEventTarget { - fn from(val: SocketAddr) -> Self { - ClusterEventTarget::Single(ClusterRemoteNodeKey::SocketAddr(val)) - } -} - -impl From for ClusterEventTarget { - fn from(val: String) -> Self { - ClusterEventTarget::Single(ClusterRemoteNodeKey::NodeId(val)) - } -} - -impl<'s> From<&'s str> for ClusterEventTarget { - fn from(val: &'s str) -> Self { - ClusterEventTarget::Single(ClusterRemoteNodeKey::NodeId(val.to_string())) - } -} - -impl> From> for ClusterEventTarget { - fn from(val: Vec) -> Self { - ClusterEventTarget::Multi(val.into_iter().map(|id| ClusterRemoteNodeKey::NodeId(id.into())).collect::>()) - } -} - -impl From> for ClusterEventTarget { - fn from(val: Arc) -> Self { - ClusterEventTarget::Client(val) - } -} - -struct EventPing; - -impl ClusterHandler for EventPing { - fn event_name(&self) -> String { - EVENT_PING.to_string() - } - async fn handle(self: Arc, _message_req: TardisClusterMessageReq) -> TardisResult> { - Ok(Some(serde_json::to_value(local_node_id().await).expect("spec always be a valid json value"))) - } -} - -pub(crate) struct EventStatus; - -impl ClusterHandler for EventStatus { - fn event_name(&self) -> String { - EVENT_STATUS.to_string() - } - - async fn handle(self: Arc, _message_req: TardisClusterMessageReq) -> TardisResult> { - Ok(Some(serde_json::to_value(TardisStatus::fetch().await).expect("status always be a valid json value"))) - } -} - -impl EventStatus { - pub async fn get_by_id(cluster_id: &str) -> TardisResult { - if cluster_id == *local_node_id().await { - Ok(TardisStatus::fetch().await) - } else { - let resp = publish_event_one_response( - EventStatus.event_name(), - Default::default(), - ClusterEventTarget::Single(ClusterRemoteNodeKey::NodeId(cluster_id.to_string())), - None, - ) - .await?; - serde_json::from_value(resp.msg).map_err(|e| { - let error_info = format!("[Tardis.Cluster] [Client] receive message error: {e}"); - TardisError::wrap(&error_info, "-1-tardis-cluster-receive-message-error") - }) - } - } -} - -pub async fn init_by_conf(conf: &FrameworkConfig, cluster_server: &TardisWebServer) -> TardisResult<()> { - if let Some(cluster_config) = &conf.cluster { - let web_server_config = conf.web_server.as_ref().expect("missing web server config"); - let access_host = web_server_config.access_host.unwrap_or(web_server_config.host); - let access_port = web_server_config.access_port.unwrap_or(web_server_config.port); - let access_addr = SocketAddr::new(access_host, access_port); - info!("[Tardis.Cluster] Initializing cluster"); - init_node(cluster_server, access_addr).await?; - match cluster_config.watch_kind.to_lowercase().as_str() { - #[cfg(feature = "k8s")] - "k8s" => { - info!("[Tardis.Cluster] Initializing cluster by k8s watch"); - cluster_watch_by_k8s::init(cluster_config, web_server_config).await?; - } - "cache" => { - info!("[Tardis.Cluster] Initializing cluster by default watch"); - cluster_watch_by_cache::init(cluster_config, web_server_config).await?; - } - _ => panic!("[Tardis.Cluster] Unsupported cluster watch kind: {}", cluster_config.watch_kind), - } - info!("[Tardis.Cluster] Initialized cluster"); - } - Ok(()) -} - -async fn init_node(cluster_server: &TardisWebServer, access_addr: SocketAddr) -> TardisResult<()> { - info!("[Tardis.Cluster] Initializing node"); - set_local_node_id(TardisFuns::field.nanoid()); - set_local_socket_addr(access_addr); - debug!("[Tardis.Cluster] Initializing response dispatcher"); - set_responser_dispatcher(init_response_dispatcher()); - debug!("[Tardis.Cluster] Register exchange route"); - cluster_server.add_route(ClusterAPI).await; - - debug!("[Tardis.Cluster] Register default events"); - subscribe(EventPing).await; - #[cfg(feature = "web-server")] - { - subscribe(EventStatus).await; - subscribe(ws_insts_mapping_avatars().clone()).await; - } - - info!("[Tardis.Cluster] Initialized node"); - Ok(()) -} - -#[instrument] -pub async fn refresh_nodes(active_nodes: &HashSet) -> TardisResult<()> { - trace!("[Tardis.Cluster] Refreshing nodes"); - trace!("[Tardis.Cluster] Find all active nodes: {:?}", active_nodes); - let mut cache_nodes = cache_nodes().write().await; - let socket_set = cache_nodes.keys().filter_map(ClusterRemoteNodeKey::as_socket_addr).collect::>(); - // remove inactive nodes - for inactive_node in socket_set.difference(active_nodes) { - if let Some(remote) = cache_nodes.remove(&ClusterRemoteNodeKey::SocketAddr(*inactive_node)) { - // load_cache_nodes_info() - info!("[Tardis.Cluster] remove inactive node {remote:?} from cache"); - cache_nodes.remove(&ClusterRemoteNodeKey::NodeId(remote.node_id)); - // TODO - // be nice to the server, close the connection - // remote.client - } - } - // add new nodes - for new_nodes_addr in active_nodes.difference(&socket_set) { - if local_socket_addr().await == new_nodes_addr { - // skip local node - continue; - } - let remote = add_remote_node(*new_nodes_addr).await?; - info!("[Tardis.Cluster] New remote nodes: {remote:?}"); - - cache_nodes.insert(ClusterRemoteNodeKey::SocketAddr(*new_nodes_addr), remote.clone()); - cache_nodes.insert(ClusterRemoteNodeKey::NodeId(remote.node_id.clone()), remote); - } - let mut table = String::new(); - for (k, v) in cache_nodes.iter() { - use std::fmt::Write; - if matches!(k, ClusterRemoteNodeKey::NodeId(_)) { - writeln!(&mut table, "{k:20} | {v:40} ").expect("shouldn't fail"); - } - } - Ok(()) -} - -async fn add_remote_node(socket_addr: SocketAddr) -> TardisResult { - if *local_socket_addr().await == socket_addr { - return Err(TardisError::wrap( - &format!("[Tardis.Cluster] [Client] add remote node {socket_addr}: can't add local node"), - "-1-tardis-cluster-add-remote-node-error", - )); - } - debug!("[Tardis.Cluster] Connect node: {socket_addr}"); - // is this node - let client = TardisFuns::ws_client(&format!("ws://{socket_addr}/tardis/cluster/ws/exchange"), move |message| async move { - if let tokio_tungstenite::tungstenite::Message::Text(message) = message { - match TardisFuns::json.str_to_obj::(&message) { - Ok(message_resp) => { - if let Err(error) = responser_dispatcher().await.send(message_resp).await { - error!("[Tardis.Cluster] [Client] response message {message}: {error}"); - } - } - Err(error) => error!("[Tardis.Cluster] [Client] response message {message}: {error}"), - } - } - None - }) - .await?; - let client = Arc::new(client); - let resp = ClusterEvent::new(EVENT_PING).target(client.clone()).one_response(Some(WHOAMI_TIMEOUT)).publish_one_response().await?; - let resp_node_id = resp.resp_node_id; - let remote = TardisClusterNodeRemote { node_id: resp_node_id, client }; - Ok(remote) -} - -/// subscribe a boxed cluster event -pub async fn subscribe_boxed(handler: ClusterHandlerObj) { - let event_name = handler.event_name.clone(); - info!("[Tardis.Cluster] [Server] subscribe event {event_name}"); - subscribers().write().await.insert(event_name, handler); -} - -/// subscribe a cluster event -pub async fn subscribe(handler: H) { - subscribe_boxed(ClusterHandlerObj::new(handler)).await; -} - -/// subscribe a cluster event if not exist -pub async fn subscribe_if_not_exist(handler: H) { - let mut wg = subscribers().write().await; - let event_name = handler.event_name(); - #[allow(clippy::map_entry)] - if !wg.contains_key(&event_name) { - info!("[Tardis.Cluster] [Server] subscribe event {event_name}"); - wg.insert(event_name, ClusterHandlerObj::new(handler)); - } -} - -/// unsubscribe a cluster event -pub async fn unsubscribe(event_name: &str) { - info!("[Tardis.Cluster] [Server] unsubscribe event {event_name}"); - subscribers().write().await.remove(event_name); -} - -/// a request message for cluster -#[derive(Deserialize, Serialize, Clone, Debug)] -pub struct TardisClusterMessageReq { - pub(crate) msg_id: String, - pub req_node_id: String, - pub msg: Value, - pub event: String, -} - -impl TardisClusterMessageReq { - pub fn new(msg: Value, event: String, req_node_id: String) -> Self { - Self { - msg_id: TardisFuns::field.nanoid(), - req_node_id, - msg, - event, - } - } - - pub fn msg_id(&self) -> String { - self.msg_id.to_string() - } -} - -/// a response message for cluster - -#[derive(Deserialize, Serialize, Clone, Debug)] -pub struct TardisClusterMessageResp { - pub(crate) msg_id: String, - pub resp_node_id: String, - pub msg: Value, -} - -impl TardisClusterMessageResp { - pub fn new(msg: Value, msg_id: String, resp_node_id: String) -> Self { - Self { msg_id, msg, resp_node_id } - } - - pub fn msg_id(&self) -> String { - self.msg_id.to_string() - } -} -#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] -pub struct TardisClusterNodeSpecifier { - pub id: String, - pub socket_addr: SocketAddr, -} - -pub struct TardisClusterNodeLocal { - pub spec: TardisClusterNodeSpecifier, -} - -#[derive(Debug, Clone)] -pub struct TardisClusterNodeRemote { - pub node_id: String, - pub client: Arc, -} - -impl std::fmt::Display for TardisClusterNodeRemote { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{is_online} / {node_id} / {url}", - is_online = if self.client.is_connected() { "online" } else { "offline" }, - node_id = self.node_id, - url = self.client.url - ) - } -} -pub enum TardisClusterNode { - Local(TardisClusterNodeLocal), - Remote(TardisClusterNodeRemote), -} - -impl TardisClusterNode {} - -use std::hash::Hash; - -use super::cluster_publish::publish_event_one_response; - -#[derive(Debug, Clone)] -struct ClusterAPI; - -#[poem_openapi::OpenApi] -impl ClusterAPI { - #[oai(path = "/tardis/cluster/ws/exchange", method = "get")] - async fn exchange(&self, websocket: WebSocket) -> BoxWebSocketUpgraded { - websocket - .on_upgrade(|mut socket| async move { - while let Some(Ok(ws_message)) = socket.next().await { - match ws_message { - Message::Text(ws_message) => { - trace!("[Tardis.Cluster] [Server] receive message {ws_message}"); - match TardisFuns::json.str_to_obj::(&ws_message) { - Ok(message_req) => { - if let Some(subscriber) = subscribers().read().await.get(message_req.event.as_str()) { - let msg_id = message_req.msg_id(); - match (subscriber.handle)(message_req).await { - Ok(Some(message_resp)) => { - if let Err(error) = socket - .send(Message::Text( - TardisFuns::json - .obj_to_string(&TardisClusterMessageResp::new(message_resp.clone(), msg_id, local_node_id().await.to_string())) - .expect("ignore"), - )) - .await - { - error!("[Tardis.Cluster] [Server] response message {message_resp:?}: {error}"); - break; - } - } - Ok(None) => {} - Err(error) => { - warn!("[Tardis.Cluster] [Server] subscribe function by message {ws_message:?}: {error}"); - } - } - } else { - warn!("[Tardis.Cluster] [Server] receive message {ws_message}: subscribe not found"); - } - } - Err(error) => error!("[Tardis.Cluster] [Server] send message {ws_message}: {error}"), - } - } - Message::Close(ws_message) => { - trace!("[Tardis.Cluster] [Server] message receive: close {:?}", ws_message); - } - _ => { - warn!("[Tardis.Cluster] [Server] message receive: the type is not implemented"); - } - } - } - }) - .boxed() - } -} diff --git a/tardis/src/cluster/cluster_publish.rs b/tardis/src/cluster/cluster_publish.rs deleted file mode 100644 index 4ac834fe..00000000 --- a/tardis/src/cluster/cluster_publish.rs +++ /dev/null @@ -1,213 +0,0 @@ -use std::borrow::Cow; -use std::sync::Arc; -use std::time::Duration; - -use super::cluster_processor::{TardisClusterMessageReq, TardisClusterMessageResp}; -use super::cluster_receive::{listen::*, listen_reply}; -use crate::cluster::cluster_processor::{cache_nodes, local_node_id, ClusterEventTarget, ClusterRemoteNodeKey}; -use crate::{ - basic::{error::TardisError, result::TardisResult}, - web::ws_client::TardisWSClient, - TardisFuns, -}; -use futures::future::join_all; -use serde::Serialize; -use serde_json::Value; -use tracing::{error, trace}; - -/// Cluster-wide event -/// -/// `` is the listener type, default is [`Once`], which implies that the response message will be received only once. -/// -/// # Example -/// ``` -/// # use tardis::cluster::cluster_publish::ClusterEvent; -/// let event = ClusterEvent::new("hello").no_response().message(&("hello", "world")); -/// ``` -#[derive(Debug, Clone)] -pub struct ClusterEvent { - event: Cow<'static, str>, - message: Value, - target: ClusterEventTarget, - listener: L, -} - -impl ClusterEvent { - pub fn new(event: impl Into>) -> Self { - Self { - event: event.into(), - message: Value::Null, - target: ClusterEventTarget::Broadcast, - listener: Once { timeout: None }, - } - } -} - -impl ClusterEvent { - pub fn listener(self, listener: L2) -> ClusterEvent { - ClusterEvent { - event: self.event, - message: self.message, - target: self.target, - listener, - } - } - /// Set the listener to receive only one response. - pub fn one_response(self, timeout: Option) -> ClusterEvent { - ClusterEvent { - event: self.event, - message: self.message, - target: self.target, - listener: Once { timeout }, - } - } - /// Don't expect any response. - pub fn no_response(self) -> ClusterEvent { - ClusterEvent { - event: self.event, - message: self.message, - target: self.target, - listener: Never, - } - } - /// Set the message of the event. - pub fn message(self, message: &T) -> TardisResult { - Ok(Self { - message: crate::TardisFuns::json.obj_to_json(message)?, - ..self - }) - } - pub fn json_message(self, message: Value) -> Self { - Self { message, ..self } - } - /// Set the target of the event. - /// - /// see [`ClusterEventTarget`] - pub fn target(self, target: impl Into) -> Self { - Self { target: target.into(), ..self } - } -} - -impl ClusterEvent { - /// Publish the event and receive only one response. - pub async fn publish_one_response(self) -> TardisResult { - publish_event_with_listener(self.event, self.message, self.target, self.listener).await?.await.map_err(|e| { - let error_info = format!("[Tardis.Cluster] [Client] Oneshot receive error: {e}, this may caused by timeout"); - tracing::error!("{error_info}"); - TardisError::wrap(&error_info, "-1-tardis-cluster-receive-message-error") - }) - } -} - -impl ClusterEvent { - /// Publish the event. - pub async fn publish(self) -> TardisResult { - publish_event_with_listener(self.event, self.message, self.target, self.listener).await - } -} - -/// Publish an event with no response. -pub async fn publish_event_no_response(event: impl Into>, message: Value, target: impl Into) -> TardisResult { - publish_event_with_listener(event, message, target, Never).await -} - -/// Publish an event and receive only one response. -pub async fn publish_event_one_response( - event: impl Into>, - message: Value, - target: impl Into, - timeout: Option, -) -> TardisResult { - publish_event_with_listener(event, message, target, Once { timeout }).await?.await.map_err(|e| { - let error_info = format!("[Tardis.Cluster] [Client] Oneshot receive error: {e}, this may caused by timeout"); - tracing::error!("{error_info}"); - TardisError::wrap(&error_info, "-1-tardis-cluster-receive-message-error") - }) -} - -/// Publish an event -pub async fn publish_event_with_listener( - event: impl Into>, - message: Value, - target: impl Into, - listener: S, -) -> TardisResult { - let node_id = local_node_id().await.to_string(); - let event = event.into(); - let target = target.into(); - let target_debug = format!("{target:?}"); - trace!("[Tardis.Cluster] [Client] publish event {event} , message {message} , to {target_debug}"); - - let nodes: Vec<_> = match target { - ClusterEventTarget::Broadcast => cache_nodes() - .read() - .await - .iter() - .filter(|(key, _)| match key { - // just filter out my self - ClusterRemoteNodeKey::NodeId(peer_node_id) => peer_node_id != &node_id, - _ => false, - }) - .map(|(_, val)| val.client.clone()) - .collect(), - ClusterEventTarget::Single(ref addr) => cache_nodes().read().await.get(addr).map(|node| node.client.clone()).into_iter().collect(), - ClusterEventTarget::Multi(ref multi) => { - let cache_nodes = cache_nodes().read().await; - multi.iter().filter_map(|addr| cache_nodes.get(addr).map(|node| node.client.clone())).collect() - } - ClusterEventTarget::Client(client) => vec![client], - }; - if nodes.is_empty() { - return Err(TardisError::wrap( - &format!( - "[Tardis.Cluster] [Client] publish event {event} , message {message} , to {target} error: can't find any target node", - event = event, - message = message, - target = target_debug - ), - "-1-tardis-cluster-publish-message-error", - )); - } - let message_req = TardisClusterMessageReq::new(message.clone(), event.to_string(), node_id); - let message_id = message_req.msg_id.clone(); - let reply = listen_reply(listener, message_id).await; - do_publish_event(message_req, nodes).await?; - Ok(reply) -} - -pub(crate) async fn do_publish_event(message_req: TardisClusterMessageReq, clients: impl IntoIterator>) -> TardisResult<()> { - let ws_message = tokio_tungstenite::tungstenite::Message::Text(TardisFuns::json.obj_to_string(&message_req)?); - let publish_result = join_all(clients.into_iter().map(|client| { - let ws_message = ws_message.clone(); - async move { client.send_raw_with_retry(ws_message).await } - })) - .await; - if publish_result - .iter() - .filter(|result| { - if let Err(error) = result { - error!( - "[Tardis.Cluster] [Client] publish event {event} , message {message}: {error}", - event = message_req.event, - message = message_req.msg - ); - true - } else { - false - } - }) - .count() - != 0 - { - Err(TardisError::wrap( - &format!( - "[Tardis.Cluster] [Client] publish event {event} , message {message} error", - event = message_req.event, - message = message_req.msg - ), - "-1-tardis-cluster-publish-message-error", - )) - } else { - Ok(()) - } -} diff --git a/tardis/src/cluster/cluster_receive.rs b/tardis/src/cluster/cluster_receive.rs deleted file mode 100644 index 286c3c6b..00000000 --- a/tardis/src/cluster/cluster_receive.rs +++ /dev/null @@ -1,205 +0,0 @@ -use std::collections::HashMap; - -use tokio::sync::{mpsc, RwLock}; - -use crate::tardis_static; - -use self::listen::Listener; - -use super::cluster_processor::{TardisClusterMessageResp, CLUSTER_MESSAGE_CACHE_SIZE}; - -enum ResponseFn { - Once(Box), - Multitime(Box bool + Send + Sync>), -} -tardis_static! { - responser_subscribers: RwLock>; -} - -pub(crate) async fn listen_reply(strategy: S, id: String) -> S::Reply { - strategy.subscribe(id).await -} - -pub(crate) fn init_response_dispatcher() -> mpsc::Sender { - let (tx, mut rx) = mpsc::channel::(CLUSTER_MESSAGE_CACHE_SIZE); - // rx is for ws connections - // tx is for response dispatcher - let dispatch_task = async move { - while let Some(resp) = rx.recv().await { - let id = resp.msg_id.clone(); - tracing::trace!( - "[Tardis.Cluster] dispatching received response: {id} from {node_id}, message: {resp:?}", - id = id, - node_id = resp.resp_node_id, - resp = resp - ); - if let Some(subscriber) = responser_subscribers().read().await.get(&id) { - match subscriber { - ResponseFn::Once(_) => { - tokio::spawn(async move { - if let Some(ResponseFn::Once(f)) = responser_subscribers().write().await.remove(&id) { - f(resp) - } - }); - } - ResponseFn::Multitime(f) => { - let drop_me = f(resp); - if drop_me { - tokio::spawn(async move { - responser_subscribers().write().await.remove(&id); - }); - } - } - } - } else { - tracing::trace!("[Tardis.Cluster] no subscriber found for message_id: {id}.", id = id); - } - } - }; - tokio::spawn(dispatch_task); - tx -} - -pub mod listen { - use std::time::Duration; - - use async_trait::async_trait; - use tokio::sync::{broadcast, mpsc, oneshot}; - - use crate::cluster::cluster_processor::TardisClusterMessageResp; - - use super::ResponseFn; - #[async_trait] - pub trait Listener { - type Reply; - async fn subscribe(self, id: String) -> Self::Reply; - } - - /// The message will be received only once. - #[derive(Debug, Default, Clone, Copy)] - pub struct Once { - pub(crate) timeout: Option, - } - - impl Once { - pub fn with_timeout(timeout: Duration) -> Self { - Self { timeout: Some(timeout) } - } - } - - #[async_trait] - impl Listener for Once { - type Reply = oneshot::Receiver; - - async fn subscribe(self, id: String) -> Self::Reply { - let (tx, rx) = oneshot::channel(); - let timeout_handle = { - let id = id.clone(); - self.timeout.map(|timeout| { - tokio::spawn(async move { - tokio::time::sleep(timeout).await; - - // super::responser_subscribers().write().await.remove(&id); - // tracing::trace!("[Tardis.Cluster] message {id} timeout"); - - if let Some(_task) = super::responser_subscribers().write().await.remove(&id) { - tracing::trace!("[Tardis.Cluster] message {id} timeout"); - } - }) - }) - }; - super::responser_subscribers().write().await.insert( - id, - ResponseFn::Once(Box::new(move |resp| { - tracing::trace!("[Tardis.Cluster] Once listener receive resp {resp:?}"); - // cleanup timeout callback - if let Some(ref timeout_handle) = timeout_handle { - timeout_handle.abort(); - } - if let Err(e) = tx.send(resp) { - tracing::debug!("[Tardis.Cluster] message {e:?} missing receiver"); - } - })), - ); - rx - } - } - - #[derive(Debug, Default, Clone, Copy)] - /// send a message and receive all the responses until the receiver is dropped. - pub struct Stream; - - #[async_trait] - impl Listener for Stream { - type Reply = mpsc::Receiver; - - async fn subscribe(self, id: String) -> Self::Reply { - let (tx, rx) = mpsc::channel(100); - { - let tx = tx.clone(); - let id = id.clone(); - tokio::spawn(async move { - tx.closed().await; - super::responser_subscribers().write().await.remove(&id); - }); - } - super::responser_subscribers().write().await.insert( - id, - ResponseFn::Multitime(Box::new(move |resp| { - if tx.is_closed() { - true - } else { - let tx = tx.clone(); - tokio::spawn(async move { tx.send(resp).await }); - false - } - })), - ); - rx - } - } - - /// Send a message and ignore the response. - #[derive(Debug, Default, Clone, Copy)] - pub struct Never; - - #[async_trait] - impl Listener for Never { - type Reply = String; - - async fn subscribe(self, id: String) -> Self::Reply { - id - } - } - - #[derive(Debug, Default, Clone, Copy)] - pub struct Broadcast {} - - #[async_trait] - impl Listener for Broadcast { - type Reply = broadcast::Receiver; - - async fn subscribe(self, id: String) -> Self::Reply { - let (tx, rx) = broadcast::channel(100); - { - let tx = tx.clone(); - let id = id.clone(); - tokio::spawn(async move { - if tx.receiver_count() == 0 { - super::responser_subscribers().write().await.remove(&id); - } else { - tokio::task::yield_now().await; - } - }); - } - super::responser_subscribers().write().await.insert( - id, - ResponseFn::Multitime(Box::new(move |resp| { - let _ = tx.send(resp); - tx.receiver_count() == 0 - })), - ); - rx - } - } -} diff --git a/tardis/src/cluster/cluster_watch_by_cache.rs b/tardis/src/cluster/cluster_watch_by_cache.rs deleted file mode 100644 index f8ea8fa7..00000000 --- a/tardis/src/cluster/cluster_watch_by_cache.rs +++ /dev/null @@ -1,67 +0,0 @@ -use std::{collections::HashSet, net::SocketAddr, time::Duration}; - -use chrono::Utc; -use tokio::time; -use tracing::{error, trace}; - -use crate::{ - basic::result::TardisResult, - cache::cache_client::TardisCacheClient, - cluster::cluster_processor, - config::config_dto::{component::WebServerConfig, ClusterConfig}, - TardisFuns, -}; - -pub const CACHE_NODE_INFO_KEY: &str = "tardis:cluster:node"; -pub const CACHE_NODE_ALIVE_CHECK_DELAYED_TIMES: i8 = 3; - -pub async fn init(cluster_config: &ClusterConfig, web_server_config: &WebServerConfig) -> TardisResult<()> { - let access_host = web_server_config.access_host.unwrap_or(web_server_config.host); - let access_port = web_server_config.access_port.unwrap_or(web_server_config.port); - let cache_check_interval_sec = cluster_config.cache_check_interval_sec.unwrap_or(10); - let access_addr = SocketAddr::new(access_host, access_port); - // heart beat - tokio::spawn(async move { - let client = TardisFuns::cache(); - let mut interval = time::interval(Duration::from_secs(cache_check_interval_sec as u64)); - loop { - { - trace!("[Tardis.Cluster] [Client] heartbeat..."); - if let Err(error) = client.hset(CACHE_NODE_INFO_KEY, &access_addr.to_string(), &Utc::now().timestamp().to_string()).await { - error!("[Tardis.Cluster] [Client] heartbeat error: {}", error); - } - } - interval.tick().await; - } - }); - tokio::spawn(async move { - let client = TardisFuns::cache(); - let mut interval = time::interval(Duration::from_secs(cache_check_interval_sec as u64)); - loop { - { - if let Err(error) = watch(&client, cache_check_interval_sec).await { - error!("[Tardis.Cluster] [Client] watch error: {}", error); - } - } - interval.tick().await; - } - }); - Ok(()) -} - -async fn watch(client: &TardisCacheClient, cache_check_interval_sec: i32) -> TardisResult<()> { - trace!("[Tardis.Cluster] [Client] watching"); - let all_nodes = client.hgetall(CACHE_NODE_INFO_KEY).await?; - let active_ts = Utc::now().timestamp() - cache_check_interval_sec as i64 * CACHE_NODE_ALIVE_CHECK_DELAYED_TIMES as i64 - 1; - let active_nodes = all_nodes - .iter() - .filter_map(|(active_node_key, active_node_ts)| (active_node_ts.parse::().unwrap_or(i64::MIN) > active_ts).then_some(active_node_key)) - .filter_map(|active_node_key| active_node_key.parse::().ok()) - .collect::>(); - cluster_processor::refresh_nodes(&active_nodes).await?; - let inactive_nodes = all_nodes.iter().filter(|(_, active_node_ts)| active_node_ts.parse::().unwrap_or(i64::MIN) <= active_ts).collect::>(); - for (inactive_node_key, _) in inactive_nodes { - client.hdel(CACHE_NODE_INFO_KEY, inactive_node_key).await?; - } - Ok(()) -} diff --git a/tardis/src/cluster/cluster_watch_by_k8s.rs b/tardis/src/cluster/cluster_watch_by_k8s.rs deleted file mode 100644 index 7b3b7617..00000000 --- a/tardis/src/cluster/cluster_watch_by_k8s.rs +++ /dev/null @@ -1,93 +0,0 @@ -use std::{ - collections::HashSet, - net::{IpAddr, SocketAddr}, - time::Duration, -}; - -use k8s_openapi::api::core::v1::{Endpoints, Service}; -use kube::{api::WatchParams, Api, Client}; -use tracing::{error, trace}; - -use crate::{ - basic::{error::TardisError, result::TardisResult}, - config::config_dto::{ClusterConfig, WebServerConfig}, -}; -use futures::StreamExt; - -use super::cluster_processor; - -pub async fn init(cluster_config: &ClusterConfig, webserver_config: &WebServerConfig) -> TardisResult<()> { - let k8s_svc = cluster_config.k8s_svc.as_ref().expect("[Tardis.Cluster] [Client] need k8s_svc config in k8s mode").to_string(); - let k8s_ns = cluster_config.k8s_ns.as_ref().expect("[Tardis.Cluster] [Client] need k8s_ns config in k8s mode").to_string(); - let web_server_port = webserver_config.port; - - tokio::spawn(async move { watch(&k8s_svc, &k8s_ns, web_server_port).await }); - Ok(()) -} - -async fn watch(k8s_svc: &str, k8s_ns: &str, web_server_port: u16) -> TardisResult<()> { - const RETRY_PERIOD: Duration = Duration::from_secs(10); - let mut force_refresh_interval = tokio::time::interval(RETRY_PERIOD); - let endpoint_api: Api = Api::namespaced(get_client().await?, k8s_ns); - let mut endpoint_watcher = endpoint_api.watch(&WatchParams::default().fields(&format!("metadata.name={k8s_svc}")), "0").await?.boxed(); - loop { - tokio::select! { - _ = force_refresh_interval.tick() => {} - Some(_) = endpoint_watcher.next() => {} - } - if let Err(e) = refresh(k8s_svc, k8s_ns, web_server_port).await { - error!("[Tardis.Cluster] [Client] watch error: {}", e); - } - } -} - -async fn refresh(k8s_svc: &str, k8s_ns: &str, web_server_port: u16) -> TardisResult<()> { - trace!("[Tardis.Cluster] [Client] watching"); - let service_api: Api = Api::namespaced(get_client().await?, k8s_ns); - let service = service_api.get(k8s_svc).await?; - let port_mapping = service - .spec - .as_ref() - .and_then(|spec| spec.ports.as_ref()) - .and_then(|ports| { - ports.iter().find(|port_obj| { - port_obj - .target_port - .as_ref() - .map(|target_port| match target_port { - k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::Int(target_port) => target_port == &(web_server_port as i32), - // TODO - k8s_openapi::apimachinery::pkg::util::intstr::IntOrString::String(_) => true, - }) - .unwrap_or(false) - }) - }) - .map(|port_obj| port_obj.port) - .ok_or_else(|| { - TardisError::wrap( - &format!("[Tardis.Cluster] [Client] kubernetes error: can not find node target_port for service {}", k8s_svc), - "", - ) - })? as u16; - - let endpoint_api: Api = Api::namespaced(get_client().await?, k8s_ns); - let endpoint = endpoint_api.get(k8s_svc).await?; - // fetch all addresses from all subsets - let active_nodes = endpoint - .subsets - .iter() - .flat_map(|subsets| { - subsets - .iter() - .flat_map(|subset| subset.addresses.as_ref().map(|addresses| addresses.iter().map(|address| address.ip.to_string()).collect::>()).unwrap_or_default()) - }) - .map(|ip: String| (ip, port_mapping)) - .filter_map(|(ip, port)| ip.parse::().map(|ip_addr| SocketAddr::new(ip_addr, port)).ok()) - .collect::>(); - cluster_processor::refresh_nodes(&active_nodes).await?; - Ok(()) -} - -async fn get_client() -> TardisResult { - Client::try_default().await.map_err(|error| TardisError::wrap(&format!("[Tardis.Cluster] [Client] kubernetes error: {error:?}"), "")) -} diff --git a/tardis/src/config/config_dto.rs b/tardis/src/config/config_dto.rs index 77f8f927..6a4b80ab 100644 --- a/tardis/src/config/config_dto.rs +++ b/tardis/src/config/config_dto.rs @@ -56,8 +56,6 @@ pub struct FrameworkConfig { #[builder(!default, default = Some(LogConfig::default()))] /// log configuration / 日志配置 pub log: Option, - /// Cluster configuration / 集群配置 - pub cluster: Option, } impl Default for FrameworkConfig { @@ -121,12 +119,7 @@ impl FrameworkConfig { pub fn log(&self) -> &LogConfig { self.log.as_ref().expect("missing component config of log") } - /// Get cluster config - /// # Panic - /// If the config of cluster is none, this will be panic. - pub fn cluster(&self) -> &ClusterConfig { - self.cluster.as_ref().expect("missing component config of cluster") - } + } /// Application configuration / 应用配置 @@ -230,34 +223,3 @@ impl Default for ConfCenterConfig { } } -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] -#[serde(default)] -pub struct ClusterConfig { - pub watch_kind: String, - #[cfg(feature = "k8s")] - pub k8s_svc: Option, - #[cfg(feature = "k8s")] - pub k8s_ns: Option, - pub cache_check_interval_sec: Option, -} - -impl Default for ClusterConfig { - fn default() -> Self { - #[cfg(feature = "k8s")] - { - ClusterConfig { - watch_kind: "k8s".to_string(), - k8s_svc: None, - k8s_ns: None, - cache_check_interval_sec: None, - } - } - #[cfg(not(feature = "k8s"))] - { - ClusterConfig { - watch_kind: "cache".to_string(), - cache_check_interval_sec: Some(10), - } - } - } -} diff --git a/tardis/src/lib.rs b/tardis/src/lib.rs index b9eb5dce..9b58b2b5 100644 --- a/tardis/src/lib.rs +++ b/tardis/src/lib.rs @@ -203,7 +203,7 @@ impl TardisFuns { /// # Arguments /// /// * `relative_path` - the directory where the configuration file is located, without the - /// configuration file name / 配置文件所在目录,不包含配置文件名 + /// configuration file name / 配置文件所在目录,不包含配置文件名 /// /// # Examples /// @@ -838,35 +838,6 @@ impl TardisFuns { tardis_instance().os.get(code).unwrap_or_else(Self::os) } - #[cfg(feature = "cluster")] - pub async fn cluster_subscribe_event_boxed(subscriber: cluster::cluster_processor::ClusterHandlerObj) { - cluster::cluster_processor::subscribe_boxed(subscriber).await; - } - - #[cfg(feature = "cluster")] - pub async fn cluster_subscribe_event(subscriber: S) { - cluster::cluster_processor::subscribe(subscriber).await; - } - - #[cfg(feature = "cluster")] - pub async fn cluster_publish_event( - event: impl Into>, - message: serde_json::Value, - target: impl Into, - ) -> TardisResult { - use cluster::cluster_publish::ClusterEvent; - ClusterEvent::new(event).json_message(message).target(target).no_response().publish().await - } - - #[cfg(feature = "cluster")] - pub async fn cluster_publish_event_one_resp( - event: impl Into>, - message: serde_json::Value, - node_id: &str, - ) -> TardisResult { - cluster::cluster_publish::publish_event_one_response(event, message, node_id, None).await - } - /// # Parameters /// - `clean: bool`: if use clean mode, it will cleanup all user setted configs like webserver modules async fn shutdown_internal(#[allow(unused_variables)] clean: bool) -> TardisResult<()> { @@ -1141,10 +1112,6 @@ pub mod basic; #[cfg_attr(docsrs, doc(cfg(feature = "cache")))] pub mod cache; -#[cfg(feature = "cluster")] -#[cfg_attr(docsrs, doc(cfg(feature = "cluster")))] -pub mod cluster; - pub mod config; #[cfg(any(feature = "crypto", feature = "base64"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "crypto", feature = "base64"))))] diff --git a/tardis/src/web.rs b/tardis/src/web.rs index d204d551..fe06d592 100644 --- a/tardis/src/web.rs +++ b/tardis/src/web.rs @@ -13,9 +13,7 @@ pub use reqwest; #[cfg(feature = "ws-client")] #[cfg_attr(docsrs, doc(cfg(feature = "ws-client")))] pub use tokio_tungstenite; -#[cfg(feature = "web-server")] -#[cfg_attr(docsrs, doc(cfg(feature = "web-server")))] -pub mod cluster_id_mw; + #[cfg(feature = "web-server")] #[cfg_attr(docsrs, doc(cfg(feature = "web-server")))] pub mod context_extractor; @@ -41,6 +39,3 @@ pub mod web_validation; #[cfg(feature = "ws-client")] #[cfg_attr(docsrs, doc(cfg(feature = "ws-client")))] pub mod ws_client; -#[cfg(feature = "web-server")] -#[cfg_attr(docsrs, doc(cfg(feature = "web-server")))] -pub mod ws_processor; diff --git a/tardis/src/web/cluster_id_mw.rs b/tardis/src/web/cluster_id_mw.rs deleted file mode 100644 index 76f367a3..00000000 --- a/tardis/src/web/cluster_id_mw.rs +++ /dev/null @@ -1,35 +0,0 @@ -use crate::TardisFuns; -#[cfg(feature = "cluster")] -use poem::http::HeaderValue; -use poem::{Endpoint, IntoResponse, Middleware, Request, Response}; - -pub struct AddClusterIdHeader; - -impl Middleware for AddClusterIdHeader { - type Output = UniformErrorImpl; - - fn transform(&self, ep: E) -> Self::Output { - UniformErrorImpl(ep) - } -} - -pub struct UniformErrorImpl(E); -pub const TARDIS_CLUSTER_ID_HEADER: &str = "Tardis-Cluster-Id"; -impl Endpoint for UniformErrorImpl { - type Output = Response; - - async fn call(&self, req: Request) -> poem::Result { - #[allow(unused_mut)] - let mut resp = self.0.call(req).await?.into_response(); - if TardisFuns::fw_config_opt().is_some_and(|cfg| cfg.cluster.is_some()) { - #[cfg(feature = "cluster")] - { - let cluster_id = crate::cluster::cluster_processor::local_node_id().await; - if let Ok(header_value) = HeaderValue::from_str(cluster_id) { - resp.headers_mut().insert(TARDIS_CLUSTER_ID_HEADER, header_value); - } - } - } - Ok(resp) - } -} diff --git a/tardis/src/web/web_server.rs b/tardis/src/web/web_server.rs index 4dfda94c..de691367 100644 --- a/tardis/src/web/web_server.rs +++ b/tardis/src/web/web_server.rs @@ -23,13 +23,11 @@ use crate::config::config_dto::{ FrameworkConfig, }; use crate::utils::initializer::InitBy; -use crate::web::cluster_id_mw::AddClusterIdHeader; use crate::web::uniform_error_mw::UniformError; mod initializer; use initializer::*; mod module; pub use module::*; -pub mod status_api; pub type BoxMiddleware<'a, T = BoxEndpoint<'a>> = Box + Send>; type ServerTaskInner = JoinHandle>; struct ServerTask { @@ -326,9 +324,9 @@ impl TardisWebServer { route.with(poem::middleware::OpenTelemetryTracing::new(tracer)) }; if module_options.uniform_error || module_config.uniform_error { - self.state.lock().await.add_route(code, route.with(UniformError).with(AddClusterIdHeader).with(cors), data); + self.state.lock().await.add_route(code, route.with(UniformError).with(cors), data); } else { - self.state.lock().await.add_route(code, route.with(cors).with(AddClusterIdHeader), data); + self.state.lock().await.add_route(code, route.with(cors), data); }; self } @@ -376,13 +374,6 @@ impl TardisWebServer { /// /// to shutdown it by calling `TardisWebServer::shutdown()` pub async fn start(&self) -> TardisResult<()> { - #[cfg(feature = "cluster")] - { - if let Some(fw) = crate::TardisFuns::fw_config_opt() { - crate::cluster::cluster_processor::init_by_conf(&fw, self).await?; - } - } - let output_info = format!( r#" ================= diff --git a/tardis/src/web/web_server/status_api.rs b/tardis/src/web/web_server/status_api.rs deleted file mode 100644 index a74d1496..00000000 --- a/tardis/src/web/web_server/status_api.rs +++ /dev/null @@ -1,73 +0,0 @@ -//! # Status Api -//! For debug usage, get the current status of the tardis server. -//! -#[cfg(feature = "cluster")] -use std::collections::HashMap; - -use poem_openapi::{param::Query, Object}; -use serde::{Deserialize, Serialize}; - -use crate::web::web_resp::{TardisApiResult, TardisResp}; -#[derive(Debug, Clone)] -pub struct TardisStatusApi; -#[derive(Debug, Serialize, Deserialize, Object)] -pub struct TardisStatus { - pub version: String, - #[cfg(feature = "cluster")] - pub cluster: TardisClusterStatus, - pub fw_config: serde_json::Value, -} - -impl TardisStatus { - pub async fn fetch() -> TardisStatus { - TardisStatus { - version: env!("CARGO_PKG_VERSION").to_string(), - #[cfg(feature = "cluster")] - cluster: TardisClusterStatus::fetch().await, - fw_config: serde_json::to_value(crate::TardisFuns::fw_config().as_ref().clone()).unwrap_or_default(), - } - } -} - -#[cfg(feature = "cluster")] -#[derive(Debug, Serialize, Deserialize, Object)] -pub struct TardisClusterStatus { - pub cluster_id: String, - pub peer_nodes: HashMap, - pub subscribed: Vec, -} - -#[cfg(feature = "cluster")] -impl TardisClusterStatus { - pub async fn fetch() -> TardisClusterStatus { - TardisClusterStatus { - cluster_id: crate::cluster::cluster_processor::local_node_id().await.to_string(), - peer_nodes: crate::cluster::cluster_processor::cache_nodes().read().await.iter().map(|(k, v)| (k.to_string(), v.to_string())).collect(), - subscribed: crate::cluster::cluster_processor::subscribers().read().await.iter().map(|(k, _)| k.to_string()).collect(), - } - } -} - -#[poem_openapi::OpenApi] -impl TardisStatusApi { - #[allow(unused_variables)] - #[oai(path = "/status", method = "get")] - pub async fn status(&self, cluster_id: Query>) -> TardisApiResult { - let cluster_id = cluster_id.0; - if let Some(id) = cluster_id { - #[cfg(feature = "cluster")] - { - if id == *crate::cluster::cluster_processor::local_node_id().await { - return TardisResp::ok(TardisStatus::fetch().await); - } - let status = crate::cluster::cluster_processor::EventStatus::get_by_id(&id).await?; - return TardisResp::ok(status); - } - #[cfg(not(feature = "cluster"))] - { - return TardisResp::err(crate::basic::error::TardisError::internal_error("cluster features not enabled", "")); - } - } - TardisResp::ok(TardisStatus::fetch().await) - } -} diff --git a/tardis/src/web/ws_processor.rs b/tardis/src/web/ws_processor.rs deleted file mode 100644 index 9c1b76d9..00000000 --- a/tardis/src/web/ws_processor.rs +++ /dev/null @@ -1,590 +0,0 @@ -#[cfg(feature = "cluster")] -pub mod cluster_protocol; -// pub mod connection_avatar_router; -use crate::basic::error::TardisError; -use crate::basic::result::TardisResult; -#[cfg(feature = "cluster")] -use crate::cluster::cluster_hashmap::ClusterStaticHashMap; - -use std::sync::Arc; -use std::{collections::HashMap, num::NonZeroUsize}; - -use futures::{Future, SinkExt, StreamExt}; -use lru::LruCache; -use poem::web::websocket::{BoxWebSocketUpgraded, CloseCode, Message, WebSocket}; -use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; -use tokio::sync::Mutex; -use tracing::warn; -use tracing::{debug, trace}; - -use crate::{tardis_static, TardisFuns}; - -pub const WS_SYSTEM_EVENT_INFO: &str = "__sys_info__"; -pub const WS_SYSTEM_EVENT_AVATAR_ADD: &str = "__sys_avatar_add__"; -pub const WS_SYSTEM_EVENT_AVATAR_DEL: &str = "__sys_avatar_del__"; -pub const WS_SYSTEM_EVENT_ERROR: &str = "__sys_error__"; -#[derive(Debug, Clone, Copy)] -enum MessageSendState { - Success, - Sending, - Fail, -} - -/// # Safety: -/// It's safe for we set the cache size manually -#[allow(clippy::undocumented_unsafe_blocks)] -pub const WS_SENDER_CACHE_SIZE: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(1000000) }; - -tardis_static! { - // Websocket instance Id -> Avatars - #[cfg(not(feature = "cluster"))] - pub ws_insts_mapping_avatars: Arc>>>; - #[cfg(feature = "cluster")] - pub ws_insts_mapping_avatars: ClusterStaticHashMap> = ClusterStaticHashMap::>::builder("tardis/avatar") - .modify_handler("del_avatar", |v, modify| { - if let Some(del) = modify.as_str() { - v.retain(|value| *value != del); - } - }) - .modify_handler("add_avatar", |v, modify| { - if let Some(add) = modify.as_str() { - v.push(add.to_string() ); - } - }) - .build(); -} -lazy_static! { - // Single instance reply guard - static ref REPLY_ONCE_GUARD: Arc>> = Arc::new(Mutex::new(LruCache::new(WS_SENDER_CACHE_SIZE))); -} - -pub fn ws_echo(avatars: String, ext: HashMap, websocket: WebSocket, process_fun: PF, close_fun: CF) -> BoxWebSocketUpgraded -where - PF: Fn(String, String, HashMap) -> PT + Send + Sync + 'static, - PT: Future> + Send + 'static, - CF: Fn(Option<(CloseCode, String)>, HashMap) -> CT + Send + Sync + 'static, - CT: Future + Send + 'static, -{ - websocket - .on_upgrade(|mut socket| async move { - while let Some(Ok(message)) = socket.next().await { - match message { - Message::Text(text) => { - trace!("[Tardis.WebServer] WS message receive: {} by {}", text, &avatars); - if let Some(msg) = process_fun(avatars.clone(), text, ext.clone()).await { - trace!("[Tardis.WebServer] WS message send: {} to {}", msg, &avatars); - if let Err(error) = socket.send(Message::Text(msg.clone())).await { - warn!("[Tardis.WebServer] WS message send failed, message {msg} to {}: {error}", &avatars); - break; - } - } - } - Message::Close(msg) => { - trace!("[Tardis.WebServer] WS message receive: close {:?}", msg); - close_fun(msg, ext.clone()).await - } - Message::Binary(_) => { - warn!("[Tardis.WebServer] WS message receive: the binary type is not implemented"); - } - Message::Ping(_) => { - warn!("[Tardis.WebServer] WS message receive: the ping type is not implemented"); - } - Message::Pong(_) => { - warn!("[Tardis.WebServer] WS message receive: the pong type is not implemented"); - } - } - } - }) - .boxed() -} - -pub trait WsBroadcastSender: Send + Sync + 'static { - fn subscribe(&self) -> tokio::sync::broadcast::Receiver>; - fn send(&self, msg: TardisWebsocketMgrMessage) -> impl Future> + Send; -} - -impl WsBroadcastSender for tokio::sync::broadcast::Sender> { - fn subscribe(&self) -> tokio::sync::broadcast::Receiver> { - self.subscribe() - } - - async fn send(&self, msg: TardisWebsocketMgrMessage) -> TardisResult<()> { - let _ = self.send(msg.into()).map_err(|_| TardisError::internal_error("tokio channel send error", ""))?; - Ok(()) - } -} - -pub trait WsHooks: Sync + Send + 'static { - fn on_process(&self, req: TardisWebsocketReq, context: &WsBroadcastContext) -> impl Future> + Send; - fn on_close(&self, message: Option<(CloseCode, String)>, _context: &WsBroadcastContext) -> impl Future + Send { - if let Some((code, reason)) = message { - tracing::debug!("[Tardis.Ws] connection closed {code:?} for reason: {reason}"); - } - async {} - } - fn on_fail(&self, id: String, error: TardisError, _context: &WsBroadcastContext) -> impl Future + Send { - tracing::warn!("[Tardis.Ws] fail to send out message [{id}], reason: {error}"); - async {} - } - fn on_success(&self, id: String, _context: &WsBroadcastContext) -> impl Future + Send { - tracing::debug!("[Tardis.Ws] success to send out message [{id}]"); - async {} - } -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct WsBroadcastContext { - pub inst_id: String, - pub mgr_node: bool, - pub subscribe_mode: bool, -} - -impl WsBroadcastContext { - pub fn new(mgr_node: bool, subscribe_mode: bool) -> Self { - Self { - inst_id: TardisFuns::field.nanoid(), - mgr_node, - subscribe_mode, - } - } -} - -pub struct WsBroadcast { - inner_sender: Arc, - hooks: Arc, - context: Arc, -} - -impl std::fmt::Debug for WsBroadcast { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("WsBroadcast").field("context", &self.context).finish_non_exhaustive() - } -} - -impl WsBroadcast -where - S: WsBroadcastSender, - H: WsHooks, -{ - pub fn new(sender: S, hooks: H, context: WsBroadcastContext) -> Self { - Self { - inner_sender: Arc::new(sender), - hooks: Arc::new(hooks), - context: Arc::new(context), - } - } - pub fn send_to_channel(&self, send_msg: TardisWebsocketMgrMessage) { - let inner_sender = self.inner_sender.clone(); - let hook = self.hooks.clone(); - let context = self.context.clone(); - let task = async move { - let id = send_msg.msg_id.clone(); - if let Err(e) = inner_sender.send(send_msg).await { - warn!("[Tardis.Ws] send message encounter an error"); - hook.on_fail(id, e, &context).await; - } else { - hook.on_success(id, &context).await; - } - }; - tokio::spawn(task); - } - pub fn send_error_to_channel(&self, error_message: &str, from_avatar: &str, msg_id: Option) { - let send_msg = TardisWebsocketMgrMessage { - msg_id: msg_id.unwrap_or_default(), - msg: json!(error_message), - from_avatar: from_avatar.to_string(), - to_avatars: vec![from_avatar.to_string()], - event: Some(WS_SYSTEM_EVENT_ERROR.to_string()), - ignore_self: false, - ignore_avatars: vec![], - from_inst_id: self.context.inst_id.clone(), - echo: true, - }; - warn!("[Tardis.WebServer] WS message receive by {:?} failed: {error_message}", from_avatar); - self.send_to_channel(send_msg); - } - pub async fn handle_req(&self, req_msg: TardisWebsocketReq) -> Result<(), String> { - let insts_in_send = ws_insts_mapping_avatars().clone(); - let inst_id = self.context.inst_id.clone(); - #[cfg(feature = "cluster")] - let Ok(Some(current_avatars)) = insts_in_send.get(inst_id.clone()).await - else { - warn!("[Tardis.WebServer] insts_in_send of inst_id {inst_id} not found"); - return Ok(()); - }; - #[cfg(not(feature = "cluster"))] - let Some(current_avatars) = insts_in_send.read().await.get(&inst_id).cloned() else { - warn!("[Tardis.WebServer] insts_in_send of inst_id {inst_id} not found"); - return Ok(()); - }; - let msg_id = req_msg.msg_id.as_ref().unwrap_or(&TardisFuns::field.nanoid()).to_string(); - // Security check - if !self.context.mgr_node && req_msg.spec_inst_id.is_some() { - return Err("spec_inst_id can only be specified on the management node".to_string()); - } - if !self.context.mgr_node && !current_avatars.contains(&req_msg.from_avatar) { - return Err("from_avatar is illegal".to_string()); - } - // System process - if req_msg.event.as_deref() == Some(WS_SYSTEM_EVENT_INFO) { - let msg = TardisFuns::json - .obj_to_json(&TardisWebsocketInstInfo { - inst_id: self.context.inst_id.clone(), - avatars: current_avatars.clone(), - mgr_node: self.context.mgr_node, - subscribe_mode: self.context.subscribe_mode, - }) - .map_err(|error| { - crate::log::error!( - "[Tardis.WebServer] can't serialize {struct_name}, error: {error}", - struct_name = stringify!(TardisWebsocketInstInfo) - ); - "message illegal" - })?; - let send_msg = TardisWebsocketMgrMessage { - msg_id, - msg, - from_avatar: req_msg.from_avatar.clone(), - to_avatars: vec![req_msg.from_avatar], - event: req_msg.event, - ignore_self: false, - ignore_avatars: vec![], - from_inst_id: if let Some(spec_inst_id) = req_msg.spec_inst_id { - spec_inst_id - } else { - self.context.inst_id.clone() - }, - echo: true, - }; - self.send_to_channel(send_msg); - return Ok(()); - // For security reasons, adding an avatar needs to be handled by the management node - } else if req_msg.event.as_deref() == Some(WS_SYSTEM_EVENT_AVATAR_ADD) { - if self.context.mgr_node { - let Some(new_avatar) = req_msg.msg.as_str() else { - return Err("msg is not a string".to_string()); - }; - let Some(ref spec_inst_id) = req_msg.spec_inst_id else { - return Err("spec_inst_id is not specified".to_string()); - }; - #[cfg(feature = "cluster")] - { - let Ok(Some(_)) = insts_in_send.get(spec_inst_id.clone()).await else { - return Err("spec_inst_id not found".to_string()); - }; - trace!("[Tardis.WebServer] WS message add avatar {}:{} to {}", &msg_id, &new_avatar, &spec_inst_id); - let _ = insts_in_send.modify(spec_inst_id.clone(), "add_avatar", json!(new_avatar)).await; - // return Ok(()); - } - #[cfg(not(feature = "cluster"))] - { - let mut write_locked = insts_in_send.write().await; - let Some(inst) = write_locked.get_mut(spec_inst_id) else { - return Err("spec_inst_id not found".to_string()); - }; - inst.push(new_avatar.to_string()); - drop(write_locked); - trace!("[Tardis.WebServer] WS message add avatar {}:{} to {}", msg_id, new_avatar, spec_inst_id); - // return Ok(()); - } - } else { - // ignore this message - // return Ok(()) - } - } else if req_msg.event.as_deref() == Some(WS_SYSTEM_EVENT_AVATAR_DEL) { - #[cfg(feature = "cluster")] - { - let Ok(Some(_)) = insts_in_send.get(self.context.inst_id.clone()).await else { - return Err("spec_inst_id not found".to_string()); - }; - let Some(del_avatar) = req_msg.msg.as_str() else { - return Err("msg is not a string".to_string()); - }; - let _ = insts_in_send.modify(self.context.inst_id.clone(), "del_avatar", json!(del_avatar)).await; - trace!("[Tardis.WebServer] WS message delete avatar {},{} to {}", msg_id, del_avatar, &self.context.inst_id); - } - #[cfg(not(feature = "cluster"))] - { - let Some(del_avatar) = req_msg.msg.as_str() else { - return Err("msg is not a string".to_string()); - }; - let mut write_locked = insts_in_send.write().await; - let Some(inst) = write_locked.get_mut(&inst_id) else { - return Err("spec_inst_id not found".to_string()); - }; - inst.retain(|value| *value != del_avatar); - drop(write_locked); - trace!("[Tardis.WebServer] WS message delete avatar {},{} to {}", msg_id, del_avatar, &inst_id); - } - return Ok(()); - } - if let Some(resp_msg) = self.hooks.on_process(req_msg.clone(), &self.context).await { - trace!( - "[Tardis.WebServer] WS message send to channel: {},{} to {:?} ignore {:?}", - msg_id, - resp_msg.msg, - resp_msg.to_avatars, - resp_msg.ignore_avatars - ); - let send_msg = TardisWebsocketMgrMessage { - msg_id, - msg: resp_msg.msg, - from_avatar: req_msg.from_avatar, - to_avatars: resp_msg.to_avatars, - event: req_msg.event, - ignore_self: req_msg.ignore_self.unwrap_or(true), - ignore_avatars: resp_msg.ignore_avatars, - from_inst_id: if let Some(spec_inst_id) = req_msg.spec_inst_id { - spec_inst_id - } else { - self.context.inst_id.clone() - }, - echo: false, - }; - self.send_to_channel(send_msg); - }; - Ok(()) - } - #[tracing::instrument(skip(websocket, self))] - pub async fn run(self, avatars: Vec, websocket: WebSocket) -> BoxWebSocketUpgraded { - websocket - .on_upgrade(move |socket| async move { - let mut inner_receiver = self.inner_sender.subscribe(); - // corresponded to the current ws connection - let inst_id = self.context.inst_id.clone(); - let current_receive_inst_id = inst_id.clone(); - #[cfg(feature = "cluster")] - let _ = ws_insts_mapping_avatars().insert(inst_id.clone(), avatars).await; - #[cfg(feature = "cluster")] - let insts_in_send = ws_insts_mapping_avatars().clone(); - #[cfg(not(feature = "cluster"))] - let _ = ws_insts_mapping_avatars().write().await.insert(inst_id.clone(), avatars); - #[cfg(not(feature = "cluster"))] - let insts_in_send = ws_insts_mapping_avatars().clone(); - let (mut ws_sink, mut ws_stream) = socket.split(); - let ws_closed = tokio_util::sync::CancellationToken::new(); - let ws_closed_notifier = ws_closed.clone(); - let context = self.context.clone(); - debug!("[Tardis.WebServer] WS new connection {inst_id}"); - tokio::spawn(async move { - // message inbound - while let Some(Ok(message)) = ws_stream.next().await { - match message { - Message::Text(text) => { - #[cfg(feature = "cluster")] - let Ok(Some(current_avatars)) = insts_in_send.get(inst_id.clone()).await - else { - warn!("[Tardis.WebServer] insts_in_send of inst_id {inst_id} not found"); - continue; - }; - #[cfg(not(feature = "cluster"))] - let Some(current_avatars) = insts_in_send.read().await.get(&inst_id).cloned() else { - warn!("[Tardis.WebServer] insts_in_send of inst_id {inst_id} not found"); - continue; - }; - trace!( - "[Tardis.WebServer] WS message receive text: {} by {:?} {}", - text, - current_avatars, - if self.context.mgr_node { "[MGR]" } else { "" } - ); - let Some(avatar_self) = current_avatars.first().cloned() else { - warn!("[Tardis.WebServer] current_avatars is empty"); - continue; - }; - match TardisFuns::json.str_to_obj::(&text) { - Err(_) => { - self.send_error_to_channel("message illegal", &avatar_self, None); - continue; - } - Ok(req_msg) => { - let msg_id = req_msg.msg_id.clone(); - if let Err(e) = self.handle_req(req_msg).await { - self.send_error_to_channel(&e, &avatar_self, msg_id); - } - } - } - } - Message::Close(msg) => { - trace!("[Tardis.WebServer] WS message receive: close {:?}", msg); - self.hooks.on_close(msg, &self.context).await - } - _ => {} - } - } - ws_closed_notifier.cancel(); - }); - - let reply_once_guard = REPLY_ONCE_GUARD.clone(); - - tokio::spawn(async move { - debug!("[Tardis.WebServer] WS tx side: new connection {current_receive_inst_id}"); - let mut hb = tokio::time::interval(tokio::time::Duration::from_secs(1)); - 'poll_next_message: loop { - let mgr_message = tokio::select! { - _ = ws_closed.cancelled() => { - trace!("[Tardis.WebServer] WS message receive: connection closed"); - return - } - _ = hb.tick() => { - if let Err(error) = ws_sink.send(Message::ping([])).await { - warn!("[Tardis.WebServer] WS fail to ping client: {error}"); - return; - } else { - continue 'poll_next_message; - } - } - next = inner_receiver.recv() => { - match next { - Ok(message) => message, - Err(e) => { - warn!("[Tardis.WebServer] WS message receive from channel failed: {e}"); - return - } - } - } - }; - - tracing::trace!(inst_id = current_receive_inst_id, "[Tardis.WebServer] inner receiver receive message {mgr_message:?}"); - - #[cfg(feature = "cluster")] - let Ok(Some(current_avatars)) = ({ ws_insts_mapping_avatars().get(current_receive_inst_id.clone()).await }) else { - warn!("[Tardis.WebServer] Instance id {current_receive_inst_id} not found"); - continue; - }; - #[cfg(not(feature = "cluster"))] - let Some(current_avatars) = ({ ws_insts_mapping_avatars().read().await.get(¤t_receive_inst_id).cloned() }) else { - warn!("[Tardis.WebServer] Instance id {current_receive_inst_id} not found"); - continue; - }; - // only self - if mgr_message.echo && current_receive_inst_id != mgr_message.from_inst_id { - continue; - } - // except self - if mgr_message.ignore_self && current_receive_inst_id == mgr_message.from_inst_id { - continue; - } - - tracing::trace!("[Tardis.WebServer] inner receiver receive message {mgr_message:?}, current_avatars: {current_avatars:?}"); - if - // send to all - mgr_message.to_avatars.is_empty() && mgr_message.ignore_avatars.is_empty() - // send to targets that match the current avatars - || !mgr_message.to_avatars.is_empty() && mgr_message.to_avatars.iter().any(|avatar| current_avatars.contains(avatar)) - // send to targets that NOT match the current avatars - || !mgr_message.ignore_avatars.is_empty() && mgr_message.ignore_avatars.iter().all(|avatar| current_avatars.contains(avatar)) - { - let Ok(resp_msg) = (if context.mgr_node { - TardisFuns::json.obj_to_string(mgr_message.as_ref()) - } else { - TardisFuns::json.obj_to_string(&TardisWebsocketMessage { - msg_id: mgr_message.msg_id.clone(), - msg: mgr_message.msg.clone(), - event: mgr_message.event.clone(), - }) - }) else { - warn!("[Tardis.WebServer] Cannot serialize {:?} into json", mgr_message); - continue; - }; - let cache_id = if !context.subscribe_mode { - let id = format!("{}{:?}", mgr_message.msg_id, ¤t_avatars); - 'poll_sending_state: loop { - let mut lock = reply_once_guard.lock().await; - match lock.get(&id) { - Some(MessageSendState::Success) => continue 'poll_next_message, - Some(MessageSendState::Sending) => tokio::task::yield_now().await, - _ => { - lock.put(id.clone(), MessageSendState::Sending); - break 'poll_sending_state; - } - } - } - Some(id) - } else { - None - }; - if let Err(error) = ws_sink.send(Message::Text(resp_msg)).await { - if let Some(cache_id) = cache_id { - let mut lock = reply_once_guard.lock().await; - lock.put(cache_id, MessageSendState::Fail); - } - if error.to_string() != "Connection closed normally" { - warn!( - "[Tardis.WebServer] WS message send: {}:{} to {:?} ignore {:?} failed: {error}", - mgr_message.msg_id, mgr_message.msg, mgr_message.to_avatars, mgr_message.ignore_avatars - ); - } - break; - } else if let Some(cache_id) = cache_id { - let mut lock = reply_once_guard.lock().await; - lock.put(cache_id, MessageSendState::Success); - } - } - } - }); - }) - .boxed() - } -} - -#[derive(Deserialize, Serialize, Clone, Debug, Default)] -pub struct TardisWebsocketReq { - pub msg_id: Option, - pub msg: Value, - pub from_avatar: String, - pub to_avatars: Option>, - pub event: Option, - pub ignore_self: Option, - pub spec_inst_id: Option, -} - -#[derive(Deserialize, Serialize, Clone, Debug)] -pub struct TardisWebsocketResp { - pub msg: Value, - pub to_avatars: Vec, - pub ignore_avatars: Vec, -} - -#[derive(Deserialize, Serialize, Clone, Debug)] -pub struct TardisWebsocketMgrMessage { - pub msg_id: String, - pub msg: Value, - pub from_inst_id: String, - pub from_avatar: String, - pub to_avatars: Vec, - pub event: Option, - pub ignore_self: bool, - pub echo: bool, - pub ignore_avatars: Vec, -} - -impl TardisWebsocketMgrMessage { - pub fn into_req(self, msg_id: String, msg: Value, current_avatar: String, to_avatars: Option>) -> TardisWebsocketReq { - TardisWebsocketReq { - msg_id: Some(msg_id), - msg, - from_avatar: current_avatar, - to_avatars, - event: self.event, - ignore_self: Some(self.ignore_self), - spec_inst_id: Some(self.from_inst_id), - } - } -} - -#[derive(Deserialize, Serialize, Clone, Debug)] -pub struct TardisWebsocketMessage { - pub msg_id: String, - pub msg: Value, - pub event: Option, -} - -#[derive(Deserialize, Serialize, Clone, Debug, Default)] -pub struct TardisWebsocketInstInfo { - pub inst_id: String, - pub avatars: Vec, - pub mgr_node: bool, - pub subscribe_mode: bool, -} diff --git a/tardis/src/web/ws_processor/cluster_protocol.rs b/tardis/src/web/ws_processor/cluster_protocol.rs deleted file mode 100644 index aec3203e..00000000 --- a/tardis/src/web/ws_processor/cluster_protocol.rs +++ /dev/null @@ -1,25 +0,0 @@ -use std::sync::Arc; - -use crate::{basic::result::TardisResult, cluster::cluster_broadcast::ClusterBroadcastChannel}; - -use super::{TardisWebsocketMgrMessage, WsBroadcastSender}; - -impl WsBroadcastSender for ClusterBroadcastChannel { - fn subscribe(&self) -> tokio::sync::broadcast::Receiver> { - self.local_broadcast_channel.subscribe() - } - - async fn send(&self, msg: TardisWebsocketMgrMessage) -> TardisResult<()> { - self.send(msg).await - } -} - -impl WsBroadcastSender for Arc> { - fn subscribe(&self) -> tokio::sync::broadcast::Receiver> { - self.as_ref().subscribe() - } - - async fn send(&self, msg: TardisWebsocketMgrMessage) -> TardisResult<()> { - ClusterBroadcastChannel::send(self, msg).await - } -} diff --git a/tardis/tests/test_cluster.rs b/tardis/tests/test_cluster.rs deleted file mode 100644 index af36833a..00000000 --- a/tardis/tests/test_cluster.rs +++ /dev/null @@ -1,318 +0,0 @@ -use std::{ - env, - path::Path, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::Duration, -}; - -use futures_util::future::join_all; -use serde_json::{json, Value}; -use tardis::{ - basic::{result::TardisResult, tracing::TardisTracing}, - cluster::{ - cluster_broadcast::ClusterBroadcastChannel, - cluster_hashmap::ClusterStaticHashMap, - cluster_processor::{self, subscribe, ClusterEventTarget, ClusterHandler, TardisClusterMessageReq}, - cluster_publish::publish_event_one_response, - }, - config::config_dto::{CacheModuleConfig, ClusterConfig, FrameworkConfig, LogConfig, TardisConfig, WebServerCommonConfig, WebServerConfig, WebServerModuleConfig}, - consts::IP_LOCALHOST, - tardis_static, - test::test_container::TardisTestContainer, - TardisFuns, -}; -use testcontainers::clients; -use tokio::{io::AsyncReadExt, process::Command, time::sleep}; -use tracing::info; -use tracing_subscriber::filter::Directive; - -#[tokio::test(flavor = "multi_thread")] -async fn test_cluster() -> TardisResult<()> { - env::set_var("RUST_LOG", "info,tardis=debug"); - let cluster_url = env::var("cluster_url"); - if let Ok(cluster_url) = cluster_url { - start_node(cluster_url, &env::var("node_id").unwrap()).await?; - } else { - // let program = env::args().next().as_ref().map(Path::new).and_then(Path::file_name).and_then(OsStr::to_str).map(String::from).unwrap(); - let program = env::current_exe()?; - - let docker = clients::Cli::default(); - let redis_container = TardisTestContainer::redis_custom(&docker); - let cluster_url = format!("redis://127.0.0.1:{}/0", redis_container.get_host_port_ipv4(6379)); - - let results = join_all(vec![ - invoke_node(&cluster_url, "1", &program), - invoke_node(&cluster_url, "2", &program), - invoke_node(&cluster_url, "3", &program), - ]) - .await; - assert!(results.into_iter().all(|r| r.unwrap())); - } - - Ok(()) -} - -async fn invoke_node(cluster_url: &str, node_id: &str, program: &Path) -> TardisResult { - let mut child = if cfg!(target_os = "windows") { - Command::new("cmd") - .env("cluster_url", cluster_url) - .env("node_id", node_id) - .env("LS_COLORS", "rs=0:di=38;5;27:mh=44;38;5;15") - .arg("/C") - .arg(program) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .spawn()? - } else { - Command::new("sh") - .env("cluster_url", cluster_url) - .env("node_id", node_id) - .env("LS_COLORS", "rs=0:di=38;5;27:mh=44;38;5;15") - .arg("-c") - .arg(program) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .spawn()? - }; - let mut buf = [0; 1024]; - let mut err_buf = [0; 1024]; - let mut stdout = child.stdout.take().unwrap(); - let mut stderr = child.stderr.take().unwrap(); - loop { - tokio::select! { - result = stdout.read(&mut buf) => { - let size = result?; - if size != 0 { - println!("node[{node_id}]/stdout:"); - println!("{}", String::from_utf8_lossy(&buf[..size])); - } - } - result = stderr.read(&mut err_buf) => { - let size = result?; - if size != 0 { - println!("node[{node_id}]/stdout:"); - println!("{}", String::from_utf8_lossy(&err_buf[..size])); - } - } - exit_code = child.wait() => { - if let Ok(exit_code) = exit_code { - return Ok(exit_code.success()) - } else { - return Ok(false) - } - } - - }; - } -} - -async fn start_node(cluster_url: String, node_id: &str) -> TardisResult<()> { - subscribe(map().clone()).await; - // subscribe - broadcast(); - cluster_processor::set_local_node_id(format!("node_{node_id}")); - let port = portpicker::pick_unused_port().unwrap(); - TardisTracing::initializer().with_fmt_layer().with_env_layer().init(); - TardisFuns::init_conf(TardisConfig { - cs: Default::default(), - fw: FrameworkConfig::builder() - .web_server( - WebServerConfig::builder().common(WebServerCommonConfig::builder().access_host(IP_LOCALHOST).port(port).build()).default(WebServerModuleConfig::default()).build(), - ) - .cache(CacheModuleConfig::builder().url(cluster_url.parse().unwrap()).build()) - .cluster(ClusterConfig { - watch_kind: "cache".to_string(), - k8s_ns: None, - k8s_svc: None, - cache_check_interval_sec: Some(1), - }) - .log( - LogConfig::builder() - .level("info".parse::().unwrap_or_default()) - .directives(if node_id == "2" { - ["tardis=trace".parse::().unwrap()] - } else { - ["tardis=debug".parse::().unwrap()] - }) - .build(), - ) - .build(), - }) - .await - .unwrap(); - - if node_id == "2" { - sleep(Duration::from_secs(2)).await; - } else if node_id == "3" { - sleep(Duration::from_secs(4)).await; - } - TardisFuns::web_server().start().await?; - sleep(Duration::from_secs(1)).await; - let task = { - let node_id = node_id.to_string(); - { - let node_id = node_id.to_string(); - tokio::spawn(async move { - let mut receiver = broadcast().subscribe(); - while let Ok(msg) = receiver.recv().await { - println!("node[{node_id}]/broadcast: {msg}"); - bc_recv_count().fetch_add(1, Ordering::SeqCst); - } - }); - } - tokio::spawn(async move { - test_broadcast(&node_id).await; - }) - }; - test_ping(node_id).await?; - test_echo(node_id).await?; - test_hash_map(node_id).await?; - - if node_id == "1" { - sleep(Duration::from_secs(1)).await; - } else if node_id == "2" { - sleep(Duration::from_secs(7)).await; - } else { - sleep(Duration::from_secs(10)).await; - } - let result = tokio::join!(task); - result.0.unwrap(); - Ok(()) -} - -static PING_COUNTER: AtomicUsize = AtomicUsize::new(0); - -async fn test_ping(node_id: &str) -> TardisResult<()> { - TardisFuns::cluster_subscribe_event(ClusterSubscriberPingTest).await; - if node_id == "1" { - // expect hit 0 times - let result = TardisFuns::cluster_publish_event("ping", json!(1000), ClusterEventTarget::Broadcast).await; - assert!(result.is_err()); - sleep(Duration::from_secs(5)).await; - // expect hit 2 times (to node_2, node_3) - TardisFuns::cluster_publish_event("ping", json!(400), ClusterEventTarget::Broadcast).await?; - sleep(Duration::from_secs(5)).await; - assert_eq!(PING_COUNTER.load(Ordering::SeqCst), 50 + 4); - } else if node_id == "2" { - // expect hit 1 times (to node_1) - TardisFuns::cluster_publish_event("ping", json!(50), ClusterEventTarget::Broadcast).await?; - sleep(Duration::from_secs(5)).await; - assert_eq!(PING_COUNTER.load(Ordering::SeqCst), 400 + 4); - } else { - // expect hit 2 times (to node_1, node_2) - TardisFuns::cluster_publish_event("ping", json!(4), ClusterEventTarget::Broadcast).await?; - sleep(Duration::from_secs(5)).await; - assert_eq!(PING_COUNTER.load(Ordering::SeqCst), 400); - } - Ok(()) -} - -struct ClusterSubscriberPingTest; - -impl ClusterHandler for ClusterSubscriberPingTest { - fn event_name(&self) -> String { - "ping".into() - } - async fn handle(self: Arc, message_req: TardisClusterMessageReq) -> TardisResult> { - info!("message_req:{message_req:?}"); - PING_COUNTER.fetch_add(message_req.msg.as_i64().unwrap() as usize, Ordering::SeqCst); - Ok(None) - } -} - -struct ClusterSubscriberEchoTest; - -impl ClusterHandler for ClusterSubscriberEchoTest { - fn event_name(&self) -> String { - "echo".into() - } - async fn handle(self: Arc, message_req: TardisClusterMessageReq) -> TardisResult> { - info!("message_req:{message_req:?}"); - Ok(Some(serde_json::Value::String(format!("echo {}", message_req.req_node_id)))) - } -} - -async fn test_echo(node_id: &str) -> TardisResult<()> { - TardisFuns::cluster_subscribe_event(ClusterSubscriberEchoTest).await; - if node_id == "1" { - let resp = TardisFuns::cluster_publish_event_one_resp("echo", serde_json::Value::String("hi".to_string()), "node_3").await?; - assert_eq!(resp.msg.as_str().unwrap(), &format!("echo node_{node_id}")); - assert_eq!(&resp.resp_node_id, "node_3"); - } else if node_id == "2" { - let resp = publish_event_one_response("echo", serde_json::Value::String("hi".to_string()), "node_3", Some(Duration::from_secs(1))).await; - assert!(resp.is_ok()); - } else { - let resp = TardisFuns::cluster_publish_event_one_resp("echo", serde_json::Value::String("hi".to_string()), "node_3").await; - assert!(resp.is_err()); - } - Ok(()) -} - -tardis_static! { - pub map: ClusterStaticHashMap = ClusterStaticHashMap::new("test"); - broadcast: Arc> = ClusterBroadcastChannel::new("test_channel", 100); - bc_recv_count: AtomicUsize = AtomicUsize::new(0); -} -async fn test_hash_map(node_id: &str) -> TardisResult<()> { - match node_id { - "1" => { - map().insert("item1".to_string(), "from_node1".to_string()).await?; - let value = map().get("item1".to_string()).await?; - assert_eq!(value, Some("from_node1".to_string())); - } - "2" => { - map().insert("item2".to_string(), "from_node2".to_string()).await?; - loop { - tokio::time::sleep(Duration::from_secs(1)).await; - let value = map().get("item1".to_string()).await?; - if value.is_some() { - assert_eq!(value, Some("from_node1".to_string())); - break; - } - } - let value = map().get("item2".to_string()).await?; - assert_eq!(value, Some("from_node2".to_string())); - tokio::time::sleep(Duration::from_secs(5)).await; - map().remove("item2".to_string()).await?; - let value = map().get("item2".to_string()).await?; - assert_eq!(value, None); - } - "3" => {} - _ => {} - } - Ok(()) -} - -async fn test_broadcast(node_id: &str) { - TardisFuns::cluster_subscribe_event(ClusterSubscriberEchoTest).await; - tokio::time::sleep(Duration::from_secs(6)).await; - match node_id { - "1" => { - broadcast().send("message1-1".to_string()).await.expect("send failed"); - broadcast().send("message1-2".to_string()).await.expect("send failed"); - } - "2" => { - broadcast().send("message2-1".to_string()).await.expect("send failed"); - broadcast().send("message2-2".to_string()).await.expect("send failed"); - } - "3" => { - broadcast().send("message3-1".to_string()).await.expect("send failed"); - broadcast().send("message3-2".to_string()).await.expect("send failed"); - } - _ => {} - } - let result = tokio::time::timeout(Duration::from_secs(20), async move { - loop { - if bc_recv_count().load(Ordering::SeqCst) == 6 { - break; - } else { - tokio::task::yield_now().await; - } - } - }) - .await; - assert!(result.is_ok(), "bc_recv_count={}", bc_recv_count().load(Ordering::SeqCst)); -} diff --git a/tardis/tests/test_websocket.rs b/tardis/tests/test_websocket.rs deleted file mode 100644 index 864a08be..00000000 --- a/tardis/tests/test_websocket.rs +++ /dev/null @@ -1,466 +0,0 @@ -#![allow(unreachable_code)] -use std::collections::HashMap; -use std::env; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::time::Duration; - -use lazy_static::lazy_static; -use poem::web::websocket::{BoxWebSocketUpgraded, WebSocket}; -use poem_openapi::param::Path; -use serde_json::json; -use tardis::basic::result::TardisResult; -#[cfg(feature = "cluster")] -use tardis::cluster::cluster_processor::set_local_node_id; -use tardis::consts::IP_LOCALHOST; -use tardis::web::web_server::{TardisWebServer, WebServerModule}; -use tardis::web::ws_client::TardisWebSocketMessageExt; -use tardis::web::ws_processor::{ - ws_echo, TardisWebsocketInstInfo, TardisWebsocketMessage, TardisWebsocketMgrMessage, TardisWebsocketReq, TardisWebsocketResp, WsBroadcast, WsBroadcastContext, WsHooks, - WS_SYSTEM_EVENT_AVATAR_ADD, WS_SYSTEM_EVENT_AVATAR_DEL, WS_SYSTEM_EVENT_INFO, -}; -use tardis::TardisFuns; -use tokio::sync::broadcast::Sender; -use tokio::sync::RwLock; -use tokio::time::sleep; -use tokio_tungstenite::tungstenite::Message; - -lazy_static! { - static ref SENDERS: Arc>>>> = Arc::new(RwLock::new(HashMap::new())); -} - -#[tokio::test(flavor = "multi_thread")] -async fn test_websocket() -> TardisResult<()> { - env::set_var("RUST_LOG", "info,tardis=trace"); - TardisFuns::init_log(); - #[cfg(feature = "cluster")] - set_local_node_id("test".into()); - let serv = TardisWebServer::init_simple(IP_LOCALHOST, 8080).unwrap(); - serv.add_route(WebServerModule::from(Api).with_ws::(100)).await; - serv.start().await?; - sleep(Duration::from_millis(500)).await; - - test_normal().await?; - - test_dyn_avatar().await?; - - Ok(()) -} - -async fn test_normal() -> TardisResult<()> { - static ERROR_COUNTER: AtomicUsize = AtomicUsize::new(0); - static SUB_COUNTER: AtomicUsize = AtomicUsize::new(0); - static NON_SUB_COUNTER: AtomicUsize = AtomicUsize::new(0); - - // message illegal test - let error_client_a = TardisFuns::ws_client("ws://127.0.0.1:8080/ws/broadcast/gerror/a", move |msg| async move { - if msg.is_ping() || msg.is_pong() { - return None; - } - if let Message::Text(msg) = msg { - println!("client_not_found recv:{}", msg); - assert!(msg.contains("message illegal")); - ERROR_COUNTER.fetch_add(1, Ordering::SeqCst); - } - None - }) - .await?; - error_client_a.send_text("hi".to_string()).await?; - // not found test - let error_client_b = TardisFuns::ws_client("ws://127.0.0.1:8080/ws/broadcast/gxxx/a", move |msg| async move { - if msg.is_ping() || msg.is_pong() { - return None; - } - if let Message::Text(msg) = msg { - println!("client_not_found recv:{}", msg); - assert_eq!(msg, "Websocket connection error: group not found"); - ERROR_COUNTER.fetch_add(1, Ordering::SeqCst); - } - None - }) - .await?; - error_client_b - .send_obj(&TardisWebsocketReq { - msg: json! {"hi"}, - from_avatar: "a".to_string(), - ..Default::default() - }) - .await?; - - // subscribe mode test - let sub_client_a = TardisFuns::ws_client("ws://127.0.0.1:8080/ws/broadcast/g1/a", move |msg| async move { - if msg.is_ping() || msg.is_pong() { - return None; - } - if let Message::Text(msg) = msg { - println!("client_a recv:{}", msg); - assert!(msg.contains(r#"service send:\"hi\""#)); - SUB_COUNTER.fetch_add(1, Ordering::SeqCst); - } - None - }) - .await?; - let sub_client_b1 = TardisFuns::ws_client("ws://127.0.0.1:8080/ws/broadcast/g1/b", move |msg| async move { - if msg.is_ping() || msg.is_pong() { - return None; - } - if let Message::Text(msg) = msg { - println!("client_b1 recv:{}", msg); - assert!(msg.contains(r#"service send:\"hi\""#)); - SUB_COUNTER.fetch_add(1, Ordering::SeqCst); - Some(Message::Text( - TardisFuns::json - .obj_to_string(&TardisWebsocketReq { - msg: json! {"client_b send:hi again"}, - from_avatar: "b".to_string(), - ..Default::default() - }) - .unwrap(), - )) - } else { - None - } - }) - .await?; - let sub_client_b2 = TardisFuns::ws_client("ws://127.0.0.1:8080/ws/broadcast/g1/b", move |msg| async move { - if msg.is_ping() || msg.is_pong() { - return None; - } - if let Message::Text(msg) = msg { - println!("client_b2 recv:{}", msg); - assert!(msg.contains(r#"service send:\"hi\""#)); - SUB_COUNTER.fetch_add(1, Ordering::SeqCst); - Some(Message::Text( - TardisFuns::json - .obj_to_string(&TardisWebsocketReq { - msg: json! {"client_b send:hi again"}, - from_avatar: "b".to_string(), - ..Default::default() - }) - .unwrap(), - )) - } else { - None - } - }) - .await?; - sub_client_a - .send_obj(&TardisWebsocketReq { - msg: json! {"hi"}, - from_avatar: "a".to_string(), - ..Default::default() - }) - .await?; - sub_client_b1 - .send_obj(&TardisWebsocketReq { - msg: json! {"hi"}, - from_avatar: "b".to_string(), - ..Default::default() - }) - .await?; - sub_client_b2 - .send_obj(&TardisWebsocketReq { - msg: json! {"hi"}, - from_avatar: "b".to_string(), - ..Default::default() - }) - .await?; - - // non-subscribe mode test - let non_sub_client_a = TardisFuns::ws_client("ws://127.0.0.1:8080/ws/broadcast/g2/a", move |msg| async move { - if msg.is_ping() || msg.is_pong() { - return None; - } - if let Message::Text(msg) = msg { - println!("client_a recv:{}", msg); - assert!(msg.contains(r#"service send:\"hi\""#)); - NON_SUB_COUNTER.fetch_add(1, Ordering::SeqCst); - } - None - }) - .await?; - let non_sub_client_b1 = TardisFuns::ws_client("ws://127.0.0.1:8080/ws/broadcast/g2/b", move |msg| async move { - if msg.is_ping() || msg.is_pong() { - return None; - } - if let Message::Text(msg) = msg { - println!("client_b1 recv:{}", msg); - assert!(msg.contains(r#"service send:\"hi\""#)); - NON_SUB_COUNTER.fetch_add(1, Ordering::SeqCst); - Some(Message::Text( - TardisFuns::json - .obj_to_string(&TardisWebsocketReq { - msg: json! {"client_b send:hi again"}, - from_avatar: "b".to_string(), - ..Default::default() - }) - .unwrap(), - )) - } else { - None - } - }) - .await?; - let non_sub_client_b2 = TardisFuns::ws_client("ws://127.0.0.1:8080/ws/broadcast/g2/b", move |msg| async move { - if msg.is_ping() || msg.is_pong() { - return None; - } - if let Message::Text(msg) = msg { - println!("client_b2 recv:{}", msg); - assert!(msg.contains(r#"service send:\"hi\""#)); - NON_SUB_COUNTER.fetch_add(1, Ordering::SeqCst); - Some(Message::Text( - TardisFuns::json - .obj_to_string(&TardisWebsocketReq { - msg: json! {"client_b send:hi again"}, - from_avatar: "b".to_string(), - ..Default::default() - }) - .unwrap(), - )) - } else { - None - } - }) - .await?; - - non_sub_client_a - .send_obj(&TardisWebsocketReq { - msg: json! {"hi"}, - from_avatar: "a".to_string(), - ..Default::default() - }) - .await?; - non_sub_client_b1 - .send_obj(&TardisWebsocketReq { - msg: json! {"hi"}, - from_avatar: "b".to_string(), - ..Default::default() - }) - .await?; - non_sub_client_b2 - .send_obj(&TardisWebsocketReq { - msg: json! {"hi"}, - from_avatar: "b".to_string(), - ..Default::default() - }) - .await?; - - sleep(Duration::from_millis(500)).await; - assert_eq!(ERROR_COUNTER.load(Ordering::SeqCst), 2); - assert_eq!(SUB_COUNTER.load(Ordering::SeqCst), 6); - assert_eq!(NON_SUB_COUNTER.load(Ordering::SeqCst), 5); - - Ok(()) -} - -async fn test_dyn_avatar() -> TardisResult<()> { - static INFO_COUNTER: AtomicUsize = AtomicUsize::new(0); - static ADD_COUNTER: AtomicUsize = AtomicUsize::new(0); - static DEL_COUNTER: AtomicUsize = AtomicUsize::new(0); - - TardisFuns::ws_client("ws://127.0.0.1:8080/ws/dyn/_/true", move |msg| async move { - if msg.is_ping() || msg.is_pong() { - return None; - } - let receive_msg = msg.str_to_obj::().unwrap(); - if receive_msg.event == Some(WS_SYSTEM_EVENT_AVATAR_ADD.to_string()) && receive_msg.msg.as_str().unwrap() == "c" { - ADD_COUNTER.fetch_add(1, Ordering::SeqCst); - let from_avatar = receive_msg.from_avatar.clone(); - return Some(Message::Text( - TardisFuns::json.obj_to_string(&receive_msg.into_req(String::new(), json! {"c"}, from_avatar.clone(), Some(vec![from_avatar]))).unwrap(), - )); - } - if receive_msg.event == Some(WS_SYSTEM_EVENT_AVATAR_DEL.to_string()) && receive_msg.msg.as_str().unwrap() == "c" { - panic!(); - DEL_COUNTER.fetch_add(1, Ordering::SeqCst); - } - None - }) - .await?; - - TardisFuns::ws_client("ws://127.0.0.1:8080/ws/dyn/a/false", move |msg| async move { - if msg.is_ping() || msg.is_pong() { - return None; - } - let receive_msg = TardisFuns::json.str_to_obj::(msg.to_text().unwrap()).unwrap(); - if receive_msg.event == Some(WS_SYSTEM_EVENT_AVATAR_ADD.to_string()) && receive_msg.msg.as_str().unwrap() == "c" { - // panic!(); - // ADD_COUNTER.fetch_add(1, Ordering::SeqCst); - } - if receive_msg.event == Some(WS_SYSTEM_EVENT_AVATAR_DEL.to_string()) && receive_msg.msg.as_str().unwrap() == "c" { - panic!(); - DEL_COUNTER.fetch_add(1, Ordering::SeqCst); - } - None - }) - .await?; - - let a_client = TardisFuns::ws_client("ws://127.0.0.1:8080/ws/dyn/a/false", move |msg| async move { - if msg.is_ping() || msg.is_pong() { - return None; - } - let receive_msg = TardisFuns::json.str_to_obj::(msg.to_text().unwrap()).unwrap(); - if receive_msg.event == Some(WS_SYSTEM_EVENT_AVATAR_ADD.to_string()) && receive_msg.msg.as_str().unwrap() == "c" { - // panic!(); - // ADD_COUNTER.fetch_add(1, Ordering::SeqCst); - } - if receive_msg.event == Some(WS_SYSTEM_EVENT_AVATAR_DEL.to_string()) && receive_msg.msg.as_str().unwrap() == "c" { - panic!(); - DEL_COUNTER.fetch_add(1, Ordering::SeqCst); - } - if receive_msg.event == Some(WS_SYSTEM_EVENT_INFO.to_string()) { - let info_msg = TardisFuns::json.json_to_obj::(receive_msg.msg).unwrap(); - assert_eq!(info_msg.avatars, vec!["c"]); - INFO_COUNTER.fetch_add(1, Ordering::SeqCst); - } - None - }) - .await?; - - TardisFuns::ws_client("ws://127.0.0.1:8080/ws/dyn/a/false", move |msg| async move { - if msg.is_ping() || msg.is_pong() { - return None; - } - let receive_msg = TardisFuns::json.str_to_obj::(msg.to_text().unwrap()).unwrap(); - if receive_msg.msg.as_str().unwrap() == "a" { - ADD_COUNTER.fetch_add(1, Ordering::SeqCst); - panic!(); - } - None - }) - .await?; - - // add avatar - a_client - .send_obj(&TardisWebsocketReq { - msg: json! {"c"}, - from_avatar: "a".to_string(), - to_avatars: Some(vec!["_".to_string()]), - event: Some(WS_SYSTEM_EVENT_AVATAR_ADD.to_string()), - ..Default::default() - }) - .await?; - - sleep(Duration::from_millis(100)).await; - - // del avatar - a_client - .send_obj(&TardisWebsocketReq { - msg: json! {"a"}, - from_avatar: "a".to_string(), - event: Some(WS_SYSTEM_EVENT_AVATAR_DEL.to_string()), - ..Default::default() - }) - .await?; - - sleep(Duration::from_millis(100)).await; - - // fetch info - a_client - .send_obj(&TardisWebsocketReq { - msg: json! {""}, - from_avatar: "c".to_string(), - event: Some(WS_SYSTEM_EVENT_INFO.to_string()), - ..Default::default() - }) - .await?; - - sleep(Duration::from_millis(200)).await; - assert_eq!(ADD_COUNTER.load(Ordering::SeqCst), 1); - assert_eq!(INFO_COUNTER.load(Ordering::SeqCst), 1); - assert_eq!(DEL_COUNTER.load(Ordering::SeqCst), 0); - - Ok(()) -} - -#[derive(Debug, Clone)] -struct Api; - -pub struct FilterHook { - service_name: &'static str, - expect: serde_json::Value, -} -impl WsHooks for FilterHook { - async fn on_process(&self, req: tardis::web::ws_processor::TardisWebsocketReq, _context: &WsBroadcastContext) -> Option { - println!("service {} recv:{}:{}", self.service_name, req.from_avatar, req.msg); - if req.msg == self.expect { - return None; - } - Some(TardisWebsocketResp { - msg: json! { format!("service send:{}", TardisFuns::json.json_to_string(req.msg).unwrap())}, - to_avatars: vec![], - ignore_avatars: vec![], - }) - } -} -pub struct ErrorHook; -impl WsHooks for ErrorHook { - async fn on_process(&self, req: tardis::web::ws_processor::TardisWebsocketReq, _context: &WsBroadcastContext) -> Option { - println!("service gerror recv:{}:{}", req.from_avatar, req.msg); - None - } -} - -pub struct PassThroughHook; -impl WsHooks for PassThroughHook { - async fn on_process(&self, req: tardis::web::ws_processor::TardisWebsocketReq, _context: &WsBroadcastContext) -> Option { - Some(TardisWebsocketResp { - msg: req.msg, - to_avatars: req.to_avatars.unwrap_or_default(), - ignore_avatars: vec![], - }) - } -} -#[poem_openapi::OpenApi] -impl Api { - #[oai(path = "/ws/broadcast/:group/:name", method = "get")] - async fn ws_broadcast(&self, group: Path, name: Path, websocket: WebSocket) -> BoxWebSocketUpgraded { - if !SENDERS.read().await.contains_key(&group.0) { - SENDERS.write().await.insert(group.0.clone(), tokio::sync::broadcast::channel::<_>(100).0); - } - let sender = SENDERS.read().await.get(&group.0).unwrap().clone(); - if group.0 == "g1" { - WsBroadcast::new( - sender, - FilterHook { - service_name: "g1", - expect: json! {"client_b send:hi again"}, - }, - WsBroadcastContext::new(false, true), - ) - .run(vec![name.0], websocket) - .await - } else if group.0 == "g2" { - WsBroadcast::new( - sender, - FilterHook { - service_name: "g2", - expect: json! {"client_b send:hi again"}, - }, - WsBroadcastContext::new(false, false), - ) - .run(vec![name.0], websocket) - .await - } else if group.0 == "gerror" { - WsBroadcast::new(sender, ErrorHook, WsBroadcastContext::new(false, false)).run(vec![name.0], websocket).await - } else { - ws_echo( - name.0, - HashMap::new(), - websocket, - |_, _, _| async move { Some("Websocket connection error: group not found".to_string()) }, - |_, _| async move {}, - ) - } - } - - #[oai(path = "/ws/dyn/:name/:mgr", method = "get")] - async fn ws_dyn_broadcast(&self, name: Path, mgr: Path, websocket: WebSocket) -> BoxWebSocketUpgraded { - if !SENDERS.read().await.contains_key("dyn") { - SENDERS.write().await.insert("dyn".to_string(), tokio::sync::broadcast::channel::<_>(100).0); - } - let sender = SENDERS.read().await.get("dyn").unwrap().clone(); - WsBroadcast::new(sender, PassThroughHook, WsBroadcastContext::new(mgr.0, true)).run(vec![name.0], websocket).await - } -} From 1157c593586848232f13cba0dc9fee5568b01682 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Thu, 22 Aug 2024 17:45:35 +0800 Subject: [PATCH 2/5] clippy and fmt --- tardis/src/config/config_dto.rs | 2 -- tardis/tests/test_web_server.rs | 12 +----------- 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/tardis/src/config/config_dto.rs b/tardis/src/config/config_dto.rs index 6a4b80ab..0290eb7e 100644 --- a/tardis/src/config/config_dto.rs +++ b/tardis/src/config/config_dto.rs @@ -119,7 +119,6 @@ impl FrameworkConfig { pub fn log(&self) -> &LogConfig { self.log.as_ref().expect("missing component config of log") } - } /// Application configuration / 应用配置 @@ -222,4 +221,3 @@ impl Default for ConfCenterConfig { } } } - diff --git a/tardis/tests/test_web_server.rs b/tardis/tests/test_web_server.rs index 5fdaa4d7..f3ce26ef 100644 --- a/tardis/tests/test_web_server.rs +++ b/tardis/tests/test_web_server.rs @@ -10,7 +10,6 @@ use poem::endpoint::{BoxEndpoint, ToDynEndpoint}; use poem::http::Method; use poem::{IntoResponse, Middleware, Response}; use serde_json::json; -use tardis::web::web_server::status_api::{TardisStatus, TardisStatusApi}; use tardis::web::web_server::WebServerModule; use testcontainers::clients; use tokio::time::sleep; @@ -98,7 +97,6 @@ async fn test_web_server() -> TardisResult<()> { let redis_container = TardisTestContainer::redis_custom(&docker); let redis_port = redis_container.get_host_port_ipv4(6379); let redis_url = format!("redis://127.0.0.1:{redis_port}/0"); - tardis::cluster::cluster_processor::set_local_node_id("test".into()); start_serv(web_url, &redis_url).await?; sleep(Duration::from_millis(500)).await; @@ -145,8 +143,6 @@ async fn start_serv(web_url: &str, redis_url: &str) -> TardisResult<()> { }) .await?; TardisFuns::web_server() - .add_module("_tardis", TardisStatusApi) - .await .add_module("todo", TodosApi) .await .add_module("other", OtherApi) @@ -160,12 +156,6 @@ async fn start_serv(web_url: &str, redis_url: &str) -> TardisResult<()> { } async fn test_basic(url: &str) -> TardisResult<()> { - // Status - let response = TardisFuns::web_client().get::>(format!("{url}/_tardis/status").as_str(), None).await?; - assert_eq!(response.code, 200); - assert_eq!(response.body.as_ref().unwrap().code, TARDIS_RESULT_SUCCESS_CODE); - tracing::info!("status: {:?}", response.body); - // Normal let response = TardisFuns::web_client().get::>(format!("{url}/todo/todos/1").as_str(), None).await?; assert_eq!(response.code, 200); @@ -477,7 +467,7 @@ async fn test_context(url: &str) -> TardisResult<()> { format!("{url}/other/context_in_header").as_str(), [( web_server_config.context_conf.context_header_name.clone(), - TardisFuns::crypto.base64.encode(&TardisFuns::json.obj_to_string(&context).unwrap()), + TardisFuns::crypto.base64.encode(TardisFuns::json.obj_to_string(&context).unwrap()), )], ) .await? From 6c55d800cb45780a3c065772448302b299d806e5 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Fri, 23 Aug 2024 09:55:46 +0800 Subject: [PATCH 3/5] trace version conflict --- tardis/Cargo.toml | 10 +++++----- tardis/src/basic/tracing.rs | 5 ++++- tardis/src/config/config_dto/log/tracing.rs | 10 ++++++---- tardis/src/lib.rs | 4 ++++ tardis/src/test/test_container/nacos_server.rs | 9 +++++---- 5 files changed, 24 insertions(+), 14 deletions(-) diff --git a/tardis/Cargo.toml b/tardis/Cargo.toml index 7da328a5..b1f9d352 100644 --- a/tardis/Cargo.toml +++ b/tardis/Cargo.toml @@ -106,15 +106,15 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-appender = { version = "0.2", optional = true } console-subscriber = { version = "0.2", optional = true } # Tracing -tracing-opentelemetry = { version = "0.23", optional = true } -opentelemetry = { version = "0.22", default-features = false, features = [ +tracing-opentelemetry = { version = "0.25", optional = true } +opentelemetry = { version = "0.24", default-features = false, features = [ "trace", ], optional = true } -opentelemetry_sdk = { version = "0.22", default-features = false, features = [ +opentelemetry_sdk = { version = "0.24", default-features = false, features = [ "rt-tokio", "trace", ], optional = true } -opentelemetry-otlp = { version = "0.15", features = [ +opentelemetry-otlp = { version = "0.17", features = [ "reqwest-client", "reqwest-rustls", "http-proto", @@ -258,7 +258,7 @@ tokio = { version = "1", features = [ ] } criterion = { version = "0.5" } poem-grpc-build = "0.4" -prost = "0.12" +prost = "0.13.1" strip-ansi-escapes = "0.2.0" portpicker = "0.1.1" # macros diff --git a/tardis/src/basic/tracing.rs b/tardis/src/basic/tracing.rs index da8805a8..48aec2e2 100644 --- a/tardis/src/basic/tracing.rs +++ b/tardis/src/basic/tracing.rs @@ -270,8 +270,11 @@ impl TardisTracing { } }; tracing::debug!("[Tardis.Tracing] Batch installing tracer. If you are blocked here, try running tokio in multithread."); - let tracer = tracer.install_batch(opentelemetry_sdk::runtime::Tokio).expect("fail to install otlp tracer"); + let provider = tracer.install_batch(opentelemetry_sdk::runtime::Tokio).expect("fail to install otlp tracer"); + use opentelemetry::trace::TracerProvider; + let tracer = provider.tracer(""); tracing::debug!("[Tardis.Tracing] Initialized otlp tracer"); + opentelemetry::global::set_tracer_provider(provider); tracer } diff --git a/tardis/src/config/config_dto/log/tracing.rs b/tardis/src/config/config_dto/log/tracing.rs index 52d99c7f..f5f02b98 100644 --- a/tardis/src/config/config_dto/log/tracing.rs +++ b/tardis/src/config/config_dto/log/tracing.rs @@ -13,15 +13,17 @@ pub enum OtlpProtocol { HttpProtobuf, } -impl ToString for OtlpProtocol { - fn to_string(&self) -> String { + +impl std::fmt::Display for OtlpProtocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - OtlpProtocol::Grpc => "grpc".to_string(), - OtlpProtocol::HttpProtobuf => "http/protobuf".to_string(), + OtlpProtocol::Grpc => write!(f, "grpc"), + OtlpProtocol::HttpProtobuf => write!(f, "http/protobuf"), } } } + impl FromStr for OtlpProtocol { type Err = TardisError; diff --git a/tardis/src/lib.rs b/tardis/src/lib.rs index 9b58b2b5..271071f1 100644 --- a/tardis/src/lib.rs +++ b/tardis/src/lib.rs @@ -601,6 +601,7 @@ impl TardisFuns { /// /// 1. Initialize the cache configuration / 初始化缓存配置 @see [init](Self::init) /// 2. Call this function to complete various cache processing operations / 调用本函数完成各种缓存处理操作 + /// /// E.g. /// ```ignore /// use std::collections::HashMap; @@ -698,6 +699,7 @@ impl TardisFuns { /// /// 1. Initialize the cache configuration / 初始化缓存配置 @see [init](Self::init) /// 2. Call this function to complete various cache processing operations / 调用本函数完成各种缓存处理操作 + /// /// E.g. /// ```ignore /// use tardis::TardisFuns; @@ -736,6 +738,7 @@ impl TardisFuns { /// /// 1. Initialize the mq configuration / 初始化队列配置 @see [init](Self::init) /// 2. Call this function to complete various mq processing operations / 调用本函数完成各种队列处理操作 + /// /// E.g. /// ```ignore /// use tardis::TardisFuns; @@ -773,6 +776,7 @@ impl TardisFuns { /// /// 1. Initialize the web client configuration / 初始化web客户端配置 @see [init](Self::init) /// 2. Call this function to complete various search processing operations / 调用本函数完成各种搜索处理操作 + /// /// E.g. /// ```ignore /// use tardis::TardisFuns; diff --git a/tardis/src/test/test_container/nacos_server.rs b/tardis/src/test/test_container/nacos_server.rs index 25cc1a08..e588b935 100644 --- a/tardis/src/test/test_container/nacos_server.rs +++ b/tardis/src/test/test_container/nacos_server.rs @@ -36,11 +36,12 @@ pub enum NacosServerMode { Cluster, } -impl ToString for NacosServerMode { - fn to_string(&self) -> String { + +impl std::fmt::Display for NacosServerMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - NacosServerMode::Standalone => "standalone".to_string(), - NacosServerMode::Cluster => "cluster".to_string(), + NacosServerMode::Standalone => write!(f, "standalone"), + NacosServerMode::Cluster => write!(f, "cluster"), } } } From 99bfe0db7f03c1e52034d8d87824c6ed8e4ef276 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Fri, 23 Aug 2024 09:59:36 +0800 Subject: [PATCH 4/5] fmt --- tardis/src/config/config_dto/log/tracing.rs | 2 -- tardis/src/lib.rs | 8 ++++---- tardis/src/test/test_container/nacos_server.rs | 1 - 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/tardis/src/config/config_dto/log/tracing.rs b/tardis/src/config/config_dto/log/tracing.rs index f5f02b98..07f6918e 100644 --- a/tardis/src/config/config_dto/log/tracing.rs +++ b/tardis/src/config/config_dto/log/tracing.rs @@ -13,7 +13,6 @@ pub enum OtlpProtocol { HttpProtobuf, } - impl std::fmt::Display for OtlpProtocol { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -23,7 +22,6 @@ impl std::fmt::Display for OtlpProtocol { } } - impl FromStr for OtlpProtocol { type Err = TardisError; diff --git a/tardis/src/lib.rs b/tardis/src/lib.rs index 271071f1..eb7d9395 100644 --- a/tardis/src/lib.rs +++ b/tardis/src/lib.rs @@ -601,7 +601,7 @@ impl TardisFuns { /// /// 1. Initialize the cache configuration / 初始化缓存配置 @see [init](Self::init) /// 2. Call this function to complete various cache processing operations / 调用本函数完成各种缓存处理操作 - /// + /// /// E.g. /// ```ignore /// use std::collections::HashMap; @@ -699,7 +699,7 @@ impl TardisFuns { /// /// 1. Initialize the cache configuration / 初始化缓存配置 @see [init](Self::init) /// 2. Call this function to complete various cache processing operations / 调用本函数完成各种缓存处理操作 - /// + /// /// E.g. /// ```ignore /// use tardis::TardisFuns; @@ -738,7 +738,7 @@ impl TardisFuns { /// /// 1. Initialize the mq configuration / 初始化队列配置 @see [init](Self::init) /// 2. Call this function to complete various mq processing operations / 调用本函数完成各种队列处理操作 - /// + /// /// E.g. /// ```ignore /// use tardis::TardisFuns; @@ -776,7 +776,7 @@ impl TardisFuns { /// /// 1. Initialize the web client configuration / 初始化web客户端配置 @see [init](Self::init) /// 2. Call this function to complete various search processing operations / 调用本函数完成各种搜索处理操作 - /// + /// /// E.g. /// ```ignore /// use tardis::TardisFuns; diff --git a/tardis/src/test/test_container/nacos_server.rs b/tardis/src/test/test_container/nacos_server.rs index e588b935..01ce4351 100644 --- a/tardis/src/test/test_container/nacos_server.rs +++ b/tardis/src/test/test_container/nacos_server.rs @@ -36,7 +36,6 @@ pub enum NacosServerMode { Cluster, } - impl std::fmt::Display for NacosServerMode { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { From ed01b2f4d111e3c04c0cc0394d39f835dd54c767 Mon Sep 17 00:00:00 2001 From: 4t145 Date: Fri, 23 Aug 2024 10:37:34 +0800 Subject: [PATCH 5/5] fix opentelemetry reload issue --- tardis/src/basic/tracing.rs | 58 ++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/tardis/src/basic/tracing.rs b/tardis/src/basic/tracing.rs index 48aec2e2..da3e8748 100644 --- a/tardis/src/basic/tracing.rs +++ b/tardis/src/basic/tracing.rs @@ -249,33 +249,39 @@ impl TardisTracing { #[cfg(feature = "tracing")] fn create_otlp_tracer() -> opentelemetry_sdk::trace::Tracer { use crate::config::config_dto::OtlpProtocol; - tracing::debug!("[Tardis.Tracing] Initializing otlp tracer"); - let protocol = std::env::var(OTEL_EXPORTER_OTLP_PROTOCOL).ok().map(|s| s.parse::().unwrap_or_default()).unwrap_or_default(); - let tracer = opentelemetry_otlp::new_pipeline().tracing(); - let tracer = match protocol { - OtlpProtocol::Grpc => { - let mut exporter = opentelemetry_otlp::new_exporter().tonic(); - // Check if we need TLS - if let Ok(endpoint) = std::env::var(OTEL_EXPORTER_OTLP_ENDPOINT) { - if endpoint.to_lowercase().starts_with("https") { - exporter = exporter.with_tls_config(Default::default()); - } - } - tracer.with_exporter(exporter) - } - OtlpProtocol::HttpProtobuf => { - let headers = Self::parse_otlp_headers_from_env(); - let exporter = opentelemetry_otlp::new_exporter().http().with_headers(headers.into_iter().collect()); - tracer.with_exporter(exporter) - } - }; - tracing::debug!("[Tardis.Tracing] Batch installing tracer. If you are blocked here, try running tokio in multithread."); - let provider = tracer.install_batch(opentelemetry_sdk::runtime::Tokio).expect("fail to install otlp tracer"); + use std::sync::OnceLock; + static INITIALIZED: OnceLock = OnceLock::new(); use opentelemetry::trace::TracerProvider; - let tracer = provider.tracer(""); - tracing::debug!("[Tardis.Tracing] Initialized otlp tracer"); - opentelemetry::global::set_tracer_provider(provider); - tracer + INITIALIZED + .get_or_init(|| { + tracing::debug!("[Tardis.Tracing] Initializing otlp tracer"); + let protocol = std::env::var(OTEL_EXPORTER_OTLP_PROTOCOL).ok().map(|s| s.parse::().unwrap_or_default()).unwrap_or_default(); + let tracer = opentelemetry_otlp::new_pipeline().tracing(); + let tracer = match protocol { + OtlpProtocol::Grpc => { + let mut exporter = opentelemetry_otlp::new_exporter().tonic(); + // Check if we need TLS + if let Ok(endpoint) = std::env::var(OTEL_EXPORTER_OTLP_ENDPOINT) { + if endpoint.to_lowercase().starts_with("https") { + exporter = exporter.with_tls_config(Default::default()); + } + } + tracer.with_exporter(exporter) + } + OtlpProtocol::HttpProtobuf => { + let headers = Self::parse_otlp_headers_from_env(); + let exporter = opentelemetry_otlp::new_exporter().http().with_headers(headers.into_iter().collect()); + tracer.with_exporter(exporter) + } + }; + tracing::debug!("[Tardis.Tracing] Batch installing tracer. If you are blocked here, try running tokio in multithread."); + let provider = tracer.install_batch(opentelemetry_sdk::runtime::Tokio).expect("fail to install otlp tracer"); + tracing::debug!("[Tardis.Tracing] Initialized otlp tracer"); + opentelemetry::global::set_tracer_provider(provider.clone()); + provider + }) + .clone() + .tracer("") } #[cfg(feature = "tracing")]