From ac6ae68a14ca1664ef71036b997eb3b52ec7c39e Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Mon, 9 May 2022 19:54:09 +0800 Subject: [PATCH] pkg/ring Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/cache/ring.go | 69 +-- pkg/cache/ring_test.go | 85 +++- server/cluster/cluster.go | 3 - server/cluster/cluster_worker.go | 9 +- server/statistics/buckets/hot_bucket_cache.go | 428 ------------------ .../buckets/hot_bucket_cache_test.go | 250 ---------- server/statistics/buckets/hot_bucket_task.go | 95 ---- .../buckets/hot_bucket_task_test.go | 133 ------ server/statistics/buckets/metric.go | 51 --- 9 files changed, 82 insertions(+), 1041 deletions(-) delete mode 100644 server/statistics/buckets/hot_bucket_cache.go delete mode 100644 server/statistics/buckets/hot_bucket_cache_test.go delete mode 100644 server/statistics/buckets/hot_bucket_task.go delete mode 100644 server/statistics/buckets/hot_bucket_task_test.go delete mode 100644 server/statistics/buckets/metric.go diff --git a/pkg/cache/ring.go b/pkg/cache/ring.go index c88bdeaeb8b9..7b71c07c78e1 100644 --- a/pkg/cache/ring.go +++ b/pkg/cache/ring.go @@ -16,12 +16,11 @@ package cache import ( "bytes" - "fmt" "github.com/tikv/pd/pkg/btree" ) -// Ring is a ring buffer, the key range must be continuous. +// Ring is a buffer, the key range must be continuous. type Ring struct { tree *btree.BTree } @@ -37,6 +36,7 @@ func NewRing(degree int) *Ring { type RingItem interface { Less(than btree.Item) bool EndKey() []byte + // Debris returns the debris after replacing the key range. Debris(startKey, endKey []byte) []RingItem StartKey() []byte String() string @@ -88,68 +88,3 @@ func (r *Ring) Put(item RingItem) { } r.tree.ReplaceOrInsert(item) } - -type simpleRingItem struct { - startKey []byte - endKey []byte -} - -func newSimpleRingItem(startKey, endKey []byte) *simpleRingItem { - return &simpleRingItem{ - startKey: startKey, - endKey: endKey, - } -} - -// String -func (s *simpleRingItem) String() string { - return fmt.Sprintf("key-range: [%s, %s]", s.startKey, s.endKey) -} - -// Less returns true if the start key of the item is less than the start key of the argument. -func (s *simpleRingItem) Less(than btree.Item) bool { - return bytes.Compare(s.StartKey(), than.(RingItem).StartKey()) < 0 -} - -// Debris returns the debris of the item. -func (s simpleRingItem) Debris(startKey, endKey []byte) []RingItem { - var res []RingItem - - left := maxKey(startKey, s.startKey) - right := minKey(endKey, s.endKey) - if bytes.Compare(left, right) > 0 { - return nil - } - if !bytes.Equal(s.startKey, left) { - res = append(res, newSimpleRingItem(s.startKey, left)) - } - - if !bytes.Equal(right, s.endKey) { - res = append(res, newSimpleRingItem(right, s.endKey)) - } - return res -} - -// EndKey returns the end key of the item. -func (s *simpleRingItem) EndKey() []byte { - return s.endKey -} - -// StartKey returns the start key of the item. -func (s *simpleRingItem) StartKey() []byte { - return s.startKey -} - -func minKey(a, b []byte) []byte { - if bytes.Compare(a, b) < 0 { - return a - } - return b -} - -func maxKey(a, b []byte) []byte { - if bytes.Compare(a, b) > 0 { - return a - } - return b -} diff --git a/pkg/cache/ring_test.go b/pkg/cache/ring_test.go index 90b40ac2eac8..d3ede8b0d338 100644 --- a/pkg/cache/ring_test.go +++ b/pkg/cache/ring_test.go @@ -1,7 +1,11 @@ package cache import ( + "bytes" + "fmt" + . "github.com/pingcap/check" + "github.com/tikv/pd/pkg/btree" ) var _ = Suite(&testRingSuite{}) @@ -9,27 +13,94 @@ var _ = Suite(&testRingSuite{}) type testRingSuite struct { } +type simpleRingItem struct { + startKey []byte + endKey []byte +} + +func newSimpleRingItem(startKey, endKey []byte) *simpleRingItem { + return &simpleRingItem{ + startKey: startKey, + endKey: endKey, + } +} + +// String implements String. +func (s *simpleRingItem) String() string { + return fmt.Sprintf("key-range: [%s, %s]", s.startKey, s.endKey) +} + +// Less returns true if the start key of the item is less than the start key of the argument. +func (s *simpleRingItem) Less(than btree.Item) bool { + return bytes.Compare(s.StartKey(), than.(RingItem).StartKey()) < 0 +} + +// Debris returns the debris of the item. +// details: https://leetcode.cn/problems/interval-list-intersections/ +func (s simpleRingItem) Debris(startKey, endKey []byte) []RingItem { + var res []RingItem + + left := maxKey(startKey, s.startKey) + right := minKey(endKey, s.endKey) + // they have no intersection. + if bytes.Compare(left, right) >= 0 { + return nil + } + if !bytes.Equal(s.startKey, left) { + res = append(res, newSimpleRingItem(s.startKey, left)) + } + + if !bytes.Equal(right, s.endKey) { + res = append(res, newSimpleRingItem(right, s.endKey)) + } + return res +} + +// EndKey returns the end key of the item. +func (s *simpleRingItem) EndKey() []byte { + return s.endKey +} + +// StartKey returns the start key of the item. +func (s *simpleRingItem) StartKey() []byte { + return s.startKey +} + +func minKey(a, b []byte) []byte { + if bytes.Compare(a, b) < 0 { + return a + } + return b +} + +func maxKey(a, b []byte) []byte { + if bytes.Compare(a, b) > 0 { + return a + } + return b +} + func (r *testRingSuite) TestRingPutItem(c *C) { - ring := NewRing(10) + ring := NewRing(2) ring.Put(newSimpleRingItem([]byte("002"), []byte("100"))) c.Assert(ring.tree.Len(), Equals, 1) ring.Put(newSimpleRingItem([]byte("100"), []byte("200"))) c.Assert(ring.tree.Len(), Equals, 2) - // init key range: [001,100], [100,200] - c.Assert(ring.GetRange(newSimpleRingItem([]byte("000"), []byte("001"))), HasLen, 0) + // init key range: [002,100], [100,200] + c.Assert(ring.GetRange(newSimpleRingItem([]byte("000"), []byte("002"))), HasLen, 0) c.Assert(ring.GetRange(newSimpleRingItem([]byte("000"), []byte("009"))), HasLen, 1) c.Assert(ring.GetRange(newSimpleRingItem([]byte("010"), []byte("090"))), HasLen, 1) c.Assert(ring.GetRange(newSimpleRingItem([]byte("010"), []byte("110"))), HasLen, 2) c.Assert(ring.GetRange(newSimpleRingItem([]byte("200"), []byte("300"))), HasLen, 0) - // test1: insert one keyrange, the old overlaps will retain like split buckets. - // key range: [001,010],[010,090],[090,100],[100,200] + // test1: insert one key range, the old overlaps will retain like split buckets. + // key range: [002,010],[010,090],[090,100],[100,200] ring.Put(newSimpleRingItem([]byte("010"), []byte("090"))) c.Assert(ring.tree.Len(), Equals, 4) c.Assert(ring.GetRange(newSimpleRingItem([]byte("010"), []byte("090"))), HasLen, 1) - // test2: insert one keyrange, the old overlaps will retain like merge . + // test2: insert one key range, the old overlaps will retain like merge . // key range: [001,080], [080,090],[090,100],[100,200] ring.Put(newSimpleRingItem([]byte("001"), []byte("080"))) c.Assert(ring.tree.Len(), Equals, 4) @@ -42,7 +113,7 @@ func (r *testRingSuite) TestRingPutItem(c *C) { c.Assert(ring.GetRange(newSimpleRingItem([]byte("010"), []byte("090"))), HasLen, 1) } -func (r *testRingSuite) TestRingItemAdjust(c *C) { +func (r *testRingSuite) TestDebris(c *C) { ringItem := newSimpleRingItem([]byte("010"), []byte("090")) var overlaps []RingItem overlaps = ringItem.Debris([]byte("000"), []byte("100")) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index d314a0def611..a1bf7febe72b 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -51,7 +51,6 @@ import ( "github.com/tikv/pd/server/schedule/placement" "github.com/tikv/pd/server/schedulers" "github.com/tikv/pd/server/statistics" - "github.com/tikv/pd/server/statistics/buckets" "github.com/tikv/pd/server/storage" "github.com/tikv/pd/server/storage/endpoint" "github.com/tikv/pd/server/versioninfo" @@ -131,7 +130,6 @@ type RaftCluster struct { labelLevelStats *statistics.LabelStatistics regionStats *statistics.RegionStatistics hotStat *statistics.HotStat - hotBuckets *buckets.HotBucketCache ruleManager *placement.RuleManager regionLabeler *labeler.RegionLabeler replicationMode *replication.ModeManager @@ -222,7 +220,6 @@ func (c *RaftCluster) InitCluster( c.ctx, c.cancel = context.WithCancel(c.serverCtx) c.labelLevelStats = statistics.NewLabelStatistics() c.hotStat = statistics.NewHotStat(c.ctx) - c.hotBuckets = buckets.NewBucketsCache(c.ctx) c.progressManager = progress.NewManager() c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit) c.prevStoreLimit = make(map[uint64]map[storelimit.Type]float64) diff --git a/server/cluster/cluster_worker.go b/server/cluster/cluster_worker.go index e90f2212b1e4..5d532bbe1dfb 100644 --- a/server/cluster/cluster_worker.go +++ b/server/cluster/cluster_worker.go @@ -26,7 +26,6 @@ import ( "github.com/tikv/pd/pkg/logutil" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule" - "github.com/tikv/pd/server/statistics/buckets" "github.com/tikv/pd/server/versioninfo" "go.uber.org/zap" ) @@ -229,10 +228,6 @@ func (c *RaftCluster) HandleBatchReportSplit(request *pdpb.ReportBatchSplitReque } // HandleReportBuckets processes buckets reports from client -func (c *RaftCluster) HandleReportBuckets(b *metapb.Buckets) error { - if err := c.processReportBuckets(b); err != nil { - return err - } - c.hotBuckets.CheckAsync(buckets.NewCheckPeerTask(b)) - return nil +func (c *RaftCluster) HandleReportBuckets(buckets *metapb.Buckets) error { + return c.processReportBuckets(buckets) } diff --git a/server/statistics/buckets/hot_bucket_cache.go b/server/statistics/buckets/hot_bucket_cache.go deleted file mode 100644 index 1ace2c97fc3d..000000000000 --- a/server/statistics/buckets/hot_bucket_cache.go +++ /dev/null @@ -1,428 +0,0 @@ -// Copyright 2022 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package buckets - -import ( - "bytes" - "context" - "fmt" - "time" - - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/log" - "github.com/tikv/pd/pkg/btree" - "github.com/tikv/pd/pkg/cache" - "github.com/tikv/pd/pkg/logutil" - "github.com/tikv/pd/pkg/slice" - "github.com/tikv/pd/server/core" - "github.com/tikv/pd/server/statistics" - "go.uber.org/zap" -) - -type status int - -const ( - alive status = iota - archive -) - -const ( - // queue is the length of the channel used to send the statistics. - queue = 20000 - // bucketBtreeDegree is the degree of the btree used to store the bucket. - bucketBtreeDegree = 10 - - // the range of the hot degree should be [-100, 100] - minHotDegree = -100 - maxHotDegree = 100 -) - -var minHotThresholds = [statistics.RegionStatCount]uint64{ - statistics.RegionReadBytes: 8 * 1024, - statistics.RegionReadKeys: 128, - statistics.RegionReadQuery: 128, - statistics.RegionWriteBytes: 1 * 1024, - statistics.RegionWriteKeys: 32, - statistics.RegionWriteQuery: 32, -} - -// HotBucketCache is the cache of hot stats. -type HotBucketCache struct { - ring *cache.Ring // regionId -> BucketsStats - bucketsOfRegion map[uint64]*BucketTreeItem // regionId -> BucketsStats - taskQueue chan flowBucketsItemTask - ctx context.Context -} - -// NewBucketsCache creates a new hot spot cache. -func NewBucketsCache(ctx context.Context) *HotBucketCache { - bucketCache := &HotBucketCache{ - ctx: ctx, - bucketsOfRegion: make(map[uint64]*BucketTreeItem), - ring: cache.NewRing(bucketBtreeDegree), - taskQueue: make(chan flowBucketsItemTask, queue), - } - go bucketCache.updateItems() - return bucketCache -} - -// BucketStats returns the hot stats of the regions that great than degree. -func (h *HotBucketCache) BucketStats(degree int) map[uint64][]*BucketStat { - rst := make(map[uint64][]*BucketStat) - for _, item := range h.bucketsOfRegion { - stats := make([]*BucketStat, 0) - for _, b := range item.stats { - if b.HotDegree >= degree { - stats = append(stats, b) - } - } - if len(stats) > 0 { - rst[item.regionID] = stats - } - } - return rst -} - -// putItem puts the item into the cache. -func (h *HotBucketCache) putItem(item *BucketTreeItem, overlaps []*BucketTreeItem) { - // only update origin if the key range is same. - if origin := h.bucketsOfRegion[item.regionID]; item.compareKeyRange(origin) { - *origin = *item - return - } - for _, overlap := range overlaps { - if overlap.status == alive { - log.Info("delete buckets from cache", zap.Uint64("region-id", overlap.regionID)) - delete(h.bucketsOfRegion, overlap.regionID) - } - } - log.Info("put buckets into cache", zap.Stringer("region-id", item)) - h.bucketsOfRegion[item.regionID] = item - h.ring.Put(item) -} - -// CheckAsync returns true if the task queue is not full. -func (h *HotBucketCache) CheckAsync(task flowBucketsItemTask) bool { - select { - case h.taskQueue <- task: - return true - default: - return false - } -} - -func (h *HotBucketCache) updateItems() { - defer logutil.LogPanic() - for { - select { - case <-h.ctx.Done(): - return - case task := <-h.taskQueue: - start := time.Now() - task.runTask(h) - bucketsHotHandlerDuration.WithLabelValues(task.taskType().String()).Observe(time.Since(start).Seconds()) - } - } -} - -// checkBucketsFlow returns the new item tree and the overlaps. -func (h *HotBucketCache) checkBucketsFlow(buckets *metapb.Buckets) (newItem *BucketTreeItem, overlaps []*BucketTreeItem) { - newItem = convertToBucketTreeItem(buckets) - // origin is existed and the version is same. - if origin := h.bucketsOfRegion[buckets.GetRegionId()]; newItem.compareKeyRange(origin) { - overlaps = []*BucketTreeItem{origin} - } else { - overlaps = h.getBucketsByKeyRange(newItem.startKey, newItem.endKey) - } - newItem.inherit(overlaps) - newItem.calculateHotDegree() - h.collectBucketsMetrics(newItem) - return newItem, overlaps -} - -func (b *BucketTreeItem) calculateHotDegree() { - for _, stat := range b.stats { - // todo: qps should be considered, tikv will report this in next sprint - readLoads := stat.Loads[:2] - readHot := slice.AllOf(readLoads, func(i int) bool { - return readLoads[i] > minHotThresholds[i] - }) - writeLoads := stat.Loads[3:5] - writeHot := slice.AllOf(writeLoads, func(i int) bool { - return writeLoads[i] > minHotThresholds[3+i] - }) - hot := readHot || writeHot - if hot && stat.HotDegree < maxHotDegree { - stat.HotDegree++ - } - if !hot && stat.HotDegree > minHotDegree { - stat.HotDegree-- - } - } -} - -// getBucketsByKeyRange returns the overlaps with the key range. -func (h *HotBucketCache) getBucketsByKeyRange(startKey, endKey []byte) (items []*BucketTreeItem) { - item := &BucketTreeItem{startKey: startKey, endKey: endKey} - ringItems := h.ring.GetRange(item) - for _, item := range ringItems { - bucketItem := item.(*BucketTreeItem) - items = append(items, bucketItem) - } - return -} - -// collectBucketsMetrics collects the metrics of the hot stats. -func (h *HotBucketCache) collectBucketsMetrics(stats *BucketTreeItem) { - bucketsHeartbeatIntervalHist.Observe(float64(stats.interval)) - for _, bucket := range stats.stats { - log.Info("collect bucket hot degree metrics", zap.Any("bucket", bucket)) - bucketsHotDegreeHist.Observe(float64(bucket.HotDegree)) - } -} - -// BucketStat is the record the bucket statistics. -type BucketStat struct { - RegionID uint64 - StartKey []byte - EndKey []byte - HotDegree int - Interval uint64 - // see statistics.RegionStatKind - Loads []uint64 -} - -func (b *BucketStat) clone() *BucketStat { - c := &BucketStat{ - StartKey: b.StartKey, - EndKey: b.EndKey, - RegionID: b.RegionID, - HotDegree: b.HotDegree, - Interval: b.Interval, - Loads: make([]uint64, len(b.Loads)), - } - copy(c.Loads, b.Loads) - return c -} - -// BucketTreeItem is the item of the bucket btree. -type BucketTreeItem struct { - regionID uint64 - startKey []byte - endKey []byte - stats []*BucketStat - interval uint64 - version uint64 - status status -} - -// StartKey implements the TreeItem interface. -func (b *BucketTreeItem) StartKey() []byte { - return b.startKey -} - -// EndKey implements the TreeItem interface. -func (b *BucketTreeItem) EndKey() []byte { - return b.endKey -} - -// String implements the fmt.Stringer interface. -func (b *BucketTreeItem) String() string { - return fmt.Sprintf("[region-id:%d][start-key:%s][end-key:%s]", - b.regionID, core.HexRegionKeyStr(b.startKey), core.HexRegionKeyStr(b.endKey)) -} - -// Debris returns the debris of the item. -func (b *BucketTreeItem) Debris(startKey, endKey []byte) []cache.RingItem { - var res []cache.RingItem - left := maxKey(startKey, b.startKey) - right := minKey(endKey, b.endKey) - // has no intersection - if bytes.Compare(left, right) > 0 { - return nil - } - // there will be no debris if the left is equal to the start key. - if !bytes.Equal(b.startKey, left) { - res = append(res, b.clone(b.startKey, left)) - } - - // there will be no debris if the right is equal to the end key. - if !bytes.Equal(b.endKey, right) { - res = append(res, b.clone(right, b.endKey)) - } - return res -} - -// Less returns true if the start key is less than the other. -func (b *BucketTreeItem) Less(than btree.Item) bool { - return bytes.Compare(b.startKey, than.(*BucketTreeItem).startKey) < 0 -} - -// compareKeyRange returns whether the key range is overlaps with the item. -func (b *BucketTreeItem) compareKeyRange(origin *BucketTreeItem) bool { - if origin == nil { - return false - } - // key range must be same if the version is same. - if b.version == origin.version { - return true - } - return bytes.Equal(b.startKey, origin.startKey) && bytes.Equal(b.endKey, origin.endKey) -} - -// Clone returns a new item with the same key range. -// item must have some debris for the given key range -func (b *BucketTreeItem) clone(startKey, endKey []byte) *BucketTreeItem { - item := &BucketTreeItem{ - regionID: b.regionID, - startKey: startKey, - endKey: endKey, - interval: b.interval, - version: b.version, - stats: make([]*BucketStat, 0, len(b.stats)), - status: archive, - } - - for _, stat := range b.stats { - // insert if the stat has debris with the key range. - left := maxKey(stat.StartKey, startKey) - right := minKey(stat.EndKey, endKey) - if bytes.Compare(left, right) < 0 { - copy := stat.clone() - copy.StartKey = left - copy.EndKey = right - item.stats = append(item.stats, copy) - } - } - return item -} - -func (b *BucketTreeItem) contains(key []byte) bool { - return bytes.Compare(b.startKey, key) <= 0 && bytes.Compare(key, b.endKey) < 0 -} - -// inherit the hot stats from the old item to the new item. -// rule1: if one cross buckets are hot , it will inherit the hottest one. -// rule2: if the cross buckets are not hot, it will inherit the coldest one. -// rule3: if some cross buckets are hot and the others are cold, it will inherit the hottest one. -func (b *BucketTreeItem) inherit(origins []*BucketTreeItem) { - if len(origins) == 0 || len(b.stats) == 0 || bytes.Compare(b.endKey, origins[0].startKey) < 0 { - return - } - - newItems := b.stats - oldItems := make([]*BucketStat, 0) - for _, bucketTree := range origins { - oldItems = append(oldItems, bucketTree.stats...) - } - // details: https://leetcode.cn/problems/interval-list-intersections/solution/jiu-pa-ni-bu-dong-shuang-zhi-zhen-by-hyj8/ - for p1, p2 := 0, 0; p1 < len(newItems) && p2 < len(oldItems); { - newItem, oldItem := newItems[p1], oldItems[p2] - left := maxKey(newItem.StartKey, oldItems[p2].StartKey) - right := minKey(newItem.EndKey, oldItems[p2].EndKey) - - // bucket should inherit the old bucket hot degree if they have some intersection. - // skip if the left is equal to the right key, such as [10 20] [20 30]. - if bytes.Compare(left, right) < 0 { - log.Info("inherit bucket %s from %s", zap.ByteString("left", left), zap.ByteString("right", right)) - oldDegree := oldItems[p2].HotDegree - newDegree := newItems[p1].HotDegree - // new bucket should interim old if the hot degree of the new bucket is less than zero. - if oldDegree < 0 && newDegree <= 0 && oldDegree < newDegree { - newItem.HotDegree = oldDegree - } - // if oldDegree is greater than zero and the new bucket, the new bucket should inherit the old hot degree. - if oldDegree > 0 && oldDegree > newDegree { - newItem.HotDegree = oldDegree - } - } - // move the left item to the next, old should move first if they are equal. - if bytes.Compare(newItem.EndKey, oldItem.EndKey) > 0 { - p2++ - } else { - p1++ - } - } -} - -func (b *BucketStat) String() string { - return fmt.Sprintf("[region-id:%d][start-key:%s][end-key-key:%s][hot-degree:%d][Interval:%d(ms)][Loads:%v]", - b.RegionID, core.HexRegionKeyStr(b.StartKey), core.HexRegionKeyStr(b.EndKey), b.HotDegree, b.Interval, b.Loads) -} - -// convertToBucketTreeItem converts the bucket stat to bucket tree item. -func convertToBucketTreeItem(buckets *metapb.Buckets) *BucketTreeItem { - items := make([]*BucketStat, len(buckets.Keys)-1) - interval := buckets.PeriodInMs - // Interval may be zero after the tikv initial. - if interval == 0 { - interval = 10 * 1000 - } - for i := 0; i < len(buckets.Keys)-1; i++ { - loads := []uint64{ - buckets.Stats.ReadBytes[i] * 1000 / interval, - buckets.Stats.ReadKeys[i] * 1000 / interval, - buckets.Stats.ReadQps[i] * 1000 / interval, - buckets.Stats.WriteBytes[i] * 1000 / interval, - buckets.Stats.WriteKeys[i] * 1000 / interval, - buckets.Stats.WriteQps[i] * 1000 / interval, - } - items[i] = &BucketStat{ - RegionID: buckets.RegionId, - StartKey: buckets.Keys[i], - EndKey: buckets.Keys[i+1], - HotDegree: 0, - Loads: loads, - Interval: interval, - } - } - return &BucketTreeItem{ - startKey: getStartKey(buckets), - endKey: getEndKey(buckets), - regionID: buckets.RegionId, - stats: items, - interval: buckets.GetPeriodInMs(), - version: buckets.Version, - status: alive, - } -} - -func getEndKey(buckets *metapb.Buckets) []byte { - if len(buckets.GetKeys()) == 0 { - return nil - } - return buckets.Keys[len(buckets.Keys)-1] -} - -func getStartKey(buckets *metapb.Buckets) []byte { - if len(buckets.GetKeys()) == 0 { - return nil - } - return buckets.Keys[0] -} - -func maxKey(a, b []byte) []byte { - if bytes.Compare(a, b) > 0 { - return a - } - return b -} - -func minKey(a, b []byte) []byte { - if bytes.Compare(a, b) > 0 { - return b - } - return a -} diff --git a/server/statistics/buckets/hot_bucket_cache_test.go b/server/statistics/buckets/hot_bucket_cache_test.go deleted file mode 100644 index b132e9c90644..000000000000 --- a/server/statistics/buckets/hot_bucket_cache_test.go +++ /dev/null @@ -1,250 +0,0 @@ -// Copyright 2022 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package buckets - -import ( - "context" - "fmt" - "testing" - - . "github.com/pingcap/check" - "github.com/pingcap/kvproto/pkg/metapb" -) - -func Test(t *testing.T) { - TestingT(t) -} - -var _ = Suite(&testHotBucketCache{}) - -type testHotBucketCache struct{} - -func (t *testHotBucketCache) TestPutItem(c *C) { - // case1: region split - // origin: |10|20|30| - // new: |10|15|20|30| - // when report bucket[15:20], the origin should be truncate into two region - cache := NewBucketsCache(context.Background()) - testdata := []struct { - regionID uint64 - keys [][]byte - regionCount int - treeLen int - version uint64 - }{{ - regionID: 1, - keys: [][]byte{[]byte("10"), []byte("20"), []byte("30")}, - regionCount: 1, - treeLen: 1, - }, { - regionID: 2, - keys: [][]byte{[]byte("15"), []byte("20")}, - regionCount: 1, - treeLen: 3, - }, { - regionID: 1, - keys: [][]byte{[]byte("20"), []byte("30")}, - version: 2, - regionCount: 2, - treeLen: 3, - }, { - regionID: 3, - keys: [][]byte{[]byte("10"), []byte("15")}, - regionCount: 3, - treeLen: 3, - }, { - // region 1,2,3 will be merged. - regionID: 4, - keys: [][]byte{[]byte("10"), []byte("30")}, - regionCount: 1, - treeLen: 1, - }} - for _, v := range testdata { - bucket := convertToBucketTreeItem(newTestBuckets(v.regionID, v.version, v.keys, 0)) - c.Assert(bucket.StartKey(), BytesEquals, v.keys[0]) - c.Assert(bucket.EndKey(), BytesEquals, v.keys[len(v.keys)-1]) - cache.putItem(bucket, cache.getBucketsByKeyRange(bucket.StartKey(), bucket.EndKey())) - c.Assert(cache.bucketsOfRegion, HasLen, v.regionCount) - c.Assert(cache.ring.Len(), Equals, v.treeLen) - c.Assert(cache.bucketsOfRegion[v.regionID], NotNil) - c.Assert(cache.getBucketsByKeyRange([]byte("10"), nil), NotNil) - } -} - -func (t *testHotBucketCache) TestConvertToBucketTreeStat(c *C) { - buckets := &metapb.Buckets{ - RegionId: 1, - Version: 0, - Keys: [][]byte{{'1'}, {'2'}, {'3'}, {'4'}, {'5'}}, - Stats: &metapb.BucketStats{ - ReadBytes: []uint64{1, 2, 3, 4}, - ReadKeys: []uint64{1, 2, 3, 4}, - ReadQps: []uint64{1, 2, 3, 4}, - WriteBytes: []uint64{1, 2, 3, 4}, - WriteKeys: []uint64{1, 2, 3, 4}, - WriteQps: []uint64{1, 2, 3, 4}, - }, - PeriodInMs: 1000, - } - item := convertToBucketTreeItem(buckets) - c.Assert(item.startKey, BytesEquals, []byte{'1'}) - c.Assert(item.endKey, BytesEquals, []byte{'5'}) - c.Assert(item.regionID, Equals, uint64(1)) - c.Assert(item.version, Equals, uint64(0)) - c.Assert(item.stats, HasLen, 4) -} - -func (t *testHotBucketCache) TestGetBucketsByKeyRange(c *C) { - cache := NewBucketsCache(context.Background()) - bucket1 := newTestBuckets(1, 1, [][]byte{[]byte("010"), []byte("015")}, 0) - bucket2 := newTestBuckets(2, 1, [][]byte{[]byte("015"), []byte("020")}, 0) - bucket3 := newTestBuckets(3, 1, [][]byte{[]byte("020"), []byte("030")}, 0) - cache.putItem(cache.checkBucketsFlow(bucket1)) - cache.putItem(cache.checkBucketsFlow(bucket2)) - cache.putItem(cache.checkBucketsFlow(bucket3)) - c.Assert(cache.getBucketsByKeyRange([]byte("010"), []byte("100")), NotNil) - c.Assert(cache.getBucketsByKeyRange([]byte("030"), []byte("100")), IsNil) - c.Assert(cache.getBucketsByKeyRange([]byte("010"), []byte("030")), HasLen, 3) - c.Assert(cache.getBucketsByKeyRange([]byte("010"), []byte("020")), HasLen, 2) - c.Assert(cache.getBucketsByKeyRange([]byte("001"), []byte("010")), HasLen, 0) - c.Assert(cache.bucketsOfRegion, HasLen, 3) -} - -func (t *testHotBucketCache) TestInherit(c *C) { - // init: key range |10 20|20-50|50-60|(3 2 10) - originBucketItem := convertToBucketTreeItem(newTestBuckets(1, 1, [][]byte{[]byte("10"), []byte("20"), []byte("50"), []byte("60")}, 0)) - originBucketItem.stats[0].HotDegree = 3 - originBucketItem.stats[1].HotDegree = 2 - originBucketItem.stats[2].HotDegree = 10 - - testdata := []struct { - buckets *metapb.Buckets - expect []int - }{{ - // case1: one bucket can be inherited by many buckets. - buckets: newTestBuckets(1, 1, [][]byte{[]byte("10"), []byte("20"), []byte("30"), []byte("40"), []byte("50")}, 0), - expect: []int{3, 2, 2, 2}, - }, { - // case2: the first start key is less than the end key of old item. - buckets: newTestBuckets(1, 1, [][]byte{[]byte("20"), []byte("45"), []byte("50")}, 0), - expect: []int{2, 2}, - }, { - // case3: the first start key is less than the end key of old item. - buckets: newTestBuckets(1, 1, [][]byte{[]byte("00"), []byte("05")}, 0), - expect: []int{0}, - }, { - // case4: newItem starKey is greater than old. - buckets: newTestBuckets(1, 1, [][]byte{[]byte("80"), []byte("90")}, 0), - expect: []int{0}, - }} - - for i, v := range testdata { - fmt.Printf("case:%d\n", i) - buckets := convertToBucketTreeItem(v.buckets) - buckets.inherit([]*BucketTreeItem{originBucketItem}) - c.Assert(buckets.stats, HasLen, len(v.expect)) - for k, v := range v.expect { - fmt.Println(k) - c.Assert(buckets.stats[k].HotDegree, Equals, v) - } - } -} - -func (t *testHotBucketCache) TestBucketTreeItemClone(c *C) { - // bucket range: [010,020][020,100] - origin := convertToBucketTreeItem(newTestBuckets(1, 1, [][]byte{[]byte("010"), []byte("020"), []byte("100")}, uint64(0))) - testdata := []struct { - startKey []byte - endKey []byte - count int - strict bool - }{{ - startKey: []byte("010"), - endKey: []byte("100"), - count: 2, - strict: true, - }, { - startKey: []byte("000"), - endKey: []byte("010"), - count: 0, - strict: false, - }, { - startKey: []byte("100"), - endKey: []byte("200"), - count: 0, - strict: false, - }, { - startKey: []byte("000"), - endKey: []byte("020"), - count: 1, - strict: false, - }, { - startKey: []byte("015"), - endKey: []byte("095"), - count: 2, - strict: true, - }, { - startKey: []byte("015"), - endKey: []byte("200"), - count: 2, - strict: false, - }} - for _, v := range testdata { - copy := origin.clone(v.startKey, v.endKey) - c.Assert(copy.startKey, BytesEquals, v.startKey) - c.Assert(copy.endKey, BytesEquals, v.endKey) - c.Assert(copy.stats, HasLen, v.count) - if v.count > 0 && v.strict { - c.Assert(copy.stats[0].StartKey, BytesEquals, v.startKey) - c.Assert(copy.stats[len(copy.stats)-1].EndKey, BytesEquals, v.endKey) - } - } -} - -func (t *testHotBucketCache) TestCalculateHotDegree(c *C) { - origin := convertToBucketTreeItem(newTestBuckets(1, 1, [][]byte{[]byte("010"), []byte("100")}, uint64(0))) - origin.calculateHotDegree() - c.Assert(origin.stats[0].HotDegree, Equals, -1) - - // case1: the dimension of read will be hot - origin.stats[0].Loads = []uint64{minHotThresholds[0] + 1, minHotThresholds[1] + 1, 0, 0, 0, 0} - origin.calculateHotDegree() - c.Assert(origin.stats[0].HotDegree, Equals, 0) - - // case1: the dimension of write will be hot - origin.stats[0].Loads = []uint64{0, 0, 0, minHotThresholds[3] + 1, minHotThresholds[4] + 1, 0} - origin.calculateHotDegree() - c.Assert(origin.stats[0].HotDegree, Equals, 1) -} - -func newTestBuckets(regionID uint64, version uint64, keys [][]byte, flow uint64) *metapb.Buckets { - flows := make([]uint64, len(keys)-1) - for i := range keys { - if i == len(keys)-1 { - continue - } - flows[i] = flow - } - rst := &metapb.Buckets{RegionId: regionID, Version: version, Keys: keys, PeriodInMs: 1000, - Stats: &metapb.BucketStats{ - ReadBytes: flows, - ReadKeys: flows, - ReadQps: flows, - WriteBytes: flows, - WriteKeys: flows, - WriteQps: flows, - }} - return rst -} diff --git a/server/statistics/buckets/hot_bucket_task.go b/server/statistics/buckets/hot_bucket_task.go deleted file mode 100644 index 79b4b8c2df60..000000000000 --- a/server/statistics/buckets/hot_bucket_task.go +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright 2022 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package buckets - -import ( - "context" - "github.com/pingcap/kvproto/pkg/metapb" -) - -type flowItemTaskKind uint32 - -const ( - checkBucketsTaskType flowItemTaskKind = iota - collectBucketStatsTaskType -) - -func (kind flowItemTaskKind) String() string { - switch kind { - case checkBucketsTaskType: - return "check_buckets" - case collectBucketStatsTaskType: - return "collect_bucket_stats" - } - return "unknown" -} - -// flowBucketsItemTask indicates the task in flowItem queue -type flowBucketsItemTask interface { - taskType() flowItemTaskKind - runTask(cache *HotBucketCache) -} - -// checkBucketsTask indicates the task in checkBuckets queue -type checkBucketsTask struct { - Buckets *metapb.Buckets -} - -// NewCheckPeerTask creates task to update peerInfo -func NewCheckPeerTask(buckets *metapb.Buckets) flowBucketsItemTask { - return &checkBucketsTask{ - Buckets: buckets, - } -} - -func (t *checkBucketsTask) taskType() flowItemTaskKind { - return checkBucketsTaskType -} - -func (t *checkBucketsTask) runTask(cache *HotBucketCache) { - newItems, overlaps := cache.checkBucketsFlow(t.Buckets) - cache.putItem(newItems, overlaps) -} - -type collectBucketStatsTask struct { - minDegree int - ret chan map[uint64][]*BucketStat // RegionID ==>Buckets -} - -// NewCollectBucketStatsTask creates task to collect bucket stats. -func NewCollectBucketStatsTask(minDegree int) *collectBucketStatsTask { - return &collectBucketStatsTask{ - minDegree: minDegree, - ret: make(chan map[uint64][]*BucketStat, 1), - } -} - -func (t *collectBucketStatsTask) taskType() flowItemTaskKind { - return collectBucketStatsTaskType -} - -func (t *collectBucketStatsTask) runTask(cache *HotBucketCache) { - t.ret <- cache.BucketStats(t.minDegree) -} - -// WaitRet returns the result of the task. -func (t *collectBucketStatsTask) WaitRet(ctx context.Context) map[uint64][]*BucketStat { - select { - case <-ctx.Done(): - return nil - case ret := <-t.ret: - return ret - } -} diff --git a/server/statistics/buckets/hot_bucket_task_test.go b/server/statistics/buckets/hot_bucket_task_test.go deleted file mode 100644 index 9b30d82622d5..000000000000 --- a/server/statistics/buckets/hot_bucket_task_test.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2022 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package buckets - -import ( - "context" - "math" - "strconv" - "time" - - . "github.com/pingcap/check" - "github.com/pingcap/kvproto/pkg/metapb" -) - -var _ = Suite(&testHotBucketTaskCache{}) - -type testHotBucketTaskCache struct { -} - -func (s *testHotBucketTaskCache) SetUpSuite(_ *C) { -} - -func (s *testHotBucketTaskCache) TearDownTest(_ *C) { -} - -func getAllBucketStats(ctx context.Context, hotCache *HotBucketCache) map[uint64][]*BucketStat { - task := NewCollectBucketStatsTask(-100) - hotCache.CheckAsync(task) - return task.WaitRet(ctx) -} - -func (s *testHotBucketTaskCache) TestColdHot(c *C) { - ctx, cancelFn := context.WithCancel(context.Background()) - defer cancelFn() - hotCache := NewBucketsCache(ctx) - testdata := []struct { - buckets *metapb.Buckets - isHot bool - }{{ - buckets: newTestBuckets(1, 1, [][]byte{[]byte("10"), []byte("20")}, 0), - isHot: false, - }, { - buckets: newTestBuckets(2, 1, [][]byte{[]byte("20"), []byte("30")}, math.MaxUint64), - isHot: true, - }} - for _, v := range testdata { - for i := 0; i < 100; i++ { - task := NewCheckPeerTask(v.buckets) - c.Assert(hotCache.CheckAsync(task), IsTrue) - hotBuckets := getAllBucketStats(ctx, hotCache) - time.Sleep(time.Millisecond * 10) - item := hotBuckets[v.buckets.RegionId] - c.Assert(item, NotNil) - if v.isHot { - c.Assert(item[0].HotDegree, Equals, i+1) - } else { - c.Assert(item[0].HotDegree, Equals, -i-1) - } - } - } -} - -func (s *testHotBucketTaskCache) TestCheckBucketsTask(c *C) { - ctx, cancelFn := context.WithCancel(context.Background()) - defer cancelFn() - hotCache := NewBucketsCache(ctx) - // case1: add bucket successfully - buckets := newTestBuckets(1, 1, [][]byte{[]byte("10"), []byte("20"), []byte("30")}, 0) - task := NewCheckPeerTask(buckets) - c.Assert(hotCache.CheckAsync(task), IsTrue) - time.Sleep(time.Millisecond * 10) - - hotBuckets := getAllBucketStats(ctx, hotCache) - c.Assert(hotBuckets, HasLen, 1) - item := hotBuckets[uint64(1)] - c.Assert(item, NotNil) - c.Assert(item, HasLen, 2) - c.Assert(item[0].HotDegree, Equals, -1) - c.Assert(item[1].HotDegree, Equals, -1) - - // case2: add bucket successful and the hot degree should inherit from the old one. - buckets = newTestBuckets(2, 1, [][]byte{[]byte("20"), []byte("30")}, 0) - task = NewCheckPeerTask(buckets) - c.Assert(hotCache.CheckAsync(task), IsTrue) - hotBuckets = getAllBucketStats(ctx, hotCache) - time.Sleep(time.Millisecond * 10) - item = hotBuckets[uint64(2)] - c.Assert(item, HasLen, 1) - c.Assert(item[0].HotDegree, Equals, -2) - - // case3:add bucket successful and the hot degree should inherit from the old one. - buckets = newTestBuckets(1, 1, [][]byte{[]byte("10"), []byte("20")}, 0) - task = NewCheckPeerTask(buckets) - c.Assert(hotCache.CheckAsync(task), IsTrue) - hotBuckets = getAllBucketStats(ctx, hotCache) - time.Sleep(time.Millisecond * 10) - item = hotBuckets[uint64(1)] - c.Assert(item, HasLen, 1) - c.Assert(item[0].HotDegree, Equals, -2) -} - -func (s *testHotBucketTaskCache) TestCollectBucketStatsTask(c *C) { - ctx, cancelFn := context.WithCancel(context.Background()) - defer cancelFn() - hotCache := NewBucketsCache(ctx) - // case1: add bucket successfully - for i := uint64(0); i < 10; i++ { - buckets := convertToBucketTreeItem(newTestBuckets(i, 1, [][]byte{[]byte(strconv.FormatUint(i*10, 10)), - []byte(strconv.FormatUint((i+1)*10, 10))}, 0)) - hotCache.putItem(buckets, hotCache.getBucketsByKeyRange(buckets.startKey, buckets.endKey)) - } - time.Sleep(time.Millisecond * 10) - task := NewCollectBucketStatsTask(-100) - c.Assert(hotCache.CheckAsync(task), IsTrue) - stats := task.WaitRet(ctx) - c.Assert(stats, HasLen, 10) - task = NewCollectBucketStatsTask(1) - c.Assert(hotCache.CheckAsync(task), IsTrue) - stats = task.WaitRet(ctx) - c.Assert(stats, HasLen, 0) -} diff --git a/server/statistics/buckets/metric.go b/server/statistics/buckets/metric.go deleted file mode 100644 index 7c074e120103..000000000000 --- a/server/statistics/buckets/metric.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2022 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package buckets - -import "github.com/prometheus/client_golang/prometheus" - -var ( - bucketsHeartbeatIntervalHist = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: "pd", - Subsystem: "scheduler", - Name: "buckets_heartbeat_interval_hist", - Help: "Bucketed histogram of the batch size of handled requests.", - Buckets: prometheus.LinearBuckets(0, 30, 20), - }) - bucketsHotDegreeHist = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Namespace: "pd", - Subsystem: "scheduler", - Name: "buckets_hot_degree_hist", - Help: "The distribution of bucket flow bytes", - Buckets: prometheus.LinearBuckets(-100, 10, 20), - }) - - bucketsHotHandlerDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: "pd", - Subsystem: "scheduler", - Name: "bucket_hot_degree_duration", - Help: "Bucketed histogram of processing time (s) of handled buckets.", - Buckets: prometheus.ExponentialBuckets(1, 1.4, 30), // 1s ~ 6.72 hours - }, []string{"type"}) -) - -func init() { - prometheus.MustRegister(bucketsHeartbeatIntervalHist) - prometheus.MustRegister(bucketsHotDegreeHist) - prometheus.MustRegister(bucketsHotHandlerDuration) -}