Skip to content

Commit

Permalink
Merge pull request #12099 from YoyinZyc/downgrade-httphandler
Browse files Browse the repository at this point in the history
[Etcd downgrade] Add http handler to enable downgrade info communication between each member
  • Loading branch information
gyuho authored Oct 26, 2020
2 parents 8fc5ef4 + f67b251 commit bc3a77d
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 9 deletions.
5 changes: 5 additions & 0 deletions embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
DefaultGRPCKeepAliveMinTime = 5 * time.Second
DefaultGRPCKeepAliveInterval = 2 * time.Hour
DefaultGRPCKeepAliveTimeout = 20 * time.Second
DefaultDowngradeCheckTime = 5 * time.Second

DefaultListenPeerURLs = "http://localhost:2380"
DefaultListenClientURLs = "http://localhost:2379"
Expand Down Expand Up @@ -330,6 +331,8 @@ type Config struct {
// UnsafeNoFsync disables all uses of fsync.
// Setting this is unsafe and will cause data loss.
UnsafeNoFsync bool `json:"unsafe-no-fsync"`

ExperimentalDowngradeCheckTime time.Duration `json:"experimental-downgrade-check-time"`
}

// configYAML holds the config suitable for yaml parsing
Expand Down Expand Up @@ -413,6 +416,8 @@ func NewConfig() *Config {
LogOutputs: []string{DefaultLogOutput},
LogLevel: logutil.DefaultLogLevel,
EnableGRPCGateway: true,

ExperimentalDowngradeCheckTime: DefaultDowngradeCheckTime,
}
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
return cfg
Expand Down
2 changes: 2 additions & 0 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
DowngradeCheckTime: cfg.ExperimentalDowngradeCheckTime,
}
print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
Expand Down Expand Up @@ -303,6 +304,7 @@ func print(lg *zap.Logger, ec Config, sc etcdserver.ServerConfig, memberInitiali
zap.String("auto-compaction-interval", sc.AutoCompactionRetention.String()),
zap.String("discovery-url", sc.DiscoveryURL),
zap.String("discovery-proxy", sc.DiscoveryProxy),
zap.String("downgrade-check-interval", sc.DowngradeCheckTime.String()),
)
}

Expand Down
1 change: 1 addition & 0 deletions etcdmain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalDowngradeCheckTime, "experimental-downgrade-check-time", cfg.ec.ExperimentalDowngradeCheckTime, "Duration of time between two downgrade status check.")

// unsafe
fs.BoolVar(&cfg.ec.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
Expand Down
6 changes: 5 additions & 1 deletion etcdserver/api/etcdhttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (

// NewPeerHandler generates an http.Handler to handle etcd peer requests.
func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler {
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler())
return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler())
}

func newPeerHandler(
Expand All @@ -47,6 +47,7 @@ func newPeerHandler(
raftHandler http.Handler,
leaseHandler http.Handler,
hashKVHandler http.Handler,
downgradeEnabledHandler http.Handler,
) http.Handler {
if lg == nil {
lg = zap.NewNop()
Expand All @@ -64,6 +65,9 @@ func newPeerHandler(
mux.Handle(leasehttp.LeasePrefix, leaseHandler)
mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
}
if downgradeEnabledHandler != nil {
mux.Handle(etcdserver.DowngradeEnabledPath, downgradeEnabledHandler)
}
if hashKVHandler != nil {
mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler)
}
Expand Down
4 changes: 2 additions & 2 deletions etcdserver/api/etcdhttp/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ var fakeRaftHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Reque
// TestNewPeerHandlerOnRaftPrefix tests that NewPeerHandler returns a handler that
// handles raft-prefix requests well.
func TestNewPeerHandlerOnRaftPrefix(t *testing.T) {
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil)
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil, nil)
srv := httptest.NewServer(ph)
defer srv.Close()

Expand Down Expand Up @@ -231,7 +231,7 @@ func TestServeMemberPromoteFails(t *testing.T) {

// TestNewPeerHandlerOnMembersPromotePrefix verifies the request with members promote prefix is routed correctly
func TestNewPeerHandlerOnMembersPromotePrefix(t *testing.T) {
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil)
ph := newPeerHandler(zap.NewExample(), &fakeServer{cluster: &fakeCluster{}}, fakeRaftHandler, nil, nil, nil)
srv := httptest.NewServer(ph)
defer srv.Close()

Expand Down
95 changes: 95 additions & 0 deletions etcdserver/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io/ioutil"
"net/http"
"sort"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -369,9 +370,103 @@ func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.R

// getDowngradeEnabledFromRemotePeers will get the downgrade enabled status of the cluster.
func getDowngradeEnabledFromRemotePeers(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
members := cl.Members()

for _, m := range members {
if m.ID == local {
continue
}
enable, err := getDowngradeEnabled(lg, m, rt)
if err != nil {
lg.Warn("failed to get downgrade enabled status", zap.String("remote-member-id", m.ID.String()), zap.Error(err))
} else {
// Since the "/downgrade/enabled" serves linearized data,
// this function can return once it gets a non-error response from the endpoint.
return enable
}
}
return false
}

// getDowngradeEnabled returns the downgrade enabled status of the given member
// via its peerURLs. Returns the last error if it fails to get it.
func getDowngradeEnabled(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (bool, error) {
cc := &http.Client{
Transport: rt,
}
var (
err error
resp *http.Response
)

for _, u := range m.PeerURLs {
addr := u + DowngradeEnabledPath
resp, err = cc.Get(addr)
if err != nil {
lg.Warn(
"failed to reach the peer URL",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
continue
}
var b []byte
b, err = ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
lg.Warn(
"failed to read body of response",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
continue
}
var enable bool
if enable, err = strconv.ParseBool(string(b)); err != nil {
lg.Warn(
"failed to convert response",
zap.String("address", addr),
zap.String("remote-member-id", m.ID.String()),
zap.Error(err),
)
continue
}
return enable, nil
}
return false, err
}

// isMatchedVersions returns true if all server versions are equal to target version, otherwise return false.
// It can be used to decide the whether the cluster finishes downgrading to target version.
func isMatchedVersions(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool {
for mid, ver := range vers {
if ver == nil {
return false
}
v, err := semver.NewVersion(ver.Cluster)
if err != nil {
lg.Warn(
"failed to parse server version of remote member",
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
zap.Error(err),
)
return false
}
if !targetVersion.Equal(*v) {
lg.Warn("remotes server has mismatching etcd version",
zap.String("remote-member-id", mid),
zap.String("current-server-version", v.String()),
zap.String("target-version", targetVersion.String()),
)
return false
}
}
return true
}

func convertToClusterVersion(v string) (*semver.Version, error) {
ver, err := semver.NewVersion(v)
if err != nil {
Expand Down
49 changes: 49 additions & 0 deletions etcdserver/cluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,52 @@ func TestDecideAllowedVersionRange(t *testing.T) {
})
}
}

func TestIsMatchedVersions(t *testing.T) {
tests := []struct {
name string
targetVersion *semver.Version
versionMap map[string]*version.Versions
expectedFinished bool
}{
{
"When downgrade finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
true,
},
{
"When cannot parse peer version",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
false,
},
{
"When downgrade not finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.5.2", Cluster: "3.5.0"},
},
false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := isMatchedVersions(zap.NewNop(), tt.targetVersion, tt.versionMap)
if actual != tt.expectedFinished {
t.Errorf("expected downgrade finished is %v; got %v", tt.expectedFinished, actual)
}
})
}
}
2 changes: 2 additions & 0 deletions etcdserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ type ServerConfig struct {
// UnsafeNoFsync disables all uses of fsync.
// Setting this is unsafe and will cause data loss.
UnsafeNoFsync bool `json:"unsafe-no-fsync"`

DowngradeCheckTime time.Duration
}

// VerifyBootstrap sanity-checks the initial config for bootstrap case
Expand Down
5 changes: 0 additions & 5 deletions etcdserver/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,6 @@ func (a *applierV3Corrupt) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevo
return nil, ErrCorrupt
}

type ServerPeerV2 interface {
ServerPeer
HashKVHandler() http.Handler
}

const PeerHashKVPath = "/members/hashkv"

type hashKVHandler struct {
Expand Down
Loading

0 comments on commit bc3a77d

Please sign in to comment.