From 6deb2a70c2a142c91e80ab49d446344b68e59792 Mon Sep 17 00:00:00 2001 From: Justin Rubek <25621857+justinrubek@users.noreply.github.com> Date: Wed, 7 Sep 2022 01:17:26 -0500 Subject: [PATCH] management rpc works --- crates/raft/src/client/mod.rs | 155 +++++++--------------- crates/raft/src/openraft_types/prelude.rs | 1 + crates/raft/src/rpc/management.rs | 10 +- 3 files changed, 58 insertions(+), 108 deletions(-) diff --git a/crates/raft/src/client/mod.rs b/crates/raft/src/client/mod.rs index 7d0e30ee71..1bfceb4e07 100644 --- a/crates/raft/src/client/mod.rs +++ b/crates/raft/src/client/mod.rs @@ -11,7 +11,7 @@ use super::{error::RpcResult, message::Request}; use crate::error::Result; use crate::openraft_types::types::Fatal; use crate::openraft_types::prelude::*; -use crate::rpc::pb::{AddLearnerRequest, AddLearnerResponse}; +use crate::rpc::pb::{AddLearnerRequest, AddLearnerResponse, ChangeMembershipRequest}; use crate::rpc::pb::raft_node_client::RaftNodeClient; use crate::{ error::RpcError, @@ -158,148 +158,93 @@ impl ConsensusClient { } } - /// Change membership to the specified set of nodes. - /// - /// All nodes in `req` have to be already added as learner with [`add_learner`], - /// or an error [`LearnerNotFound`] will be returned. - pub async fn change_membership( + async fn change_membership_rpc( &self, - req: &BTreeSet, - ) -> RpcResult { - todo!(); - // self.send_rpc_to_leader("cluster/change-membership", Some(req)) - // .await - } - - /// Get the metrics about the cluster. - /// - /// Metrics contains various information about the cluster, such as current leader, - /// membership config, replication status etc. - /// See [`RaftMetrics`]. - pub async fn metrics(&self) -> RpcResult { + req: &ChangeMembershipRequest, + ) -> RpcResult { let (_leader_id, endpoint) = self.leader.lock().await.clone(); let mut client = RaftNodeClient::connect(endpoint) .await .map_err(|e| RpcError::Network(NetworkError::new(&e)))?; - match client.metrics(()).await { - Ok(resp) => Ok(resp.into_inner().try_into().unwrap()), - Err(e) => Err(RpcError::Network(NetworkError::new(&e))), - } - } - - // --- Internal methods - - // Send RPC to specified node. - // - // It sends out a POST request if `req` is Some. Otherwise a GET request. - // The remote endpoint must respond a reply in form of `Result`. - // An `Err` happened on remote will be wrapped in an [`RPCError::RemoteError`]. - /* - async fn do_send_rpc_to_leader( - &self, - uri: &str, - req: Option<&Req>, - ) -> RpcResult - where - Req: Serialize + 'static, - Resp: Serialize + DeserializeOwned, - Err: std::error::Error + Serialize + DeserializeOwned, - { - println!("do_send_rpc_to_leader: {:?}", uri); - let (leader_id, url) = { - let t = self.leader.lock().await; - let target_addr = &t.1; - (t.0, format!("http://{}/{}", target_addr, uri)) - }; - - let resp = if let Some(r) = req { - println!( - ">>> client send request to {}: {}", - url, - serde_json::to_string_pretty(&r).unwrap() - ); - self.inner.post(url.clone()).json(r) - } else { - trace!(">>> client send request to {}", url,); - self.inner.get(url.clone()) - } - .send() - .await - .map_err(|e| { - println!(">>> client send request to {} failed: {}", url, e); - RpcError::Network(NetworkError::new(&e)) - })?; - println!("<<< client receive response from {}: {:?}", url, resp); - - let res: Result = resp - .json() + let resp = client + .change_membership(req.clone()) .await + .map(|resp| bincode::deserialize(&resp.into_inner().payload).expect("failed to deserialize")) .map_err(|e| RpcError::Network(NetworkError::new(&e)))?; - trace!( - "<<< client recv reply from {}: {}", - url, - serde_json::to_string_pretty(&res).unwrap() - ); - res.map_err(|e| RpcError::RemoteError(RemoteError::new(leader_id, e))) + Ok(resp) } - /// Try the best to send a request to the leader. + + + /// Change membership to the specified set of nodes. /// - /// If the target node is not a leader, a `ForwardToLeader` error will be - /// returned and this client will retry at most 3 times to contact the updated leader. - async fn send_rpc_to_leader( + /// All nodes in `req` have to be already added as learner with [`add_learner`], + /// or an error [`LearnerNotFound`] will be returned. + pub async fn change_membership( &self, - uri: &str, - req: Option<&Req>, - ) -> RpcResult - where - Req: Serialize + 'static, - Resp: Serialize + DeserializeOwned, - Err: std::error::Error + Serialize + DeserializeOwned + TryInto + Clone, - { - println!("send_rpc_to_leader"); - // Retry at most 3 times to find a valid leader. + req: &BTreeSet, + ) -> RpcResult { let mut n_retry = 3; + let req = ChangeMembershipRequest { + payload: bincode::serialize(req).expect("failed to serialize"), + }; + loop { - let res: RpcResult = self.do_send_rpc_to_leader(uri, req).await; + let res = self.change_membership_rpc(&req).await; let rpc_err = match res { - Ok(x) => return Ok(x), + Ok(res) => return Ok(res), Err(rpc_err) => rpc_err, }; - if let RpcError::RemoteError(remote_err) = &rpc_err { - println!("{:?}", remote_err); - let forward_err_res = - >::try_into(remote_err.source.clone()); + if let RPCError::RemoteError(remote_err) = &rpc_err { + let forward_err_res = + >::try_into(remote_err.source.clone()); if let Ok(ForwardToLeader { leader_id: Some(leader_id), leader_node: Some(leader_node), - .. - }) = forward_err_res - { - // Update target to the new leader. + }) = forward_err_res { + // Update target to the "new" leader { let mut t = self.leader.lock().await; - let address = leader_node.address.clone(); - *t = (leader_id, address.parse::().unwrap()); + let url = leader_node.address.clone(); + let endpoint = Endpoint::from_shared(url) + .expect("failed to create endpoint"); + *t = (leader_id, endpoint); } n_retry -= 1; if n_retry > 0 { continue; } + } } - println!("{}", rpc_err); return Err(rpc_err); } } - */ + + /// Get the metrics about the cluster. + /// + /// Metrics contains various information about the cluster, such as current leader, + /// membership config, replication status etc. + /// See [`RaftMetrics`]. + pub async fn metrics(&self) -> RpcResult { + let (_leader_id, endpoint) = self.leader.lock().await.clone(); + + let mut client = RaftNodeClient::connect(endpoint) + .await + .map_err(|e| RpcError::Network(NetworkError::new(&e)))?; + + match client.metrics(()).await { + Ok(resp) => Ok(resp.into_inner().try_into().unwrap()), + Err(e) => Err(RpcError::Network(NetworkError::new(&e))), + } + } } diff --git a/crates/raft/src/openraft_types/prelude.rs b/crates/raft/src/openraft_types/prelude.rs index 55e926d776..934eee7402 100644 --- a/crates/raft/src/openraft_types/prelude.rs +++ b/crates/raft/src/openraft_types/prelude.rs @@ -6,6 +6,7 @@ pub use crate::openraft_types::types::{ VoteRequest as OVoteRequest, VoteResponse as OVoteResponse, ClientWriteResponse as OClientWriteResponse, + ClientWriteError as OClientWriteError, RaftMetrics as ORaftMetrics, AddLearnerResponse as OAddLearnerResponse, AddLearnerError as OAddLearnerError, diff --git a/crates/raft/src/rpc/management.rs b/crates/raft/src/rpc/management.rs index 58fdfca47f..627454d128 100644 --- a/crates/raft/src/rpc/management.rs +++ b/crates/raft/src/rpc/management.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, collections::BTreeMap}; +use std::{sync::Arc, collections::{BTreeMap, BTreeSet}}; use super::{pb::{ AddLearnerRequest, @@ -7,7 +7,7 @@ use super::{pb::{ raft_node_server::RaftNode, ClientWriteResponse, InitResponse, AddLearnerResponse, }, TonicResult}; -use crate::openraft_types::prelude::*; +use crate::{openraft_types::prelude::*, repr::NodeId}; use crate::{server::app::ApplicationState, repr::Node, openraft_types::types::{AddLearnerError}}; #[derive(Clone)] @@ -58,7 +58,11 @@ impl RaftNode for ManagementRpcHandler { &self, request: tonic::Request, ) -> TonicResult { - todo!(); + let req: BTreeSet = bincode::deserialize(&request.into_inner().payload).unwrap(); + let resp = self.app.raft.change_membership(req, true, false).await; + + let data = bincode::serialize(&resp).unwrap(); + Ok(tonic::Response::new(ChangeMembershipResponse { payload: data.into() })) } async fn metrics(