Skip to content

Commit

Permalink
etcdserver: add getDowngradeEnabled http handler; attach downgrade mo…
Browse files Browse the repository at this point in the history
…nitor to server to monitor downgrade status.
  • Loading branch information
YoyinZyc committed Jun 30, 2020
1 parent 7f726db commit 236a54d
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 8 deletions.
7 changes: 6 additions & 1 deletion etcdserver/api/etcdhttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ import (
const (
peerMembersPath = "/members"
peerMemberPromotePrefix = "/members/promote/"
downgradeEnabledPath = "/downgrade/enabled"
)

// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler {
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler())
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler())
}

func newPeerHandler(
Expand All @@ -47,6 +48,7 @@ func newPeerHandler(
raftHandler http.Handler,
leaseHandler http.Handler,
hashKVHandler http.Handler,
downgradeEnabledHandler http.Handler,
) http.Handler {
if lg == nil {
lg = zap.NewNop()
Expand All @@ -64,6 +66,9 @@ func newPeerHandler(
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
}
if downgradeEnabledHandler != nil {
mux.Handle(downgradeEnabledPath, downgradeEnabledHandler)
}
if hashKVHandler != nil {
mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler)
}
Expand Down
4 changes: 2 additions & 2 deletions etcdserver/api/etcdhttp/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Reque
// TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that
// handles raft-prefix requests well.
func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil)
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil, nil)
srv := httptest.NewServer(ph)
defer srv.Close()

Expand Down Expand Up @@ -231,7 +231,7 @@ func TestServeMemberPromoteFails(t *testing.T) {

// TestNewPeerHandlerOnMembersPromotePrefix verifies the request with members promote prefix is routed correctly
func TestNewPeerHandlerOnMembersPromotePrefix(t *testing.T) {
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil)
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil, nil)
srv := httptest.NewServer(ph)
defer srv.Close()

Expand Down
89 changes: 89 additions & 0 deletions etcdserver/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,98 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R

// getDowngradeEnabledFromRemotePeers will get the downgrade enabled status of the cluster.
func getDowngradeEnabledFromRemotePeers(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
members := cl.Members()

for _, m := range members {
if m.ID == local {
continue
}
enable, err := getDowngradeEnabled(lg, m, rt)
if err != nil {
lg.Warn("failed to get downgrade enabled status", zap.String("remote-member-id", m.ID.String()), zap.Error(err))
} else {
// Since the "/downgrade/enabled" serves linearized data,
// this function can return once it gets a non-error response from the endpoint.
return enable
}
}
return false
}

// getDowngradeEnabled returns the downgrade enabled status of the given member
// via its peerURLs. Returns the last error if it fails to get it.
func getDowngradeEnabled(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (bool, error) {
cc := &http.Client{
Transport: rt,
}
var (
err error
resp *http.Response
)

for _, u := range m.PeerURLs {
addr := u + "/downgrade/enabled"
resp, err = cc.Get(addr)
if err != nil {
lg.Warn(
"failed to reach the peer URL",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
continue
}
var b []byte
b, err = ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
lg.Warn(
"failed to read body of response",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
continue
}
var enable bool
if err = json.Unmarshal(b, &enable); err != nil {
lg.Warn(
"failed to unmarshal response",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
continue
}
return enable, nil
}
return false, err
}

// isDowngradeFinished decides the cluster downgrade status based on versions map.
// Return true if all servers are downgraded to target version, otherwise return false.
func isDowngradeFinished(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool {
for mid, ver := range vers {
if ver == nil {
return false
}
v, err := semver.NewVersion(ver.Cluster)
if err != nil {
lg.Warn(
"failed to parse server version of remote member",
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
zap.Error(err),
)
return false
}
if !targetVersion.Equal(*v) {
return false
}
}
return true
}

func convertToClusterVersion(v string) (*semver.Version, error) {
ver, err := semver.NewVersion(v)
if err != nil {
Expand Down
49 changes: 49 additions & 0 deletions etcdserver/cluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,52 @@ func TestDecideAllowedVersionRange(t *testing.T) {
})
}
}

func TestIsDowngradeFinished(t *testing.T) {
tests := []struct {
name string
targetVersion *semver.Version
versionMap map[string]*version.Versions
expectedFinished bool
}{
{
"When downgrade finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
true,
},
{
"When cannot parse peer version",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
false,
},
{
"When downgrade not finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.5.2", Cluster: "3.5.0"},
},
false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := isDowngradeFinished(zap.NewNop(), tt.targetVersion, tt.versionMap)
if actual != tt.expectedFinished {
t.Errorf("Expected downgrade finished is %v; Got %v", tt.expectedFinished, actual)
}
})
}
}
5 changes: 0 additions & 5 deletions etcdserver/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,6 @@ func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo
return nil, ErrCorrupt
}

type ServerPeerV2 interface {
ServerPeer
HashKVHandler() http.Handler
}

const PeerHashKVPath = "/members/hashkv"

type hashKVHandler struct {
Expand Down
92 changes: 92 additions & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ const (
recommendedMaxRequestBytes = 10 * 1024 * 1024

readyPercent = 0.9

// Todo: need to be decided
monitorDowngradeInterval = time.Second
)

var (
Expand Down Expand Up @@ -705,6 +708,7 @@ func (s *EtcdServer) Start() {
s.goAttach(s.monitorVersions)
s.goAttach(s.linearizableReadLoop)
s.goAttach(s.monitorKVHash)
s.goAttach(s.monitorDowngrade)
}

// start prepares and starts server in a new goroutine. It is no longer safe to
Expand Down Expand Up @@ -814,6 +818,64 @@ func (s *EtcdServer) LeaseHandler() http.Handler {

func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }

type ServerPeerV2 interface {
ServerPeer
ServerDowngradeHTTP
HashKVHandler() http.Handler
}

type ServerDowngradeHTTP interface {
DowngradeEnabledHandler() http.Handler
}

func (s *EtcdServer) DowngradeInfo() *membership.DowngradeInfo { return s.cluster.DowngradeInfo() }

type downgradeEnabledHandler struct {
lg *zap.Logger
cluster api.Cluster
server *EtcdServer
}

func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
return &downgradeEnabledHandler{
lg: s.getLogger(),
cluster: s.cluster,
server: s,
}
}

func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
w.Header().Set("Allow", r.Method)
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}

w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())

if r.URL.Path != "/downgrade/enabled" {
http.Error(w, "bad path", http.StatusBadRequest)
return
}

ctx, cancel := context.WithTimeout(context.Background(), h.server.Cfg.ReqTimeout())
defer cancel()

// serve with linearized downgrade info
if err := h.server.linearizableReadNotify(ctx); err != nil {
http.Error(w, fmt.Sprintf("failed linearized read: %v", err),
http.StatusInternalServerError)
return
}
enabled := h.server.DowngradeInfo().Enabled
w.Header().Set("Content-Type", "application/json")
b, err := json.Marshal(enabled)
if err != nil {
h.lg.Warn("failed to marshal downgrade.Enabled to json", zap.Error(err))
}
w.Write(b)
}

// Process takes a raft message and applies it to the server's raft state
// machine, respecting any timeout of the given context.
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
Expand Down Expand Up @@ -2314,6 +2376,36 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
}
}

func (s *EtcdServer) monitorDowngrade() {
lg := s.getLogger()
for {
select {
case <-time.After(monitorDowngradeInterval):
case <-s.stopping:
return
}

if s.Leader() != s.ID() {
continue
}

d := s.cluster.DowngradeInfo()
if !d.Enabled {
continue
}

targetVersion := d.TargetVersion
v := semver.Must(semver.NewVersion(targetVersion))
if isDowngradeFinished(s.getLogger(), v, getVersions(s.getLogger(), s.cluster, s.id, s.peerRt)) {
lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion))
if _, err := s.downgradeCancel(context.Background()); err != nil {
lg.Warn("failed to cancel downgrade", zap.Error(err))
}
continue
}
}
}

func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
switch err {
case context.Canceled:
Expand Down

0 comments on commit 236a54d

Please sign in to comment.