Skip to content

Commit

Permalink
etcdserver: CheckInitialHashKV when "InitialCorruptCheck==true"
Browse files Browse the repository at this point in the history
etcdserver: only compare hash values if any

It's possible that peer has higher revision than local node.
In such case, hashes will still be different on requested
revision, but peer's header revision is greater.

etcdserver: count mismatch only when compact revisions are same

Signed-off-by: Gyu-Ho Lee <[email protected]>
  • Loading branch information
gyuho committed Nov 23, 2017
1 parent 1f38f1f commit e0dfc43
Showing 1 changed file with 85 additions and 18 deletions.
103 changes: 85 additions & 18 deletions etcdserver/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,61 @@ package etcdserver

import (
"context"
"fmt"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
"github.com/coreos/etcd/pkg/types"
)

// CheckInitialHashKV compares initial hash values with its peers
// before serving any peer/client traffic. Only mismatch when hashes
// are different at requested revision, with same compact revision.
func (s *EtcdServer) CheckInitialHashKV() error {
if !s.Cfg.InitialCorruptCheck {
return nil
}

plog.Infof("%s starting initial corruption check with timeout %v...", s.ID(), s.Cfg.ReqTimeout())
h, rev, crev, err := s.kv.HashByRev(0)
if err != nil {
return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err)
}
peers := s.getPeerHashKVs(rev)
mismatch := 0
for _, p := range peers {
if p.resp != nil {
peerID := types.ID(p.resp.Header.MemberId)
if h != p.resp.Hash {
if crev == p.resp.CompactRevision {
plog.Errorf("%s's hash %d != %s's hash %d (revision %d, peer revision %d, compact revision %d)", s.ID(), h, peerID, p.resp.Hash, rev, p.resp.Header.Revision, crev)
mismatch++
} else {
plog.Warningf("%s cannot check hash of peer(%s): peer has a different compact revision %d (revision:%d)", s.ID(), peerID, p.resp.CompactRevision, rev)
}
}
continue
}
if p.err != nil {
switch p.err {
case rpctypes.ErrFutureRev:
plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: peer is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
case rpctypes.ErrCompacted:
plog.Warningf("%s cannot check the hash of peer(%q) at revision %d: local node is lagging behind(%q)", s.ID(), p.eps, rev, p.err.Error())
}
}
}
if mismatch > 0 {
return fmt.Errorf("%s found data inconsistency with peers", s.ID())
}

plog.Infof("%s succeeded on initial corruption checking: no corruption", s.ID())
return nil
}

func (s *EtcdServer) monitorKVHash() {
t := s.Cfg.CorruptCheckTime
if t == 0 {
Expand All @@ -50,7 +97,7 @@ func (s *EtcdServer) checkHashKV() error {
if err != nil {
plog.Fatalf("failed to hash kv store (%v)", err)
}
resps := s.getPeerHashKVs(rev)
peers := s.getPeerHashKVs(rev)

ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
err = s.linearizableReadNotify(ctx)
Expand Down Expand Up @@ -86,35 +133,38 @@ func (s *EtcdServer) checkHashKV() error {
mismatch(uint64(s.ID()))
}

for _, resp := range resps {
id := resp.Header.MemberId
for _, p := range peers {
if p.resp == nil {
continue
}
id := p.resp.Header.MemberId

// leader expects follower's latest revision less than or equal to leader's
if resp.Header.Revision > rev2 {
if p.resp.Header.Revision > rev2 {
plog.Warningf(
"revision %d from member %v, expected at most %d",
resp.Header.Revision,
p.resp.Header.Revision,
types.ID(id),
rev2)
mismatch(id)
}

// leader expects follower's latest compact revision less than or equal to leader's
if resp.CompactRevision > crev2 {
if p.resp.CompactRevision > crev2 {
plog.Warningf(
"compact revision %d from member %v, expected at most %d",
resp.CompactRevision,
p.resp.CompactRevision,
types.ID(id),
crev2,
)
mismatch(id)
}

// follower's compact revision is leader's old one, then hashes must match
if resp.CompactRevision == crev && resp.Hash != h {
if p.resp.CompactRevision == crev && p.resp.Hash != h {
plog.Warningf(
"hash %d at revision %d from member %v, expected hash %d",
resp.Hash,
p.resp.Hash,
rev,
types.ID(id),
h,
Expand All @@ -125,36 +175,53 @@ func (s *EtcdServer) checkHashKV() error {
return nil
}

func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*clientv3.HashKVResponse) {
for _, m := range s.cluster.Members() {
type peerHashKVResp struct {
resp *clientv3.HashKVResponse
err error
eps []string
}

func (s *EtcdServer) getPeerHashKVs(rev int64) (resps []*peerHashKVResp) {
// TODO: handle the case when "s.cluster.Members" have not
// been populated (e.g. no snapshot to load from disk)
mbs := s.cluster.Members()
pURLs := make([][]string, len(mbs))
for _, m := range mbs {
if m.ID == s.ID() {
continue
}
pURLs = append(pURLs, m.PeerURLs)
}

for _, purls := range pURLs {
if len(purls) == 0 {
continue
}
cli, cerr := clientv3.New(clientv3.Config{
DialTimeout: s.Cfg.ReqTimeout(),
Endpoints: m.PeerURLs,
Endpoints: purls,
})
if cerr != nil {
plog.Warningf("%s failed to create client to peer %s for hash checking (%q)", s.ID(), types.ID(m.ID), cerr.Error())
plog.Warningf("%s failed to create client to peer %q for hash checking (%q)", s.ID(), purls, cerr.Error())
continue
}

respsLen := len(resps)
for _, c := range cli.Endpoints() {
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
resp, herr := cli.HashKV(ctx, c, rev)
var resp *clientv3.HashKVResponse
resp, cerr = cli.HashKV(ctx, c, rev)
cancel()
if herr == nil {
cerr = herr
resps = append(resps, resp)
if cerr == nil {
resps = append(resps, &peerHashKVResp{resp: resp})
break
}
plog.Warningf("%s hash-kv error %q on peer %q with revision %d", s.ID(), cerr.Error(), c, rev)
}
cli.Close()

if respsLen == len(resps) {
plog.Warningf("%s failed to hash kv for peer %s (%v)", s.ID(), types.ID(m.ID), cerr)
resps = append(resps, &peerHashKVResp{err: cerr, eps: purls})
}
}
return resps
Expand Down

0 comments on commit e0dfc43

Please sign in to comment.