Skip to content

Commit

Permalink
Merge pull request #92 from 4t145/fix-typo-and-add-test
Browse files Browse the repository at this point in the history
Fix typo and add tests
  • Loading branch information
4t145 authored Dec 5, 2023
2 parents 4ced2b2 + d8c3f2f commit eeba49e
Show file tree
Hide file tree
Showing 12 changed files with 289 additions and 130 deletions.
4 changes: 2 additions & 2 deletions tardis/src/cluster/cluster_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
ident: ident.into(),
local_broadcast_channel: sender,
});

tracing::trace!("[Tardis.Cluster] create broadcast channel: {}", cluster_chan.event_name());
let subscriber = BroadcastChannelSubscriber {
channel: Arc::downgrade(&cluster_chan),
event_name: cluster_chan.event_name(),
Expand Down Expand Up @@ -97,7 +97,7 @@ where
async fn subscribe(&self, message_req: TardisClusterMessageReq) -> TardisResult<Option<Value>> {
if let Ok(message) = serde_json::from_value(message_req.msg) {
if let Some(chan) = self.channel.upgrade() {
let _ = chan.send(message);
let _ = chan.local_broadcast_channel.send(message);
} else {
unsubscribe(&self.event_name()).await;
}
Expand Down
123 changes: 109 additions & 14 deletions tardis/src/cluster/cluster_hashmap.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
borrow::Cow,
collections::HashMap,
fmt,
sync::Arc,
time::{Duration, Instant},
};
Expand All @@ -18,30 +19,90 @@ use super::{
};

// Cshm = ClusterStaticHashMap
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct ClusterStaticHashMap<K, V> {
pub map: Arc<RwLock<HashMap<K, V>>>,
pub ident: &'static str,
pub cluster_sync: bool,
pub modify_handler: Arc<HashMap<String, Box<dyn Fn(&mut V, &Value) + Send + Sync>>>,
}

impl<K, V> fmt::Debug for ClusterStaticHashMap<K, V> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ClusterStaticHashMap").field("ident", &self.ident).field("cluster_sync", &self.cluster_sync).finish()
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
enum CshmEvent<K, V> {
Insert(Vec<(K, V)>),
Remove { keys: Vec<K> },
Get { key: K },
Modify { key: K, mapper: String, modify: Value },
}

pub struct ClusterStaticHashMapBuilder<K, V> {
ident: &'static str,
cluster_sync: bool,
modify_handler: HashMap<String, Box<dyn Fn(&mut V, &Value) + Send + Sync>>,
_phantom: std::marker::PhantomData<(K, V)>,
}

impl<K, V> ClusterStaticHashMapBuilder<K, V> {
pub fn new(ident: &'static str) -> Self {
Self {
ident,
cluster_sync: true,
modify_handler: HashMap::new(),
_phantom: std::marker::PhantomData,
}
}
pub fn sync(mut self, cluster_sync: bool) -> Self {
self.cluster_sync = cluster_sync;
self
}
pub fn modify_handler(mut self, mapper: &'static str, handler: impl Fn(&mut V, &Value) + Send + Sync + 'static) -> Self {
self.modify_handler.insert(mapper.to_string(), Box::new(handler));
self
}
pub fn build(self) -> ClusterStaticHashMap<K, V> {
ClusterStaticHashMap {
map: Arc::new(RwLock::new(HashMap::new())),
ident: self.ident,
cluster_sync: self.cluster_sync,
modify_handler: Arc::new(self.modify_handler),
}
}
}

impl<K, V> ClusterStaticHashMap<K, V>
where
K: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned + Hash + Eq,
V: Send + Sync + 'static + Clone + serde::Serialize + serde::de::DeserializeOwned,
{
pub fn builder(ident: &'static str) -> ClusterStaticHashMapBuilder<K, V> {
ClusterStaticHashMapBuilder::new(ident)
}

pub fn new(ident: &'static str) -> Self {
Self {
map: Arc::new(RwLock::new(HashMap::new())),
ident,
cluster_sync: true,
modify_handler: Arc::new(HashMap::new()),
}
}
pub fn new_standalone(ident: &'static str) -> Self {
Self {
map: Arc::new(RwLock::new(HashMap::new())),
ident,
cluster_sync: false,
modify_handler: Arc::new(HashMap::new()),
}
}
pub fn is_cluster(&self) -> bool {
self.cluster_sync
}
pub fn event_name(&self) -> String {
format!("tardis/hashmap/{ident}", ident = self.ident)
}
Expand All @@ -50,10 +111,11 @@ where
}
pub async fn insert(&self, key: K, value: V) -> TardisResult<()> {
self.map.write().await.insert(key.clone(), value.clone());
let event = CshmEvent::<K, V>::Insert(vec![(key, value)]);
let json = TardisJson.obj_to_json(&event)?;
dbg!(&json);
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
if self.is_cluster() {
let event = CshmEvent::<K, V>::Insert(vec![(key, value)]);
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
}
Ok(())
}
pub async fn batch_insert(&self, pairs: Vec<(K, V)>) -> TardisResult<()> {
Expand All @@ -63,16 +125,20 @@ where
wg.insert(key.clone(), value.clone());
}
}
let event = CshmEvent::<K, V>::Insert(pairs);
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
if self.is_cluster() {
let event = CshmEvent::<K, V>::Insert(pairs);
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
}
Ok(())
}
pub async fn remove(&self, key: K) -> TardisResult<()> {
self.map.write().await.remove(&key);
let event = CshmEvent::<K, V>::Remove { keys: vec![key] };
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
if self.is_cluster() {
let event = CshmEvent::<K, V>::Remove { keys: vec![key] };
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
}
Ok(())
}
pub async fn batch_remove(&self, keys: Vec<K>) -> TardisResult<()> {
Expand All @@ -82,9 +148,11 @@ where
wg.remove(key);
}
}
let event = CshmEvent::<K, V>::Remove { keys };
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
if self.is_cluster() {
let event = CshmEvent::<K, V>::Remove { keys };
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
}
Ok(())
}
pub async fn get(&self, key: K) -> TardisResult<Option<V>> {
Expand All @@ -95,6 +163,9 @@ where
}
}
async fn get_remote(&self, key: K) -> TardisResult<Option<V>> {
if !self.is_cluster() {
return Ok(None);
}
let peer_count = peer_count().await;
if peer_count == 0 {
return Ok(None);
Expand Down Expand Up @@ -126,6 +197,21 @@ where
}
Ok(None)
}
pub async fn modify(&self, key: K, mapper: &'static str, modify: Value) -> TardisResult<()> {
let mapper = mapper.to_string();
let mut wg = self.map.write().await;
if let Some(v) = wg.get_mut(&key) {
if let Some(handler) = self.modify_handler.get(&mapper) {
handler(v, &modify);
}
}
if self.is_cluster() {
let event = CshmEvent::<K, V>::Modify { key, mapper, modify };
let json = TardisJson.obj_to_json(&event)?;
let _result = publish_event_no_response(self.event_name(), json, ClusterEventTarget::Broadcast).await;
}
Ok(())
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -156,6 +242,15 @@ where
let value = rg.get(&key);
Ok(Some(TardisJson.obj_to_json(&value)?))
}
CshmEvent::Modify { key, mapper, modify } => {
let mut wg = self.map.write().await;
if let Some(v) = wg.get_mut(&key) {
if let Some(handler) = self.modify_handler.get(&mapper) {
handler(v, &modify);
}
}
Ok(None)
}
}
}
fn event_name(&self) -> Cow<'static, str> {
Expand Down
24 changes: 13 additions & 11 deletions tardis/src/cluster/cluster_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ use crate::config::config_dto::FrameworkConfig;
use crate::tardis_static;
use crate::web::web_server::TardisWebServer;
use crate::web::ws_client::TardisWSClient;
use crate::web::ws_processor::cluster_protocol::Avatar;
use crate::web::ws_processor::ws_insts_mapping_avatars;
// use crate::web::ws_processor::cluster_protocol::Avatar;
use crate::{basic::result::TardisResult, TardisFuns};
use async_trait::async_trait;

pub const CLUSTER_NODE_WHOAMI: &str = "__cluster_node_who_am_i__";
pub const EVENT_PING: &str = "tardis/ping";
pub const CLUSTER_MESSAGE_CACHE_SIZE: usize = 10000;
pub const WHOIAM_TIMEOUT: Duration = Duration::from_secs(30);
pub const WHOAMI_TIMEOUT: Duration = Duration::from_secs(30);

type StaticCowStr = Cow<'static, str>;
// static LOCAL_NODE_ID_SETTER: OnceLock<String> = OnceLock::new();
// static LOCAL_SOCKET_ADDR: OnceLock<SocketAddr> = OnceLock::new();

tardis_static! {
pub async set local_socket_addr: SocketAddr;
pub async set local_node_id: String;
pub async set responsor_dispatcher: mpsc::Sender<TardisClusterMessageResp>;
pub async set responser_dispatcher: mpsc::Sender<TardisClusterMessageResp>;
pub(crate) cache_nodes: Arc<RwLock<HashMap<ClusterRemoteNodeKey, TardisClusterNodeRemote>>>;
subscribers: Arc<RwLock<HashMap<StaticCowStr, Box<dyn TardisClusterSubscriber>>>>;
}
Expand Down Expand Up @@ -190,16 +190,16 @@ async fn init_node(cluster_server: &TardisWebServer, access_addr: SocketAddr) ->
info!("[Tardis.Cluster] Initializing node");
set_local_node_id(TardisFuns::field.nanoid());
set_local_socket_addr(access_addr);
debug!("[Tardis.Cluster] Initializing response dispathcer");
set_responsor_dispatcher(init_response_dispatcher());
debug!("[Tardis.Cluster] Initializing response dispatcher");
set_responser_dispatcher(init_response_dispatcher());
debug!("[Tardis.Cluster] Register exchange route");
cluster_server.add_route(ClusterAPI).await;

debug!("[Tardis.Cluster] Register default events");
subscribe(EventPing).await;
#[cfg(feature = "web-server")]
{
subscribe(Avatar).await;
subscribe(ws_insts_mapping_avatars().clone()).await;
}

info!("[Tardis.Cluster] Initialized node");
Expand Down Expand Up @@ -240,7 +240,9 @@ pub async fn refresh_nodes(active_nodes: &HashSet<SocketAddr>) -> TardisResult<(
let mut table = String::new();
for (k, v) in cache_nodes.iter() {
use std::fmt::Write;
writeln!(&mut table, "{k:20} | {v:40} ").expect("shouldn't fail");
if matches!(k, ClusterRemoteNodeKey::NodeId(_)) {
writeln!(&mut table, "{k:20} | {v:40} ").expect("shouldn't fail");
}
}
trace!("[Tardis.Cluster] cache nodes table \n{table}");
Ok(())
Expand All @@ -259,7 +261,7 @@ async fn add_remote_node(socket_addr: SocketAddr) -> TardisResult<TardisClusterN
if let tokio_tungstenite::tungstenite::Message::Text(message) = message {
match TardisFuns::json.str_to_obj::<TardisClusterMessageResp>(&message) {
Ok(message_resp) => {
if let Err(error) = responsor_dispatcher().await.send(message_resp).await {
if let Err(error) = responser_dispatcher().await.send(message_resp).await {
error!("[Tardis.Cluster] [Client] response message {message}: {error}");
}
}
Expand All @@ -270,7 +272,7 @@ async fn add_remote_node(socket_addr: SocketAddr) -> TardisResult<TardisClusterN
})
.await?;
let client = Arc::new(client);
let resp = ClusterEvent::new(EVENT_PING).target(client.clone()).one_response(Some(WHOIAM_TIMEOUT)).publish_one_response().await?;
let resp = ClusterEvent::new(EVENT_PING).target(client.clone()).one_response(Some(WHOAMI_TIMEOUT)).publish_one_response().await?;
let resp_node_id = resp.resp_node_id;
let remote = TardisClusterNodeRemote { node_id: resp_node_id, client };
Ok(remote)
Expand Down
1 change: 0 additions & 1 deletion tardis/src/cluster/cluster_publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ pub async fn publish_event_with_listener<S: Listener>(
listener: S,
) -> TardisResult<S::Reply> {
let node_id = local_node_id().await.to_string();
dbg!(&node_id);
let event = event.into();
let target = target.into();
let target_debug = format!("{target:?}");
Expand Down
22 changes: 11 additions & 11 deletions tardis/src/cluster/cluster_receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ enum ResponseFn {
Multitime(Box<dyn Fn(TardisClusterMessageResp) -> bool + Send + Sync>),
}
tardis_static! {
responsor_subscribers: RwLock<HashMap::<String, ResponseFn>>;
responser_subscribers: RwLock<HashMap::<String, ResponseFn>>;
}

pub async fn listen_reply<S: Listener>(strategy: S, id: String) -> S::Reply {
Expand All @@ -33,11 +33,11 @@ pub(crate) fn init_response_dispatcher() -> mpsc::Sender<TardisClusterMessageRes
node_id = resp.resp_node_id,
resp = resp
);
if let Some(subscriber) = responsor_subscribers().read().await.get(&id) {
if let Some(subscriber) = responser_subscribers().read().await.get(&id) {
match subscriber {
ResponseFn::Once(_) => {
tokio::spawn(async move {
if let Some(ResponseFn::Once(f)) = responsor_subscribers().write().await.remove(&id) {
if let Some(ResponseFn::Once(f)) = responser_subscribers().write().await.remove(&id) {
f(resp)
}
});
Expand All @@ -46,7 +46,7 @@ pub(crate) fn init_response_dispatcher() -> mpsc::Sender<TardisClusterMessageRes
let drop_me = f(resp);
if drop_me {
tokio::spawn(async move {
responsor_subscribers().write().await.remove(&id);
responser_subscribers().write().await.remove(&id);
});
}
}
Expand Down Expand Up @@ -99,16 +99,16 @@ pub mod listen {
tokio::spawn(async move {
tokio::time::sleep(timeout).await;

// super::responsor_subscribers().write().await.remove(&id);
// super::responser_subscribers().write().await.remove(&id);
// tracing::trace!("[Tardis.Cluster] message {id} timeout");

if let Some(_task) = super::responsor_subscribers().write().await.remove(&id) {
if let Some(_task) = super::responser_subscribers().write().await.remove(&id) {
tracing::trace!("[Tardis.Cluster] message {id} timeout");
}
})
})
};
super::responsor_subscribers().write().await.insert(
super::responser_subscribers().write().await.insert(
id,
ResponseFn::Once(Box::new(move |resp| {
tracing::trace!("[Tardis.Cluster] Once listener receive resp {resp:?}");
Expand Down Expand Up @@ -140,10 +140,10 @@ pub mod listen {
let id = id.clone();
tokio::spawn(async move {
tx.closed().await;
super::responsor_subscribers().write().await.remove(&id);
super::responser_subscribers().write().await.remove(&id);
});
}
super::responsor_subscribers().write().await.insert(
super::responser_subscribers().write().await.insert(
id,
ResponseFn::Multitime(Box::new(move |resp| {
if tx.is_closed() {
Expand Down Expand Up @@ -186,13 +186,13 @@ pub mod listen {
let id = id.clone();
tokio::spawn(async move {
if tx.receiver_count() == 0 {
super::responsor_subscribers().write().await.remove(&id);
super::responser_subscribers().write().await.remove(&id);
} else {
tokio::task::yield_now().await;
}
});
}
super::responsor_subscribers().write().await.insert(
super::responser_subscribers().write().await.insert(
id,
ResponseFn::Multitime(Box::new(move |resp| {
let _ = tx.send(resp);
Expand Down
2 changes: 1 addition & 1 deletion tardis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
#![doc(html_logo_url = "https://raw.githubusercontent.com/ideal-world/tardis/main/logo.png")]
#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(clippy::unwrap_used, clippy::undocumented_unsafe_blocks)]
#![warn(clippy::unwrap_used, clippy::undocumented_unsafe_blocks, clippy::dbg_macro)]

extern crate core;
#[macro_use]
Expand Down
Loading

0 comments on commit eeba49e

Please sign in to comment.