diff --git a/api/v3rpc/rpctypes/error.go b/api/v3rpc/rpctypes/error.go index b820e696e0f5..7ddfb62b84db 100644 --- a/api/v3rpc/rpctypes/error.go +++ b/api/v3rpc/rpctypes/error.go @@ -47,6 +47,7 @@ var ( ErrGRPCMemberNotLearner = status.Error(codes.FailedPrecondition, "etcdserver: can only promote a learner member") ErrGRPCLearnerNotReady = status.Error(codes.FailedPrecondition, "etcdserver: can only promote a learner member which is in sync with leader") ErrGRPCTooManyLearners = status.Error(codes.FailedPrecondition, "etcdserver: too many learner members in cluster") + ErrGPRCIdMismatch = status.Error(codes.FailedPrecondition, "etcdserver: cluster ID mismatch") ErrGRPCRequestTooLarge = status.Error(codes.InvalidArgument, "etcdserver: request is too large") ErrGRPCRequestTooManyRequests = status.Error(codes.ResourceExhausted, "etcdserver: too many requests") @@ -204,6 +205,7 @@ var ( ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken) ErrAuthOldRevision = Error(ErrGRPCAuthOldRevision) ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt) + ErrClusterIdMismatch = Error(ErrGPRCIdMismatch) ErrNoLeader = Error(ErrGRPCNoLeader) ErrNotLeader = Error(ErrGRPCNotLeader) diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 20e19fbb7060..42abf6517ab6 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -150,6 +150,17 @@ func (cm *corruptionChecker) InitialCheck() error { zap.Strings("remote-peer-endpoints", p.eps), zap.Error(err), ) + case rpctypes.ErrClusterIdMismatch: + cm.lg.Warn( + "cluster ID mismatch", + zap.String("local-member-id", cm.hasher.MemberId().String()), + zap.Int64("local-member-revision", h.Revision), + zap.Int64("local-member-compact-revision", h.CompactRevision), + zap.Uint32("local-member-hash", h.Hash), + zap.String("remote-peer-id", p.id.String()), + zap.Strings("remote-peer-endpoints", p.eps), + zap.Error(err), + ) } } } @@ -466,7 +477,7 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp { var lastErr error for _, ep := range p.eps { ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - resp, lastErr := HashByRev(ctx, cc, ep, rev) + resp, lastErr := HashByRev(ctx, s.cluster.ID(), cc, ep, rev) cancel() if lastErr == nil { resps = append(resps, &peerHashKVResp{peerInfo: p, resp: resp, err: nil}) @@ -510,6 +521,10 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "bad path", http.StatusBadRequest) return } + if gcid := r.Header.Get("X-Etcd-Cluster-ID"); gcid != h.server.cluster.ID().String() { + http.Error(w, "cluster ID mismatch", http.StatusPreconditionFailed) + return + } defer r.Body.Close() b, err := io.ReadAll(r.Body) @@ -553,7 +568,7 @@ func (h *hashKVHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // HashByRev fetch hash of kv store at the given rev via http call to the given url -func HashByRev(ctx context.Context, cc *http.Client, url string, rev int64) (*pb.HashKVResponse, error) { +func HashByRev(ctx context.Context, cid types.ID, cc *http.Client, url string, rev int64) (*pb.HashKVResponse, error) { hashReq := &pb.HashKVRequest{Revision: rev} hashReqBytes, err := json.Marshal(hashReq) if err != nil { @@ -566,6 +581,7 @@ func HashByRev(ctx context.Context, cc *http.Client, url string, rev int64) (*pb } req = req.WithContext(ctx) req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Etcd-Cluster-ID", cid.String()) req.Cancel = ctx.Done() resp, err := cc.Do(req) @@ -585,6 +601,10 @@ func HashByRev(ctx context.Context, cc *http.Client, url string, rev int64) (*pb if strings.Contains(string(b), mvcc.ErrFutureRev.Error()) { return nil, rpctypes.ErrFutureRev } + } else if resp.StatusCode == http.StatusPreconditionFailed { + if strings.Contains(string(b), "cluster ID mismatch") { + return nil, rpctypes.ErrClusterIdMismatch + } } if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("unknown error: %s", string(b)) diff --git a/server/etcdserver/corrupt_test.go b/server/etcdserver/corrupt_test.go index 3fff8a533f3a..f43b84283205 100644 --- a/server/etcdserver/corrupt_test.go +++ b/server/etcdserver/corrupt_test.go @@ -15,11 +15,23 @@ package etcdserver import ( + "bytes" "context" + "encoding/json" "fmt" + "io" + "net/http" + "net/http/httptest" + "strconv" + "strings" "testing" "time" + "go.uber.org/zap" + + "go.etcd.io/etcd/server/v3/lease" + betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" + "github.com/stretchr/testify/assert" "go.uber.org/zap/zaptest" @@ -86,6 +98,13 @@ func TestInitialCheck(t *testing.T) { hasher: fakeHasher{hashByRevResponses: []hashByRev{{hash: mvcc.KeyValueHash{Hash: 1, CompactRevision: 1}}}, peerHashes: []*peerHashKVResp{{resp: &pb.HashKVResponse{Header: &pb.ResponseHeader{}, Hash: 2, CompactRevision: 2}}}}, expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, }, + { + name: "Cluster ID Mismatch does not fail CorruptionChecker.InitialCheck()", + hasher: fakeHasher{ + peerHashes: []*peerHashKVResp{{err: rpctypes.ErrClusterIdMismatch}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "HashByRev(0)", "PeerHashByRev(0)", "MemberId()", "MemberId()"}, + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { @@ -203,6 +222,13 @@ func TestPeriodicCheck(t *testing.T) { expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)", "TriggerCorruptAlarm(88)"}, expectCorrupt: true, }, + { + name: "Cluster ID Mismatch does not fail CorruptionChecker.PeriodicCheck()", + hasher: fakeHasher{ + peerHashes: []*peerHashKVResp{{err: rpctypes.ErrClusterIdMismatch}}, + }, + expectActions: []string{"HashByRev(0)", "PeerHashByRev(0)", "ReqTimeout()", "LinearizableReadNotify()", "HashByRev(0)"}, + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { @@ -388,6 +414,14 @@ func TestCompactHashCheck(t *testing.T) { }, expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)", "MemberId()"}, }, + { + name: "Cluster ID Mismatch does not fail CorruptionChecker.CompactHashCheck()", + hasher: fakeHasher{ + hashes: []mvcc.KeyValueHash{{Revision: 1, CompactRevision: 1, Hash: 1}}, + peerHashes: []*peerHashKVResp{{err: rpctypes.ErrClusterIdMismatch}}, + }, + expectActions: []string{"MemberId()", "ReqTimeout()", "Hashes()", "PeerHashByRev(1)", "MemberId()"}, + }, } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { @@ -473,3 +507,88 @@ func (f *fakeHasher) TriggerCorruptAlarm(memberId types.ID) { f.actions = append(f.actions, fmt.Sprintf("TriggerCorruptAlarm(%d)", memberId)) f.alarmTriggered = true } + +func TestHashKVHandler(t *testing.T) { + var remoteClusterID = 111195 + var localClusterID = 111196 + var revision = 1 + + etcdSrv := &EtcdServer{} + etcdSrv.cluster = newTestCluster(t, nil) + etcdSrv.cluster.SetID(types.ID(localClusterID), types.ID(localClusterID)) + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + etcdSrv.kv = mvcc.New(zap.NewNop(), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + ph := &hashKVHandler{ + lg: zap.NewNop(), + server: etcdSrv, + } + srv := httptest.NewServer(ph) + defer srv.Close() + + tests := []struct { + name string + remoteClusterID int + wcode int + wKeyWords string + }{ + { + "success", + localClusterID, + http.StatusOK, + "", + }, + { + "fail", + remoteClusterID, + http.StatusPreconditionFailed, + "cluster ID mismatch", + }, + } + for i, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + hashReq := &pb.HashKVRequest{Revision: int64(revision)} + hashReqBytes, err := json.Marshal(hashReq) + if err != nil { + t.Fatalf("failed to marshal request: %v", err) + } + req, err := http.NewRequest(http.MethodGet, srv.URL+PeerHashKVPath, bytes.NewReader(hashReqBytes)) + if err != nil { + t.Fatalf("failed to create request: %v", err) + } + req.Header.Set("X-Etcd-Cluster-ID", strconv.FormatUint(uint64(tt.remoteClusterID), 16)) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("failed to get http response: %v", err) + } + body, err := io.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + t.Fatalf("unexpected io.ReadAll error: %v", err) + } + if resp.StatusCode != tt.wcode { + t.Fatalf("#%d: code = %d, want %d", i, resp.StatusCode, tt.wcode) + } + if resp.StatusCode != http.StatusOK { + if !strings.Contains(string(body), tt.wKeyWords) { + t.Errorf("#%d: body: %s, want body to contain keywords: %s", i, string(body), tt.wKeyWords) + } + return + } + + hashKVResponse := pb.HashKVResponse{} + err = json.Unmarshal(body, &hashKVResponse) + if err != nil { + t.Fatalf("unmarshal response error: %v", err) + } + hashValue, _, err := etcdSrv.KV().HashStorage().HashByRev(int64(revision)) + if err != nil { + t.Fatalf("etcd server hash failed: %v", err) + } + if hashKVResponse.Hash != hashValue.Hash { + t.Fatalf("hash value inconsistent: %d != %d", hashKVResponse.Hash, hashValue) + } + }) + } +} diff --git a/tests/e2e/corrupt_test.go b/tests/e2e/corrupt_test.go index e9f7a66c413e..9ce1182202b2 100644 --- a/tests/e2e/corrupt_test.go +++ b/tests/e2e/corrupt_test.go @@ -98,6 +98,79 @@ func corruptTest(cx ctlCtx) { e2e.WaitReadyExpectProc(context.TODO(), proc, []string{fmt.Sprintf("etcdmain: %016x found data inconsistency with peers", id0)}) } +func TestInPlaceRecovery(t *testing.T) { + basePort := 20000 + e2e.BeforeTest(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Initialize the cluster. + epcOld, err := e2e.NewEtcdProcessCluster(ctx, t, + e2e.WithInitialClusterToken("old"), + e2e.WithKeepDataDir(false), + e2e.WithCorruptCheckTime(time.Second), + e2e.WithBasePort(basePort), + ) + if err != nil { + t.Fatalf("could not start etcd process cluster (%v)", err) + } + + // Put some data into the old cluster, so that after recovering from a blank db, the hash diverges. + oldCc, err := e2e.NewEtcdctl(epcOld.Cfg.Client, epcOld.EndpointsGRPC()) + assert.NoError(t, err) + for i := 0; i < 10; i++ { + err := oldCc.Put(ctx, testutil.PickKey(int64(i)), fmt.Sprint(i), config.PutOptions{}) + assert.NoError(t, err, "error on put") + } + + // Create a new cluster config, but with the same port numbers. In this way the new servers can stay in + // contact with the old ones. + epcNewConfig := e2e.NewConfig( + e2e.WithInitialClusterToken("new"), + e2e.WithKeepDataDir(false), + e2e.WithCorruptCheckTime(time.Second), + e2e.WithBasePort(basePort), + e2e.WithInitialCorruptCheck(true), + ) + epcNew, err := e2e.InitEtcdProcessCluster(t, epcNewConfig) + if err != nil { + t.Fatalf("could not init etcd process cluster (%v)", err) + } + newCc, err := e2e.NewEtcdctl(epcNew.Cfg.Client, epcNew.EndpointsGRPC()) + assert.NoError(t, err) + + // Rolling recovery of the servers. + for i, newProc := range epcNew.Procs { + oldProc := epcOld.Procs[i] + err = oldProc.Close() + if err != nil { + t.Fatalf("could not stop etcd process (%v)", err) + } + err = newProc.Start(ctx) + if err != nil { + t.Fatalf("could not start etcd process (%v)", err) + } + time.Sleep(5 * time.Second) // wait for a while so that the periodical check proceeds + } + + alarmResponse, err := newCc.AlarmList(ctx) + assert.NoError(t, err, "error on alarm list") + for _, alarm := range alarmResponse.Alarms { + if alarm.Alarm == etcdserverpb.AlarmType_CORRUPT { + t.Fatalf("There is no corruption after in-place recovery, but corruption reported.") + } + } + + t.Cleanup(func() { + if errC := epcOld.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + if errC := epcNew.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }) +} + func TestPeriodicCheckDetectsCorruption(t *testing.T) { checkTime := time.Second e2e.BeforeTest(t) diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 0eafc4579104..bdd286bbcc1f 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -315,6 +315,14 @@ func WithCorruptCheckTime(time time.Duration) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.CorruptCheckTime = time } } +func WithInitialClusterToken(token string) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.InitialToken = token } +} + +func WithInitialCorruptCheck(enabled bool) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.InitialCorruptCheck = enabled } +} + func WithCompactHashCheckEnabled(enabled bool) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.CompactHashCheckEnabled = enabled } } diff --git a/tests/integration/hashkv_test.go b/tests/integration/hashkv_test.go index 3fc10a604d88..238958559e83 100644 --- a/tests/integration/hashkv_test.go +++ b/tests/integration/hashkv_test.go @@ -47,13 +47,14 @@ func TestCompactionHash(t *testing.T) { }, } - testutil.TestCompactionHash(context.Background(), t, hashTestCase{cc, clus.Members[0].GRPCURL(), client}, 1000) + testutil.TestCompactionHash(context.Background(), t, hashTestCase{cc, clus.Members[0].GRPCURL(), client, clus.Members[0].Server}, 1000) } type hashTestCase struct { *clientv3.Client - url string - http *http.Client + url string + http *http.Client + server *etcdserver.EtcdServer } func (tc hashTestCase) Put(ctx context.Context, key, value string) error { @@ -67,7 +68,7 @@ func (tc hashTestCase) Delete(ctx context.Context, key string) error { } func (tc hashTestCase) HashByRev(ctx context.Context, rev int64) (testutil.KeyValueHash, error) { - resp, err := etcdserver.HashByRev(ctx, tc.http, "http://unix", rev) + resp, err := etcdserver.HashByRev(ctx, tc.server.Cluster().ID(), tc.http, "http://unix", rev) return testutil.KeyValueHash{Hash: resp.Hash, CompactRevision: resp.CompactRevision, Revision: resp.Header.Revision}, err }