diff --git a/src/general/network/m_p2p.rs b/src/general/network/m_p2p.rs index 312d1e9..cee8e6d 100644 --- a/src/general/network/m_p2p.rs +++ b/src/general/network/m_p2p.rs @@ -2,7 +2,10 @@ use std::{ collections::HashMap, marker::PhantomData, net::SocketAddr, - sync::atomic::{AtomicU32, Ordering}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, time::Duration, }; @@ -26,9 +29,20 @@ use ws_derive::LogicalModule; pub type TaskId = u32; pub type MsgId = u32; +/* The core function and interface of P2P module are defined. It is mainly used to manage the communication between P2P nodes, +including the sending and receiving of messages. */ #[async_trait] pub trait P2PKernel: LogicalModule { + + /* This function is used to send messages to other nodes and wait for a response from the other node. The 'nodeid' parameter indicates + the ID of the target node, and 'req_data' indicates the message content to be sent. The 'WSResult>' function returns a Future of + type WSResult> containing the content of the response message from the other node. */ async fn send_for_response(&self, nodeid: NodeID, req_data: Vec) -> WSResult>; + + /* This function is used to send a message to a specified node without waiting for a response from the other node. Parameter 'node' + indicates the ID of the target node, 'task_id' indicates the task ID of the message, 'msg_id' indicates the ID of the message, and 'req_data' + indicates the content of the message to be sent. The function returns a Future of type 'WSResult<()>', representing the result of the + message being sent. */ async fn send( &self, node: NodeID, @@ -38,32 +52,23 @@ pub trait P2PKernel: LogicalModule { ) -> WSResult<()>; } +// For message sending #[derive(Default)] pub struct MsgSender { _phantom: std::marker::PhantomData, } -#[derive(Default)] -pub struct MsgHandler { - _phantom: std::marker::PhantomData, -} - -#[derive(Default)] -pub struct RPCCaller { - _phantom: std::marker::PhantomData, -} - -#[derive(Default)] -pub struct RPCHandler { - _phantom: std::marker::PhantomData, -} - impl MsgSender { pub fn new() -> Self { Self { _phantom: std::marker::PhantomData, } } + + /* This function is used to send a message to the specified node. The 'p2p' parameter is a reference to the P2PModule instance, which is used + to obtain the P2P core functionality. 'node_id' Indicates the ID of the target node. 'msg' is the message to be sent and is of type generic M, + which is the message type that implements the MsgPack trait. The function sends the message to the specified node by calling the send method + of p2p_kernel, and returns a Future of type WSResult<()>, representing the result of the message. */ pub async fn send(&self, p2p: &P2PModule, node_id: NodeID, msg: M) -> WSResult<()> { p2p.p2p_kernel .send(node_id, 0, msg.msg_id(), msg.encode_to_vec()) @@ -71,12 +76,25 @@ impl MsgSender { } } + +/// For message processing +#[derive(Default)] +pub struct MsgHandler { + _phantom: std::marker::PhantomData, +} + impl MsgHandler { pub fn new() -> Self { Self { _phantom: std::marker::PhantomData, } } + + /* This function is used for message processing. The p2p parameter is a reference to the P2PModule instance, which is used to obtain + the P2P core functionality. msg_handler is a closure for processing received messages that takes a Responser argument and a generic + M message and returns a Future of type WSResult<()> representing the result of processing the message. At the same time, the function + records the id of the message being processed and the function handling it by calling the p2p regist_dispatch method (calling the + callback function of the record), which uses the default value of generic M to identify the message type. */ pub fn regist( &self, p2p: &P2PModule, @@ -88,15 +106,28 @@ impl MsgHandler { } } +/// Used to send RPC requests to other nodes +#[derive(Default)] +pub struct RPCCaller { + _phantom: std::marker::PhantomData, +} + impl RPCCaller { pub fn new() -> Self { Self { _phantom: std::marker::PhantomData, } } + + /* Used to register the request type of the RPC call and call p2p's regist_dispatch method in the function (call record + callback function) to record the message id being processed and the function that handled it */ pub fn regist(&self, p2p: &P2PModule) { p2p.regist_rpc_send::(); } + + /* This function is used to make an RPC call request to the specified node, accepting the p2p instance, node ID, RPC request, + and an optional timeout time as parameters. In the internal, it calls the call_rpc() method to send the RPC request, and returns an + asynchronous result of type WSResult, containing the response of the RPC request. */ pub async fn call( &self, p2p: &P2PModule, @@ -108,18 +139,31 @@ impl RPCCaller { } } +// Used to process received RPC requests +#[derive(Default)] +pub struct RPCHandler { + _phantom: std::marker::PhantomData, +} + impl RPCHandler { pub fn new() -> Self { Self { _phantom: std::marker::PhantomData, } } + + /* Handlers for logging RPC requests. Accepts a handler function req_handler that takes an RPCResponsor parameter and an + R parameter and returns a WSResult<()>. Internally, it calls the p2p regist_rpc_recv() method to record the received RPC request and its handlers. */ pub fn regist(&self, p2p: &P2PModule, req_handler: F) where F: Fn(RPCResponsor, R) -> WSResult<()> + Send + Sync + 'static, { p2p.regist_rpc_recv::(req_handler); } + + /* It is used to initiate an RPC call request to a specified node. Accept the p2p instance, node ID, RPC request, + and optional timeout as parameters. Internally, it invokes p2p's call_rpc() method to send the RPC request and returns + an asynchronous result of type WSResult containing the result of the RPC response. */ pub async fn call( &self, p2p: &P2PModule, @@ -131,9 +175,11 @@ impl RPCHandler { } } +/// The core function of P2P module includes various data structures and methods to deal with P2P communication. #[derive(LogicalModule)] pub struct P2PModule { // pub logical_modules_view: P2PModuleLMView, + /// A read/write lock that stores the mapping of message distribution between nodes and is used to distribute messages to the corresponding handlers based on message type and task ID. dispatch_map: RwLock< HashMap< u32, @@ -145,14 +191,27 @@ pub struct P2PModule { >, >, >, + + /// A skip table that stores tasks waiting for a response, including task ids, node ids, and response senders. waiting_tasks: crossbeam_skiplist::SkipMap< (TaskId, NodeID), Mutex>>>, >, + + /// A hash table of read/write locks to store the status information of an ongoing RPC call. + rpc_holder: RwLock>>>, + + /// P2PQuicNode instance, which handles the specific logic of P2P communication. pub p2p_kernel: P2PQuicNode, // pub state_trans_tx: tokio::sync::broadcast::Sender, + + /// Node configuration information. pub nodes_config: NodesConfig, + + /// An atomic U32 type used to generate a unique task ID. pub next_task_id: AtomicU32, + + /// P2PView example, which is used to provide view information about the P2P module. view: P2PView, } @@ -165,6 +224,11 @@ logical_module_view_impl!(P2PView, p2p, P2PModule); #[async_trait] impl LogicalModule for P2PModule { + /* LogicalModule trait 中定义的一个关联函数,用于创建 P2PModule 的新实例。接受一个 LogicalModuleNewArgs 类型的参数, + 其中包含了创建实例所需的各种参数。 */ + + /* An associated function defined in the LogicalModule trait to create a new instance of P2PModule. + Accepts a parameter of type LogicalModuleNewArgs, which contains the various parameters required to create the instance. */ fn inner_new(args: LogicalModuleNewArgs) -> Self where Self: Sized, @@ -176,34 +240,52 @@ impl LogicalModule for P2PModule { p2p_kernel: P2PQuicNode::new(args.clone()), dispatch_map: HashMap::new().into(), waiting_tasks: Default::default(), + rpc_holder: HashMap::new().into(), nodes_config, next_task_id: AtomicU32::new(0), view: P2PView::new(args.logical_modules_ref.clone()), } } - + // LogicalModule trait 中定义的一个关联函数,用于启动 P2PModule。在内部调用了 P2PQuicNode 的start方法,用于启动quic协议的网络连接 + + /* An associated function defined in the LogicalModule trait to start P2PModule. Internally, the start method + of P2PQuicNode is called to start the network connection of the quic protocol */ async fn start(&self) -> WSResult> { let sub = self.p2p_kernel.start().await?; Ok(sub) } } +/// RPC调用的响应方,用于发送RPC调用的响应结果。 pub struct RPCResponsor { _p: PhantomData, responsor: Responser, } + impl RPCResponsor { + /* 一个异步函数,用于发送RPC调用的响应结果。resp 参数,表示要发送的响应结果,类型为 R::Resp, 调用 + responsor 的 send_resp() 方法,并等待其完成,将结果返回 */ + + /* An asynchronous function that sends the result of an RPC call in response. The resp parameter, + which represents the response result to be sent, is of type R::Resp, calls responsor's send_resp() + method, waits for it to complete, and returns the result */ pub async fn send_resp(&self, resp: R::Resp) -> WSResult<()> { self.responsor.send_resp(resp).await } + // 获取相应节点的 id + // Gets the id of the corresponding node pub fn node_id(&self) -> NodeID { self.responsor.node_id } + // 获取响应的任务id + // Gets the task id of the response pub fn task_id(&self) -> TaskId { self.responsor.task_id } } +/// 代表了RPC调用的响应方的信息,包括任务ID、节点ID以及P2P视图。 +/// Information that represents the responder to the RPC call, including the task ID, node ID, and P2P view. pub struct Responser { task_id: TaskId, pub node_id: NodeID, @@ -211,6 +293,17 @@ pub struct Responser { } impl Responser { + + /* 一个异步函数,用于发送RPC调用的响应结果。接受 resp 参数,表示要发送的响应结果,类型为实现了 MsgPack + Default trait 的 RESP 类型。 + 如果当前节点是响应方所在的节点(即本地节点),则调用本地的消息分发函数来处理响应。如果当前节点不是响应方所在的节点,则通过 P2P + 视图发送响应给目标节点。最终返回一个 WSResult<()> 类型的结果,表示响应发送的结果。*/ + + /* An asynchronous function that sends the result of an RPC call in response. Accepts the resp parameter, representing + the response result to be sent, of the RESP type that implements the MsgPack + Default trait. If the current node is + the node where the responder resides (that is, the local node), the local message distribution function is invoked to + process the response. If the current node is not the node where the responder resides, the response is sent to the + target node through the P2P view. Finally, a result of type WSResult<()> is returned, representing the result sent + in response. */ pub async fn send_resp(&self, resp: RESP) -> WSResult<()> where RESP: MsgPack + Default, @@ -231,6 +324,9 @@ impl Responser { } } +/// 枚举,定义了用于消息分发的载荷类型,它可以是远程数据(Remote(Bytes))或者是本地数据(Local(Box))。 +/* Enumeration, which defines the type of payload used for message distribution, which can be either Remote + data (Remote(Bytes)) or Local data (Local(Box)) */ pub enum DispatchPayload { Remote(Bytes), /// zero copy of sub big memory ptr like vector,string,etc. @@ -244,6 +340,9 @@ impl From for DispatchPayload { } impl P2PModule { + /* 根据节点的地址查找对应的节点ID。实现:遍历节点配置信息,查找与指定地址匹配的节点ID,并返回结果。 */ + /* Search for the node ID based on the node address. Implementation: Traverse the node configuration + information, find the node ID that matches the specified address, and return the result. */ pub fn find_peer_id(&self, addr: &SocketAddr) -> Option { self.nodes_config.peers.iter().find_map( |(id, peer)| { @@ -260,12 +359,16 @@ impl P2PModule { // } // 消息回来时,调用记录的回调函数 + /* 将消息类型 M 对应的处理函数 f 注册到分发映射表中,以便接收到相应消息时能够调用对应的处理函数进行处理。 */ + /* The handler f corresponding to message type M is registered in the distribution mapping table so that the + corresponding handler function can be invoked for processing when receiving the corresponding message. */ fn regist_dispatch(&self, m: M, f: F) where M: MsgPack + Default, F: Fn(Responser, M) -> WSResult<()> + Send + Sync + 'static, { // self.p2p.regist_dispatch(); + // map 这里是一个获得了写权限的 hashmap,key是消息id,value是处理消息的函数。 let mut map = self.dispatch_map.write(); let old = map.insert( m.msg_id(), @@ -294,6 +397,9 @@ impl P2PModule { assert!(old.is_none()); } + /* 注册了一个用于发送RPC请求的处理函数,当接收到RPC响应时,调用相应的回调函数发送响应给等待的任务。 */ + /* A handler for sending RPC requests is registered, and when an RPC response is received, the corresponding + callback function is called to send the response to the waiting task. */ fn regist_rpc_send(&self) where REQ: RPCReq, @@ -322,6 +428,9 @@ impl P2PModule { } // 自动完成response的匹配 + /* 注册了一个用于接收RPC请求的处理函数,当接收到RPC请求时,调用注册的处理函数进行处理。 */ + /* A handler for receiving RPC requests is registered, and when an RPC request is received, the + registered handler is invoked for processing. */ fn regist_rpc_recv(&self, req_handler: F) where REQ: RPCReq, @@ -348,7 +457,9 @@ impl P2PModule { // self.regist_rpc_recv::(req_handler); // self.regist_rpc_send::(); // } - + + /* 通过 P2P 核心节点向指定节点发送响应消息,并等待响应发送完成。 */ + /* The P2P core node sends a response message to the specified node, and waits for the response to complete. */ async fn send_resp(&self, node_id: NodeID, task_id: TaskId, resp: RESP) -> WSResult<()> where RESP: MsgPack + Default, @@ -363,6 +474,8 @@ impl P2PModule { .await } + /* 调用指定节点的RPC服务,并等待返回响应消息。 */ + /* Calls the RPC service of the specified node and waits for a response message to be returned. */ #[inline] async fn call_rpc(&self, node_id: NodeID, req: R, dur: Option) -> WSResult where @@ -372,6 +485,17 @@ impl P2PModule { self.call_rpc_inner::(node_id, req, dur).await } + + /* 用于实际执行RPC调用的内部逻辑。它首先生成一个唯一的任务ID,然后创建一个单次通道(oneshot channel)用于接收响应消息。 + 接着根据节点ID判断是本地调用还是远程调用,如果是本地调用,则直接调用分发函数进行处理;如果是远程调用,则通过P2P核心节点 + 发送RPC请求消息,并等待一定时长(由dur参数指定)来接收响应消息。如果超时未收到响应,则返回错误。最后,将收到的响应消息 + 反序列化为指定的响应类型并返回。 */ + /* The internal logic used to actually execute the RPC call. It first generates a unique task ID and then creates + a oneshot channel for receiving response messages. Then according to the node ID to determine whether the local + call or remote call, if the local call, directly call the distribution function for processing; In the case of a + remote call, an RPC request message is sent through the P2P core node and a certain amount of time (specified by + the dur parameter) is waited to receive the response message. If no response is received, an error is returned. + Finally, the received response message is deserialized to the specified response type and returned. */ async fn call_rpc_inner( &self, node_id: NodeID, @@ -470,6 +594,8 @@ impl P2PModule { Ok(*resp) } + /* 根据消息ID查找注册的处理函数,并调用相应的处理函数进行消息处理。 */ + /* The registered handler is found based on the message ID and the corresponding handler is called for message processing. */ pub fn dispatch( &self, nid: NodeID, @@ -487,6 +613,9 @@ impl P2PModule { Err(WsNetworkLogicErr::MsgIdNotDispatchable(id).into()) } } + + /* 根据节点ID在节点配置信息中查找对应的节点地址,并返回结果。 */ + /* The system searches for the node address in the node configuration information based on the node ID and returns the result. */ pub fn get_addr_by_id(&self, id: NodeID) -> WSResult { self.nodes_config.peers.get(&id).map_or_else( || Err(WsNetworkLogicErr::InvaidNodeID(id).into()), @@ -494,3 +623,19 @@ impl P2PModule { ) } } + +// #[async_trait] +// impl P2P for P2PModule { +// async fn send_for_response(&self, nodeid: NodeID, req_data: Vec) -> WSResult> { +// // let (tx, rx) = tokio::sync::oneshot::channel(); +// // self.p2p_kernel.send(nodeid, req_data, tx).await?; +// // let res = rx.await?; + +// Ok(vec![]) +// } + +// async fn send(&self, nodeid: NodeID, msg_id:u64,req_data: Vec) -> WSResult<()> { +// self.p2p_kernel.send(nodeid, req_data).await?; +// Ok(()) +// } +// } diff --git a/src/general/network/m_p2p_quic.rs b/src/general/network/m_p2p_quic.rs index 2f48b0a..a6cb4ba 100644 --- a/src/general/network/m_p2p_quic.rs +++ b/src/general/network/m_p2p_quic.rs @@ -12,6 +12,7 @@ //! //! We then proceed to listening for new connections/messages. + use async_trait::async_trait; use parking_lot::{Mutex, RwLock}; @@ -45,10 +46,24 @@ struct P2PQuicNodeLocked { sub_tasks: Vec>, } +/* 用于存储P2PQuicNode节点的共享数据,包括对等连接、广播发送器等。它提供了一些函数来管理对等节点之间的连接。 */ +/* Used to store shared data of P2PQuicNode nodes, including peer connections and broadcast transmitters. + It provides functions to manage connections between peers. */ struct P2PQuicNodeShared { + locked: Mutex, + + /* 广播发送器,用于向所有连接的对等节点发送消息。 */ + /* A broadcast transmitter that sends messages to all connected peers. */ btx: BroadcastSender, // shared_connection_map: tokio::sync::Mutex>, + + /* 保存与每个对等节点(peer)的连接信息的哈希映射,其中SocketAddr作为键,值包括一个读写锁,保护对等节点的连接向量, + 每个连接对应一个Connection实例。还有一个原子整数,用于维护轮询索引和活动连接数,避免了对连接向量的频繁锁定。 */ + /* Saves a hash map of the Connection information to each peer, where SocketAddr is the key and the value + includes a read/write lock that protects the peer's connection vector, with each connection corresponding + to a connection instance. There is also an atomic integer that maintains the polling index and the number + of active connections, avoiding frequent locking of the join vector. */ peer_connections: RwLock< HashMap< SocketAddr, @@ -65,6 +80,12 @@ struct P2PQuicNodeShared { } impl P2PQuicNodeShared { + + /* 预留给定对等节点的连接资源。首先检查给定对等节点是否已经存在于连接映射中,如果不存在,则创建一个新的连接信息并插入 + 到映射中。以确保每个对等节点都有一个对应的连接信息,便于后续进行连接管理。 */ + /* Reserve connection resources for the specified peer node. First check whether the given peer already exists + in the connection map, and if not, create a new connection information and insert it into the map. To ensure + that each peer node has a corresponding connection information, facilitate subsequent connection management. */ async fn reserve_peer_conn(&self, peer: SocketAddr) { if !self.peer_connections.read().contains_key(&peer) { let _ = self.peer_connections.write().insert( @@ -82,13 +103,23 @@ impl P2PQuicNodeShared { logical_module_view_impl!(View); logical_module_view_impl!(View, p2p, P2PModule); + +/// 用于表示基于QUIC协议的P2P节点,其中包含了P2P节点的视图和共享资源。 +/// It is used to represent a P2P node based on the QUIC protocol, which contains the view and shared resources of the P2P node. #[derive(LogicalModule)] pub struct P2PQuicNode { + /* P2P节点的视图,包含了P2P模块的信息,如节点配置、对等节点等。 */ + /* The view of a P2P node contains information about the P2P module, such as node configuration and peer nodes. */ pub logical_modules_view: View, + /* Arcs pointing to shared resources are wrapped with Arc smart Pointers to ensure secure access in multithreaded environments. */ shared: Arc, } + impl P2PQuicNode { + /* 返回P2P节点中的P2P模块的引用,通过视图获取到P2P模块,以便后续进行基于P2P模块的操作。 */ + /* Returns the reference of the P2P module in the P2P node, and obtains the P2P module from the view to facilitate + subsequent operations based on the P2P module. */ fn p2p_base(&self) -> &P2PModule { self.logical_modules_view.p2p() } @@ -96,6 +127,10 @@ impl P2PQuicNode { #[async_trait] impl LogicalModule for P2PQuicNode { + + /* 用于创建新的 P2PQuicNode 实例。初始化节点,包括创建视图和共享资源,并返回初始化后的节点实例。 */ + /* Used to create a new P2PQuicNode instance. Initializes the node, including creating views and + shared resources, and returns the initialized node instance. */ fn inner_new(args: LogicalModuleNewArgs) -> Self where Self: Sized, @@ -111,6 +146,9 @@ impl LogicalModule for P2PQuicNode { // name: format!("{}::{}", args.parent_name, Self::self_name()), } } + + /* 用于启动节点,包括建立连接、监听新连接等操作 */ + /* It is used to start a node, including establishing connections and listening for new connections */ async fn start(&self) -> WSResult> { let this_addr = self.p2p_base().nodes_config.this.1.addr; // create an endpoint for us to listen on and send from. @@ -131,10 +169,12 @@ impl LogicalModule for P2PQuicNode { let shared = self.shared.clone(); let mut net_tasks: Vec = vec![]; - + + /* 遍历配置文件中的每个节点,并为它们创建一个主动连接任务。对于每个节点,节点将尝试连接,如果成功建立连接, + 则发送本节点的地址信息,并处理连接后续的消息。 */ for (n, n_config) in &self.p2p_base().nodes_config.peers { let endpoint = endpoint.clone(); - let addr = n_config.addr; + let addr = n_config.addr; // 配置文件中给出的地址 let shared = shared.clone(); let view = self.logical_modules_view.clone(); let n = n.clone(); @@ -174,6 +214,7 @@ impl LogicalModule for P2PQuicNode { ); } + let view = self.logical_modules_view.clone(); net_tasks.push( tokio::spawn(async move { @@ -181,6 +222,9 @@ impl LogicalModule for P2PQuicNode { // fn select_handle_next_inco tracing::info!("start listening for new connections on node {}", this_addr); loop { + /* 使用 tokio::select! 宏,同时监听新连接和节点内部的消息通道,以便处理新连接和系统消息(如系统关闭)。 */ + /* Use tokio::select! Macro that listens for both new connections and message channels inside the node + to process new connections and system messages (such as system shutdown). */ tokio::select! { next_incoming= incoming_conns.next() => { if let Some((connection, mut incoming)) = next_incoming { @@ -219,7 +263,11 @@ impl LogicalModule for P2PQuicNode { panic!("tx should live longer than rx, error: {:?}",e); } }; + match msg{ + /* 当系统接收到系统关闭消息时,停止监听新的连接请求,结束循环,完成节点的启动过程。 */ + /* When the system receives the system shutdown message, the system stops listening for + new connection requests, ends the loop, and completes the node startup process. */ BroadcastMsg::SysEnd => { // system shutdown break; @@ -294,6 +342,10 @@ impl LogicalModule for P2PQuicNode { // let res=endpoint.connect_to(&addr).await // } +/* 创建一个新的异步任务来处理连接。接受参数包括远程地址 remote_addr、节点视图 view、共享节点信息 shared、 + 端点 endpoint、连接 connection 以及传入连接 incoming。 */ +/* Create a new asynchronous task to handle the connection. Accepted parameters include remote address remote_addr, + node view, shared node information shared, endpoint endpoint, connection connection, and incoming connection. */ fn new_handle_connection_task( remote_addr: SocketAddr, view: &View, @@ -313,6 +365,8 @@ fn new_handle_connection_task( })); } +/* 处理与其他节点的连接。 */ +/* Handles connections to other nodes */ async fn handle_connection( remote_addr: SocketAddr, view: &View, @@ -325,7 +379,8 @@ async fn handle_connection( println!("Listening on: {:?}", remote_addr); println!("---\n"); - + // 根据远程地址在节点视图中查找对应的节点ID。 + // Locate the node ID in the node view based on the remote address. let remote_id = view.p2p().find_peer_id(&remote_addr).unwrap_or_else(|| { panic!( "remote_addr {:?} not found in peer_id_map {:?}", @@ -334,6 +389,11 @@ async fn handle_connection( ); }); + /* 调用共享节点信息中的 reserve_peer_conn 函数来为远程节点保留连接。从共享节点信息中获取与远程地址对应的连接信息, + 并将当前连接添加到连接列表中。 */ + /* Call the reserve_peer_conn function in the shared node information to reserve the connection for the + remote node. Obtain the connection information corresponding to the remote address from the shared node + information and add the current connection to the connection list. */ shared.reserve_peer_conn(remote_addr).await; let peer_conns = shared .peer_connections @@ -345,6 +405,12 @@ async fn handle_connection( peer_conns.0.write().await.push(connection); let _ = peer_conns.2.fetch_add(1, Ordering::Relaxed); + + /* 使用循环不断接收传入的消息,解析消息头中的消息ID和任务ID,并调用节点视图中的 dispatch 函数来分发消息。处理完毕后, + 将当前连接从连接列表中移除,并更新连接数量。 */ + /* Use a loop to continuously receive incoming messages, parse the message ID and task ID in the message header, + and call the dispatch function in the node view to distribute the message. After processing is complete, remove + the current connection from the connection list and update the number of connections. */ loop { let res = incoming.next().await; match res { @@ -387,11 +453,15 @@ async fn handle_connection( // shared.locked.lock().sub_tasks.push(handle); } +/// 将字节流解析为消息ID和任务ID。 +/// Parse the byte stream into a message ID and a task ID. fn deserialize_msg_id_task_id(head: &[u8]) -> WSResult<(MsgId, TaskId)> { let (msg_id, task_id) = bincode::deserialize::<(MsgId, TaskId)>(head) .map_err(|err| WsSerialErr::BincodeErr(err))?; Ok((msg_id, task_id)) } +/// 将消息ID和任务ID序列化为字节流。 +/// Serialize the message ID and task ID to a byte stream. fn serialize_msg_id_task_id(msg_id: MsgId, task_id: TaskId) -> Vec { let mut head: Vec = bincode::serialize(&(msg_id, task_id)).unwrap(); head.insert(0, head.len() as u8); @@ -400,9 +470,13 @@ fn serialize_msg_id_task_id(msg_id: MsgId, task_id: TaskId) -> Vec { #[async_trait] impl P2PKernel for P2PQuicNode { + async fn send_for_response(&self, _nodeid: NodeID, _req_data: Vec) -> WSResult> { Ok(Vec::new()) } + + /* 发送请求给指定节点。 */ + /* Sends the request to the specified node. */ async fn send( &self, node: NodeID, @@ -410,14 +484,23 @@ impl P2PKernel for P2PQuicNode { msg_id: MsgId, req_data: Vec, ) -> WSResult<()> { + // 获取目标节点的Socket地址。 + // Gets the Socket address of the target node. let addr = self.p2p_base().get_addr_by_id(node)?; + // 从 peer_connections 中获取与目标节点建立的连接信息。 + // Gets information about connections made to the target node from peer_connections. let peerconns = { let hold = self.shared.peer_connections.read(); hold.get(&addr).map(|v| v.clone()) }; if let Some(peer_conns) = peerconns { // round robin + /* 选择一个连接,并将请求数据发送给目标节点。如果发送失败,记录错误信息并继续尝试其他连接,直到所有连接尝试完毕。 + 如果所有连接都失败,则返回相应的错误信息。 */ + /* Select a connection and send the request data to the target node. If sending fails, record an error + message and continue trying other connections until all connection attempts are complete. If all connections + fail, the corresponding error message is returned. */ let all_count = peer_conns.2.load(Ordering::Relaxed); for _ in 0..all_count { // length might change