From 236a54d30c89e7d823fa539321ec764df3fa9428 Mon Sep 17 00:00:00 2001 From: yoyinzyc Date: Tue, 30 Jun 2020 15:43:23 -0700 Subject: [PATCH] etcdserver: add getDowngradeEnabled http handler; attach downgrade monitor to server to monitor downgrade status. --- etcdserver/api/etcdhttp/peer.go | 7 ++- etcdserver/api/etcdhttp/peer_test.go | 4 +- etcdserver/cluster_util.go | 89 +++++++++++++++++++++++++++ etcdserver/cluster_util_test.go | 49 +++++++++++++++ etcdserver/corrupt.go | 5 -- etcdserver/server.go | 92 ++++++++++++++++++++++++++++ 6 files changed, 238 insertions(+), 8 deletions(-) diff --git a/etcdserver/api/etcdhttp/peer.go b/etcdserver/api/etcdhttp/peer.go index 0b97c05fbe7f..adf1feda44ba 100644 --- a/etcdserver/api/etcdhttp/peer.go +++ b/etcdserver/api/etcdhttp/peer.go @@ -34,11 +34,12 @@ import ( const ( peerMembersPath = "/members" peerMemberPromotePrefix = "/members/promote/" + downgradeEnabledPath = "/downgrade/enabled" ) // NewPeerHandler generates an http.Handler to handle etcd peer requests. func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler { - return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler()) + return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler()) } func newPeerHandler( @@ -47,6 +48,7 @@ func newPeerHandler( raftHandler http.Handler, leaseHandler http.Handler, hashKVHandler http.Handler, + downgradeEnabledHandler http.Handler, ) http.Handler { if lg == nil { lg = zap.NewNop() @@ -64,6 +66,9 @@ func newPeerHandler( mux.Handle(leasehttp.LeasePrefix, leaseHandler) mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler) } + if downgradeEnabledHandler != nil { + mux.Handle(downgradeEnabledPath, downgradeEnabledHandler) + } if hashKVHandler != nil { mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler) } diff --git a/etcdserver/api/etcdhttp/peer_test.go b/etcdserver/api/etcdhttp/peer_test.go index bc2f206ebf7e..832868b68047 100644 --- a/etcdserver/api/etcdhttp/peer_test.go +++ b/etcdserver/api/etcdhttp/peer_test.go @@ -83,7 +83,7 @@ var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Reque // TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that // handles raft-prefix requests well. func TestNewPeerHandlerOnRaftPrefix(t *testing.T) { - ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil) + ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil, nil) srv := httptest.NewServer(ph) defer srv.Close() @@ -231,7 +231,7 @@ func TestServeMemberPromoteFails(t *testing.T) { // TestNewPeerHandlerOnMembersPromotePrefix verifies the request with members promote prefix is routed correctly func TestNewPeerHandlerOnMembersPromotePrefix(t *testing.T) { - ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil) + ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil, nil) srv := httptest.NewServer(ph) defer srv.Close() diff --git a/etcdserver/cluster_util.go b/etcdserver/cluster_util.go index fc09e1a398f2..0d599fc79af0 100644 --- a/etcdserver/cluster_util.go +++ b/etcdserver/cluster_util.go @@ -369,9 +369,98 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R // getDowngradeEnabledFromRemotePeers will get the downgrade enabled status of the cluster. func getDowngradeEnabledFromRemotePeers(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool { + members := cl.Members() + + for _, m := range members { + if m.ID == local { + continue + } + enable, err := getDowngradeEnabled(lg, m, rt) + if err != nil { + lg.Warn("failed to get downgrade enabled status", zap.String("remote-member-id", m.ID.String()), zap.Error(err)) + } else { + // Since the "/downgrade/enabled" serves linearized data, + // this function can return once it gets a non-error response from the endpoint. + return enable + } + } return false } +// getDowngradeEnabled returns the downgrade enabled status of the given member +// via its peerURLs. Returns the last error if it fails to get it. +func getDowngradeEnabled(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (bool, error) { + cc := &http.Client{ + Transport: rt, + } + var ( + err error + resp *http.Response + ) + + for _, u := range m.PeerURLs { + addr := u + "/downgrade/enabled" + resp, err = cc.Get(addr) + if err != nil { + lg.Warn( + "failed to reach the peer URL", + zap.String("address", addr), + zap.String("remote-member-id", m.ID.String()), + zap.Error(err), + ) + continue + } + var b []byte + b, err = ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + lg.Warn( + "failed to read body of response", + zap.String("address", addr), + zap.String("remote-member-id", m.ID.String()), + zap.Error(err), + ) + continue + } + var enable bool + if err = json.Unmarshal(b, &enable); err != nil { + lg.Warn( + "failed to unmarshal response", + zap.String("address", addr), + zap.String("remote-member-id", m.ID.String()), + zap.Error(err), + ) + continue + } + return enable, nil + } + return false, err +} + +// isDowngradeFinished decides the cluster downgrade status based on versions map. +// Return true if all servers are downgraded to target version, otherwise return false. +func isDowngradeFinished(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool { + for mid, ver := range vers { + if ver == nil { + return false + } + v, err := semver.NewVersion(ver.Cluster) + if err != nil { + lg.Warn( + "failed to parse server version of remote member", + zap.String("remote-member-id", mid), + zap.String("remote-member-version", ver.Server), + zap.Error(err), + ) + return false + } + if !targetVersion.Equal(*v) { + return false + } + } + return true +} + func convertToClusterVersion(v string) (*semver.Version, error) { ver, err := semver.NewVersion(v) if err != nil { diff --git a/etcdserver/cluster_util_test.go b/etcdserver/cluster_util_test.go index 29ba9f90dc8a..2829dff61d05 100644 --- a/etcdserver/cluster_util_test.go +++ b/etcdserver/cluster_util_test.go @@ -215,3 +215,52 @@ func TestDecideAllowedVersionRange(t *testing.T) { }) } } + +func TestIsDowngradeFinished(t *testing.T) { + tests := []struct { + name string + targetVersion *semver.Version + versionMap map[string]*version.Versions + expectedFinished bool + }{ + { + "When downgrade finished", + &semver.Version{Major: 3, Minor: 4}, + map[string]*version.Versions{ + "mem1": {Server: "3.4.1", Cluster: "3.4.0"}, + "mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"}, + "mem3": {Server: "3.4.2", Cluster: "3.4.0"}, + }, + true, + }, + { + "When cannot parse peer version", + &semver.Version{Major: 3, Minor: 4}, + map[string]*version.Versions{ + "mem1": {Server: "3.4.1", Cluster: "3.4"}, + "mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"}, + "mem3": {Server: "3.4.2", Cluster: "3.4.0"}, + }, + false, + }, + { + "When downgrade not finished", + &semver.Version{Major: 3, Minor: 4}, + map[string]*version.Versions{ + "mem1": {Server: "3.4.1", Cluster: "3.4.0"}, + "mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"}, + "mem3": {Server: "3.5.2", Cluster: "3.5.0"}, + }, + false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual := isDowngradeFinished(zap.NewNop(), tt.targetVersion, tt.versionMap) + if actual != tt.expectedFinished { + t.Errorf("Expected downgrade finished is %v; Got %v", tt.expectedFinished, actual) + } + }) + } +} diff --git a/etcdserver/corrupt.go b/etcdserver/corrupt.go index 45aa4535713b..56fc2fc72ae2 100644 --- a/etcdserver/corrupt.go +++ b/etcdserver/corrupt.go @@ -337,11 +337,6 @@ func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo return nil, ErrCorrupt } -type ServerPeerV2 interface { - ServerPeer - HashKVHandler() http.Handler -} - const PeerHashKVPath = "/members/hashkv" type hashKVHandler struct { diff --git a/etcdserver/server.go b/etcdserver/server.go index 40f556dd0b97..b0876724cfc8 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -101,6 +101,9 @@ const ( recommendedMaxRequestBytes = 10 * 1024 * 1024 readyPercent = 0.9 + + // Todo: need to be decided + monitorDowngradeInterval = time.Second ) var ( @@ -705,6 +708,7 @@ func (s *EtcdServer) Start() { s.goAttach(s.monitorVersions) s.goAttach(s.linearizableReadLoop) s.goAttach(s.monitorKVHash) + s.goAttach(s.monitorDowngrade) } // start prepares and starts server in a new goroutine. It is no longer safe to @@ -814,6 +818,64 @@ func (s *EtcdServer) LeaseHandler() http.Handler { func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() } +type ServerPeerV2 interface { + ServerPeer + ServerDowngradeHTTP + HashKVHandler() http.Handler +} + +type ServerDowngradeHTTP interface { + DowngradeEnabledHandler() http.Handler +} + +func (s *EtcdServer) DowngradeInfo() *membership.DowngradeInfo { return s.cluster.DowngradeInfo() } + +type downgradeEnabledHandler struct { + lg *zap.Logger + cluster api.Cluster + server *EtcdServer +} + +func (s *EtcdServer) DowngradeEnabledHandler() http.Handler { + return &downgradeEnabledHandler{ + lg: s.getLogger(), + cluster: s.cluster, + server: s, + } +} + +func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + w.Header().Set("Allow", r.Method) + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return + } + + w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String()) + + if r.URL.Path != "/downgrade/enabled" { + http.Error(w, "bad path", http.StatusBadRequest) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), h.server.Cfg.ReqTimeout()) + defer cancel() + + // serve with linearized downgrade info + if err := h.server.linearizableReadNotify(ctx); err != nil { + http.Error(w, fmt.Sprintf("failed linearized read: %v", err), + http.StatusInternalServerError) + return + } + enabled := h.server.DowngradeInfo().Enabled + w.Header().Set("Content-Type", "application/json") + b, err := json.Marshal(enabled) + if err != nil { + h.lg.Warn("failed to marshal downgrade.Enabled to json", zap.Error(err)) + } + w.Write(b) +} + // Process takes a raft message and applies it to the server's raft state // machine, respecting any timeout of the given context. func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { @@ -2314,6 +2376,36 @@ func (s *EtcdServer) updateClusterVersion(ver string) { } } +func (s *EtcdServer) monitorDowngrade() { + lg := s.getLogger() + for { + select { + case <-time.After(monitorDowngradeInterval): + case <-s.stopping: + return + } + + if s.Leader() != s.ID() { + continue + } + + d := s.cluster.DowngradeInfo() + if !d.Enabled { + continue + } + + targetVersion := d.TargetVersion + v := semver.Must(semver.NewVersion(targetVersion)) + if isDowngradeFinished(s.getLogger(), v, getVersions(s.getLogger(), s.cluster, s.id, s.peerRt)) { + lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion)) + if _, err := s.downgradeCancel(context.Background()); err != nil { + lg.Warn("failed to cancel downgrade", zap.Error(err)) + } + continue + } + } +} + func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { switch err { case context.Canceled: