Skip to content

Commit

Permalink
etcdserver: Move version monitor logic to separate module
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Jun 22, 2021
1 parent f992d69 commit 7d09924
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 200 deletions.
67 changes: 0 additions & 67 deletions server/etcdserver/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,44 +161,6 @@ func getVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt
return vers
}

// decideClusterVersion decides the cluster version based on the versions map.
// The returned version is the min server version in the map, or nil if the min
// version in unknown.
func decideClusterVersion(lg *zap.Logger, vers map[string]*version.Versions) *semver.Version {
var cv *semver.Version
lv := semver.Must(semver.NewVersion(version.Version))

for mid, ver := range vers {
if ver == nil {
return nil
}
v, err := semver.NewVersion(ver.Server)
if err != nil {
lg.Warn(
"failed to parse server version of remote member",
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
zap.Error(err),
)
return nil
}
if lv.LessThan(*v) {
lg.Warn(
"leader found higher-versioned member",
zap.String("local-member-version", lv.String()),
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
)
}
if cv == nil {
cv = v
} else if v.LessThan(*cv) {
cv = v
}
}
return cv
}

// allowedVersionRange decides the available version range of the cluster that local server can join in;
// if the downgrade enabled status is true, the version window is [oneMinorHigher, oneMinorHigher]
// if the downgrade is not enabled, the version window is [MinClusterVersion, localVersion]
Expand Down Expand Up @@ -438,35 +400,6 @@ func getDowngradeEnabled(lg *zap.Logger, m *membership.Member, rt http.RoundTrip
return false, err
}

// isMatchedVersions returns true if all server versions are equal to target version, otherwise return false.
// It can be used to decide the whether the cluster finishes downgrading to target version.
func isMatchedVersions(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool {
for mid, ver := range vers {
if ver == nil {
return false
}
v, err := semver.NewVersion(ver.Cluster)
if err != nil {
lg.Warn(
"failed to parse server version of remote member",
zap.String("remote-member-id", mid),
zap.String("remote-member-version", ver.Server),
zap.Error(err),
)
return false
}
if !targetVersion.Equal(*v) {
lg.Warn("remotes server has mismatching etcd version",
zap.String("remote-member-id", mid),
zap.String("current-server-version", v.String()),
zap.String("target-version", targetVersion.String()),
)
return false
}
}
return true
}

func convertToClusterVersion(v string) (*semver.Version, error) {
ver, err := semver.NewVersion(v)
if err != nil {
Expand Down
86 changes: 0 additions & 86 deletions server/etcdserver/cluster_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package etcdserver

import (
"reflect"
"testing"

"go.etcd.io/etcd/api/v3/version"
Expand All @@ -27,42 +26,6 @@ import (

var testLogger = zap.NewExample()

func TestDecideClusterVersion(t *testing.T) {
tests := []struct {
vers map[string]*version.Versions
wdver *semver.Version
}{
{
map[string]*version.Versions{"a": {Server: "2.0.0"}},
semver.Must(semver.NewVersion("2.0.0")),
},
// unknown
{
map[string]*version.Versions{"a": nil},
nil,
},
{
map[string]*version.Versions{"a": {Server: "2.0.0"}, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}},
semver.Must(semver.NewVersion("2.0.0")),
},
{
map[string]*version.Versions{"a": {Server: "2.1.0"}, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}},
semver.Must(semver.NewVersion("2.1.0")),
},
{
map[string]*version.Versions{"a": nil, "b": {Server: "2.1.0"}, "c": {Server: "2.1.0"}},
nil,
},
}

for i, tt := range tests {
dver := decideClusterVersion(testLogger, tt.vers)
if !reflect.DeepEqual(dver, tt.wdver) {
t.Errorf("#%d: ver = %+v, want %+v", i, dver, tt.wdver)
}
}
}

func TestIsCompatibleWithVers(t *testing.T) {
tests := []struct {
vers map[string]*version.Versions
Expand Down Expand Up @@ -215,52 +178,3 @@ func TestDecideAllowedVersionRange(t *testing.T) {
})
}
}

func TestIsMatchedVersions(t *testing.T) {
tests := []struct {
name string
targetVersion *semver.Version
versionMap map[string]*version.Versions
expectedFinished bool
}{
{
"When downgrade finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
true,
},
{
"When cannot parse peer version",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.4.2", Cluster: "3.4.0"},
},
false,
},
{
"When downgrade not finished",
&semver.Version{Major: 3, Minor: 4},
map[string]*version.Versions{
"mem1": {Server: "3.4.1", Cluster: "3.4.0"},
"mem2": {Server: "3.4.2-pre", Cluster: "3.4.0"},
"mem3": {Server: "3.5.2", Cluster: "3.5.0"},
},
false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := isMatchedVersions(zap.NewNop(), tt.targetVersion, tt.versionMap)
if actual != tt.expectedFinished {
t.Errorf("expected downgrade finished is %v; got %v", tt.expectedFinished, actual)
}
})
}
}
85 changes: 38 additions & 47 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
serverversion "go.etcd.io/etcd/server/v3/etcdserver/version"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/lease/leasehttp"
"go.etcd.io/etcd/server/v3/mvcc"
Expand Down Expand Up @@ -2430,12 +2431,9 @@ func (s *EtcdServer) ClusterVersion() *semver.Version {
return s.cluster.Version()
}

// monitorVersions checks the member's version every monitorVersionInterval.
// It updates the cluster version if all members agrees on a higher one.
// It prints out log if there is a member with a higher version than the
// local version.
// TODO switch to updateClusterVersionV3 in 3.6
// monitorVersions every monitorVersionInterval checks if it's the leader and updates cluster version if needed.
func (s *EtcdServer) monitorVersions() {
monitor := serverversion.NewVersionMonitor(s.Logger(), &serverVersionAdapter{s})
for {
select {
case <-s.FirstCommitInTermNotify():
Expand All @@ -2447,31 +2445,7 @@ func (s *EtcdServer) monitorVersions() {
if s.Leader() != s.ID() {
continue
}

v := decideClusterVersion(s.Logger(), getVersions(s.Logger(), s.cluster, s.id, s.peerRt))
if v != nil {
// only keep major.minor version for comparison
v = &semver.Version{
Major: v.Major,
Minor: v.Minor,
}
}

// if the current version is nil:
// 1. use the decided version if possible
// 2. or use the min cluster version
if s.cluster.Version() == nil {
verStr := version.MinClusterVersion
if v != nil {
verStr = v.String()
}
s.GoAttach(func() { s.updateClusterVersionV2(verStr) })
continue
}

if v != nil && membership.IsValidVersionChange(s.cluster.Version(), v) {
s.GoAttach(func() { s.updateClusterVersionV2(v.String()) })
}
monitor.UpdateClusterVersionIfNeeded()
}
}

Expand Down Expand Up @@ -2551,12 +2525,13 @@ func (s *EtcdServer) updateClusterVersionV3(ver string) {
}
}

// monitorDowngrade every DowngradeCheckTime checks if it's the leader and cancels downgrade if needed.
func (s *EtcdServer) monitorDowngrade() {
monitor := serverversion.NewVersionMonitor(s.Logger(), &serverVersionAdapter{s})
t := s.Cfg.DowngradeCheckTime
if t == 0 {
return
}
lg := s.Logger()
for {
select {
case <-time.After(t):
Expand All @@ -2567,22 +2542,7 @@ func (s *EtcdServer) monitorDowngrade() {
if !s.isLeader() {
continue
}

d := s.cluster.DowngradeInfo()
if !d.Enabled {
continue
}

targetVersion := d.TargetVersion
v := semver.Must(semver.NewVersion(targetVersion))
if isMatchedVersions(s.Logger(), v, getVersions(s.Logger(), s.cluster, s.id, s.peerRt)) {
lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion))
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if _, err := s.downgradeCancel(ctx); err != nil {
lg.Warn("failed to cancel downgrade", zap.Error(err))
}
cancel()
}
monitor.CancelDowngradeIfNeeded()
}
}

Expand Down Expand Up @@ -2703,3 +2663,34 @@ func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
}
return be.Defrag()
}

type serverVersionAdapter struct {
*EtcdServer
}

var _ serverversion.VersionStorage = (*serverVersionAdapter)(nil)

// TODO switch to updateClusterVersionV3 in 3.6
func (s *serverVersionAdapter) UpdateClusterVersion(version string) {
s.GoAttach(func() { s.updateClusterVersionV2(version) })
}

func (s *serverVersionAdapter) DowngradeCancel() {
ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if _, err := s.downgradeCancel(ctx); err != nil {
s.lg.Warn("failed to cancel downgrade", zap.Error(err))
}
cancel()
}

func (s *serverVersionAdapter) GetClusterVersion() *semver.Version {
return s.cluster.Version()
}

func (s *serverVersionAdapter) GetDowngradeInfo() *membership.DowngradeInfo {
return s.cluster.DowngradeInfo()
}

func (s *serverVersionAdapter) GetVersions() map[string]*version.Versions {
return getVersions(s.lg, s.cluster, s.id, s.peerRt)
}
Loading

0 comments on commit 7d09924

Please sign in to comment.