Skip to content

Commit

Permalink
management rpc works
Browse files Browse the repository at this point in the history
  • Loading branch information
justinrubek committed Sep 7, 2022
1 parent c4c1d79 commit 6deb2a7
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 108 deletions.
155 changes: 50 additions & 105 deletions crates/raft/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{error::RpcResult, message::Request};
use crate::error::Result;
use crate::openraft_types::types::Fatal;
use crate::openraft_types::prelude::*;
use crate::rpc::pb::{AddLearnerRequest, AddLearnerResponse};
use crate::rpc::pb::{AddLearnerRequest, AddLearnerResponse, ChangeMembershipRequest};
use crate::rpc::pb::raft_node_client::RaftNodeClient;
use crate::{
error::RpcError,
Expand Down Expand Up @@ -158,148 +158,93 @@ impl ConsensusClient {
}
}

/// Change membership to the specified set of nodes.
///
/// All nodes in `req` have to be already added as learner with [`add_learner`],
/// or an error [`LearnerNotFound`] will be returned.
pub async fn change_membership(
async fn change_membership_rpc(
&self,
req: &BTreeSet<NodeId>,
) -> RpcResult<ClientWriteResponse, ClientWriteError> {
todo!();
// self.send_rpc_to_leader("cluster/change-membership", Some(req))
// .await
}

/// Get the metrics about the cluster.
///
/// Metrics contains various information about the cluster, such as current leader,
/// membership config, replication status etc.
/// See [`RaftMetrics`].
pub async fn metrics(&self) -> RpcResult<RaftMetrics, Infallible> {
req: &ChangeMembershipRequest,
) -> RpcResult<OClientWriteResponse, OClientWriteError> {
let (_leader_id, endpoint) = self.leader.lock().await.clone();

let mut client = RaftNodeClient::connect(endpoint)
.await
.map_err(|e| RpcError::Network(NetworkError::new(&e)))?;

match client.metrics(()).await {
Ok(resp) => Ok(resp.into_inner().try_into().unwrap()),
Err(e) => Err(RpcError::Network(NetworkError::new(&e))),
}
}

// --- Internal methods

// Send RPC to specified node.
//
// It sends out a POST request if `req` is Some. Otherwise a GET request.
// The remote endpoint must respond a reply in form of `Result<T, E>`.
// An `Err` happened on remote will be wrapped in an [`RPCError::RemoteError`].
/*
async fn do_send_rpc_to_leader<Req, Resp, Err>(
&self,
uri: &str,
req: Option<&Req>,
) -> RpcResult<Resp, Err>
where
Req: Serialize + 'static,
Resp: Serialize + DeserializeOwned,
Err: std::error::Error + Serialize + DeserializeOwned,
{
println!("do_send_rpc_to_leader: {:?}", uri);
let (leader_id, url) = {
let t = self.leader.lock().await;
let target_addr = &t.1;
(t.0, format!("http://{}/{}", target_addr, uri))
};
let resp = if let Some(r) = req {
println!(
">>> client send request to {}: {}",
url,
serde_json::to_string_pretty(&r).unwrap()
);
self.inner.post(url.clone()).json(r)
} else {
trace!(">>> client send request to {}", url,);
self.inner.get(url.clone())
}
.send()
.await
.map_err(|e| {
println!(">>> client send request to {} failed: {}", url, e);
RpcError::Network(NetworkError::new(&e))
})?;
println!("<<< client receive response from {}: {:?}", url, resp);
let res: Result<Resp, Err> = resp
.json()
let resp = client
.change_membership(req.clone())
.await
.map(|resp| bincode::deserialize(&resp.into_inner().payload).expect("failed to deserialize"))
.map_err(|e| RpcError::Network(NetworkError::new(&e)))?;
trace!(
"<<< client recv reply from {}: {}",
url,
serde_json::to_string_pretty(&res).unwrap()
);

res.map_err(|e| RpcError::RemoteError(RemoteError::new(leader_id, e)))
Ok(resp)
}

/// Try the best to send a request to the leader.


/// Change membership to the specified set of nodes.
///
/// If the target node is not a leader, a `ForwardToLeader` error will be
/// returned and this client will retry at most 3 times to contact the updated leader.
async fn send_rpc_to_leader<Req, Resp, Err>(
/// All nodes in `req` have to be already added as learner with [`add_learner`],
/// or an error [`LearnerNotFound`] will be returned.
pub async fn change_membership(
&self,
uri: &str,
req: Option<&Req>,
) -> RpcResult<Resp, Err>
where
Req: Serialize + 'static,
Resp: Serialize + DeserializeOwned,
Err: std::error::Error + Serialize + DeserializeOwned + TryInto<ForwardToLeader> + Clone,
{
println!("send_rpc_to_leader");
// Retry at most 3 times to find a valid leader.
req: &BTreeSet<NodeId>,
) -> RpcResult<OClientWriteResponse, OClientWriteError> {
let mut n_retry = 3;

let req = ChangeMembershipRequest {
payload: bincode::serialize(req).expect("failed to serialize"),
};

loop {
let res: RpcResult<Resp, Err> = self.do_send_rpc_to_leader(uri, req).await;
let res = self.change_membership_rpc(&req).await;

let rpc_err = match res {
Ok(x) => return Ok(x),
Ok(res) => return Ok(res),
Err(rpc_err) => rpc_err,
};

if let RpcError::RemoteError(remote_err) = &rpc_err {
println!("{:?}", remote_err);
let forward_err_res =
<Err as TryInto<ForwardToLeader>>::try_into(remote_err.source.clone());
if let RPCError::RemoteError(remote_err) = &rpc_err {
let forward_err_res =
<ClientWriteError as TryInto<OForwardToLeader>>::try_into(remote_err.source.clone());

if let Ok(ForwardToLeader {
leader_id: Some(leader_id),
leader_node: Some(leader_node),
..
}) = forward_err_res
{
// Update target to the new leader.
}) = forward_err_res {
// Update target to the "new" leader
{
let mut t = self.leader.lock().await;
let address = leader_node.address.clone();
*t = (leader_id, address.parse::<SocketAddr>().unwrap());
let url = leader_node.address.clone();
let endpoint = Endpoint::from_shared(url)
.expect("failed to create endpoint");
*t = (leader_id, endpoint);
}

n_retry -= 1;
if n_retry > 0 {
continue;
}

}
}

println!("{}", rpc_err);
return Err(rpc_err);
}
}
*/

/// Get the metrics about the cluster.
///
/// Metrics contains various information about the cluster, such as current leader,
/// membership config, replication status etc.
/// See [`RaftMetrics`].
pub async fn metrics(&self) -> RpcResult<RaftMetrics, Infallible> {
let (_leader_id, endpoint) = self.leader.lock().await.clone();

let mut client = RaftNodeClient::connect(endpoint)
.await
.map_err(|e| RpcError::Network(NetworkError::new(&e)))?;

match client.metrics(()).await {
Ok(resp) => Ok(resp.into_inner().try_into().unwrap()),
Err(e) => Err(RpcError::Network(NetworkError::new(&e))),
}
}
}
1 change: 1 addition & 0 deletions crates/raft/src/openraft_types/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub use crate::openraft_types::types::{
VoteRequest as OVoteRequest,
VoteResponse as OVoteResponse,
ClientWriteResponse as OClientWriteResponse,
ClientWriteError as OClientWriteError,
RaftMetrics as ORaftMetrics,
AddLearnerResponse as OAddLearnerResponse,
AddLearnerError as OAddLearnerError,
Expand Down
10 changes: 7 additions & 3 deletions crates/raft/src/rpc/management.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{sync::Arc, collections::BTreeMap};
use std::{sync::Arc, collections::{BTreeMap, BTreeSet}};

use super::{pb::{
AddLearnerRequest,
Expand All @@ -7,7 +7,7 @@ use super::{pb::{
raft_node_server::RaftNode, ClientWriteResponse, InitResponse, AddLearnerResponse,
}, TonicResult};

use crate::openraft_types::prelude::*;
use crate::{openraft_types::prelude::*, repr::NodeId};
use crate::{server::app::ApplicationState, repr::Node, openraft_types::types::{AddLearnerError}};

#[derive(Clone)]
Expand Down Expand Up @@ -58,7 +58,11 @@ impl RaftNode for ManagementRpcHandler {
&self,
request: tonic::Request<ChangeMembershipRequest>,
) -> TonicResult<ChangeMembershipResponse> {
todo!();
let req: BTreeSet<NodeId> = bincode::deserialize(&request.into_inner().payload).unwrap();
let resp = self.app.raft.change_membership(req, true, false).await;

let data = bincode::serialize(&resp).unwrap();
Ok(tonic::Response::new(ChangeMembershipResponse { payload: data.into() }))
}

async fn metrics(
Expand Down

0 comments on commit 6deb2a7

Please sign in to comment.