Skip to content

Commit

Permalink
labeler: add region label (#3968)
Browse files Browse the repository at this point in the history
* labeler: add region label support

Signed-off-by: disksing <[email protected]>

* fix comment

Signed-off-by: disksing <[email protected]>

* error doc

Signed-off-by: disksing <[email protected]>

* remove IsMatch

Signed-off-by: disksing <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
disksing and ti-chi-bot authored Aug 17, 2021
1 parent 7a2ab50 commit 7ca4ca9
Show file tree
Hide file tree
Showing 6 changed files with 541 additions and 0 deletions.
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,16 @@ error = '''
failed to unmarshal proto
'''

["PD:region:ErrRegionRuleContent"]
error = '''
invalid region rule content, %s
'''

["PD:region:ErrRegionRuleNotFound"]
error = '''
region label rule not found for id %s
'''

["PD:schedule:ErrCreateOperator"]
error = '''
unable to create operator, %s
Expand Down
6 changes: 6 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ var (
ErrBuildRuleList = errors.Normalize("build rule list failed, %s", errors.RFCCodeText("PD:placement:ErrBuildRuleList"))
)

// region label errors
var (
ErrRegionRuleContent = errors.Normalize("invalid region rule content, %s", errors.RFCCodeText("PD:region:ErrRegionRuleContent"))
ErrRegionRuleNotFound = errors.Normalize("region label rule not found for id %s", errors.RFCCodeText("PD:region:ErrRegionRuleNotFound"))
)

// cluster errors
var (
ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped"))
Expand Down
16 changes: 16 additions & 0 deletions server/core/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
gcPath = "gc"
rulesPath = "rules"
ruleGroupPath = "rule_group"
regionLabelPath = "region_label"
replicationPath = "replication_mode"
componentPath = "component"
customScheduleConfigPath = "scheduler_config"
Expand Down Expand Up @@ -272,6 +273,21 @@ func (s *Storage) LoadRules(f func(k, v string)) error {
return s.LoadRangeByPrefix(rulesPath+"/", f)
}

// SaveRegionRule saves a region rule to the storage.
func (s *Storage) SaveRegionRule(ruleKey string, rule interface{}) error {
return s.SaveJSON(regionLabelPath, ruleKey, rule)
}

// DeleteRegionRule removes a region rule from storage.
func (s *Storage) DeleteRegionRule(ruleKey string) error {
return s.Remove(path.Join(regionLabelPath, ruleKey))
}

// LoadRegionRules loads region rules from storage.
func (s *Storage) LoadRegionRules(f func(k, v string)) error {
return s.LoadRangeByPrefix(regionLabelPath+"/", f)
}

// SaveRuleGroup stores a rule group config to storage.
func (s *Storage) SaveRuleGroup(groupID string, group interface{}) error {
return s.SaveJSON(ruleGroupPath, groupID, group)
Expand Down
270 changes: 270 additions & 0 deletions server/schedule/labeler/labeler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
// Copyright 2021 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package labeler

import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"reflect"
"sync"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/rangelist"
"go.uber.org/zap"
)

// RegionLabeler is utility to label regions.
type RegionLabeler struct {
storage *core.Storage
sync.RWMutex
labelRules map[string]*LabelRule
rangeList rangelist.List // sorted LabelRules of the type `KeyRange`
}

// NewRegionLabeler creates a Labeler instance.
func NewRegionLabeler(storage *core.Storage) (*RegionLabeler, error) {
l := &RegionLabeler{
storage: storage,
labelRules: make(map[string]*LabelRule),
}

if err := l.loadRules(); err != nil {
return nil, err
}
return l, nil
}

func (l *RegionLabeler) loadRules() error {
var toDelete []string
err := l.storage.LoadRegionRules(func(k, v string) {
var r LabelRule
if err := json.Unmarshal([]byte(v), &r); err != nil {
log.Error("failed to unmarshal label rule value", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule))
toDelete = append(toDelete, k)
return
}
if err := l.adjustRule(&r); err != nil {
log.Error("failed to adjust label rule", zap.String("rule-key", k), zap.String("rule-value", v), zap.Error(err))
toDelete = append(toDelete, k)
return
}
l.labelRules[r.ID] = &r
})
if err != nil {
return err
}
for _, d := range toDelete {
if err = l.storage.DeleteRegionRule(d); err != nil {
return err
}
}
l.buildRangeList()
return nil
}

func (l *RegionLabeler) adjustRule(rule *LabelRule) error {
if rule.ID == "" {
return errs.ErrRegionRuleContent.FastGenByArgs("empty rule id")
}
if len(rule.Labels) == 0 {
return errs.ErrRegionRuleContent.FastGenByArgs("no region labels")
}
for _, l := range rule.Labels {
if l.Key == "" {
return errs.ErrRegionRuleContent.FastGenByArgs("empty region label key")
}
if l.Value == "" {
return errs.ErrRegionRuleContent.FastGenByArgs("empty region label value")
}
}

switch rule.RuleType {
case KeyRange:
data, ok := rule.Rule.(map[string]interface{})
if !ok {
return errs.ErrRegionRuleContent.FastGenByArgs(fmt.Sprintf("invalid rule type: %T", reflect.TypeOf(rule.Rule)))
}
startKey, ok := data["start_key"].(string)
if !ok {
return errs.ErrRegionRuleContent.FastGenByArgs(fmt.Sprintf("invalid startKey type: %T", reflect.TypeOf(data["start_key"])))
}
endKey, ok := data["end_key"].(string)
if !ok {
return errs.ErrRegionRuleContent.FastGenByArgs(fmt.Sprintf("invalid endKey type: %T", reflect.TypeOf(data["end_key"])))
}
var r KeyRangeRule
r.StartKeyHex, r.EndKeyHex = startKey, endKey
var err error
r.StartKey, err = hex.DecodeString(r.StartKeyHex)
if err != nil {
return errs.ErrHexDecodingString.FastGenByArgs(r.StartKeyHex)
}
r.EndKey, err = hex.DecodeString(r.EndKeyHex)
if err != nil {
return errs.ErrHexDecodingString.FastGenByArgs(r.EndKeyHex)
}
if len(r.EndKey) > 0 && bytes.Compare(r.EndKey, r.StartKey) <= 0 {
return errs.ErrRegionRuleContent.FastGenByArgs("endKey should be greater than startKey")
}
rule.Rule = &r
return nil
}
log.Error("invalid rule type", zap.String("rule-type", rule.RuleType))
return errs.ErrRegionRuleContent.FastGenByArgs(fmt.Sprintf("invalid rule type: %s", rule.RuleType))
}

func (l *RegionLabeler) buildRangeList() {
builder := rangelist.NewBuilder()
for _, rule := range l.labelRules {
if rule.RuleType == KeyRange {
r := rule.Rule.(*KeyRangeRule)
builder.AddItem(r.StartKey, r.EndKey, rule)
}
}
l.rangeList = builder.Build()
}

// GetAllLabelRules returns all the rules.
func (l *RegionLabeler) GetAllLabelRules() []*LabelRule {
l.RLock()
defer l.RUnlock()
rules := make([]*LabelRule, 0, len(l.labelRules))
for _, rule := range l.labelRules {
rules = append(rules, rule)
}
return rules
}

// GetLabelRules returns the rules that match the given ids.
func (l *RegionLabeler) GetLabelRules(ids []string) ([]*LabelRule, error) {
l.RLock()
defer l.RUnlock()
rules := make([]*LabelRule, 0, len(ids))
for _, id := range ids {
if rule, ok := l.labelRules[id]; ok {
rules = append(rules, rule)
} else {
return nil, errs.ErrRegionRuleNotFound.FastGenByArgs(id)
}
}
return rules, nil
}

// GetLabelRule returns the Rule with the same ID.
func (l *RegionLabeler) GetLabelRule(id string) *LabelRule {
l.RLock()
defer l.RUnlock()
return l.labelRules[id]
}

// SetLabelRule inserts or updates a LabelRule.
func (l *RegionLabeler) SetLabelRule(rule *LabelRule) error {
l.Lock()
defer l.Unlock()
if err := l.adjustRule(rule); err != nil {
return err
}
if err := l.storage.SaveRegionRule(rule.ID, rule); err != nil {
return err
}
l.labelRules[rule.ID] = rule
l.buildRangeList()
return nil
}

// DeleteLabelRule removes a LabelRule.
func (l *RegionLabeler) DeleteLabelRule(id string) error {
l.Lock()
defer l.Unlock()
if err := l.storage.DeleteRegionRule(id); err != nil {
return err
}
delete(l.labelRules, id)
l.buildRangeList()
return nil
}

// Patch updates multiple region rules in a batch.
func (l *RegionLabeler) Patch(patch LabelRulePatch) error {
for _, rule := range patch.SetRules {
if err := l.adjustRule(rule); err != nil {
return err
}
}

// save to storage
for _, key := range patch.DeleteRules {
if err := l.storage.DeleteRegionRule(key); err != nil {
return err
}
}
for _, rule := range patch.SetRules {
if err := l.storage.SaveRegionRule(rule.ID, rule); err != nil {
return err
}
}

// update inmemory states.
l.Lock()
defer l.Unlock()

for _, key := range patch.DeleteRules {
delete(l.labelRules, key)
}
for _, rule := range patch.SetRules {
l.labelRules[rule.ID] = rule
}
l.buildRangeList()
return nil
}

// GetRegionLabel returns the label of the region for a key.
func (l *RegionLabeler) GetRegionLabel(region *core.RegionInfo, key string) string {
l.RLock()
defer l.RUnlock()
// search ranges
if i, data := l.rangeList.GetData(region.GetStartKey(), region.GetEndKey()); i != -1 {
for _, rule := range data {
for _, l := range rule.(*LabelRule).Labels {
if l.Key == key {
return l.Value
}
}
}
}
return ""
}

// GetRegionLabels returns the labels of the region.
func (l *RegionLabeler) GetRegionLabels(region *core.RegionInfo) []*RegionLabel {
l.RLock()
defer l.RUnlock()
var result []*RegionLabel
// search ranges
if i, data := l.rangeList.GetData(region.GetStartKey(), region.GetEndKey()); i != -1 {
for _, rule := range data {
for _, l := range rule.(*LabelRule).Labels {
result = append(result, &RegionLabel{
Key: l.Key,
Value: l.Value,
})
}
}
}
return result
}
Loading

0 comments on commit 7ca4ca9

Please sign in to comment.