diff --git a/errors.toml b/errors.toml index 56527e78003..740f0eadc12 100644 --- a/errors.toml +++ b/errors.toml @@ -476,6 +476,16 @@ error = ''' failed to unmarshal proto ''' +["PD:region:ErrRegionRuleContent"] +error = ''' +invalid region rule content, %s +''' + +["PD:region:ErrRegionRuleNotFound"] +error = ''' +region label rule not found for id %s +''' + ["PD:schedule:ErrCreateOperator"] error = ''' unable to create operator, %s diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index dc3256577a2..de970ff2c5f 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -94,6 +94,12 @@ var ( ErrBuildRuleList = errors.Normalize("build rule list failed, %s", errors.RFCCodeText("PD:placement:ErrBuildRuleList")) ) +// region label errors +var ( + ErrRegionRuleContent = errors.Normalize("invalid region rule content, %s", errors.RFCCodeText("PD:region:ErrRegionRuleContent")) + ErrRegionRuleNotFound = errors.Normalize("region label rule not found for id %s", errors.RFCCodeText("PD:region:ErrRegionRuleNotFound")) +) + // cluster errors var ( ErrNotBootstrapped = errors.Normalize("TiKV cluster not bootstrapped, please start TiKV first", errors.RFCCodeText("PD:cluster:ErrNotBootstrapped")) diff --git a/server/core/storage.go b/server/core/storage.go index 1fc5f6fc212..08650125915 100644 --- a/server/core/storage.go +++ b/server/core/storage.go @@ -42,6 +42,7 @@ const ( gcPath = "gc" rulesPath = "rules" ruleGroupPath = "rule_group" + regionLabelPath = "region_label" replicationPath = "replication_mode" componentPath = "component" customScheduleConfigPath = "scheduler_config" @@ -272,6 +273,21 @@ func (s *Storage) LoadRules(f func(k, v string)) error { return s.LoadRangeByPrefix(rulesPath+"/", f) } +// SaveRegionRule saves a region rule to the storage. +func (s *Storage) SaveRegionRule(ruleKey string, rule interface{}) error { + return s.SaveJSON(regionLabelPath, ruleKey, rule) +} + +// DeleteRegionRule removes a region rule from storage. +func (s *Storage) DeleteRegionRule(ruleKey string) error { + return s.Remove(path.Join(regionLabelPath, ruleKey)) +} + +// LoadRegionRules loads region rules from storage. +func (s *Storage) LoadRegionRules(f func(k, v string)) error { + return s.LoadRangeByPrefix(regionLabelPath+"/", f) +} + // SaveRuleGroup stores a rule group config to storage. func (s *Storage) SaveRuleGroup(groupID string, group interface{}) error { return s.SaveJSON(ruleGroupPath, groupID, group) diff --git a/server/schedule/labeler/labeler.go b/server/schedule/labeler/labeler.go new file mode 100644 index 00000000000..065afceb661 --- /dev/null +++ b/server/schedule/labeler/labeler.go @@ -0,0 +1,270 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package labeler + +import ( + "bytes" + "encoding/hex" + "encoding/json" + "fmt" + "reflect" + "sync" + + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/schedule/rangelist" + "go.uber.org/zap" +) + +// RegionLabeler is utility to label regions. +type RegionLabeler struct { + storage *core.Storage + sync.RWMutex + labelRules map[string]*LabelRule + rangeList rangelist.List // sorted LabelRules of the type `KeyRange` +} + +// NewRegionLabeler creates a Labeler instance. +func NewRegionLabeler(storage *core.Storage) (*RegionLabeler, error) { + l := &RegionLabeler{ + storage: storage, + labelRules: make(map[string]*LabelRule), + } + + if err := l.loadRules(); err != nil { + return nil, err + } + return l, nil +} + +func (l *RegionLabeler) loadRules() error { + var toDelete []string + err := l.storage.LoadRegionRules(func(k, v string) { + var r LabelRule + if err := json.Unmarshal([]byte(v), &r); err != nil { + log.Error("failed to unmarshal label rule value", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule)) + toDelete = append(toDelete, k) + return + } + if err := l.adjustRule(&r); err != nil { + log.Error("failed to adjust label rule", zap.String("rule-key", k), zap.String("rule-value", v), zap.Error(err)) + toDelete = append(toDelete, k) + return + } + l.labelRules[r.ID] = &r + }) + if err != nil { + return err + } + for _, d := range toDelete { + if err = l.storage.DeleteRegionRule(d); err != nil { + return err + } + } + l.buildRangeList() + return nil +} + +func (l *RegionLabeler) adjustRule(rule *LabelRule) error { + if rule.ID == "" { + return errs.ErrRegionRuleContent.FastGenByArgs("empty rule id") + } + if len(rule.Labels) == 0 { + return errs.ErrRegionRuleContent.FastGenByArgs("no region labels") + } + for _, l := range rule.Labels { + if l.Key == "" { + return errs.ErrRegionRuleContent.FastGenByArgs("empty region label key") + } + if l.Value == "" { + return errs.ErrRegionRuleContent.FastGenByArgs("empty region label value") + } + } + + switch rule.RuleType { + case KeyRange: + data, ok := rule.Rule.(map[string]interface{}) + if !ok { + return errs.ErrRegionRuleContent.FastGenByArgs(fmt.Sprintf("invalid rule type: %T", reflect.TypeOf(rule.Rule))) + } + startKey, ok := data["start_key"].(string) + if !ok { + return errs.ErrRegionRuleContent.FastGenByArgs(fmt.Sprintf("invalid startKey type: %T", reflect.TypeOf(data["start_key"]))) + } + endKey, ok := data["end_key"].(string) + if !ok { + return errs.ErrRegionRuleContent.FastGenByArgs(fmt.Sprintf("invalid endKey type: %T", reflect.TypeOf(data["end_key"]))) + } + var r KeyRangeRule + r.StartKeyHex, r.EndKeyHex = startKey, endKey + var err error + r.StartKey, err = hex.DecodeString(r.StartKeyHex) + if err != nil { + return errs.ErrHexDecodingString.FastGenByArgs(r.StartKeyHex) + } + r.EndKey, err = hex.DecodeString(r.EndKeyHex) + if err != nil { + return errs.ErrHexDecodingString.FastGenByArgs(r.EndKeyHex) + } + if len(r.EndKey) > 0 && bytes.Compare(r.EndKey, r.StartKey) <= 0 { + return errs.ErrRegionRuleContent.FastGenByArgs("endKey should be greater than startKey") + } + rule.Rule = &r + return nil + } + log.Error("invalid rule type", zap.String("rule-type", rule.RuleType)) + return errs.ErrRegionRuleContent.FastGenByArgs(fmt.Sprintf("invalid rule type: %s", rule.RuleType)) +} + +func (l *RegionLabeler) buildRangeList() { + builder := rangelist.NewBuilder() + for _, rule := range l.labelRules { + if rule.RuleType == KeyRange { + r := rule.Rule.(*KeyRangeRule) + builder.AddItem(r.StartKey, r.EndKey, rule) + } + } + l.rangeList = builder.Build() +} + +// GetAllLabelRules returns all the rules. +func (l *RegionLabeler) GetAllLabelRules() []*LabelRule { + l.RLock() + defer l.RUnlock() + rules := make([]*LabelRule, 0, len(l.labelRules)) + for _, rule := range l.labelRules { + rules = append(rules, rule) + } + return rules +} + +// GetLabelRules returns the rules that match the given ids. +func (l *RegionLabeler) GetLabelRules(ids []string) ([]*LabelRule, error) { + l.RLock() + defer l.RUnlock() + rules := make([]*LabelRule, 0, len(ids)) + for _, id := range ids { + if rule, ok := l.labelRules[id]; ok { + rules = append(rules, rule) + } else { + return nil, errs.ErrRegionRuleNotFound.FastGenByArgs(id) + } + } + return rules, nil +} + +// GetLabelRule returns the Rule with the same ID. +func (l *RegionLabeler) GetLabelRule(id string) *LabelRule { + l.RLock() + defer l.RUnlock() + return l.labelRules[id] +} + +// SetLabelRule inserts or updates a LabelRule. +func (l *RegionLabeler) SetLabelRule(rule *LabelRule) error { + l.Lock() + defer l.Unlock() + if err := l.adjustRule(rule); err != nil { + return err + } + if err := l.storage.SaveRegionRule(rule.ID, rule); err != nil { + return err + } + l.labelRules[rule.ID] = rule + l.buildRangeList() + return nil +} + +// DeleteLabelRule removes a LabelRule. +func (l *RegionLabeler) DeleteLabelRule(id string) error { + l.Lock() + defer l.Unlock() + if err := l.storage.DeleteRegionRule(id); err != nil { + return err + } + delete(l.labelRules, id) + l.buildRangeList() + return nil +} + +// Patch updates multiple region rules in a batch. +func (l *RegionLabeler) Patch(patch LabelRulePatch) error { + for _, rule := range patch.SetRules { + if err := l.adjustRule(rule); err != nil { + return err + } + } + + // save to storage + for _, key := range patch.DeleteRules { + if err := l.storage.DeleteRegionRule(key); err != nil { + return err + } + } + for _, rule := range patch.SetRules { + if err := l.storage.SaveRegionRule(rule.ID, rule); err != nil { + return err + } + } + + // update inmemory states. + l.Lock() + defer l.Unlock() + + for _, key := range patch.DeleteRules { + delete(l.labelRules, key) + } + for _, rule := range patch.SetRules { + l.labelRules[rule.ID] = rule + } + l.buildRangeList() + return nil +} + +// GetRegionLabel returns the label of the region for a key. +func (l *RegionLabeler) GetRegionLabel(region *core.RegionInfo, key string) string { + l.RLock() + defer l.RUnlock() + // search ranges + if i, data := l.rangeList.GetData(region.GetStartKey(), region.GetEndKey()); i != -1 { + for _, rule := range data { + for _, l := range rule.(*LabelRule).Labels { + if l.Key == key { + return l.Value + } + } + } + } + return "" +} + +// GetRegionLabels returns the labels of the region. +func (l *RegionLabeler) GetRegionLabels(region *core.RegionInfo) []*RegionLabel { + l.RLock() + defer l.RUnlock() + var result []*RegionLabel + // search ranges + if i, data := l.rangeList.GetData(region.GetStartKey(), region.GetEndKey()); i != -1 { + for _, rule := range data { + for _, l := range rule.(*LabelRule).Labels { + result = append(result, &RegionLabel{ + Key: l.Key, + Value: l.Value, + }) + } + } + } + return result +} diff --git a/server/schedule/labeler/labeler_test.go b/server/schedule/labeler/labeler_test.go new file mode 100644 index 00000000000..e55749ffc2c --- /dev/null +++ b/server/schedule/labeler/labeler_test.go @@ -0,0 +1,192 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package labeler + +import ( + "encoding/hex" + "encoding/json" + "sort" + "testing" + + . "github.com/pingcap/check" + "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/kv" +) + +func TestT(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&testLabelerSuite{}) + +type testLabelerSuite struct { + store *core.Storage + labeler *RegionLabeler +} + +func (s *testLabelerSuite) SetUpSuite(c *C) { + s.store = core.NewStorage(kv.NewMemoryKV()) + var err error + s.labeler, err = NewRegionLabeler(s.store) + c.Assert(err, IsNil) +} + +func (s *testLabelerSuite) TestAdjustRule(c *C) { + rule := LabelRule{ + ID: "foo", + Labels: []RegionLabel{ + {Key: "k1", Value: "v1"}, + }, + RuleType: "key-range", + Rule: map[string]interface{}{ + "start_key": "12abcd", + "end_key": "34cdef", + }, + } + err := s.labeler.adjustRule(&rule) + c.Assert(err, IsNil) + c.Assert(rule.Rule.(*KeyRangeRule).StartKey, BytesEquals, []byte{0x12, 0xab, 0xcd}) + c.Assert(rule.Rule.(*KeyRangeRule).EndKey, BytesEquals, []byte{0x34, 0xcd, 0xef}) +} + +func (s *testLabelerSuite) TestAdjustRule2(c *C) { + ruleData := `{"id":"id", "labels": [{"key": "k1", "value": "v1"}], "rule_type":"key-range", "rule": {"start_key":"", "end_key":""}}` + var rule LabelRule + err := json.Unmarshal([]byte(ruleData), &rule) + c.Assert(err, IsNil) + err = s.labeler.adjustRule(&rule) + c.Assert(err, IsNil) + + badRuleData := []string{ + // no id + `{"id":"", "labels": [{"key": "k1", "value": "v1"}], "rule_type":"key-range", "rule": {"start_key":"", "end_key":""}}`, + // no labels + `{"id":"id", "labels": [], "rule_type":"key-range", "rule": {"start_key":"", "end_key":""}}`, + // empty label key + `{"id":"id", "labels": [{"key": "", "value": "v1"}], "rule_type":"key-range", "rule": {"start_key":"", "end_key":""}}`, + // empty label value + `{"id":"id", "labels": [{"key": "k1", "value": ""}], "rule_type":"key-range", "rule": {"start_key":"", "end_key":""}}`, + // unknown rule type + `{"id":"id", "labels": [{"key": "k1", "value": "v1"}], "rule_type":"unknown", "rule": {"start_key":"", "end_key":""}}`, + // wrong rule content + `{"id":"id", "labels": [{"key": "k1", "value": "v1"}], "rule_type":"key-range", "rule":123}`, + `{"id":"id", "labels": [{"key": "k1", "value": "v1"}], "rule_type":"key-range", "rule": {"start_key":123, "end_key":""}}`, + `{"id":"id", "labels": [{"key": "k1", "value": "v1"}], "rule_type":"key-range", "rule": {"start_key":"", "end_key":123}}`, + `{"id":"id", "labels": [{"key": "k1", "value": "v1"}], "rule_type":"key-range", "rule": {"start_key":"123", "end_key":"abcd"}}`, + `{"id":"id", "labels": [{"key": "k1", "value": "v1"}], "rule_type":"key-range", "rule": {"start_key":"abcd", "end_key":"123"}}`, + `{"id":"id", "labels": [{"key": "k1", "value": "v1"}], "rule_type":"key-range", "rule": {"start_key":"abcd", "end_key":"1234"}}`, + } + for i, str := range badRuleData { + var rule LabelRule + err := json.Unmarshal([]byte(str), &rule) + c.Assert(err, IsNil, Commentf("#%d", i)) + err = s.labeler.adjustRule(&rule) + c.Assert(err, NotNil, Commentf("#%d", i)) + } +} + +func (s *testLabelerSuite) TestGetSetRule(c *C) { + rules := []*LabelRule{ + {ID: "rule1", Labels: []RegionLabel{{Key: "k1", Value: "v1"}}, RuleType: "key-range", Rule: map[string]interface{}{"start_key": "1234", "end_key": "5678"}}, + {ID: "rule2", Labels: []RegionLabel{{Key: "k2", Value: "v2"}}, RuleType: "key-range", Rule: map[string]interface{}{"start_key": "ab12", "end_key": "cd12"}}, + {ID: "rule3", Labels: []RegionLabel{{Key: "k3", Value: "v3"}}, RuleType: "key-range", Rule: map[string]interface{}{"start_key": "abcd", "end_key": "efef"}}, + } + for _, r := range rules { + err := s.labeler.SetLabelRule(r) + c.Assert(err, IsNil) + } + + allRules := s.labeler.GetAllLabelRules() + sort.Slice(allRules, func(i, j int) bool { return allRules[i].ID < allRules[j].ID }) + c.Assert(allRules, DeepEquals, rules) + + byIDs, err := s.labeler.GetLabelRules([]string{"rule3", "rule1"}) + c.Assert(err, IsNil) + c.Assert(byIDs, DeepEquals, []*LabelRule{rules[2], rules[0]}) + + err = s.labeler.DeleteLabelRule("rule2") + c.Assert(err, IsNil) + c.Assert(s.labeler.GetLabelRule("rule2"), IsNil) + _, err = s.labeler.GetLabelRules([]string{"rule1", "rule2"}) + c.Assert(err, NotNil) + + // patch + patch := LabelRulePatch{ + SetRules: []*LabelRule{ + {ID: "rule2", Labels: []RegionLabel{{Key: "k2", Value: "v2"}}, RuleType: "key-range", Rule: map[string]interface{}{"start_key": "ab12", "end_key": "cd12"}}, + }, + DeleteRules: []string{"rule1"}, + } + err = s.labeler.Patch(patch) + c.Assert(err, IsNil) + allRules = s.labeler.GetAllLabelRules() + sort.Slice(allRules, func(i, j int) bool { return allRules[i].ID < allRules[j].ID }) + c.Assert(allRules, DeepEquals, rules[1:]) +} + +func (s *testLabelerSuite) TestSaveLoadRule(c *C) { + rules := []*LabelRule{ + {ID: "rule1", Labels: []RegionLabel{{Key: "k1", Value: "v1"}}, RuleType: "key-range", Rule: map[string]interface{}{"start_key": "1234", "end_key": "5678"}}, + {ID: "rule2", Labels: []RegionLabel{{Key: "k2", Value: "v2"}}, RuleType: "key-range", Rule: map[string]interface{}{"start_key": "ab12", "end_key": "cd12"}}, + {ID: "rule3", Labels: []RegionLabel{{Key: "k3", Value: "v3"}}, RuleType: "key-range", Rule: map[string]interface{}{"start_key": "abcd", "end_key": "efef"}}, + } + for _, r := range rules { + err := s.labeler.SetLabelRule(r) + c.Assert(err, IsNil) + } + + labeler, err := NewRegionLabeler(s.store) + c.Assert(err, IsNil) + for _, r := range rules { + r2 := labeler.GetLabelRule(r.ID) + c.Assert(r2, DeepEquals, r) + } +} + +func (s *testLabelerSuite) TestKeyRange(c *C) { + rules := []*LabelRule{ + {ID: "rule1", Labels: []RegionLabel{{Key: "k1", Value: "v1"}}, RuleType: "key-range", Rule: map[string]interface{}{"start_key": "1234", "end_key": "5678"}}, + {ID: "rule2", Labels: []RegionLabel{{Key: "k2", Value: "v2"}}, RuleType: "key-range", Rule: map[string]interface{}{"start_key": "ab12", "end_key": "cd12"}}, + {ID: "rule3", Labels: []RegionLabel{{Key: "k3", Value: "v3"}}, RuleType: "key-range", Rule: map[string]interface{}{"start_key": "abcd", "end_key": "efef"}}, + } + for _, r := range rules { + err := s.labeler.SetLabelRule(r) + c.Assert(err, IsNil) + } + + type testCase struct { + start, end string + labels map[string]string + } + testCases := []testCase{ + {"1234", "5678", map[string]string{"k1": "v1"}}, + {"1234", "aaaa", map[string]string{}}, + {"abcd", "abff", map[string]string{"k2": "v2", "k3": "v3"}}, + {"cd12", "dddd", map[string]string{"k3": "v3"}}, + {"ffee", "ffff", map[string]string{}}, + } + for _, tc := range testCases { + start, _ := hex.DecodeString(tc.start) + end, _ := hex.DecodeString(tc.end) + region := core.NewTestRegionInfo(start, end) + labels := s.labeler.GetRegionLabels(region) + c.Assert(labels, HasLen, len(tc.labels)) + for _, l := range labels { + c.Assert(tc.labels[l.Key], Equals, l.Value) + } + for _, k := range []string{"k1", "k2", "k3"} { + c.Assert(s.labeler.GetRegionLabel(region, k), Equals, tc.labels[k]) + } + } +} diff --git a/server/schedule/labeler/rules.go b/server/schedule/labeler/rules.go new file mode 100644 index 00000000000..7fc30695731 --- /dev/null +++ b/server/schedule/labeler/rules.go @@ -0,0 +1,47 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package labeler + +// RegionLabel is the label of a region. +type RegionLabel struct { + Key string `json:"key"` + Value string `json:"value"` +} + +// LabelRule is the rule to assign labels to a region. +type LabelRule struct { + ID string `json:"id"` + Labels []RegionLabel `json:"labels"` + RuleType string `json:"rule_type"` + Rule interface{} `json:"rule"` +} + +const ( + // KeyRange is the rule type that labels regions by key range. + KeyRange = "key-range" +) + +// KeyRangeRule contains the start key and end key of the LabelRule. +type KeyRangeRule struct { + StartKey []byte `json:"-"` // range start key + StartKeyHex string `json:"start_key"` // hex format start key, for marshal/unmarshal + EndKey []byte `json:"-"` // range end key + EndKeyHex string `json:"end_key"` // hex format end key, for marshal/unmarshal +} + +// LabelRulePatch is the patch to update the label rules. +type LabelRulePatch struct { + SetRules []*LabelRule `json:"sets"` + DeleteRules []string `json:"deletes"` +}