From 495feb7f73e30e37d4c5dfc89a682b59b09769e2 Mon Sep 17 00:00:00 2001 From: Allen Zhong Date: Fri, 26 Feb 2021 15:24:26 +0800 Subject: [PATCH 1/5] cluster: implement new pd api methods --- pkg/cluster/api/pdapi.go | 91 +++++++++++++++++++++++++++++++--------- 1 file changed, 72 insertions(+), 19 deletions(-) diff --git a/pkg/cluster/api/pdapi.go b/pkg/cluster/api/pdapi.go index 7f6ed8fb73..248b85ba99 100644 --- a/pkg/cluster/api/pdapi.go +++ b/pkg/cluster/api/pdapi.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tiup/pkg/utils" pdserverapi "github.com/tikv/pd/server/api" pdconfig "github.com/tikv/pd/server/config" + pdsp "github.com/tikv/pd/server/schedule/placement" ) // PDClient is an HTTP client of the PD server @@ -71,18 +72,21 @@ const ( // nolint (some is unused now) var ( - pdPingURI = "pd/ping" - pdMembersURI = "pd/api/v1/members" - pdStoresURI = "pd/api/v1/stores" - pdStoreURI = "pd/api/v1/store" - pdConfigURI = "pd/api/v1/config" - pdClusterIDURI = "pd/api/v1/cluster" - pdSchedulersURI = "pd/api/v1/schedulers" - pdLeaderURI = "pd/api/v1/leader" - pdLeaderTransferURI = "pd/api/v1/leader/transfer" - pdConfigReplicate = "pd/api/v1/config/replicate" - pdConfigSchedule = "pd/api/v1/config/schedule" - pdRegionsCheckURI = "pd/api/v1/regions/check" + pdPingURI = "pd/ping" + pdConfigURI = "pd/api/v1/config" + pdClusterIDURI = "pd/api/v1/cluster" + pdConfigReplicate = "pd/api/v1/config/replicate" + pdReplicationModeURI = "pd/api/v1/config/replication-mode" + pdRulesURI = "pd/api/v1/config/rules" + pdConfigSchedule = "pd/api/v1/config/schedule" + pdLeaderURI = "pd/api/v1/leader" + pdLeaderTransferURI = "pd/api/v1/leader/transfer" + pdMembersURI = "pd/api/v1/members" + pdSchedulersURI = "pd/api/v1/schedulers" + pdStoreURI = "pd/api/v1/store" + pdStoresURI = "pd/api/v1/stores" + pdStoresLimitURI = "pd/api/v1/stores/limit" + pdRegionsCheckURI = "pd/api/v1/regions/check" ) func tryURLs(endpoints []string, f func(endpoint string) ([]byte, error)) ([]byte, error) { @@ -277,8 +281,8 @@ func (pc *PDClient) GetMembers() (*pdpb.GetMembersResponse, error) { return &members, nil } -// GetDashboardAddress get the PD node address which runs dashboard -func (pc *PDClient) GetDashboardAddress() (string, error) { +// GetConfig returns all PD configs +func (pc *PDClient) GetConfig() (map[string]interface{}, error) { endpoints := pc.getEndpoints(pdConfigURI) // We don't use the `github.com/tikv/pd/server/config` directly because @@ -294,10 +298,15 @@ func (pc *PDClient) GetDashboardAddress() (string, error) { return body, json.Unmarshal(body, &pdConfig) }) if err != nil { - return "", err + return nil, err } - cfg, err := flatten.Flatten(pdConfig, "", flatten.DotStyle) + return flatten.Flatten(pdConfig, "", flatten.DotStyle) +} + +// GetDashboardAddress get the PD node address which runs dashboard +func (pc *PDClient) GetDashboardAddress() (string, error) { + cfg, err := pc.GetConfig() if err != nil { return "", perrs.AddStack(err) } @@ -639,7 +648,7 @@ func (pc *PDClient) DelStore(host string, retryOpt *utils.RetryOption) error { return nil } -func (pc *PDClient) updateConfig(body io.Reader, url string) error { +func (pc *PDClient) updateConfig(url string, body io.Reader) error { endpoints := pc.getEndpoints(url) _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { return pc.httpClient.Post(endpoint, body) @@ -649,7 +658,7 @@ func (pc *PDClient) updateConfig(body io.Reader, url string) error { // UpdateReplicateConfig updates the PD replication config func (pc *PDClient) UpdateReplicateConfig(body io.Reader) error { - return pc.updateConfig(body, pdConfigReplicate) + return pc.updateConfig(pdConfigReplicate, body) } // GetReplicateConfig gets the PD replication config @@ -703,7 +712,7 @@ func (pc *PDClient) GetTiKVLabels() (map[string]map[string]string, error) { // UpdateScheduleConfig updates the PD schedule config func (pc *PDClient) UpdateScheduleConfig(body io.Reader) error { - return pc.updateConfig(body, pdConfigSchedule) + return pc.updateConfig(pdConfigSchedule, body) } // CheckRegion queries for the region with specific status @@ -722,3 +731,47 @@ func (pc *PDClient) CheckRegion(state string) (*pdserverapi.RegionsInfo, error) }) return ®ionsInfo, err } + +// GetPlacementRules queries for all placement rules +func (pc *PDClient) GetPlacementRules() ([]*pdsp.Rule, error) { + endpoints := pc.getEndpoints(pdRulesURI) + var rules []*pdsp.Rule + + _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { + body, err := pc.httpClient.Get(endpoint) + if err != nil { + return body, err + } + return body, json.Unmarshal(body, &rules) + }) + + if err != nil { + return nil, err + } + + return rules, nil +} + +// SetReplicationConfig sets a config key value of PD replication, it has the +// same effect as `pd-ctl config set key value` +func (pc *PDClient) SetReplicationConfig(key string, value int) error { + data := map[string]interface{}{"set": map[string]interface{}{key: value}} + body, err := json.Marshal(data) + if err != nil { + return err + } + log.Debugf("setting replication config: %s=%d", key, value) + return pc.updateConfig(pdReplicationModeURI, bytes.NewBuffer(body)) +} + +// SetAllStoreLimits sets store for all stores and types, it has the same effect +// as `pd-ctl store limit all value` +func (pc *PDClient) SetAllStoreLimits(value int) error { + data := map[string]interface{}{"rate": value} + body, err := json.Marshal(data) + if err != nil { + return err + } + log.Debugf("setting store limit: %d", value) + return pc.updateConfig(pdStoresLimitURI, bytes.NewBuffer(body)) +} From dd27c57c40c9648634339925ac9dcbc0b9d9eed7 Mon Sep 17 00:00:00 2001 From: Allen Zhong Date: Fri, 26 Feb 2021 18:21:20 +0800 Subject: [PATCH 2/5] cluster: increase store schedule limits during upgrade process --- pkg/cluster/operation/upgrade.go | 120 +++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/pkg/cluster/operation/upgrade.go b/pkg/cluster/operation/upgrade.go index 12a4115414..f87b60706a 100644 --- a/pkg/cluster/operation/upgrade.go +++ b/pkg/cluster/operation/upgrade.go @@ -18,8 +18,11 @@ import ( "crypto/tls" "reflect" "strconv" + "time" + perrs "github.com/pingcap/errors" "github.com/pingcap/tiup/pkg/checkpoint" + "github.com/pingcap/tiup/pkg/cluster/api" "github.com/pingcap/tiup/pkg/cluster/spec" "github.com/pingcap/tiup/pkg/logger/log" "github.com/pingcap/tiup/pkg/set" @@ -32,6 +35,10 @@ func init() { checkpoint.Field("operation", reflect.DeepEqual), checkpoint.Field("instance", reflect.DeepEqual), ) + checkpoint.RegisterField( + checkpoint.Field("operation", reflect.DeepEqual), + checkpoint.Field("func", reflect.DeepEqual), + ) } // Upgrade the cluster. @@ -46,6 +53,13 @@ func Upgrade( components := topo.ComponentsByUpdateOrder() components = FilterComponent(components, roleFilter) + pdList := make([]string, 0) + tidbTopo, isTidbTopo := topo.(*spec.Specification) + if isTidbTopo { + pdList = tidbTopo.GetPDList() + } + pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg) + for _, component := range components { instances := FilterInstance(component.Instances(), nodeFilter) if len(instances) < 1 { @@ -53,6 +67,19 @@ func Upgrade( } log.Infof("Upgrading component %s", component.Name()) + // perform pre-upgrade actions of component + var origLeaderScheduleLimit int + var origRegionScheduleLimit int + var err error + switch component.Name() { + case spec.ComponentTiKV: + origLeaderScheduleLimit, origRegionScheduleLimit, err = increaseScheduleLimit(ctx, pdClient) + if err != nil { + return err + } + default: + // do nothing, kept for future usage with other components + } // some instances are upgraded after others deferInstances := make([]spec.Instance, 0) @@ -86,6 +113,16 @@ func Upgrade( return err } } + + // perform post-upgrade actions for component + switch component.Name() { + case spec.ComponentTiKV: + if err := decreaseScheduleLimit(pdClient, origLeaderScheduleLimit, origRegionScheduleLimit); err != nil { + return err + } + default: + // do nothing, kept for future usage with other components + } } return nil @@ -142,3 +179,86 @@ func Addr(ins spec.Instance) string { } return ins.GetHost() + ":" + strconv.Itoa(ins.GetPort()) } + +var ( + leaderScheduleLimitOffset = 32 + regionScheduleLimitOffset = 512 + //storeLimitOffset = 512 + leaderScheduleLimitThreshold = 64 + regionScheduleLimitThreshold = 1024 + //storeLimitThreshold = 1024 +) + +// increaseScheduleLimit increases the schedule limit of leader and region for faster +// rebalancing during the rolling restart / upgrade process +func increaseScheduleLimit(ctx context.Context, pc *api.PDClient) ( + currLeaderScheduleLimit int, + currRegionScheduleLimit int, + err error) { + // insert checkpoint + point := checkpoint.Acquire(ctx, map[string]interface{}{ + "operation": "upgrade", + "func": "increaseScheduleLimit", + }) + defer func() { + point.Release(err, + zap.String("operation", "upgrade"), + zap.String("func", "increaseScheduleLimit"), + zap.Int("currLeaderScheduleLimit", currLeaderScheduleLimit), + zap.Int("currRegionScheduleLimit", currRegionScheduleLimit), + ) + }() + + if data := point.Hit(); data != nil { + currLeaderScheduleLimit = data["currLeaderScheduleLimit"].(int) + currRegionScheduleLimit = data["currRegionScheduleLimit"].(int) + return + } + + // query current values + cfg, err := pc.GetConfig() + if err != nil { + return + } + val, ok := cfg["schedule.leader-schedule-limit"].(float64) + if !ok { + return currLeaderScheduleLimit, currRegionScheduleLimit, perrs.New("cannot get current leader-schedule-limit") + } + currLeaderScheduleLimit = int(val) + val, ok = cfg["schedule.region-schedule-limit"].(float64) + if !ok { + return currLeaderScheduleLimit, currRegionScheduleLimit, perrs.New("cannot get current region-schedule-limit") + } + currRegionScheduleLimit = int(val) + + // increase values + if currLeaderScheduleLimit < leaderScheduleLimitThreshold { + newLimit := currLeaderScheduleLimit + leaderScheduleLimitOffset + if newLimit > leaderScheduleLimitThreshold { + newLimit = leaderScheduleLimitThreshold + } + if err := pc.SetReplicationConfig("leader-schedule-limit", newLimit); err != nil { + return currLeaderScheduleLimit, currRegionScheduleLimit, err + } + } + if currRegionScheduleLimit < regionScheduleLimitThreshold { + newLimit := currRegionScheduleLimit + regionScheduleLimitOffset + if newLimit > regionScheduleLimitThreshold { + newLimit = regionScheduleLimitThreshold + } + if err := pc.SetReplicationConfig("region-schedule-limit", newLimit); err != nil { + return currLeaderScheduleLimit, currRegionScheduleLimit, err + } + } + + return +} + +// decreaseScheduleLimit tries to set the schedule limit back to it's original with +// the same offset value as increaseScheduleLimit added, with some sanity checks +func decreaseScheduleLimit(pc *api.PDClient, origLeaderScheduleLimit, origRegionScheduleLimit int) error { + if err := pc.SetReplicationConfig("leader-schedule-limit", origLeaderScheduleLimit); err != nil { + return err + } + return pc.SetReplicationConfig("region-schedule-limit", origRegionScheduleLimit) +} From db389d4c5a7ab478d6c550a3b01e0e5065b6f32c Mon Sep 17 00:00:00 2001 From: Allen Zhong Date: Mon, 1 Mar 2021 11:35:53 +0800 Subject: [PATCH 3/5] cluster: fix type assertion --- pkg/cluster/operation/upgrade.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cluster/operation/upgrade.go b/pkg/cluster/operation/upgrade.go index f87b60706a..7625c69d9a 100644 --- a/pkg/cluster/operation/upgrade.go +++ b/pkg/cluster/operation/upgrade.go @@ -210,8 +210,8 @@ func increaseScheduleLimit(ctx context.Context, pc *api.PDClient) ( }() if data := point.Hit(); data != nil { - currLeaderScheduleLimit = data["currLeaderScheduleLimit"].(int) - currRegionScheduleLimit = data["currRegionScheduleLimit"].(int) + currLeaderScheduleLimit = int(data["currLeaderScheduleLimit"].(float64)) + currRegionScheduleLimit = int(data["currRegionScheduleLimit"].(float64)) return } From d54a46993c83fca239b2354780a2690425d4f70b Mon Sep 17 00:00:00 2001 From: Allen Zhong Date: Mon, 1 Mar 2021 14:54:09 +0800 Subject: [PATCH 4/5] cluster: refine tikv upgrade --- pkg/cluster/operation/upgrade.go | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/pkg/cluster/operation/upgrade.go b/pkg/cluster/operation/upgrade.go index 7625c69d9a..7f804c3dfa 100644 --- a/pkg/cluster/operation/upgrade.go +++ b/pkg/cluster/operation/upgrade.go @@ -47,19 +47,12 @@ func Upgrade( topo spec.Topology, options Options, tlsCfg *tls.Config, -) error { +) (upgErr error) { roleFilter := set.NewStringSet(options.Roles...) nodeFilter := set.NewStringSet(options.Nodes...) components := topo.ComponentsByUpdateOrder() components = FilterComponent(components, roleFilter) - pdList := make([]string, 0) - tidbTopo, isTidbTopo := topo.(*spec.Specification) - if isTidbTopo { - pdList = tidbTopo.GetPDList() - } - pdClient := api.NewPDClient(pdList, 10*time.Second, tlsCfg) - for _, component := range components { instances := FilterInstance(component.Instances(), nodeFilter) if len(instances) < 1 { @@ -73,10 +66,14 @@ func Upgrade( var err error switch component.Name() { case spec.ComponentTiKV: + pdClient := api.NewPDClient(topo.(*spec.Specification).GetPDList(), 10*time.Second, tlsCfg) origLeaderScheduleLimit, origRegionScheduleLimit, err = increaseScheduleLimit(ctx, pdClient) if err != nil { return err } + defer func() { + upgErr = decreaseScheduleLimit(pdClient, origLeaderScheduleLimit, origRegionScheduleLimit) + }() default: // do nothing, kept for future usage with other components } @@ -113,16 +110,6 @@ func Upgrade( return err } } - - // perform post-upgrade actions for component - switch component.Name() { - case spec.ComponentTiKV: - if err := decreaseScheduleLimit(pdClient, origLeaderScheduleLimit, origRegionScheduleLimit); err != nil { - return err - } - default: - // do nothing, kept for future usage with other components - } } return nil From 77608734ec6e0b9f7b6225a6bb2c75da9bcba361 Mon Sep 17 00:00:00 2001 From: Allen Zhong Date: Mon, 1 Mar 2021 19:38:17 +0800 Subject: [PATCH 5/5] cluster: try to handle error --- pkg/cluster/operation/upgrade.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/cluster/operation/upgrade.go b/pkg/cluster/operation/upgrade.go index 7f804c3dfa..d2801adf8c 100644 --- a/pkg/cluster/operation/upgrade.go +++ b/pkg/cluster/operation/upgrade.go @@ -234,6 +234,9 @@ func increaseScheduleLimit(ctx context.Context, pc *api.PDClient) ( newLimit = regionScheduleLimitThreshold } if err := pc.SetReplicationConfig("region-schedule-limit", newLimit); err != nil { + // try to revert leader scheduler limit by our best effort, does not make sence + // to handle this error again + _ = pc.SetReplicationConfig("leader-schedule-limit", currLeaderScheduleLimit) return currLeaderScheduleLimit, currRegionScheduleLimit, err } }