Skip to content

Commit

Permalink
pkg/ring
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <[email protected]>
  • Loading branch information
bufferflies committed May 9, 2022
1 parent eea13d5 commit ac6ae68
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 1,041 deletions.
69 changes: 2 additions & 67 deletions pkg/cache/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ package cache

import (
"bytes"
"fmt"

"github.com/tikv/pd/pkg/btree"
)

// Ring is a ring buffer, the key range must be continuous.
// Ring is a buffer, the key range must be continuous.
type Ring struct {
tree *btree.BTree
}
Expand All @@ -37,6 +36,7 @@ func NewRing(degree int) *Ring {
type RingItem interface {
Less(than btree.Item) bool
EndKey() []byte
// Debris returns the debris after replacing the key range.
Debris(startKey, endKey []byte) []RingItem
StartKey() []byte
String() string
Expand Down Expand Up @@ -88,68 +88,3 @@ func (r *Ring) Put(item RingItem) {
}
r.tree.ReplaceOrInsert(item)
}

type simpleRingItem struct {
startKey []byte
endKey []byte
}

func newSimpleRingItem(startKey, endKey []byte) *simpleRingItem {
return &simpleRingItem{
startKey: startKey,
endKey: endKey,
}
}

// String
func (s *simpleRingItem) String() string {
return fmt.Sprintf("key-range: [%s, %s]", s.startKey, s.endKey)
}

// Less returns true if the start key of the item is less than the start key of the argument.
func (s *simpleRingItem) Less(than btree.Item) bool {
return bytes.Compare(s.StartKey(), than.(RingItem).StartKey()) < 0
}

// Debris returns the debris of the item.
func (s simpleRingItem) Debris(startKey, endKey []byte) []RingItem {
var res []RingItem

left := maxKey(startKey, s.startKey)
right := minKey(endKey, s.endKey)
if bytes.Compare(left, right) > 0 {
return nil
}
if !bytes.Equal(s.startKey, left) {
res = append(res, newSimpleRingItem(s.startKey, left))
}

if !bytes.Equal(right, s.endKey) {
res = append(res, newSimpleRingItem(right, s.endKey))
}
return res
}

// EndKey returns the end key of the item.
func (s *simpleRingItem) EndKey() []byte {
return s.endKey
}

// StartKey returns the start key of the item.
func (s *simpleRingItem) StartKey() []byte {
return s.startKey
}

func minKey(a, b []byte) []byte {
if bytes.Compare(a, b) < 0 {
return a
}
return b
}

func maxKey(a, b []byte) []byte {
if bytes.Compare(a, b) > 0 {
return a
}
return b
}
85 changes: 78 additions & 7 deletions pkg/cache/ring_test.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,106 @@
package cache

import (
"bytes"
"fmt"

. "github.com/pingcap/check"
"github.com/tikv/pd/pkg/btree"
)

var _ = Suite(&testRingSuite{})

type testRingSuite struct {
}

type simpleRingItem struct {
startKey []byte
endKey []byte
}

func newSimpleRingItem(startKey, endKey []byte) *simpleRingItem {
return &simpleRingItem{
startKey: startKey,
endKey: endKey,
}
}

// String implements String.
func (s *simpleRingItem) String() string {
return fmt.Sprintf("key-range: [%s, %s]", s.startKey, s.endKey)
}

// Less returns true if the start key of the item is less than the start key of the argument.
func (s *simpleRingItem) Less(than btree.Item) bool {
return bytes.Compare(s.StartKey(), than.(RingItem).StartKey()) < 0
}

// Debris returns the debris of the item.
// details: https://leetcode.cn/problems/interval-list-intersections/
func (s simpleRingItem) Debris(startKey, endKey []byte) []RingItem {
var res []RingItem

left := maxKey(startKey, s.startKey)
right := minKey(endKey, s.endKey)
// they have no intersection.
if bytes.Compare(left, right) >= 0 {
return nil
}
if !bytes.Equal(s.startKey, left) {
res = append(res, newSimpleRingItem(s.startKey, left))
}

if !bytes.Equal(right, s.endKey) {
res = append(res, newSimpleRingItem(right, s.endKey))
}
return res
}

// EndKey returns the end key of the item.
func (s *simpleRingItem) EndKey() []byte {
return s.endKey
}

// StartKey returns the start key of the item.
func (s *simpleRingItem) StartKey() []byte {
return s.startKey
}

func minKey(a, b []byte) []byte {
if bytes.Compare(a, b) < 0 {
return a
}
return b
}

func maxKey(a, b []byte) []byte {
if bytes.Compare(a, b) > 0 {
return a
}
return b
}

func (r *testRingSuite) TestRingPutItem(c *C) {
ring := NewRing(10)
ring := NewRing(2)
ring.Put(newSimpleRingItem([]byte("002"), []byte("100")))
c.Assert(ring.tree.Len(), Equals, 1)
ring.Put(newSimpleRingItem([]byte("100"), []byte("200")))
c.Assert(ring.tree.Len(), Equals, 2)

// init key range: [001,100], [100,200]
c.Assert(ring.GetRange(newSimpleRingItem([]byte("000"), []byte("001"))), HasLen, 0)
// init key range: [002,100], [100,200]
c.Assert(ring.GetRange(newSimpleRingItem([]byte("000"), []byte("002"))), HasLen, 0)
c.Assert(ring.GetRange(newSimpleRingItem([]byte("000"), []byte("009"))), HasLen, 1)
c.Assert(ring.GetRange(newSimpleRingItem([]byte("010"), []byte("090"))), HasLen, 1)
c.Assert(ring.GetRange(newSimpleRingItem([]byte("010"), []byte("110"))), HasLen, 2)
c.Assert(ring.GetRange(newSimpleRingItem([]byte("200"), []byte("300"))), HasLen, 0)

// test1: insert one keyrange, the old overlaps will retain like split buckets.
// key range: [001,010],[010,090],[090,100],[100,200]
// test1: insert one key range, the old overlaps will retain like split buckets.
// key range: [002,010],[010,090],[090,100],[100,200]
ring.Put(newSimpleRingItem([]byte("010"), []byte("090")))
c.Assert(ring.tree.Len(), Equals, 4)
c.Assert(ring.GetRange(newSimpleRingItem([]byte("010"), []byte("090"))), HasLen, 1)

// test2: insert one keyrange, the old overlaps will retain like merge .
// test2: insert one key range, the old overlaps will retain like merge .
// key range: [001,080], [080,090],[090,100],[100,200]
ring.Put(newSimpleRingItem([]byte("001"), []byte("080")))
c.Assert(ring.tree.Len(), Equals, 4)
Expand All @@ -42,7 +113,7 @@ func (r *testRingSuite) TestRingPutItem(c *C) {
c.Assert(ring.GetRange(newSimpleRingItem([]byte("010"), []byte("090"))), HasLen, 1)
}

func (r *testRingSuite) TestRingItemAdjust(c *C) {
func (r *testRingSuite) TestDebris(c *C) {
ringItem := newSimpleRingItem([]byte("010"), []byte("090"))
var overlaps []RingItem
overlaps = ringItem.Debris([]byte("000"), []byte("100"))
Expand Down
3 changes: 0 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/tikv/pd/server/schedule/placement"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/server/statistics/buckets"
"github.com/tikv/pd/server/storage"
"github.com/tikv/pd/server/storage/endpoint"
"github.com/tikv/pd/server/versioninfo"
Expand Down Expand Up @@ -131,7 +130,6 @@ type RaftCluster struct {
labelLevelStats *statistics.LabelStatistics
regionStats *statistics.RegionStatistics
hotStat *statistics.HotStat
hotBuckets *buckets.HotBucketCache
ruleManager *placement.RuleManager
regionLabeler *labeler.RegionLabeler
replicationMode *replication.ModeManager
Expand Down Expand Up @@ -222,7 +220,6 @@ func (c *RaftCluster) InitCluster(
c.ctx, c.cancel = context.WithCancel(c.serverCtx)
c.labelLevelStats = statistics.NewLabelStatistics()
c.hotStat = statistics.NewHotStat(c.ctx)
c.hotBuckets = buckets.NewBucketsCache(c.ctx)
c.progressManager = progress.NewManager()
c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit)
c.prevStoreLimit = make(map[uint64]map[storelimit.Type]float64)
Expand Down
9 changes: 2 additions & 7 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/tikv/pd/pkg/logutil"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/statistics/buckets"
"github.com/tikv/pd/server/versioninfo"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -229,10 +228,6 @@ func (c *RaftCluster) HandleBatchReportSplit(request *pdpb.ReportBatchSplitReque
}

// HandleReportBuckets processes buckets reports from client
func (c *RaftCluster) HandleReportBuckets(b *metapb.Buckets) error {
if err := c.processReportBuckets(b); err != nil {
return err
}
c.hotBuckets.CheckAsync(buckets.NewCheckPeerTask(b))
return nil
func (c *RaftCluster) HandleReportBuckets(buckets *metapb.Buckets) error {
return c.processReportBuckets(buckets)
}
Loading

0 comments on commit ac6ae68

Please sign in to comment.