Skip to content

Commit

Permalink
cluster: pd support the configuration of bucket switch (#5013)
Browse files Browse the repository at this point in the history
close #4720

Signed-off-by: bufferflies <[email protected]>

Co-authored-by: ShuNing <[email protected]>
Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
3 people authored May 27, 2022
1 parent 80128d3 commit f3cefe1
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 44 deletions.
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
if err != nil {
return err
}
region.Inherit(origin)
region.Inherit(origin, c.storeConfigManager.GetStoreConfig().IsEnableRegionBucket())

c.hotStat.CheckWriteAsync(statistics.NewCheckExpiredItemTask(region))
c.hotStat.CheckReadAsync(statistics.NewCheckExpiredItemTask(region))
Expand Down
35 changes: 25 additions & 10 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,36 +578,51 @@ func (s *testClusterInfoSuite) TestBucketHeartbeat(c *C) {

// case1: region is not exist
buckets := &metapb.Buckets{
RegionId: 0,
RegionId: 1,
Version: 1,
Keys: [][]byte{{'1'}, {'2'}},
}
c.Assert(cluster.processReportBuckets(buckets), NotNil)

// case2: bucket can be processed after the region update.
stores := newTestStores(3, "2.0.0")
n, np := uint64(1), uint64(1)
n, np := uint64(2), uint64(2)
regions := newTestRegions(n, n, np)
for _, store := range stores {
c.Assert(cluster.putStoreLocked(store), IsNil)
}

c.Assert(cluster.processRegionHeartbeat(regions[0]), IsNil)
c.Assert(cluster.GetRegion(uint64(0)).GetBuckets(), IsNil)
c.Assert(cluster.processRegionHeartbeat(regions[1]), IsNil)
c.Assert(cluster.GetRegion(uint64(1)).GetBuckets(), IsNil)
c.Assert(cluster.processReportBuckets(buckets), IsNil)
c.Assert(cluster.GetRegion(uint64(0)).GetBuckets(), DeepEquals, buckets)
c.Assert(cluster.GetRegion(uint64(1)).GetBuckets(), DeepEquals, buckets)

// case3: the bucket version is same.
c.Assert(cluster.processReportBuckets(buckets), IsNil)
// case4: the bucket version is changed.
buckets.Version = 3
c.Assert(cluster.processReportBuckets(buckets), IsNil)
c.Assert(cluster.GetRegion(uint64(0)).GetBuckets(), DeepEquals, buckets)
newBuckets := &metapb.Buckets{
RegionId: 1,
Version: 3,
Keys: [][]byte{{'1'}, {'2'}},
}
c.Assert(cluster.processReportBuckets(newBuckets), IsNil)
c.Assert(cluster.GetRegion(uint64(1)).GetBuckets(), DeepEquals, newBuckets)

//case5: region update should inherit buckets.
newRegion := regions[0].Clone(core.WithIncConfVer())
// case5: region update should inherit buckets.
newRegion := regions[1].Clone(core.WithIncConfVer(), core.SetBuckets(nil))
cluster.storeConfigManager = config.NewTestStoreConfigManager(nil)
config := cluster.storeConfigManager.GetStoreConfig()
config.Coprocessor.EnableRegionBucket = true
c.Assert(cluster.processRegionHeartbeat(newRegion), IsNil)
c.Assert(cluster.GetRegion(uint64(0)).GetBuckets(), NotNil)
c.Assert(cluster.GetRegion(uint64(1)).GetBuckets().GetKeys(), HasLen, 2)

// case6: disable region bucket in
config.Coprocessor.EnableRegionBucket = false
newRegion2 := regions[1].Clone(core.WithIncConfVer(), core.SetBuckets(nil))
c.Assert(cluster.processRegionHeartbeat(newRegion2), IsNil)
c.Assert(cluster.GetRegion(uint64(1)).GetBuckets(), IsNil)
c.Assert(cluster.GetRegion(uint64(1)).GetBuckets().GetKeys(), HasLen, 0)
}

func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
Expand Down
25 changes: 17 additions & 8 deletions server/config/store_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Coprocessor struct {
RegionMaxKeys int `json:"region-max-keys"`
RegionSplitKeys int `json:"region-split-keys"`
EnableRegionBucket bool `json:"enable-region-bucket"`
RegionBucketSize int `json:"region-bucket-size"`
}

// String implements fmt.Stringer interface.
Expand All @@ -69,14 +70,6 @@ func (c *StoreConfig) String() string {
return string(data)
}

// EnableRegionBucket return ture if the region bucket is enabled.
func (c *StoreConfig) EnableRegionBucket() bool {
if c == nil {
return false
}
return c.Coprocessor.EnableRegionBucket
}

// GetRegionMaxSize returns the max region size in MB
func (c *StoreConfig) GetRegionMaxSize() uint64 {
if c == nil || len(c.Coprocessor.RegionMaxSize) == 0 {
Expand Down Expand Up @@ -109,6 +102,22 @@ func (c *StoreConfig) GetRegionMaxKeys() uint64 {
return uint64(c.Coprocessor.RegionMaxKeys)
}

// IsEnableRegionBucket return ture if the region bucket is enabled.
func (c *StoreConfig) IsEnableRegionBucket() bool {
if c == nil {
return false
}
return c.Coprocessor.EnableRegionBucket
}

// GetRegionBucketSize returns region bucket size if enable region buckets.
func (c *StoreConfig) GetRegionBucketSize() int {
if c == nil || !c.Coprocessor.EnableRegionBucket {
return 0
}
return c.Coprocessor.RegionBucketSize
}

// CheckRegionSize return error if the smallest region's size is less than mergeSize
func (c *StoreConfig) CheckRegionSize(size, mergeSize uint64) error {
// the merged region will not be split if it's size less than region max size.
Expand Down
12 changes: 8 additions & 4 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,10 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
return region
}

// Inherit inherits the buckets and region size from the parent region.
// Inherit inherits the buckets and region size from the parent region if bucket enabled.
// correct approximate size and buckets by the previous size if here exists a reported RegionInfo.
// See https://github.com/tikv/tikv/issues/11114
func (r *RegionInfo) Inherit(origin *RegionInfo) {
func (r *RegionInfo) Inherit(origin *RegionInfo, bucketEnable bool) {
// regionSize should not be zero if region is not empty.
if r.GetApproximateSize() == 0 {
if origin != nil {
Expand All @@ -184,7 +184,7 @@ func (r *RegionInfo) Inherit(origin *RegionInfo) {
r.approximateSize = EmptyRegionApproximateSize
}
}
if origin != nil && r.buckets == nil {
if bucketEnable && origin != nil && r.buckets == nil {
r.buckets = origin.buckets
}
}
Expand Down Expand Up @@ -423,8 +423,8 @@ func (r *RegionInfo) GetStat() *pdpb.RegionStat {

// UpdateBuckets sets the buckets of the region.
func (r *RegionInfo) UpdateBuckets(buckets, old *metapb.Buckets) bool {
// the bucket can't be nil except in the test cases.
if buckets == nil {
atomic.StorePointer(&r.buckets, nil)
return true
}
// only need to update bucket keys, versions.
Expand Down Expand Up @@ -617,6 +617,10 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
if len(region.GetPeers()) != len(origin.GetPeers()) {
saveKV, saveCache = true, true
}
if len(region.GetBuckets().GetKeys()) != len(origin.GetBuckets().GetKeys()) {
debug("bucket key changed", zap.Uint64("region-id", region.GetID()))
saveKV, saveCache = true, true
}

if region.GetApproximateSize() != origin.GetApproximateSize() ||
region.GetApproximateKeys() != origin.GetApproximateKeys() {
Expand Down
7 changes: 7 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ func WithRemoveStorePeer(storeID uint64) RegionCreateOption {
}
}

// SetBuckets sets the buckets for the region, only use test.
func SetBuckets(buckets *metapb.Buckets) RegionCreateOption {
return func(region *RegionInfo) {
region.UpdateBuckets(buckets, region.GetBuckets())
}
}

// SetReadBytes sets the read bytes for the region.
func SetReadBytes(v uint64) RegionCreateOption {
return func(region *RegionInfo) {
Expand Down
30 changes: 13 additions & 17 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,34 +216,30 @@ func (s *testRegionInfoSuite) TestInherit(c *C) {
}
r := NewRegionInfo(&metapb.Region{Id: 100}, nil)
r.approximateSize = int64(t.size)
r.Inherit(origin)
r.Inherit(origin, false)
c.Assert(r.approximateSize, Equals, int64(t.expect))
}

// bucket
data := []struct {
originBuckets *metapb.Buckets
buckets *metapb.Buckets
same bool
}{
{nil, nil, true},
{nil, &metapb.Buckets{RegionId: 1, Version: 2}, false},
{&metapb.Buckets{RegionId: 1, Version: 2}, &metapb.Buckets{RegionId: 1, Version: 3}, false},
{&metapb.Buckets{RegionId: 1, Version: 2}, nil, true},
{nil, nil},
{nil, &metapb.Buckets{RegionId: 100, Version: 2}},
{&metapb.Buckets{RegionId: 100, Version: 2}, &metapb.Buckets{RegionId: 100, Version: 3}},
{&metapb.Buckets{RegionId: 100, Version: 2}, nil},
}
for _, d := range data {
var origin *RegionInfo
if d.originBuckets != nil {
origin = NewRegionInfo(&metapb.Region{Id: 100}, nil)
origin.UpdateBuckets(d.originBuckets, origin.GetBuckets())
}
origin := NewRegionInfo(&metapb.Region{Id: 100}, nil, SetBuckets(d.originBuckets))
r := NewRegionInfo(&metapb.Region{Id: 100}, nil)
r.UpdateBuckets(d.buckets, r.GetBuckets())
r.Inherit(origin)
if d.same {
c.Assert(r.GetBuckets(), DeepEquals, d.originBuckets)
} else {
c.Assert(r.GetBuckets(), Not(DeepEquals), d.originBuckets)
r.Inherit(origin, true)
c.Assert(r.GetBuckets(), DeepEquals, d.originBuckets)
// region will not inherit bucket keys.
if origin.GetBuckets() != nil {
newRegion := NewRegionInfo(&metapb.Region{Id: 100}, nil)
newRegion.Inherit(origin, false)
c.Assert(newRegion.GetBuckets(), Not(DeepEquals), d.originBuckets)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ func (s *GrpcServer) GetRegion(ctx context.Context, request *pdpb.GetRegionReque
return &pdpb.GetRegionResponse{Header: s.header()}, nil
}
var buckets *metapb.Buckets
if request.GetNeedBuckets() {
if rc.GetStoreConfig().IsEnableRegionBucket() && request.GetNeedBuckets() {
buckets = region.GetBuckets()
}
return &pdpb.GetRegionResponse{
Expand Down Expand Up @@ -967,7 +967,7 @@ func (s *GrpcServer) GetPrevRegion(ctx context.Context, request *pdpb.GetRegionR
return &pdpb.GetRegionResponse{Header: s.header()}, nil
}
var buckets *metapb.Buckets
if request.GetNeedBuckets() {
if rc.GetStoreConfig().IsEnableRegionBucket() && request.GetNeedBuckets() {
buckets = region.GetBuckets()
}
return &pdpb.GetRegionResponse{
Expand Down Expand Up @@ -1000,7 +1000,7 @@ func (s *GrpcServer) GetRegionByID(ctx context.Context, request *pdpb.GetRegionB
return &pdpb.GetRegionResponse{Header: s.header()}, nil
}
var buckets *metapb.Buckets
if request.GetNeedBuckets() {
if rc.GetStoreConfig().IsEnableRegionBucket() && request.GetNeedBuckets() {
buckets = region.GetBuckets()
}
return &pdpb.GetRegionResponse{
Expand Down
2 changes: 1 addition & 1 deletion server/schedulers/split_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (s *splitBucketScheduler) ServeHTTP(w http.ResponseWriter, r *http.Request)

// IsScheduleAllowed return true if the sum of executing opSplit operator is less .
func (s *splitBucketScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool {
if !cluster.GetStoreConfig().EnableRegionBucket() {
if !cluster.GetStoreConfig().IsEnableRegionBucket() {
schedulerCounter.WithLabelValues(s.GetName(), "bucket-disable").Inc()
return false
}
Expand Down
13 changes: 13 additions & 0 deletions tests/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,8 @@ func (s *testClientSuite) SetUpSuite(c *C) {
},
})
}
config := cluster.GetStoreConfig()
config.EnableRegionBucket = true
}

func (s *testClientSuite) TearDownSuite(c *C) {
Expand Down Expand Up @@ -797,6 +799,17 @@ func (s *testClientSuite) TestGetRegion(c *C) {
}
return c.Check(r.Buckets, NotNil)
})
config := s.srv.GetRaftCluster().GetStoreConfig()
config.EnableRegionBucket = false
testutil.WaitUntil(c, func() bool {
r, err := s.client.GetRegion(context.Background(), []byte("a"), pd.WithBuckets())
c.Assert(err, IsNil)
if r == nil {
return false
}
return c.Check(r.Buckets, IsNil)
})
config.EnableRegionBucket = true
c.Succeed()
}

Expand Down

0 comments on commit f3cefe1

Please sign in to comment.