diff --git a/openraft/src/core/install_snapshot.rs b/openraft/src/core/install_snapshot.rs index b2cd4b9de..70ff2a958 100644 --- a/openraft/src/core/install_snapshot.rs +++ b/openraft/src/core/install_snapshot.rs @@ -254,9 +254,6 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore { return; } @@ -870,8 +868,7 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore Result<(), StorageError> { + pub(crate) async fn apply_to_state_machine( + &mut self, + since: u64, + upto_index: u64, + ) -> Result<(), StorageError> { tracing::debug!(upto_index = display(upto_index), "apply_to_state_machine"); - let since = self.engine.state.last_applied.next_index(); let end = upto_index + 1; debug_assert!( @@ -978,8 +978,6 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore Result<(), StorageError> { - self.leader_step_down(); - - self.apply_to_state_machine(log_index).await?; - - Ok(()) - } - /// Spawn a new replication stream returning its replication state handle. #[tracing::instrument(level = "debug", skip(self))] #[allow(clippy::type_complexity)] @@ -1656,13 +1644,19 @@ impl, S: RaftStorage> RaftRuntime unreachable!("it has to be a leader!!!"); } } - Command::LeaderCommit { ref upto, .. } => { - for i in self.engine.state.last_applied.next_index()..(upto.index + 1) { - self.leader_commit(i).await?; - } + Command::LeaderCommit { + already_committed: ref committed, + ref upto, + } => { + self.apply_to_state_machine(committed.next_index(), upto.index).await?; + // Stepping down should be controlled by Engine. + self.leader_step_down(); } - Command::FollowerCommit { upto, .. } => { - self.apply_to_state_machine(upto.index).await?; + Command::FollowerCommit { + already_committed: ref committed, + ref upto, + } => { + self.apply_to_state_machine(committed.next_index(), upto.index).await?; } Command::ReplicateEntries { upto } => { if let Some(l) = &self.leader_data { diff --git a/openraft/src/engine/calc_purge_upto_test.rs b/openraft/src/engine/calc_purge_upto_test.rs index 782fe8997..0d7d3d13d 100644 --- a/openraft/src/engine/calc_purge_upto_test.rs +++ b/openraft/src/engine/calc_purge_upto_test.rs @@ -24,7 +24,7 @@ fn eng() -> Engine { #[test] fn test_calc_purge_upto() -> anyhow::Result<()> { - // last_purged_log_id, last_applied, max_keep, want + // last_purged_log_id, committed, max_keep, want let cases = vec![ // (None, None, 0, None), diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index 75753851e..19c7b5c71 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -36,13 +36,15 @@ where /// Commit entries that are already in the store, upto `upto`, inclusive. /// And send applied result to the client that proposed the entry. LeaderCommit { - since: Option>, + // TODO: pass the log id list? + // TODO: merge LeaderCommit and FollowerCommit + already_committed: Option>, upto: LogId, }, /// Commit entries that are already in the store, upto `upto`, inclusive. FollowerCommit { - since: Option>, + already_committed: Option>, upto: LogId, }, diff --git a/openraft/src/engine/elect_test.rs b/openraft/src/engine/elect_test.rs index 3296c7282..6da79b357 100644 --- a/openraft/src/engine/elect_test.rs +++ b/openraft/src/engine/elect_test.rs @@ -83,7 +83,7 @@ fn test_elect() -> anyhow::Result<()> { },), }, Command::LeaderCommit { - since: None, + already_committed: None, upto: LogId { leader_id: LeaderId { term: 1, node_id: 1 }, index: 0, @@ -152,7 +152,7 @@ fn test_elect() -> anyhow::Result<()> { },), }, Command::LeaderCommit { - since: None, + already_committed: None, upto: LogId { leader_id: LeaderId { term: 2, node_id: 1 }, index: 0, diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index e2537b46f..5968c0a7c 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -365,7 +365,7 @@ where tracing::debug!( my_vote = display(self.state.vote), my_last_log_id = display(self.state.last_log_id().summary()), - my_last_applied = display(self.state.last_applied.summary()), + my_committed = display(self.state.committed.summary()), "local state" ); @@ -431,7 +431,7 @@ where if let Some(prev_committed) = self.state.update_committed(&committed) { self.push_command(Command::FollowerCommit { // TODO(xp): when restart, commit is reset to None. Use last_applied instead. - since: prev_committed, + already_committed: prev_committed, upto: committed.unwrap(), }); self.purge_applied_log(); @@ -741,7 +741,7 @@ where committed: self.state.committed, }); self.push_command(Command::LeaderCommit { - since: prev_committed, + already_committed: prev_committed, upto: self.state.committed.unwrap(), }); self.purge_applied_log(); diff --git a/openraft/src/engine/follower_commit_entries_test.rs b/openraft/src/engine/follower_commit_entries_test.rs index f70a07e7b..5ab97dd13 100644 --- a/openraft/src/engine/follower_commit_entries_test.rs +++ b/openraft/src/engine/follower_commit_entries_test.rs @@ -132,7 +132,7 @@ fn test_follower_commit_entries_lt_last_entry() -> anyhow::Result<()> { assert_eq!( vec![Command::FollowerCommit { - since: Some(log_id(1, 1)), + already_committed: Some(log_id(1, 1)), upto: log_id(2, 3) }], eng.commands @@ -167,7 +167,7 @@ fn test_follower_commit_entries_gt_last_entry() -> anyhow::Result<()> { assert_eq!( vec![Command::FollowerCommit { - since: Some(log_id(1, 1)), + already_committed: Some(log_id(1, 1)), upto: log_id(2, 3) }], eng.commands @@ -191,7 +191,7 @@ fn test_follower_commit_entries_purge_to_committed() -> anyhow::Result<()> { assert_eq!( vec![ Command::FollowerCommit { - since: Some(log_id(1, 1)), + already_committed: Some(log_id(1, 1)), upto: log_id(2, 3) }, Command::PurgeLog { upto: log_id(2, 3) }, @@ -217,7 +217,7 @@ fn test_follower_commit_entries_purge_to_committed_minus_1() -> anyhow::Result<( assert_eq!( vec![ Command::FollowerCommit { - since: Some(log_id(1, 1)), + already_committed: Some(log_id(1, 1)), upto: log_id(2, 3) }, Command::PurgeLog { upto: log_id(1, 2) }, diff --git a/openraft/src/engine/handle_append_entries_req_test.rs b/openraft/src/engine/handle_append_entries_req_test.rs index 9d6918003..29fd644a8 100644 --- a/openraft/src/engine/handle_append_entries_req_test.rs +++ b/openraft/src/engine/handle_append_entries_req_test.rs @@ -52,7 +52,6 @@ fn eng() -> Engine { id: 2, // make it a member ..Default::default() }; - eng.state.last_applied = Some(log_id(0, 0)); eng.state.vote = Vote::new(2, 1); eng.state.server_state = ServerState::Candidate; eng.state.log_ids.append(log_id(1, 1)); @@ -103,6 +102,65 @@ fn test_handle_append_entries_req_vote_is_rejected() -> anyhow::Result<()> { Ok(()) } +#[test] +fn test_handle_append_entries_req_prev_log_id_is_applied() -> anyhow::Result<()> { + // An applied log id has to be committed thus + let mut eng = eng(); + eng.state.vote = Vote::new(1, 2); + eng.enter_leading(); + + let resp = eng.handle_append_entries_req( + &Vote::new_committed(2, 1), + Some(log_id(0, 0)), + &Vec::>::new(), + None, + ); + + assert_eq!(AppendEntriesResponse::Success, resp); + assert_eq!( + &[ + log_id(1, 1), // + log_id(2, 3), // + ], + eng.state.log_ids.key_log_ids() + ); + assert_eq!(Vote::new_committed(2, 1), eng.state.vote); + assert_eq!(Some(log_id(2, 3)), eng.state.last_log_id()); + assert_eq!(Some(log_id(0, 0)), eng.state.committed); + assert_eq!( + MembershipState { + committed: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), + effective: Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m23())) + }, + eng.state.membership_state + ); + assert_eq!(ServerState::Follower, eng.state.server_state); + + assert_eq!( + MetricsChangeFlags { + replication: false, + local_data: true, + cluster: true, + }, + eng.metrics_flags + ); + + assert_eq!( + vec![ + Command::SaveVote { + vote: Vote::new_committed(2, 1) + }, + Command::InstallElectionTimer { can_be_leader: false }, + Command::UpdateServerState { + server_state: ServerState::Follower + }, + ], + eng.commands + ); + + Ok(()) +} + #[test] fn test_handle_append_entries_req_prev_log_id_conflict() -> anyhow::Result<()> { let mut eng = eng(); @@ -218,7 +276,7 @@ fn test_handle_append_entries_req_prev_log_id_is_committed() -> anyhow::Result<( Command::AppendInputEntries { range: 1..2 }, Command::MoveInputCursorBy { n: 2 }, Command::FollowerCommit { - since: Some(log_id(0, 0)), + already_committed: Some(log_id(0, 0)), upto: log_id(1, 1) }, ], @@ -353,7 +411,7 @@ fn test_handle_append_entries_req_entries_conflict() -> anyhow::Result<()> { }, Command::MoveInputCursorBy { n: 2 }, Command::FollowerCommit { - since: Some(log_id(0, 0)), + already_committed: Some(log_id(0, 0)), upto: log_id(3, 3) }, ], diff --git a/openraft/src/engine/initialize_test.rs b/openraft/src/engine/initialize_test.rs index d082a384a..3e6be61af 100644 --- a/openraft/src/engine/initialize_test.rs +++ b/openraft/src/engine/initialize_test.rs @@ -107,7 +107,7 @@ fn test_initialize_single_node() -> anyhow::Result<()> { },), }, Command::LeaderCommit { - since: None, + already_committed: None, upto: LogId { leader_id: LeaderId { term: 1, node_id: 1 }, index: 1, diff --git a/openraft/src/engine/leader_append_entries_test.rs b/openraft/src/engine/leader_append_entries_test.rs index f4aa987f4..08cc6ac2c 100644 --- a/openraft/src/engine/leader_append_entries_test.rs +++ b/openraft/src/engine/leader_append_entries_test.rs @@ -65,7 +65,7 @@ fn eng() -> Engine { id: 1, // make it a member ..Default::default() }; - eng.state.last_applied = Some(log_id(0, 0)); + eng.state.committed = Some(log_id(0, 0)); eng.state.vote = Vote::new_committed(3, 1); eng.state.log_ids.append(log_id(1, 1)); eng.state.log_ids.append(log_id(2, 3)); @@ -210,7 +210,7 @@ fn test_leader_append_entries_fast_commit() -> anyhow::Result<()> { committed: Some(log_id(3, 6)) }, Command::LeaderCommit { - since: None, + already_committed: Some(log_id(0, 0)), upto: LogId::new(LeaderId::new(3, 1), 6) }, Command::ReplicateEntries { @@ -282,7 +282,7 @@ fn test_leader_append_entries_fast_commit_upto_membership_entry() -> anyhow::Res committed: Some(log_id(3, 4)) }, Command::LeaderCommit { - since: None, + already_committed: Some(log_id(0, 0)), upto: LogId::new(LeaderId::new(3, 1), 4) }, Command::UpdateMembership { @@ -364,7 +364,7 @@ fn test_leader_append_entries_fast_commit_membership_no_voter_change() -> anyhow committed: Some(log_id(3, 4)) }, Command::LeaderCommit { - since: None, + already_committed: Some(log_id(0, 0)), upto: LogId::new(LeaderId::new(3, 1), 4) }, Command::UpdateMembership { @@ -381,7 +381,7 @@ fn test_leader_append_entries_fast_commit_membership_no_voter_change() -> anyhow committed: Some(log_id(3, 6)) }, Command::LeaderCommit { - since: Some(LogId::new(LeaderId::new(3, 1), 4)), + already_committed: Some(LogId::new(LeaderId::new(3, 1), 4)), upto: LogId::new(LeaderId::new(3, 1), 6) }, Command::ReplicateEntries { @@ -465,7 +465,7 @@ fn test_leader_append_entries_fast_commit_if_membership_voter_change_to_1() -> a committed: Some(log_id(3, 6)) }, Command::LeaderCommit { - since: None, + already_committed: Some(log_id(0, 0)), upto: LogId::new(LeaderId::new(3, 1), 6) }, Command::ReplicateEntries { diff --git a/openraft/src/engine/update_progress_test.rs b/openraft/src/engine/update_progress_test.rs index 3ed951e8f..ca8fb6e98 100644 --- a/openraft/src/engine/update_progress_test.rs +++ b/openraft/src/engine/update_progress_test.rs @@ -76,7 +76,7 @@ fn test_update_progress_update_leader_progress() -> anyhow::Result<()> { committed: Some(log_id(2, 1)) }, Command::LeaderCommit { - since: None, + already_committed: None, upto: log_id(2, 1) } ], @@ -93,7 +93,7 @@ fn test_update_progress_update_leader_progress() -> anyhow::Result<()> { committed: Some(log_id(2, 3)) }, Command::LeaderCommit { - since: Some(log_id(2, 1)), + already_committed: Some(log_id(2, 1)), upto: log_id(2, 3) } ], @@ -123,7 +123,7 @@ fn test_update_progress_purge_upto_committed() -> anyhow::Result<()> { committed: Some(log_id(2, 1)) }, Command::LeaderCommit { - since: None, + already_committed: None, upto: log_id(2, 1) }, Command::PurgeLog { upto: log_id(2, 1) }, @@ -154,7 +154,7 @@ fn test_update_progress_purge_upto_committed_minus_1() -> anyhow::Result<()> { committed: Some(log_id(2, 2)) }, Command::LeaderCommit { - since: None, + already_committed: None, upto: log_id(2, 2) }, Command::PurgeLog { upto: log_id(2, 1) }, diff --git a/openraft/src/raft_state.rs b/openraft/src/raft_state.rs index c686c19f8..3f2c4f3d5 100644 --- a/openraft/src/raft_state.rs +++ b/openraft/src/raft_state.rs @@ -20,8 +20,12 @@ where /// The vote state of this node. pub vote: Vote, - /// The LogId of the last log applied to the state machine. - pub last_applied: Option>, + /// The LogId of the last log committed(AKA applied) to the state machine. + /// + /// - Committed means: a log that is replicated to a quorum of the cluster and it is of the term of the leader. + /// + /// - A quorum could be a uniform quorum or joint quorum. + pub committed: Option>, /// All log ids this node has. pub log_ids: LogIdList, @@ -35,15 +39,6 @@ where /// The internal server state used by Engine. pub(crate) internal_server_state: InternalServerState, - /// The log id of the last known committed entry. - /// - /// - Committed means: a log that is replicated to a quorum of the cluster and it is of the term of the leader. - /// - /// - A quorum could be a uniform quorum or joint quorum. - /// - /// - `committed` in raft is volatile and will not be persisted. - pub committed: Option>, - pub server_state: ServerState, } diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index f433b6a9a..8cda6adf3 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -57,7 +57,7 @@ where let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self).await?; Ok(RaftState { - last_applied, + committed: last_applied, // The initial value for `vote` is the minimal possible value. // See: [Conditions for initialization](https://datafuselabs.github.io/openraft/cluster-formation.html#conditions-for-initialization) vote: vote.unwrap_or_default(), @@ -66,7 +66,6 @@ where // -- volatile fields: they are not persisted. internal_server_state: InternalServerState::default(), - committed: None, server_state: Default::default(), }) } diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index fff9b2172..12a81ec6d 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -435,7 +435,7 @@ where "state machine has higher log" ); assert_eq!( - initial.last_applied, + initial.committed, Some(LogId::new(LeaderId::new(3, NODE_ID.into()), 1)), "unexpected value for last applied log" );