Skip to content

Commit

Permalink
Change: remove non-blocking membership change
Browse files Browse the repository at this point in the history
When changing membership in nonblocking mode, the leader submits a
membership config log but does not wait for the log to be committed.

This is useless because the caller has to assert the log is
committed, by periodically querying the metrics of a raft node, until it
is finally committed. Which actually makes it a blocking routine.

API changes:

- Removes `allow_lagging` paramenter from `Raft::change_membership()`
- Removes error `LearnerIsLagging`

Upgrade tip:

Adjust API calls to make it compile.

Refactor: move `leader_append_entries()` to `LeaderHandler`.
  • Loading branch information
drmingdrmer committed Feb 18, 2023
1 parent 1ae5b35 commit 9906d6e
Show file tree
Hide file tree
Showing 30 changed files with 820 additions and 934 deletions.
2 changes: 1 addition & 1 deletion examples/raft-kv-memstore/src/network/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub async fn change_membership(
app: Data<ExampleApp>,
req: Json<BTreeSet<ExampleNodeId>>,
) -> actix_web::Result<impl Responder> {
let res = app.raft.change_membership(req.0, true, false).await;
let res = app.raft.change_membership(req.0, false).await;
Ok(Json(res))
}

Expand Down
2 changes: 1 addition & 1 deletion examples/raft-kv-rocksdb/src/network/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn add_learner(mut req: Request<Arc<ExampleApp>>) -> tide::Result {
/// Changes specified learners to members, or remove members.
async fn change_membership(mut req: Request<Arc<ExampleApp>>) -> tide::Result {
let body: BTreeSet<ExampleNodeId> = req.body_json().await?;
let res = req.state().raft.change_membership(body, true, false).await;
let res = req.state().raft.change_membership(body, false).await;
Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&res)?).build())
}

Expand Down
2 changes: 0 additions & 2 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@

mod install_snapshot;
mod raft_core;
mod replication_expectation;
mod replication_state;
mod server_state;
mod snapshot_state;
mod streaming_state;
mod tick;

pub use raft_core::RaftCore;
pub(crate) use replication_expectation::Expectation;
pub(crate) use replication_state::replication_lag;
pub use server_state::ServerState;
pub(crate) use snapshot_state::SnapshotResult;
Expand Down
184 changes: 36 additions & 148 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::fmt::Display;
use std::mem::swap;
use std::pin::Pin;
Expand Down Expand Up @@ -29,8 +28,6 @@ use tracing::Span;
use crate::config::Config;
use crate::config::RuntimeConfig;
use crate::config::SnapshotPolicy;
use crate::core::replication_lag;
use crate::core::Expectation;
use crate::core::ServerState;
use crate::core::SnapshotResult;
use crate::core::SnapshotState;
Expand All @@ -39,16 +36,11 @@ use crate::engine::Command;
use crate::engine::Engine;
use crate::engine::SendResult;
use crate::entry::EntryRef;
use crate::error::ChangeMembershipError;
use crate::error::CheckIsLeaderError;
use crate::error::ClientWriteError;
use crate::error::EmptyMembership;
use crate::error::Fatal;
use crate::error::ForwardToLeader;
use crate::error::InProgress;
use crate::error::InitializeError;
use crate::error::LearnerIsLagging;
use crate::error::LearnerNotFound;
use crate::error::QuorumNotEnough;
use crate::error::RPCError;
use crate::error::Timeout;
Expand Down Expand Up @@ -377,16 +369,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
node: C::Node,
tx: ClientWriteTx<C>,
) -> Result<(), Fatal<C::NodeId>> {
if let Some(l) = &self.leader_data {
tracing::debug!(
"add target node {} as learner; current nodes: {:?}",
target,
l.nodes.keys()
);
} else {
unreachable!("it has to be a leader!!!");
}

// TODO: move these logic to Engine?
let curr = &self.engine.state.membership_state.effective().membership;
let new_membership = curr.add_learner(target, node);

Expand All @@ -405,116 +388,19 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
pub(super) async fn change_membership(
&mut self,
changes: ChangeMembers<C::NodeId>,
expectation: Option<Expectation>,
turn_to_learner: bool,
tx: RaftRespTx<ClientWriteResponse<C>, ClientWriteError<C::NodeId, C::Node>>,
) -> Result<(), Fatal<C::NodeId>> {
let last = self.engine.state.membership_state.effective().membership.get_joint_config().last().unwrap();
let members = changes.apply_to(last);

// Ensure cluster will have at least one node.
if members.is_empty() {
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
ChangeMembershipError::EmptyMembership(EmptyMembership {}),
)));
return Ok(());
}

let res = self.check_membership_committed();
if let Err(e) = res {
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(e)));
return Ok(());
}

let mem = &self.engine.state.membership_state.effective();
let curr = mem.membership.clone();

let old_members = mem.voter_ids().collect::<BTreeSet<_>>();
let only_in_new = members.difference(&old_members);

let new_config = curr.next_safe(members.clone(), turn_to_learner);

tracing::debug!(?new_config, "new_config");

for node_id in only_in_new.clone() {
if !mem.contains(node_id) {
let not_found = LearnerNotFound { node_id: *node_id };
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
ChangeMembershipError::LearnerNotFound(not_found),
)));
let res = self.engine.state.membership_state.next_membership(changes, turn_to_learner);
let new_membership = match res {
Ok(x) => x,
Err(e) => {
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(e)));
return Ok(());
}
}

if let Err(e) = self.check_replication_states(only_in_new, expectation) {
let _ = tx.send(Err(e.into()));
return Ok(());
}

self.write_entry(EntryPayload::Membership(new_config), Some(tx)).await?;
Ok(())
}

/// Check if the effective membership is committed, so that a new membership is allowed to be
/// proposed.
fn check_membership_committed(&self) -> Result<(), ChangeMembershipError<C::NodeId>> {
let st = &self.engine.state;

if st.is_membership_committed() {
return Ok(());
}

Err(ChangeMembershipError::InProgress(InProgress {
committed: st.committed().copied(),
membership_log_id: st.membership_state.effective().log_id,
}))
}

/// return Ok if all the current replication states satisfy the `expectation` for changing
/// membership.
fn check_replication_states<'n>(
&self,
nodes: impl Iterator<Item = &'n C::NodeId>,
expectation: Option<Expectation>,
) -> Result<(), ChangeMembershipError<C::NodeId>> {
let expectation = match &expectation {
None => {
// No expectation, whatever is OK.
return Ok(());
}
Some(x) => x,
};

let last_log_id = self.engine.state.last_log_id();

for node_id in nodes {
match expectation {
Expectation::AtLineRate => {
// Expect to be at line rate but not.

let matching = if let Some(l) = &self.engine.internal_server_state.leading() {
*l.progress.get(node_id)
} else {
unreachable!("it has to be a leader!!!");
};

let distance = replication_lag(&matching.matching.index(), &last_log_id.index());

if distance <= self.config.replication_lag_threshold {
continue;
}

let lagging = LearnerIsLagging {
node_id: *node_id,
matched: matching.matching,
distance,
};

return Err(ChangeMembershipError::LearnerIsLagging(lagging));
}
}
}

self.write_entry(EntryPayload::Membership(new_membership), Some(tx)).await?;
Ok(())
}

Expand All @@ -530,24 +416,30 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
&mut self,
payload: EntryPayload<C>,
resp_tx: Option<ClientWriteTx<C>>,
) -> Result<(), Fatal<C::NodeId>> {
) -> Result<bool, Fatal<C::NodeId>> {
tracing::debug!(payload = display(payload.summary()), "write_entry");

let (mut lh, tx) = if let Some((lh, tx)) = self.engine.get_leader_handler_or_reject(resp_tx) {
(lh, tx)
} else {
return Ok(false);
};

let mut entry_refs = [EntryRef::new(&payload)];
// TODO: it should returns membership config error etc. currently this is done by the
// caller.
self.engine.leader_append_entries(&mut entry_refs);
// caller.
lh.leader_append_entries(&mut entry_refs);

// Install callback channels.
if let Some(tx) = resp_tx {
if let Some(tx) = tx {
if let Some(l) = &mut self.leader_data {
l.client_resp_channels.insert(entry_refs[0].log_id.index, tx);
}
}

self.run_engine_commands(&entry_refs).await?;

Ok(())
Ok(true)
}

/// Flush cached changes of metrics to notify metrics watchers with updated metrics.
Expand Down Expand Up @@ -1147,12 +1039,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.reject_with_forward_to_leader(tx);
}
}
RaftMsg::ClientWriteRequest { payload: rpc, tx } => {
if self.engine.state.is_leader(&self.engine.config.id) {
self.write_entry(rpc, Some(tx)).await?;
} else {
self.reject_with_forward_to_leader(tx);
}
RaftMsg::ClientWriteRequest { payload, tx } => {
self.write_entry(payload, Some(tx)).await?;
}
RaftMsg::Initialize { members, tx } => {
self.handle_initialize(members, tx).await?;
Expand All @@ -1166,15 +1054,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
RaftMsg::ChangeMembership {
changes,
when,
turn_to_learner,
tx,
} => {
if self.engine.state.is_leader(&self.engine.config.id) {
self.change_membership(changes, when, turn_to_learner, tx).await?;
} else {
self.reject_with_forward_to_leader(tx);
}
self.change_membership(changes, turn_to_learner, tx).await?;
}
RaftMsg::ExternalRequest { req } => {
req(&self.engine.state, &mut self.storage, &mut self.network);
Expand All @@ -1192,13 +1075,16 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
}
ExternalCommand::Heartbeat => {
// TODO: reject if it is not leader?
self.write_entry(EntryPayload::Blank, None).await?;
let log_id = self.engine.state.last_log_id();
tracing::debug!(
log_id = display(log_id.summary()),
"ExternalCommand: sent heartbeat log"
);
let is_leader = self.write_entry(EntryPayload::Blank, None).await?;
if is_leader {
let log_id = self.engine.state.last_log_id();
tracing::debug!(
log_id = display(log_id.summary()),
"ExternalCommand: sent heartbeat log"
);
} else {
tracing::warn!("ExternalCommand: failed to send heartbeat log, not a leader");
}
}
ExternalCommand::Snapshot => self.trigger_snapshot_if_needed(true).await,
}
Expand Down Expand Up @@ -1239,9 +1125,11 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
if self.runtime_config.enable_heartbeat.load(Ordering::Relaxed) {
// heartbeat by sending a blank log
// TODO: use Engine::append_blank_log
self.write_entry(EntryPayload::Blank, None).await?;
let log_id = self.engine.state.last_log_id();
tracing::debug!(log_id = display(log_id.summary()), "sent heartbeat log");
let is_leader = self.write_entry(EntryPayload::Blank, None).await?;
if is_leader {
let log_id = self.engine.state.last_log_id();
tracing::debug!(log_id = display(log_id.summary()), "sent heartbeat log");
}
}

// Install next heartbeat
Expand Down
7 changes: 0 additions & 7 deletions openraft/src/core/replication_expectation.rs

This file was deleted.

Loading

0 comments on commit 9906d6e

Please sign in to comment.