Skip to content

Commit

Permalink
Fix: discard blank log heartbeat, revert to the standard heartbeat
Browse files Browse the repository at this point in the history
- #698

The blank log heartbeat design has two problems:

- The heartbeat that sends a blank log introduces additional I/O, as a follower has to persist every log to maintain correctness.

- Although `(term, log_index)` serves as a pseudo time in Raft, measuring whether a node has caught up with the leader and is capable of becoming a new leader, leadership is not solely determined by this pseudo time.
  Wall clock time is also taken into account.

  There may be a case where the pseudo time is not upto date but the clock time is, and the node should not become the leader.
  For example, in a cluster of three nodes, if the leader (node-1) is busy sending a snapshot to node-2(it has not yet replicated the latest logs to a quorum, but node-2 received message from the leader(node-1), thus it knew there is an active leader), node-3 should not seize leadership from node-1.
  This is why there needs to be two types of time, pseudo time `(term, log_index)` and wall clock time, to protect leadership.

  In the follow graph:
  - node-1 is the leader, has 4 log entries, and is sending a snapshot to
      node-2,
  - node-2 received several chunks of snapshot, and it perceived an active
      leader thus extended leader lease.
  - node-3 tried to send vote request to node-2, although node-2 do not have
      as many logs as node-3, it should still reject node-3's vote request
      because the leader lease has not yet expired.

  In the obsolete design, extending pseudo time `(term, index)` with a
  `tick`, in this case node-3 will seize the leadership from node-2.

  ```text
  Ni: Node i
  Ei: log entry i

  N1 E1 E2 E3 E4
        |
        v
  N2    snapshot
        +-----------------+
                 ^        |
                 |        leader lease
                 |
  N3 E1 E2 E3    | vote-request
  ---------------+----------------------------> clock time
                 now

  ```

The original document is presented below for reference.
  • Loading branch information
drmingdrmer committed Mar 4, 2023
1 parent ed76ee9 commit 97fa158
Show file tree
Hide file tree
Showing 21 changed files with 513 additions and 70 deletions.
5 changes: 4 additions & 1 deletion guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
- [Architecture](./architecture.md)
- [Threads](./threading.md)
- [Log data layout](./log-data-layout.md)
- [Heartbeat](./heartbeat.md)
- [Vote](./vote.md)
- [Replication](./replication.md)
- [Delete-conflicting-logs](./delete_log.md)
Expand All @@ -29,3 +28,7 @@
- [Upgrade Tips](./upgrade-tips.md)
- [Upgrade from 0.6 to 0.7](./upgrade-v06-v07.md)
- [Upgrade from 0.7 to 0.8](./upgrade-v07-v08.md)

- [Obsolete design](./obsolete-design.md)
- [Heartbeat](./heartbeat.md)

49 changes: 48 additions & 1 deletion guide/src/heartbeat.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,50 @@
# Obsolete: blank log heartbeat

https://github.com/datafuselabs/openraft/issues/698

This design has two problems:

- The heartbeat that sends a blank log introduces additional I/O, as a follower has to persist every log to maintain correctness.

- Although `(term, log_index)` serves as a pseudo time in Raft, measuring whether a node has caught up with the leader and is capable of becoming a new leader, leadership is not solely determined by this pseudo time.
Wall clock time is also taken into account.

There may be a case where the pseudo time is not upto date but the clock time is, and the node should not become the leader.
For example, in a cluster of three nodes, if the leader (node-1) is busy sending a snapshot to node-2(it has not yet replicated the latest logs to a quorum, but node-2 received message from the leader(node-1), thus it knew there is an active leader), node-3 should not seize leadership from node-1.
This is why there needs to be two types of time, pseudo time `(term, log_index)` and wall clock time, to protect leadership.

In the follow graph:
- node-1 is the leader, has 4 log entries, and is sending a snapshot to
node-2,
- node-2 received several chunks of snapshot, and it perceived an active
leader thus extended leader lease.
- node-3 tried to send vote request to node-2, although node-2 do not have
as many logs as node-3, it should still reject node-3's vote request
because the leader lease has not yet expired.

In the obsolete design, extending pseudo time `(term, index)` with a
`tick`, in this case node-3 will seize the leadership from node-2.

```text
Ni: Node i
Ei: log entry i
N1 E1 E2 E3 E4
|
v
N2 snapshot
+-----------------+
^ |
| leader lease
|
N3 E1 E2 E3 | vote-request
---------------+----------------------------> clock time
now
```

The original document is presented below for reference.

# Heartbeat in openraft

## Heartbeat in standard raft
Expand Down Expand Up @@ -25,7 +72,7 @@ Because the leader must have the greatest **pseudo time**,
thus by comparing the **pseudo time**, a follower automatically refuse election request from a node unreachable to the leader.

And comparing the **pseudo time** is already done by `handle_vote_request()`,
there is no need to add another timer for the active leader.
there is no need to add another timer for the active leader.

Thus making heartbeat request a blank log is the simplest way.

Expand Down
5 changes: 5 additions & 0 deletions guide/src/obsolete-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Obsolete Designs

Several designs in Openraft have been discarded due to the problems they caused for applications.
These designs were attempts at optimization or simplification, but ultimately proved to be inappropriate.
They are included in this chapter as an archive to explain why they were discarded.
56 changes: 30 additions & 26 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,29 +431,26 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
///
/// Currently heartbeat is a blank log
#[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))]
pub async fn send_heartbeat(
&mut self,
tick: usize,
resp_tx: Option<ClientWriteTx<C>>,
emitter: impl Display,
) -> Result<(), Fatal<C::NodeId>> {
tracing::debug!(tick = display(tick), "send_heartbeat");
pub async fn send_heartbeat(&mut self, emitter: impl Display) -> Result<bool, Fatal<C::NodeId>> {
tracing::debug!(now = debug(&self.engine.state.now), "send_heartbeat");

let _ = tick;

let is_leader = self.write_entry(EntryPayload::Blank, resp_tx).await?;
if is_leader {
let log_id = self.engine.state.last_log_id();
let mut lh = if let Some((lh, _)) =
self.engine.get_leader_handler_or_reject::<(), ClientWriteError<C::NodeId, C::Node>>(None)
{
lh
} else {
tracing::debug!(
log_id = display(log_id.summary()),
tick = display(tick),
"{} sent heartbeat",
now = debug(&self.engine.state.now),
"{} failed to send heartbeat",
emitter
);
} else {
tracing::debug!(tick = display(tick), "{} failed to send heartbeat", emitter);
}
Ok(())
return Ok(false);
};

lh.send_heartbeat();

tracing::debug!("{} sent heartbeat", emitter);
Ok(true)
}

/// Flush cached changes of metrics to notify metrics watchers with updated metrics.
Expand Down Expand Up @@ -1033,6 +1030,16 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.handle_append_entries_request(rpc, tx).await?;
}
RaftMsg::RequestVote { rpc, tx } => {
// Vote request needs to check if the lease of the last leader expired.
// Thus it is time sensitive. Update the cached time for it.
let now = Instant::now();
self.engine.state.update_now(now);
tracing::debug!(
vote_request = display(rpc.summary()),
"handle vote request: now: {:?}",
now
);

self.handle_vote_request(rpc, tx).await?;
}
RaftMsg::VoteResponse { target, resp, vote } => {
Expand Down Expand Up @@ -1081,8 +1088,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
}
ExternalCommand::Heartbeat => {
// TODO: use the last tick
self.send_heartbeat(0, None, "ExternalCommand").await?;
self.send_heartbeat("ExternalCommand").await?;
}
ExternalCommand::Snapshot => self.trigger_snapshot_if_needed(true).await,
}
Expand All @@ -1091,6 +1097,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// check every timer

let now = Instant::now();
self.engine.state.update_now(now);
tracing::debug!("received tick: {}, now: {:?}", i, now);

let current_vote = self.engine.state.get_vote();
Expand Down Expand Up @@ -1121,7 +1128,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
if let Some(t) = heartbeat_at {
if now >= t {
if self.runtime_config.enable_heartbeat.load(Ordering::Relaxed) {
self.send_heartbeat(i, None, "tick").await?;
self.send_heartbeat("tick").await?;
}

// Install next heartbeat
Expand Down Expand Up @@ -1352,15 +1359,12 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
}
Command::Replicate { req, target } => {
if let Some(l) = &self.leader_data {
// TODO(2): consider remove the returned error from new_client().
// Node may not exist because `RaftNetworkFactory::new_client()` returns an
// error.
let node = &l.nodes.get(&target);

if let Some(node) = node {
match req {
Inflight::None => {
unreachable!("Inflight::None");
let _ = node.tx_repl.send(Replicate::Heartbeat);
}
Inflight::Logs { id, log_id_range } => {
let _ = node.tx_repl.send(Replicate::Logs { id, log_id_range });
Expand Down
36 changes: 27 additions & 9 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::time::Duration;

use crate::core::ServerState;
use crate::engine::handler::following_handler::FollowingHandler;
use crate::engine::handler::leader_handler::LeaderHandler;
use crate::engine::handler::log_handler::LogHandler;
use crate::engine::handler::replication_handler::ReplicationHandler;
use crate::engine::handler::replication_handler::SendNone;
use crate::engine::handler::server_state_handler::ServerStateHandler;
use crate::engine::handler::snapshot_handler::SnapshotHandler;
use crate::engine::handler::vote_handler::VoteHandler;
Expand Down Expand Up @@ -47,6 +50,12 @@ pub(crate) struct EngineConfig<NID: NodeId> {

/// The maximum number of entries per payload allowed to be transmitted during replication
pub(crate) max_payload_entries: u64,

/// The duration of an active leader's lease.
///
/// When a follower or learner perceives an active leader, such as by receiving an AppendEntries
/// message, it should not grant another candidate to become the leader during this period.
pub(crate) leader_lease: Duration,
}

impl<NID: NodeId> Default for EngineConfig<NID> {
Expand All @@ -56,6 +65,7 @@ impl<NID: NodeId> Default for EngineConfig<NID> {
max_in_snapshot_log_to_keep: 1000,
purge_batch_size: 256,
max_payload_entries: 300,
leader_lease: Duration::from_millis(150),
}
}
}
Expand Down Expand Up @@ -139,7 +149,7 @@ where

let mut rh = self.replication_handler();
rh.rebuild_replication_streams();
rh.initiate_replication();
rh.initiate_replication(SendNone::False);

return;
}
Expand Down Expand Up @@ -272,6 +282,21 @@ where
"Engine::handle_vote_req"
);

// Current leader lease has not yet expired, reject voting request
if self.state.now <= self.state.leader_expire_at {
tracing::debug!(
"reject: leader lease has not yet expire; now; {:?}, leader lease will expire at: {:?}: after {:?}",
self.state.now,
self.state.leader_expire_at,
self.state.leader_expire_at - self.state.now
);
return VoteResponse {
vote: *self.state.get_vote(),
vote_granted: false,
last_log_id: self.state.last_log_id().copied(),
};
}

// The first step is to check log. If the candidate has less log, nothing needs to be done.

if req.last_log_id.as_ref() >= self.state.last_log_id() {
Expand Down Expand Up @@ -366,13 +391,6 @@ where
} else {
self.output.push_command(Command::InstallElectionTimer { can_be_leader: true });
}

debug_assert!(self.state.is_voter(&self.config.id));

// When vote is rejected, it does not need to leave candidate state.
// Candidate loop, follower loop and learner loop are totally the same.
//
// The only thing that needs to do is update election timer.
}

/// Append entries to follower/learner.
Expand Down Expand Up @@ -486,7 +504,7 @@ where

rh.rebuild_replication_streams();
rh.append_blank_log();
rh.initiate_replication();
rh.initiate_replication(SendNone::False);
}

/// Check if a raft node is in a state that allows to initialize.
Expand Down
39 changes: 39 additions & 0 deletions openraft/src/engine/handle_vote_req_test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::Arc;
use std::time::Duration;

use maplit::btreeset;

Expand Down Expand Up @@ -33,6 +34,44 @@ fn eng() -> Engine<u64, ()> {
eng
}

#[test]
fn test_handle_vote_req_rejected_by_leader_lease() -> anyhow::Result<()> {
let mut eng = eng();
// Non expired leader lease
eng.state.now = eng.state.leader_expire_at - Duration::from_millis(500);

let resp = eng.handle_vote_req(VoteRequest {
vote: Vote::new(1, 2),
last_log_id: Some(log_id(2, 3)),
});

assert_eq!(
VoteResponse {
vote: Vote::new(2, 1),
vote_granted: false,
last_log_id: None
},
resp
);

assert_eq!(Vote::new(2, 1), *eng.state.get_vote());
assert!(eng.internal_server_state.is_leading());

assert_eq!(ServerState::Candidate, eng.state.server_state);
assert_eq!(
MetricsChangeFlags {
replication: false,
local_data: false,
cluster: false,
},
eng.output.metrics_flags
);

assert_eq!(0, eng.output.commands.len());

Ok(())
}

#[test]
fn test_handle_vote_req_reject_smaller_vote() -> anyhow::Result<()> {
let mut eng = eng();
Expand Down
Loading

0 comments on commit 97fa158

Please sign in to comment.