Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: increase schedule limit during upgrade of tikv #1161

Merged
merged 7 commits into from
Mar 2, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 72 additions & 19 deletions pkg/cluster/api/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tiup/pkg/utils"
pdserverapi "github.com/tikv/pd/server/api"
pdconfig "github.com/tikv/pd/server/config"
pdsp "github.com/tikv/pd/server/schedule/placement"
)

// PDClient is an HTTP client of the PD server
Expand Down Expand Up @@ -71,18 +72,21 @@ const (

// nolint (some is unused now)
var (
pdPingURI = "pd/ping"
pdMembersURI = "pd/api/v1/members"
pdStoresURI = "pd/api/v1/stores"
pdStoreURI = "pd/api/v1/store"
pdConfigURI = "pd/api/v1/config"
pdClusterIDURI = "pd/api/v1/cluster"
pdSchedulersURI = "pd/api/v1/schedulers"
pdLeaderURI = "pd/api/v1/leader"
pdLeaderTransferURI = "pd/api/v1/leader/transfer"
pdConfigReplicate = "pd/api/v1/config/replicate"
pdConfigSchedule = "pd/api/v1/config/schedule"
pdRegionsCheckURI = "pd/api/v1/regions/check"
pdPingURI = "pd/ping"
pdConfigURI = "pd/api/v1/config"
pdClusterIDURI = "pd/api/v1/cluster"
pdConfigReplicate = "pd/api/v1/config/replicate"
pdReplicationModeURI = "pd/api/v1/config/replication-mode"
pdRulesURI = "pd/api/v1/config/rules"
pdConfigSchedule = "pd/api/v1/config/schedule"
pdLeaderURI = "pd/api/v1/leader"
pdLeaderTransferURI = "pd/api/v1/leader/transfer"
pdMembersURI = "pd/api/v1/members"
pdSchedulersURI = "pd/api/v1/schedulers"
pdStoreURI = "pd/api/v1/store"
pdStoresURI = "pd/api/v1/stores"
pdStoresLimitURI = "pd/api/v1/stores/limit"
pdRegionsCheckURI = "pd/api/v1/regions/check"
)

func tryURLs(endpoints []string, f func(endpoint string) ([]byte, error)) ([]byte, error) {
Expand Down Expand Up @@ -277,8 +281,8 @@ func (pc *PDClient) GetMembers() (*pdpb.GetMembersResponse, error) {
return &members, nil
}

// GetDashboardAddress get the PD node address which runs dashboard
func (pc *PDClient) GetDashboardAddress() (string, error) {
// GetConfig returns all PD configs
func (pc *PDClient) GetConfig() (map[string]interface{}, error) {
endpoints := pc.getEndpoints(pdConfigURI)

// We don't use the `github.com/tikv/pd/server/config` directly because
Expand All @@ -294,10 +298,15 @@ func (pc *PDClient) GetDashboardAddress() (string, error) {
return body, json.Unmarshal(body, &pdConfig)
})
if err != nil {
return "", err
return nil, err
}

cfg, err := flatten.Flatten(pdConfig, "", flatten.DotStyle)
return flatten.Flatten(pdConfig, "", flatten.DotStyle)
}

// GetDashboardAddress get the PD node address which runs dashboard
func (pc *PDClient) GetDashboardAddress() (string, error) {
cfg, err := pc.GetConfig()
if err != nil {
return "", perrs.AddStack(err)
}
Expand Down Expand Up @@ -639,7 +648,7 @@ func (pc *PDClient) DelStore(host string, retryOpt *utils.RetryOption) error {
return nil
}

func (pc *PDClient) updateConfig(body io.Reader, url string) error {
func (pc *PDClient) updateConfig(url string, body io.Reader) error {
endpoints := pc.getEndpoints(url)
_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
return pc.httpClient.Post(endpoint, body)
Expand All @@ -649,7 +658,7 @@ func (pc *PDClient) updateConfig(body io.Reader, url string) error {

// UpdateReplicateConfig updates the PD replication config
func (pc *PDClient) UpdateReplicateConfig(body io.Reader) error {
return pc.updateConfig(body, pdConfigReplicate)
return pc.updateConfig(pdConfigReplicate, body)
}

// GetReplicateConfig gets the PD replication config
Expand Down Expand Up @@ -703,7 +712,7 @@ func (pc *PDClient) GetTiKVLabels() (map[string]map[string]string, error) {

// UpdateScheduleConfig updates the PD schedule config
func (pc *PDClient) UpdateScheduleConfig(body io.Reader) error {
return pc.updateConfig(body, pdConfigSchedule)
return pc.updateConfig(pdConfigSchedule, body)
}

// CheckRegion queries for the region with specific status
Expand All @@ -722,3 +731,47 @@ func (pc *PDClient) CheckRegion(state string) (*pdserverapi.RegionsInfo, error)
})
return &regionsInfo, err
}

// GetPlacementRules queries for all placement rules
func (pc *PDClient) GetPlacementRules() ([]*pdsp.Rule, error) {
endpoints := pc.getEndpoints(pdRulesURI)
var rules []*pdsp.Rule

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(endpoint)
if err != nil {
return body, err
}
return body, json.Unmarshal(body, &rules)
})

if err != nil {
return nil, err
}

return rules, nil
}

// SetReplicationConfig sets a config key value of PD replication, it has the
// same effect as `pd-ctl config set key value`
func (pc *PDClient) SetReplicationConfig(key string, value int) error {
data := map[string]interface{}{"set": map[string]interface{}{key: value}}
body, err := json.Marshal(data)
if err != nil {
return err
}
log.Debugf("setting replication config: %s=%d", key, value)
return pc.updateConfig(pdReplicationModeURI, bytes.NewBuffer(body))
}

// SetAllStoreLimits sets store for all stores and types, it has the same effect
// as `pd-ctl store limit all value`
func (pc *PDClient) SetAllStoreLimits(value int) error {
data := map[string]interface{}{"rate": value}
body, err := json.Marshal(data)
if err != nil {
return err
}
log.Debugf("setting store limit: %d", value)
return pc.updateConfig(pdStoresLimitURI, bytes.NewBuffer(body))
}
109 changes: 108 additions & 1 deletion pkg/cluster/operation/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import (
"crypto/tls"
"reflect"
"strconv"
"time"

perrs "github.com/pingcap/errors"
"github.com/pingcap/tiup/pkg/checkpoint"
"github.com/pingcap/tiup/pkg/cluster/api"
"github.com/pingcap/tiup/pkg/cluster/spec"
"github.com/pingcap/tiup/pkg/logger/log"
"github.com/pingcap/tiup/pkg/set"
Expand All @@ -32,6 +35,10 @@ func init() {
checkpoint.Field("operation", reflect.DeepEqual),
checkpoint.Field("instance", reflect.DeepEqual),
)
checkpoint.RegisterField(
checkpoint.Field("operation", reflect.DeepEqual),
checkpoint.Field("func", reflect.DeepEqual),
)
}

// Upgrade the cluster.
Expand All @@ -40,7 +47,7 @@ func Upgrade(
topo spec.Topology,
options Options,
tlsCfg *tls.Config,
) error {
) (upgErr error) {
roleFilter := set.NewStringSet(options.Roles...)
nodeFilter := set.NewStringSet(options.Nodes...)
components := topo.ComponentsByUpdateOrder()
Expand All @@ -53,6 +60,23 @@ func Upgrade(
}

log.Infof("Upgrading component %s", component.Name())
// perform pre-upgrade actions of component
var origLeaderScheduleLimit int
var origRegionScheduleLimit int
var err error
switch component.Name() {
case spec.ComponentTiKV:
pdClient := api.NewPDClient(topo.(*spec.Specification).GetPDList(), 10*time.Second, tlsCfg)
origLeaderScheduleLimit, origRegionScheduleLimit, err = increaseScheduleLimit(ctx, pdClient)
if err != nil {
return err
}
defer func() {
upgErr = decreaseScheduleLimit(pdClient, origLeaderScheduleLimit, origRegionScheduleLimit)
}()
default:
// do nothing, kept for future usage with other components
}

// some instances are upgraded after others
deferInstances := make([]spec.Instance, 0)
Expand Down Expand Up @@ -142,3 +166,86 @@ func Addr(ins spec.Instance) string {
}
return ins.GetHost() + ":" + strconv.Itoa(ins.GetPort())
}

var (
leaderScheduleLimitOffset = 32
regionScheduleLimitOffset = 512
//storeLimitOffset = 512
leaderScheduleLimitThreshold = 64
regionScheduleLimitThreshold = 1024
//storeLimitThreshold = 1024
)

// increaseScheduleLimit increases the schedule limit of leader and region for faster
// rebalancing during the rolling restart / upgrade process
func increaseScheduleLimit(ctx context.Context, pc *api.PDClient) (
currLeaderScheduleLimit int,
currRegionScheduleLimit int,
err error) {
// insert checkpoint
point := checkpoint.Acquire(ctx, map[string]interface{}{
"operation": "upgrade",
"func": "increaseScheduleLimit",
})
defer func() {
point.Release(err,
zap.String("operation", "upgrade"),
zap.String("func", "increaseScheduleLimit"),
zap.Int("currLeaderScheduleLimit", currLeaderScheduleLimit),
zap.Int("currRegionScheduleLimit", currRegionScheduleLimit),
)
}()

if data := point.Hit(); data != nil {
currLeaderScheduleLimit = int(data["currLeaderScheduleLimit"].(float64))
currRegionScheduleLimit = int(data["currRegionScheduleLimit"].(float64))
return
}

// query current values
cfg, err := pc.GetConfig()
if err != nil {
return
}
val, ok := cfg["schedule.leader-schedule-limit"].(float64)
if !ok {
return currLeaderScheduleLimit, currRegionScheduleLimit, perrs.New("cannot get current leader-schedule-limit")
}
currLeaderScheduleLimit = int(val)
val, ok = cfg["schedule.region-schedule-limit"].(float64)
if !ok {
return currLeaderScheduleLimit, currRegionScheduleLimit, perrs.New("cannot get current region-schedule-limit")
}
currRegionScheduleLimit = int(val)

// increase values
if currLeaderScheduleLimit < leaderScheduleLimitThreshold {
newLimit := currLeaderScheduleLimit + leaderScheduleLimitOffset
if newLimit > leaderScheduleLimitThreshold {
newLimit = leaderScheduleLimitThreshold
}
if err := pc.SetReplicationConfig("leader-schedule-limit", newLimit); err != nil {
return currLeaderScheduleLimit, currRegionScheduleLimit, err
}
}
if currRegionScheduleLimit < regionScheduleLimitThreshold {
newLimit := currRegionScheduleLimit + regionScheduleLimitOffset
if newLimit > regionScheduleLimitThreshold {
newLimit = regionScheduleLimitThreshold
}
if err := pc.SetReplicationConfig("region-schedule-limit", newLimit); err != nil {
return currLeaderScheduleLimit, currRegionScheduleLimit, err
AstroProfundis marked this conversation as resolved.
Show resolved Hide resolved
}
}

return
}

// decreaseScheduleLimit tries to set the schedule limit back to it's original with
// the same offset value as increaseScheduleLimit added, with some sanity checks
func decreaseScheduleLimit(pc *api.PDClient, origLeaderScheduleLimit, origRegionScheduleLimit int) error {
if err := pc.SetReplicationConfig("leader-schedule-limit", origLeaderScheduleLimit); err != nil {
return err
}
return pc.SetReplicationConfig("region-schedule-limit", origRegionScheduleLimit)
}