From 8b98fee9ce9394bda2659d6fc70a189c415eef78 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Wed, 23 Nov 2022 13:11:21 +0800 Subject: [PATCH 1/7] etcdserver: detect corrupted member based on quorum When the leader detects data inconsistency by comparing hashes, currently it assumes that the follower is the corrupted member. It isn't correct, the leader might be the corrupted member as well. We should depend on quorum to identify the corrupted member. For example, for 3 member cluster, if 2 members have the same hash, the the member with different hash is the corrupted one. For 5 member cluster, if 3 members have the same same, the corrupted member is one of the left two members; it's also possible that both the left members are corrupted. Signed-off-by: Benjamin Wang --- client/pkg/types/id.go | 19 +++- server/etcdserver/corrupt.go | 167 ++++++++++++++++++++++++------ server/etcdserver/corrupt_test.go | 33 ++++-- 3 files changed, 178 insertions(+), 41 deletions(-) diff --git a/client/pkg/types/id.go b/client/pkg/types/id.go index ae00388dde0..9a8429391ed 100644 --- a/client/pkg/types/id.go +++ b/client/pkg/types/id.go @@ -14,7 +14,10 @@ package types -import "strconv" +import ( + "bytes" + "strconv" +) // ID represents a generic identifier which is canonically // stored as a uint64 but is typically represented as a @@ -37,3 +40,17 @@ type IDSlice []ID func (p IDSlice) Len() int { return len(p) } func (p IDSlice) Less(i, j int) bool { return uint64(p[i]) < uint64(p[j]) } func (p IDSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func (p IDSlice) String() string { + var b bytes.Buffer + if p.Len() > 0 { + b.WriteString(p[0].String()) + } + + for i := 1; i < p.Len(); i++ { + b.WriteString(",") + b.WriteString(p[i].String()) + } + + return b.String() +} diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 7b72124e670..dd627796da1 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "io" + "math" "net/http" "sort" "strings" @@ -258,57 +259,157 @@ func (cm *corruptionChecker) CompactHashCheck() { ) hashes := cm.uncheckedRevisions() // Assume that revisions are ordered from largest to smallest - for i, hash := range hashes { + for _, hash := range hashes { peers := cm.hasher.PeerHashByRev(hash.Revision) if len(peers) == 0 { continue } - peersChecked := 0 - for _, p := range peers { - if p.resp == nil || p.resp.CompactRevision != hash.CompactRevision { - continue - } + if cm.checkPeerHashes(hash, peers) { + return + } + } + cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes))) + return +} - // follower's compact revision is leader's old one, then hashes must match - if p.resp.Hash != hash.Hash { - cm.hasher.TriggerCorruptAlarm(p.id) - cm.lg.Error("failed compaction hash check", - zap.Int64("revision", hash.Revision), - zap.Int64("leader-compact-revision", hash.CompactRevision), - zap.Uint32("leader-hash", hash.Hash), - zap.Int64("follower-compact-revision", p.resp.CompactRevision), - zap.Uint32("follower-hash", p.resp.Hash), - zap.String("follower-peer-id", p.id.String()), - ) - return - } - peersChecked++ - cm.lg.Info("successfully checked hash on follower", - zap.Int64("revision", hash.Revision), - zap.String("peer-id", p.id.String()), - ) +// check peers hash and raise alarms if detected corruption. +// return a bool indicate whether to check next hash. +// +// true: successfully checked hash on whole cluster or raised alarms, so no need to check next hash +// false: skipped some members, so need to check next hash +func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers []*peerHashKVResp) bool { + leaderId := cm.hasher.MemberId() + hashMap := map[uint32]types.IDSlice{leaderHash.Hash: {leaderId}} + + peersChecked := 0 + // group all peers by hash + for _, peer := range peers { + skipped := false + reason := "" + + if peer.resp == nil { + skipped = true + reason = "no response" + } else if peer.resp.CompactRevision != leaderHash.CompactRevision { + skipped = true + reason = fmt.Sprintf("the peer's CompactRevision %d doesn't match leader's CompactRevision %d", + peer.resp.CompactRevision, leaderHash.CompactRevision) + } + if skipped { + cm.lg.Warn("Skipped peer's hash", zap.Int("number-of-peers", len(peers)), + zap.String("leader-id", leaderId.String()), + zap.String("peer-id", peer.id.String()), + zap.String("reason", reason)) + continue + } + + peersChecked++ + if ids, ok := hashMap[peer.resp.Hash]; !ok { + hashMap[peer.resp.Hash] = []types.ID{peer.id} + } else { + ids = append(ids, peer.id) + hashMap[peer.resp.Hash] = ids } - if len(peers) == peersChecked { + } + + // All members have the same CompactRevision and Hash. + if len(hashMap) == 1 { + if peersChecked == len(peers) { cm.lg.Info("successfully checked hash on whole cluster", zap.Int("number-of-peers-checked", peersChecked), - zap.Int64("revision", hash.Revision), + zap.Int64("revision", leaderHash.Revision), + zap.Int64("compactRevision", leaderHash.CompactRevision), ) cm.mux.Lock() - if hash.Revision > cm.latestRevisionChecked { - cm.latestRevisionChecked = hash.Revision + if leaderHash.Revision > cm.latestRevisionChecked { + cm.latestRevisionChecked = leaderHash.Revision } cm.mux.Unlock() - cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", i+1)) - return + return true } cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers", zap.Int("number-of-peers-checked", peersChecked), zap.Int("number-of-peers", len(peers)), - zap.Int64("revision", hash.Revision), + zap.Int64("revision", leaderHash.Revision), + zap.Int64("compactRevision", leaderHash.CompactRevision), ) + // The only case which needs to check next hash + return false } - cm.lg.Info("finished compaction hash check", zap.Int("number-of-hashes-checked", len(hashes))) - return + + // Detected hashes mismatch + // The first step is to figure out the majority with the same hash. + memberCnt := len(peers) + 1 + quorum := memberCnt/2 + 1 + quorumExist := false + for k, v := range hashMap { + if len(v) >= quorum { + quorumExist = true + // remove the majority, and we might raise alarms for the left members. + delete(hashMap, k) + break + } + } + + if !quorumExist { + // If quorumExist doesn't exist, then only raise alarm for the least minority + cm.lg.Error("Detected compaction hash mismatch but can't identify the corrupted members, so only raise alarm for the least minority", + zap.String("leader-id", leaderId.String()), + zap.Int64("leader-revision", leaderHash.Revision), + zap.Int64("leader-compact-revision", leaderHash.CompactRevision), + zap.Uint32("leader-hash", leaderHash.Hash), + ) + + // find out the members which are the least minority + var ( + minCnt int = math.MaxInt + hashVal uint32 + memberIDs types.IDSlice + ) + + for k, v := range hashMap { + if v.Len() < minCnt { + minCnt = v.Len() + hashVal = k + memberIDs = v + } + } + + for _, pid := range memberIDs { + cm.hasher.TriggerCorruptAlarm(pid) + } + delete(hashMap, hashVal) + + cm.lg.Error("Detected compaction hash mismatch but can't identify the corrupted members, so only raise alarm for the least minority", + zap.String("leader-id", leaderId.String()), + zap.Int64("leader-revision", leaderHash.Revision), + zap.Int64("leader-compact-revision", leaderHash.CompactRevision), + zap.Uint32("leader-hash", leaderHash.Hash), + zap.Uint32("peer-hash", hashVal), + zap.String("peer-ids", memberIDs.String()), + ) + } + + // Raise alarm for the left members if the quorum is present. + // But we should always generate error log for debugging. + for k, v := range hashMap { + for _, pid := range v { + if quorumExist { + cm.hasher.TriggerCorruptAlarm(pid) + } + } + cm.lg.Error("Detected compaction hash mismatch", + zap.String("leader-id", leaderId.String()), + zap.Int64("leader-revision", leaderHash.Revision), + zap.Int64("leader-compact-revision", leaderHash.CompactRevision), + zap.Uint32("leader-hash", leaderHash.Hash), + zap.Uint32("peer-hash", k), + zap.String("peer-ids", v.String()), + zap.Bool("alarm-raised", quorumExist), + ) + } + + return true } func (cm *corruptionChecker) uncheckedRevisions() []mvcc.KeyValueHash { diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index 93942051c50..f8274fe1698 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -251,7 +251,7 @@ func TestCompactHashCheck(t *testing.T) { hashes: []mvcc.KeyValueHash{{Revision: 1}, {Revision: 2}}, peerHashes: []*peerHashKVResp{{err: fmt.Errorf("failed getting hash")}}, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"}, }, { name: "Peer returned different compaction revision is skipped", @@ -259,15 +259,34 @@ func TestCompactHashCheck(t *testing.T) { hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1}, {Revision: 2, CompactRevision: 2}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{CompactRevision: 3}}}, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "PeerHashByRev(1)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"}, }, { name: "Peer returned same compaction revision but different hash triggers alarm", hasher: fakeHasher{ - hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, - peerHashes: []*peerHashKVResp{{peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}}, + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(42)"}, + expectCorrupt: true, + }, + { + name: "Peer returned same compaction revision but different hash triggers alarm for the least minority", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 46}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 4}}, + {peerInfo: peerInfo{id: 47}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + }, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "TriggerCorruptAlarm(42)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(46)"}, expectCorrupt: true, }, { @@ -276,7 +295,7 @@ func TestCompactHashCheck(t *testing.T) { hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 1}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{MemberId: 42}, CompactRevision: 1, Hash: 1}}}, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()"}, expectLastRevisionChecked: 2, }, { @@ -288,7 +307,7 @@ func TestCompactHashCheck(t *testing.T) { {err: fmt.Errorf("failed getting hash")}, }, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)", "MemberId()"}, }, } for _, tc := range tcs { From a3197102e9b8b42fcf745ae5e4166b93f3b00095 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Wed, 23 Nov 2022 13:21:12 +0800 Subject: [PATCH 2/7] test: rollback the change in PR pull/14824 The change did in https://github.com/etcd-io/etcd/pull/14824 fixed the test instead of the product code. It isn't correct. After we fixed the product code in this PR, we can revert the change in that PR. Signed-off-by: Benjamin Wang --- tests/integration/corrupt_test.go | 38 ++----------------------------- 1 file changed, 2 insertions(+), 36 deletions(-) diff --git a/tests/integration/corrupt_test.go b/tests/integration/corrupt_test.go index ee2474a2371..ee98a24fb83 100644 --- a/tests/integration/corrupt_test.go +++ b/tests/integration/corrupt_test.go @@ -93,30 +93,13 @@ func TestPeriodicCheckDetectsCorruption(t *testing.T) { time.Sleep(50 * time.Millisecond) leader := clus.WaitLeader(t) - // Get sorted member IDs - members, err := cc.MemberList(ctx) - assert.NoError(t, err, "error on member list %v") - - // NOTE: If the corrupted member has been elected as leader, the - // alarm will show the smaller member. - var expectedID = uint64(clus.Members[0].ID()) - if leader == 0 { - for _, m := range members.Members { - if m.Name != clus.Members[0].Name { - expectedID = m.ID - break - } - } - - } - err = clus.Members[leader].Server.CorruptionChecker().PeriodicCheck() assert.NoError(t, err, "error on periodic check") time.Sleep(50 * time.Millisecond) alarmResponse, err := cc.AlarmList(ctx) assert.NoError(t, err, "error on alarm list") - assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: expectedID}}, alarmResponse.Alarms) + assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: uint64(clus.Members[0].ID())}}, alarmResponse.Alarms) } func TestCompactHashCheck(t *testing.T) { @@ -186,26 +169,9 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) { time.Sleep(50 * time.Millisecond) leader := clus.WaitLeader(t) - // Get sorted member IDs - members, err := cc.MemberList(ctx) - assert.NoError(t, err, "error on member list %v") - - // NOTE: If the corrupted member has been elected as leader, the - // alarm will show the smaller member. - var expectedID = uint64(clus.Members[0].ID()) - if leader == 0 { - for _, m := range members.Members { - if m.Name != clus.Members[0].Name { - expectedID = m.ID - break - } - } - - } - clus.Members[leader].Server.CorruptionChecker().CompactHashCheck() time.Sleep(50 * time.Millisecond) alarmResponse, err := cc.AlarmList(ctx) assert.NoError(t, err, "error on alarm list") - assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: expectedID}}, alarmResponse.Alarms) + assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: uint64(clus.Members[0].ID())}}, alarmResponse.Alarms) } From 7b19ee6396de1c1ff6dda4b9ee444cb42010fa2f Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Wed, 23 Nov 2022 13:42:45 +0800 Subject: [PATCH 3/7] test: add integration test to cover the multiple member corruption case Signed-off-by: Benjamin Wang --- tests/integration/corrupt_test.go | 55 +++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/tests/integration/corrupt_test.go b/tests/integration/corrupt_test.go index ee98a24fb83..cd695c5782e 100644 --- a/tests/integration/corrupt_test.go +++ b/tests/integration/corrupt_test.go @@ -175,3 +175,58 @@ func TestCompactHashCheckDetectCorruption(t *testing.T) { assert.NoError(t, err, "error on alarm list") assert.Equal(t, []*etcdserverpb.AlarmMember{{Alarm: etcdserverpb.AlarmType_CORRUPT, MemberID: uint64(clus.Members[0].ID())}}, alarmResponse.Alarms) } + +func TestCompactHashCheckDetectMultipleCorruption(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 5}) + defer clus.Terminate(t) + + cc, err := clus.ClusterClient(t) + require.NoError(t, err) + + ctx := context.Background() + + for i := 0; i < 10; i++ { + _, err := cc.Put(ctx, testutil.PickKey(int64(i)), fmt.Sprint(i)) + assert.NoError(t, err, "error on put") + } + + clus.Members[0].Server.CorruptionChecker().CompactHashCheck() + clus.Members[0].Stop(t) + clus.Members[1].Server.CorruptionChecker().CompactHashCheck() + clus.Members[1].Stop(t) + clus.WaitLeader(t) + + err = testutil.CorruptBBolt(clus.Members[0].BackendPath()) + require.NoError(t, err) + err = testutil.CorruptBBolt(clus.Members[1].BackendPath()) + require.NoError(t, err) + + err = clus.Members[0].Restart(t) + require.NoError(t, err) + err = clus.Members[1].Restart(t) + require.NoError(t, err) + + _, err = cc.Compact(ctx, 5) + require.NoError(t, err) + time.Sleep(50 * time.Millisecond) + leader := clus.WaitLeader(t) + + clus.Members[leader].Server.CorruptionChecker().CompactHashCheck() + time.Sleep(50 * time.Millisecond) + alarmResponse, err := cc.AlarmList(ctx) + assert.NoError(t, err, "error on alarm list") + + expectedAlarmMap := map[uint64]etcdserverpb.AlarmType{ + uint64(clus.Members[0].ID()): etcdserverpb.AlarmType_CORRUPT, + uint64(clus.Members[1].ID()): etcdserverpb.AlarmType_CORRUPT, + } + + actualAlarmMap := make(map[uint64]etcdserverpb.AlarmType) + for _, alarm := range alarmResponse.Alarms { + actualAlarmMap[alarm.MemberID] = alarm.Alarm + } + + require.Equal(t, expectedAlarmMap, actualAlarmMap) +} From 85fc09d09bb2ebbc0d2eed40f16fc20eb0f1b630 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Wed, 23 Nov 2022 15:43:46 +0800 Subject: [PATCH 4/7] etcdserver: resolve review comments in PR 14828 Signed-off-by: Benjamin Wang --- server/etcdserver/corrupt.go | 37 +++++++++++++------------------ server/etcdserver/corrupt_test.go | 18 +++++++++++++++ 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index dd627796da1..1713de3b86b 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -279,7 +279,7 @@ func (cm *corruptionChecker) CompactHashCheck() { // false: skipped some members, so need to check next hash func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers []*peerHashKVResp) bool { leaderId := cm.hasher.MemberId() - hashMap := map[uint32]types.IDSlice{leaderHash.Hash: {leaderId}} + hash2members := map[uint32]types.IDSlice{leaderHash.Hash: {leaderId}} peersChecked := 0 // group all peers by hash @@ -304,16 +304,16 @@ func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers } peersChecked++ - if ids, ok := hashMap[peer.resp.Hash]; !ok { - hashMap[peer.resp.Hash] = []types.ID{peer.id} + if ids, ok := hash2members[peer.resp.Hash]; !ok { + hash2members[peer.resp.Hash] = []types.ID{peer.id} } else { ids = append(ids, peer.id) - hashMap[peer.resp.Hash] = ids + hash2members[peer.resp.Hash] = ids } } // All members have the same CompactRevision and Hash. - if len(hashMap) == 1 { + if len(hash2members) == 1 { if peersChecked == len(peers) { cm.lg.Info("successfully checked hash on whole cluster", zap.Int("number-of-peers-checked", peersChecked), @@ -342,32 +342,25 @@ func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers memberCnt := len(peers) + 1 quorum := memberCnt/2 + 1 quorumExist := false - for k, v := range hashMap { + for k, v := range hash2members { if len(v) >= quorum { quorumExist = true // remove the majority, and we might raise alarms for the left members. - delete(hashMap, k) + delete(hash2members, k) break } } if !quorumExist { - // If quorumExist doesn't exist, then only raise alarm for the least minority - cm.lg.Error("Detected compaction hash mismatch but can't identify the corrupted members, so only raise alarm for the least minority", - zap.String("leader-id", leaderId.String()), - zap.Int64("leader-revision", leaderHash.Revision), - zap.Int64("leader-compact-revision", leaderHash.CompactRevision), - zap.Uint32("leader-hash", leaderHash.Hash), - ) - - // find out the members which are the least minority + // If quorumExist doesn't exist, then only raise alarm for the least minority. + // The first step is to find out the members which are the least minority. var ( minCnt int = math.MaxInt hashVal uint32 memberIDs types.IDSlice ) - for k, v := range hashMap { + for k, v := range hash2members { if v.Len() < minCnt { minCnt = v.Len() hashVal = k @@ -375,10 +368,11 @@ func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers } } + // raise alarms for _, pid := range memberIDs { cm.hasher.TriggerCorruptAlarm(pid) } - delete(hashMap, hashVal) + delete(hash2members, hashVal) cm.lg.Error("Detected compaction hash mismatch but can't identify the corrupted members, so only raise alarm for the least minority", zap.String("leader-id", leaderId.String()), @@ -392,12 +386,13 @@ func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers // Raise alarm for the left members if the quorum is present. // But we should always generate error log for debugging. - for k, v := range hashMap { - for _, pid := range v { - if quorumExist { + for k, v := range hash2members { + if quorumExist { + for _, pid := range v { cm.hasher.TriggerCorruptAlarm(pid) } } + cm.lg.Error("Detected compaction hash mismatch", zap.String("leader-id", leaderId.String()), zap.Int64("leader-revision", leaderHash.Revision), diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index f8274fe1698..e8fd19fcbb7 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -289,6 +289,24 @@ func TestCompactHashCheck(t *testing.T) { expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(46)"}, expectCorrupt: true, }, + { + name: "Peer returned same compaction revision but all members have different hash", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + // Each member is the least minority, and etcd may trigger alarm for any one. + // So intentionally set the same member id for all members. + peerHashes: []*peerHashKVResp{ + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 4}}, + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 5}}, + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 6}}, + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 7}}, + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 8}}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(42)"}, + expectCorrupt: true, + }, { name: "Peer returned same hash bumps last revision checked", hasher: fakeHasher{ From e95e82f0b92ae7d679969f88f3d2feea8c28c5ee Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Sat, 26 Nov 2022 05:52:45 +0800 Subject: [PATCH 5/7] etcdserver: added a summary for the CompactHashCheck method Signed-off-by: Benjamin Wang --- server/etcdserver/corrupt.go | 57 +++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 1713de3b86b..8f2d3f4521e 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -252,6 +252,17 @@ func (cm *corruptionChecker) PeriodicCheck() error { return nil } +// CompactHashCheck is based on the fact that 'compactions' are coordinated +// between raft members and performed at the same revision. For each compacted +// revision there is KV store hash computed and saved for some time. +// +// This method communicates with peers to find a recent common revision across +// members, and raises alarm if 2 or more members at the same compact revision +// have different hashes. +// +// We might miss opportunity to perform the check if the compaction is still +// ongoing on one of the members or it was unresponsive. In such situation the +// method still passes without raising alarm. func (cm *corruptionChecker) CompactHashCheck() { cm.lg.Info("starting compact hash check", zap.String("local-member-id", cm.hasher.MemberId().String()), @@ -314,27 +325,7 @@ func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers // All members have the same CompactRevision and Hash. if len(hash2members) == 1 { - if peersChecked == len(peers) { - cm.lg.Info("successfully checked hash on whole cluster", - zap.Int("number-of-peers-checked", peersChecked), - zap.Int64("revision", leaderHash.Revision), - zap.Int64("compactRevision", leaderHash.CompactRevision), - ) - cm.mux.Lock() - if leaderHash.Revision > cm.latestRevisionChecked { - cm.latestRevisionChecked = leaderHash.Revision - } - cm.mux.Unlock() - return true - } - cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers", - zap.Int("number-of-peers-checked", peersChecked), - zap.Int("number-of-peers", len(peers)), - zap.Int64("revision", leaderHash.Revision), - zap.Int64("compactRevision", leaderHash.CompactRevision), - ) - // The only case which needs to check next hash - return false + return cm.handleConsistentHash(leaderHash, peersChecked, len(peers)) } // Detected hashes mismatch @@ -407,6 +398,30 @@ func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers return true } +func (cm *corruptionChecker) handleConsistentHash(hash mvcc.KeyValueHash, peersChecked, peerCnt int) bool { + if peersChecked == peerCnt { + cm.lg.Info("successfully checked hash on whole cluster", + zap.Int("number-of-peers-checked", peersChecked), + zap.Int64("revision", hash.Revision), + zap.Int64("compactRevision", hash.CompactRevision), + ) + cm.mux.Lock() + if hash.Revision > cm.latestRevisionChecked { + cm.latestRevisionChecked = hash.Revision + } + cm.mux.Unlock() + return true + } + cm.lg.Warn("skipped revision in compaction hash check; was not able to check all peers", + zap.Int("number-of-peers-checked", peersChecked), + zap.Int("number-of-peers", peerCnt), + zap.Int64("revision", hash.Revision), + zap.Int64("compactRevision", hash.CompactRevision), + ) + // The only case which needs to check next hash + return false +} + func (cm *corruptionChecker) uncheckedRevisions() []mvcc.KeyValueHash { cm.mux.RLock() lastRevisionChecked := cm.latestRevisionChecked From 6049af072cf005f9bc9bda566063b9ff7c6fce10 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Sat, 26 Nov 2022 06:08:58 +0800 Subject: [PATCH 6/7] etcdserver: intentionally set memberID as 0 when can't identify the corrupted member If quorum doesn't exist, we don't know which members data are corrupted. In such situation, we intentionally set the memberID as 0, it means it affects the whole cluster. It's align with what we did for 3.4 and 3.5 in https://github.com/etcd-io/etcd/issues/14849 Signed-off-by: Benjamin Wang --- server/etcdserver/corrupt.go | 33 ++++++--------------------------- 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 8f2d3f4521e..20e19fbb706 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" "io" - "math" "net/http" "sort" "strings" @@ -343,36 +342,16 @@ func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers } if !quorumExist { - // If quorumExist doesn't exist, then only raise alarm for the least minority. - // The first step is to find out the members which are the least minority. - var ( - minCnt int = math.MaxInt - hashVal uint32 - memberIDs types.IDSlice - ) - - for k, v := range hash2members { - if v.Len() < minCnt { - minCnt = v.Len() - hashVal = k - memberIDs = v - } - } - - // raise alarms - for _, pid := range memberIDs { - cm.hasher.TriggerCorruptAlarm(pid) - } - delete(hash2members, hashVal) - - cm.lg.Error("Detected compaction hash mismatch but can't identify the corrupted members, so only raise alarm for the least minority", + // If quorum doesn't exist, we don't know which members data are + // corrupted. In such situation, we intentionally set the memberID + // as 0, it means it affects the whole cluster. + cm.lg.Error("Detected compaction hash mismatch but cannot identify the corrupted members, so intentionally set the memberID as 0", zap.String("leader-id", leaderId.String()), zap.Int64("leader-revision", leaderHash.Revision), zap.Int64("leader-compact-revision", leaderHash.CompactRevision), zap.Uint32("leader-hash", leaderHash.Hash), - zap.Uint32("peer-hash", hashVal), - zap.String("peer-ids", memberIDs.String()), ) + cm.hasher.TriggerCorruptAlarm(0) } // Raise alarm for the left members if the quorum is present. @@ -391,7 +370,7 @@ func (cm *corruptionChecker) checkPeerHashes(leaderHash mvcc.KeyValueHash, peers zap.Uint32("leader-hash", leaderHash.Hash), zap.Uint32("peer-hash", k), zap.String("peer-ids", v.String()), - zap.Bool("alarm-raised", quorumExist), + zap.Bool("quorum-exist", quorumExist), ) } From d545d603e9ba5615339e0268fa2b79738232ea50 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Sat, 26 Nov 2022 06:24:56 +0800 Subject: [PATCH 7/7] test: update both unit test and e2e/integration test for CompactHashCheck Signed-off-by: Benjamin Wang --- server/etcdserver/corrupt_test.go | 90 +++++++++++++++++++++++++------ 1 file changed, 75 insertions(+), 15 deletions(-) diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index e8fd19fcbb7..dd876ca2333 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -262,19 +262,71 @@ func TestCompactHashCheck(t *testing.T) { expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"}, }, { - name: "Peer returned same compaction revision but different hash triggers alarm", + name: "Etcd can identify two corrupted members in 5 member cluster", hasher: fakeHasher{ hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, peerHashes: []*peerHashKVResp{ - {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + {peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 7}}, + {peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 8}}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(44)", "TriggerCorruptAlarm(45)"}, + expectCorrupt: true, + }, + { + name: "Etcd checks next hash when one member is unresponsive in 3 member cluster", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {err: fmt.Errorf("failed getting hash")}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "PeerHashByRev(1)", "MemberId()"}, + expectCorrupt: false, + }, + { + name: "Etcd can identify single corrupted member in 3 member cluster", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, }, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(42)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(43)"}, expectCorrupt: true, }, { - name: "Peer returned same compaction revision but different hash triggers alarm for the least minority", + name: "Etcd can identify single corrupted member in 5 member cluster", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + {peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(44)"}, + expectCorrupt: true, + }, + { + name: "Etcd triggers corrupted alarm on whole cluster if in 3 member cluster one member is down and one member corrupted", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {err: fmt.Errorf("failed getting hash")}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(0)"}, + expectCorrupt: true, + }, + { + name: "Etcd triggers corrupted alarm on whole cluster if no quorum in 5 member cluster", hasher: fakeHasher{ hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, peerHashes: []*peerHashKVResp{ @@ -286,25 +338,33 @@ func TestCompactHashCheck(t *testing.T) { {peerInfo: peerInfo{id: 47}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, }, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(46)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(0)"}, expectCorrupt: true, }, { - name: "Peer returned same compaction revision but all members have different hash", + name: "Etcd can identify corrupted member in 5 member cluster even if one member is down", hasher: fakeHasher{ - hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}, {Revision: 2, CompactRevision: 1, Hash: 2}}, - // Each member is the least minority, and etcd may trigger alarm for any one. - // So intentionally set the same member id for all members. + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}}, + peerHashes: []*peerHashKVResp{ + {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + {err: fmt.Errorf("failed getting hash")}, + {peerInfo: peerInfo{id: 44}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, + {peerInfo: peerInfo{id: 45}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 2}}, + }, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(44)"}, + expectCorrupt: true, + }, + { + name: "Etcd can identify that leader is corrupted", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 2}, {Revision: 2, CompactRevision: 1, Hash: 2}}, peerHashes: []*peerHashKVResp{ {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, - {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 4}}, - {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 5}}, - {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 6}}, - {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 7}}, - {peerInfo: peerInfo{id: 42}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 8}}, + {peerInfo: peerInfo{id: 43}, resp: &pb.HashKVResponse{CompactRevision: 1, Hash: 3}}, }, }, - expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(42)"}, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(2)", "MemberId()", "TriggerCorruptAlarm(1)"}, expectCorrupt: true, }, {