Skip to content

Commit

Permalink
Cherry pick: raftstore: set term when catch up logs for merge
Browse files Browse the repository at this point in the history
Cherry pick to fix pingcap/tiflash#4437.

* add test case

Signed-off-by: Jay Lee <[email protected]>

* raftstore: set term when catch up logs for merge

Prepare merge may includes logs from different terms, which may be even
larger than the term of a follower that is lag behind. So when catching
up logs by commit merge, its term should also be set to get a correct
metadata.

Close tikv#11526.

Signed-off-by: Jay Lee <[email protected]>

* add log for becoming follower

Signed-off-by: Jay Lee <[email protected]>

* correct commit log check

Signed-off-by: Jay Lee <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
Signed-off-by: CalvinNeo <[email protected]>

Co-authored-by: Jay <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
3 people authored Mar 28, 2022
1 parent a1ffca1 commit 5874ff9
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 2 deletions.
2 changes: 1 addition & 1 deletion components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3458,7 +3458,7 @@ where
ctx: &mut ApplyContext<EK, W>,
catch_up_logs: CatchUpLogs,
) {
fail_point!("after_handle_catch_up_logs_for_merge");
fail_point!("after_handle_catch_up_logs_for_merge", |_| {});
fail_point!(
"after_handle_catch_up_logs_for_merge_1003",
self.delegate.id() == 1003,
Expand Down
20 changes: 19 additions & 1 deletion components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,14 +741,32 @@ where
// There are maybe some logs not included in CommitMergeRequest's entries, like CompactLog,
// so the commit index may exceed the last index of the entires from CommitMergeRequest.
// If that, no need to append
if self.raft_group.raft.raft_log.committed - log_idx > entries.len() as u64 {
if self.raft_group.raft.raft_log.committed - log_idx >= entries.len() as u64 {
return None;
}
entries = &entries[(self.raft_group.raft.raft_log.committed - log_idx) as usize..];
log_idx = self.raft_group.raft.raft_log.committed;
}
let log_term = self.get_index_term(log_idx);

let last_log = entries.last().unwrap();
if last_log.term > self.term() {
// Hack: In normal flow, when leader sends the entries, it will use a term that's not less
// than the last log term. And follower will update its states correctly. For merge, we append
// the log without raft, so we have to take care of term explicitly to get correct metadata.
info!(
"become follower for new logs";
"new_log_term" => last_log.term,
"new_log_index" => last_log.index,
"term" => self.term(),
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
);
self.raft_group
.raft
.become_follower(last_log.term, INVALID_ID);
}

self.raft_group
.raft
.raft_log
Expand Down
58 changes: 58 additions & 0 deletions tests/failpoints/cases/test_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,3 +1364,61 @@ fn test_prewrite_before_max_ts_is_synced() {
let resp = do_prewrite(&mut cluster);
assert!(!resp.get_region_error().has_max_timestamp_not_synced());
}

/// If term is changed in catching up logs, follower needs to update the term
/// correctly, otherwise will leave corrupted states.
#[test]
fn test_merge_election_and_restart() {
let mut cluster = new_node_cluster(0, 3);
configure_for_merge(&mut cluster);

let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

let on_raft_gc_log_tick_fp = "on_raft_gc_log_tick";
fail::cfg(on_raft_gc_log_tick_fp, "return()").unwrap();

cluster.run();

let region = pd_client.get_region(b"k1").unwrap();
cluster.must_split(&region, b"k2");

let r1 = pd_client.get_region(b"k1").unwrap();
let r1_on_store1 = find_peer(&r1, 1).unwrap().to_owned();
cluster.must_transfer_leader(r1.get_id(), r1_on_store1.clone());
cluster.must_put(b"k11", b"v11");
must_get_equal(&cluster.get_engine(2), b"k11", b"v11");

let r1_on_store2 = find_peer(&r1, 2).unwrap().to_owned();
cluster.must_transfer_leader(r1.get_id(), r1_on_store2);
cluster.must_put(b"k12", b"v12");
must_get_equal(&cluster.get_engine(1), b"k12", b"v12");

cluster.add_send_filter(CloneFilterFactory(RegionPacketFilter::new(r1.get_id(), 2)));

// Wait new leader elected.
cluster.must_transfer_leader(r1.get_id(), r1_on_store1);
cluster.must_put(b"k13", b"v13");
must_get_equal(&cluster.get_engine(1), b"k13", b"v13");
must_get_none(&cluster.get_engine(2), b"k13");

// Don't actually execute commit merge
fail::cfg("after_handle_catch_up_logs_for_merge", "return()").unwrap();
// Now region 1 can still be merged into region 2 because leader has committed index cache.
let r2 = pd_client.get_region(b"k3").unwrap();
cluster.must_try_merge(r1.get_id(), r2.get_id());
// r1 on store 2 should be able to apply all committed logs.
must_get_equal(&cluster.get_engine(2), b"k13", b"v13");

cluster.shutdown();
cluster.clear_send_filters();
fail::remove("after_handle_catch_up_logs_for_merge");
cluster.start().unwrap();

// Wait for region elected to avoid timeout and backoff.
cluster.leader_of_region(r2.get_id());
// If merge can be resumed correctly, the put should succeed.
cluster.must_put(b"k14", b"v14");
// If logs from different term are process correctly, store 2 should have latest updates.
must_get_equal(&cluster.get_engine(2), b"k14", b"v14");
}

0 comments on commit 5874ff9

Please sign in to comment.