Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix typo and add tests #92

Merged
merged 4 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tardis/src/cluster/cluster_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
ident: ident.into(),
local_broadcast_channel: sender,
});

tracing::trace!("[Tardis.Cluster] create broadcast channel: {}", cluster_chan.event_name());
let subscriber = BroadcastChannelSubscriber {
channel: Arc::downgrade(&cluster_chan),
event_name: cluster_chan.event_name(),
Expand Down Expand Up @@ -97,7 +97,7 @@ where
async fn subscribe(&self, message_req: TardisClusterMessageReq) -> TardisResult<Option<Value>> {
if let Ok(message) = serde_json::from_value(message_req.msg) {
if let Some(chan) = self.channel.upgrade() {
let _ = chan.send(message);
let _ = chan.local_broadcast_channel.send(message);
} else {
unsubscribe(&self.event_name()).await;
}
Expand Down
123 changes: 109 additions & 14 deletions tardis/src/cluster/cluster_hashmap.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
borrow::Cow,
collections::HashMap,
fmt,
sync::Arc,
time::{Duration, Instant},
};
Expand All @@ -18,30 +19,90 @@ use super::{
};

// Cshm = ClusterStaticHashMap
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct ClusterStaticHashMap<K, V> {
pub map: Arc<RwLock<HashMap<K, V>>>,
pub ident: &'static str,
pub cluster_sync: bool,
pub modify_handler: Arc<HashMap<String, Box<dyn Fn(&mut V, &Value) + Send + Sync>>>,
}

impl<K, V> fmt::Debug for ClusterStaticHashMap<K, V> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ClusterStaticHashMap").field("ident", &self.ident).field("cluster_sync", &self.cluster_sync).finish()
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
enum CshmEvent<K, V> {
Insert(Vec<(K, V)>),
Remove { keys: Vec<K> },
Get { key: K },
Modify { key: K, mapper: String, modify: Value },
}

pub struct ClusterStaticHashMapBuilder<K, V> {
ident: &'static str,
cluster_sync: bool,
modify_handler: HashMap<String, Box<dyn Fn(&mut V, &Value) + Send + Sync>>,
_phantom: std::marker::PhantomData<(K, V)>,
}

impl<K, V> ClusterStaticHashMapBuilder<K, V> {
pub fn new(ident: &'static str) -> Self {
Self {
ident,
cluster_sync: true,
modify_handler: HashMap::new(),
_phantom: std::marker::PhantomData,
}
}
pub fn sync(mut self, cluster_sync: bool) -> Self {
self.cluster_sync = cluster_sync;
self
}
pub fn modify_handler(mut self, mapper: &'static str, handler: impl Fn(&mut V, &Value) + Send + Sync + 'static) -> Self {
self.modify_handler.insert(mapper.to_string(), Box::new(handler));
self
}
pub fn build(self) -> ClusterStaticHashMap<K, V> {
ClusterStaticHashMap {
map: Arc::new(RwLock::new(HashMap::new())),
ident: self.ident,
cluster_sync: self.cluster_sync,
modify_handler: Arc::new(self.modify_handler),
}
}
}

impl<K, V> ClusterStaticHashMap<K, V>
where
K: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + Hash + Eq,
V: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned,
{
pub fn builder(ident: &'static str) -> ClusterStaticHashMapBuilder<K, V> {
ClusterStaticHashMapBuilder::new(ident)
}

pub fn new(ident: &'static str) -> Self {
Self {
map: Arc::new(RwLock::new(HashMap::new())),
ident,
cluster_sync: true,
modify_handler: Arc::new(HashMap::new()),
}
}
pub fn new_standalone(ident: &'static str) -> Self {
Self {
map: Arc::new(RwLock::new(HashMap::new())),
ident,
cluster_sync: false,
modify_handler: Arc::new(HashMap::new()),
}
}
pub fn is_cluster(&self) -> bool {
self.cluster_sync
}
pub fn event_name(&self) -> String {
format!("tardis/hashmap/{ident}", ident = self.ident)
}
Expand All @@ -50,10 +111,11 @@ where
}
pub async fn insert(&self, key: K, value: V) -> TardisResult<()> {
self.map.write().await.insert(key.clone(), value.clone());
let event = CshmEvent::<K, V>::Insert(vec![(key, value)]);
let json = TardisJson.obj_to_json(&event)?;
dbg!(&json);
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
if self.is_cluster() {
let event = CshmEvent::<K, V>::Insert(vec![(key, value)]);
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
}
Ok(())
}
pub async fn batch_insert(&self, pairs: Vec<(K, V)>) -> TardisResult<()> {
Expand All @@ -63,16 +125,20 @@ where
wg.insert(key.clone(), value.clone());
}
}
let event = CshmEvent::<K, V>::Insert(pairs);
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
if self.is_cluster() {
let event = CshmEvent::<K, V>::Insert(pairs);
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
}
Ok(())
}
pub async fn remove(&self, key: K) -> TardisResult<()> {
self.map.write().await.remove(&key);
let event = CshmEvent::<K, V>::Remove { keys: vec![key] };
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
if self.is_cluster() {
let event = CshmEvent::<K, V>::Remove { keys: vec![key] };
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
}
Ok(())
}
pub async fn batch_remove(&self, keys: Vec<K>) -> TardisResult<()> {
Expand All @@ -82,9 +148,11 @@ where
wg.remove(key);
}
}
let event = CshmEvent::<K, V>::Remove { keys };
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
if self.is_cluster() {
let event = CshmEvent::<K, V>::Remove { keys };
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
}
Ok(())
}
pub async fn get(&self, key: K) -> TardisResult<Option<V>> {
Expand All @@ -95,6 +163,9 @@ where
}
}
async fn get_remote(&self, key: K) -> TardisResult<Option<V>> {
if !self.is_cluster() {
return Ok(None);
}
let peer_count = peer_count().await;
if peer_count == 0 {
return Ok(None);
Expand Down Expand Up @@ -126,6 +197,21 @@ where
}
Ok(None)
}
pub async fn modify(&self, key: K, mapper: &'static str, modify: Value) -> TardisResult<()> {
let mapper = mapper.to_string();
let mut wg = self.map.write().await;
if let Some(v) = wg.get_mut(&key) {
if let Some(handler) = self.modify_handler.get(&mapper) {
handler(v, &modify);
}
}
if self.is_cluster() {
let event = CshmEvent::<K, V>::Modify { key, mapper, modify };
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
}
Ok(())
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -156,6 +242,15 @@ where
let value = rg.get(&key);
Ok(Some(TardisJson.obj_to_json(&value)?))
}
CshmEvent::Modify { key, mapper, modify } => {
let mut wg = self.map.write().await;
if let Some(v) = wg.get_mut(&key) {
if let Some(handler) = self.modify_handler.get(&mapper) {
handler(v, &modify);
}
}
Ok(None)
}
}
}
fn event_name(&self) -> Cow<'static, str> {
Expand Down
24 changes: 13 additions & 11 deletions tardis/src/cluster/cluster_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ use crate::config::config_dto::FrameworkConfig;
use crate::tardis_static;
use crate::web::web_server::TardisWebServer;
use crate::web::ws_client::TardisWSClient;
use crate::web::ws_processor::cluster_protocol::Avatar;
use crate::web::ws_processor::ws_insts_mapping_avatars;
// use crate::web::ws_processor::cluster_protocol::Avatar;
use crate::{basic::result::TardisResult, TardisFuns};
use async_trait::async_trait;

pub const CLUSTER_NODE_WHOAMI: &str = "__cluster_node_who_am_i__";
pub const EVENT_PING: &str = "tardis/ping";
pub const CLUSTER_MESSAGE_CACHE_SIZE: usize = 10000;
pub const WHOIAM_TIMEOUT: Duration = Duration::from_secs(30);
pub const WHOAMI_TIMEOUT: Duration = Duration::from_secs(30);

type StaticCowStr = Cow<'static, str>;
// static LOCAL_NODE_ID_SETTER: OnceLock<String> = OnceLock::new();
// static LOCAL_SOCKET_ADDR: OnceLock<SocketAddr> = OnceLock::new();

tardis_static! {
pub async set local_socket_addr: SocketAddr;
pub async set local_node_id: String;
pub async set responsor_dispatcher: mpsc::Sender<TardisClusterMessageResp>;
pub async set responser_dispatcher: mpsc::Sender<TardisClusterMessageResp>;
pub(crate) cache_nodes: Arc<RwLock<HashMap<ClusterRemoteNodeKey, TardisClusterNodeRemote>>>;
subscribers: Arc<RwLock<HashMap<StaticCowStr, Box<dyn TardisClusterSubscriber>>>>;
}
Expand Down Expand Up @@ -190,16 +190,16 @@ async fn init_node(cluster_server: &TardisWebServer, access_addr: SocketAddr) ->
info!("[Tardis.Cluster] Initializing node");
set_local_node_id(TardisFuns::field.nanoid());
set_local_socket_addr(access_addr);
debug!("[Tardis.Cluster] Initializing response dispathcer");
set_responsor_dispatcher(init_response_dispatcher());
debug!("[Tardis.Cluster] Initializing response dispatcher");
set_responser_dispatcher(init_response_dispatcher());
debug!("[Tardis.Cluster] Register exchange route");
cluster_server.add_route(ClusterAPI).await;

debug!("[Tardis.Cluster] Register default events");
subscribe(EventPing).await;
#[cfg(feature = "web-server")]
{
subscribe(Avatar).await;
subscribe(ws_insts_mapping_avatars().clone()).await;
}

info!("[Tardis.Cluster] Initialized node");
Expand Down Expand Up @@ -240,7 +240,9 @@ pub async fn refresh_nodes(active_nodes: &HashSet<SocketAddr>) -> TardisResult<(
let mut table = String::new();
for (k, v) in cache_nodes.iter() {
use std::fmt::Write;
writeln!(&mut table, "{k:20} | {v:40} ").expect("shouldn't fail");
if matches!(k, ClusterRemoteNodeKey::NodeId(_)) {
writeln!(&mut table, "{k:20} | {v:40} ").expect("shouldn't fail");
}
}
trace!("[Tardis.Cluster] cache nodes table \n{table}");
Ok(())
Expand All @@ -259,7 +261,7 @@ async fn add_remote_node(socket_addr: SocketAddr) -> TardisResult<TardisClusterN
if let tokio_tungstenite::tungstenite::Message::Text(message) = message {
match TardisFuns::json.str_to_obj::<TardisClusterMessageResp>(&message) {
Ok(message_resp) => {
if let Err(error) = responsor_dispatcher().await.send(message_resp).await {
if let Err(error) = responser_dispatcher().await.send(message_resp).await {
error!("[Tardis.Cluster] [Client] response message {message}: {error}");
}
}
Expand All @@ -270,7 +272,7 @@ async fn add_remote_node(socket_addr: SocketAddr) -> TardisResult<TardisClusterN
})
.await?;
let client = Arc::new(client);
let resp = ClusterEvent::new(EVENT_PING).target(client.clone()).one_response(Some(WHOIAM_TIMEOUT)).publish_one_response().await?;
let resp = ClusterEvent::new(EVENT_PING).target(client.clone()).one_response(Some(WHOAMI_TIMEOUT)).publish_one_response().await?;
let resp_node_id = resp.resp_node_id;
let remote = TardisClusterNodeRemote { node_id: resp_node_id, client };
Ok(remote)
Expand Down
1 change: 0 additions & 1 deletion tardis/src/cluster/cluster_publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ pub async fn publish_event_with_listener<S: Listener>(
listener: S,
) -> TardisResult<S::Reply> {
let node_id = local_node_id().await.to_string();
dbg!(&node_id);
let event = event.into();
let target = target.into();
let target_debug = format!("{target:?}");
Expand Down
22 changes: 11 additions & 11 deletions tardis/src/cluster/cluster_receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ enum ResponseFn {
Multitime(Box<dyn Fn(TardisClusterMessageResp) -> bool + Send + Sync>),
}
tardis_static! {
responsor_subscribers: RwLock<HashMap::<String, ResponseFn>>;
responser_subscribers: RwLock<HashMap::<String, ResponseFn>>;
}

pub async fn listen_reply<S: Listener>(strategy: S, id: String) -> S::Reply {
Expand All @@ -33,11 +33,11 @@ pub(crate) fn init_response_dispatcher() -> mpsc::Sender<TardisClusterMessageRes
node_id = resp.resp_node_id,
resp = resp
);
if let Some(subscriber) = responsor_subscribers().read().await.get(&id) {
if let Some(subscriber) = responser_subscribers().read().await.get(&id) {
match subscriber {
ResponseFn::Once(_) => {
tokio::spawn(async move {
if let Some(ResponseFn::Once(f)) = responsor_subscribers().write().await.remove(&id) {
if let Some(ResponseFn::Once(f)) = responser_subscribers().write().await.remove(&id) {
f(resp)
}
});
Expand All @@ -46,7 +46,7 @@ pub(crate) fn init_response_dispatcher() -> mpsc::Sender<TardisClusterMessageRes
let drop_me = f(resp);
if drop_me {
tokio::spawn(async move {
responsor_subscribers().write().await.remove(&id);
responser_subscribers().write().await.remove(&id);
});
}
}
Expand Down Expand Up @@ -99,16 +99,16 @@ pub mod listen {
tokio::spawn(async move {
tokio::time::sleep(timeout).await;

// super::responsor_subscribers().write().await.remove(&id);
// super::responser_subscribers().write().await.remove(&id);
// tracing::trace!("[Tardis.Cluster] message {id} timeout");

if let Some(_task) = super::responsor_subscribers().write().await.remove(&id) {
if let Some(_task) = super::responser_subscribers().write().await.remove(&id) {
tracing::trace!("[Tardis.Cluster] message {id} timeout");
}
})
})
};
super::responsor_subscribers().write().await.insert(
super::responser_subscribers().write().await.insert(
id,
ResponseFn::Once(Box::new(move |resp| {
tracing::trace!("[Tardis.Cluster] Once listener receive resp {resp:?}");
Expand Down Expand Up @@ -140,10 +140,10 @@ pub mod listen {
let id = id.clone();
tokio::spawn(async move {
tx.closed().await;
super::responsor_subscribers().write().await.remove(&id);
super::responser_subscribers().write().await.remove(&id);
});
}
super::responsor_subscribers().write().await.insert(
super::responser_subscribers().write().await.insert(
id,
ResponseFn::Multitime(Box::new(move |resp| {
if tx.is_closed() {
Expand Down Expand Up @@ -186,13 +186,13 @@ pub mod listen {
let id = id.clone();
tokio::spawn(async move {
if tx.receiver_count() == 0 {
super::responsor_subscribers().write().await.remove(&id);
super::responser_subscribers().write().await.remove(&id);
} else {
tokio::task::yield_now().await;
}
});
}
super::responsor_subscribers().write().await.insert(
super::responser_subscribers().write().await.insert(
id,
ResponseFn::Multitime(Box::new(move |resp| {
let _ = tx.send(resp);
Expand Down
2 changes: 1 addition & 1 deletion tardis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@

#![doc(html_logo_url = "https://raw.githubusercontent.com/ideal-world/tardis/main/logo.png")]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(clippy::unwrap_used, clippy::undocumented_unsafe_blocks)]
#![warn(clippy::unwrap_used, clippy::undocumented_unsafe_blocks, clippy::dbg_macro)]

extern crate core;
#[macro_use]
Expand Down
Loading
Loading