Skip to content

Commit

Permalink
*: fix the issue that loadcluster does not remove overlap regions
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Dec 15, 2019
1 parent f7f643c commit 0828315
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 18 deletions.
35 changes: 17 additions & 18 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ const (
defaultChangedRegionsLimit = 10000
)

// ErrRegionIsStale is error info for region is stale.
var ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error {
return errors.Errorf("region is stale: region %v origin %v", region, origin)
}

// Server is the interface for cluster.
type Server interface {
GetAllocator() *id.AllocatorImpl
Expand Down Expand Up @@ -242,7 +237,17 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {

start = time.Now()

if err := c.storage.LoadRegions(c.core.PutRegion); err != nil {
// used to load region from kv storage to cache storage.
putRegionToCache := func(region *core.RegionInfo) []*core.RegionInfo {
origin, err := c.core.PreCheckPutRegion(region)
if err != nil {
log.Warn("region is stale", zap.Error(err), zap.Stringer("origin", origin.GetMeta()))
// return the state region to delete.
return []*core.RegionInfo{region}
}
return c.core.PutRegion(region)
}
if err := c.storage.LoadRegions(putRegionToCache); err != nil {
return nil, err
}
log.Info("load regions",
Expand Down Expand Up @@ -394,14 +399,10 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
// processRegionHeartbeat updates the region information.
func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.RLock()
origin := c.GetRegion(region.GetID())
if origin == nil {
for _, item := range c.core.GetOverlaps(region) {
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() {
c.RUnlock()
return ErrRegionIsStale(region.GetMeta(), item.GetMeta())
}
}
origin, err := c.core.PreCheckPutRegion(region)
if err != nil {
c.RUnlock()
return err
}
writeItems := c.CheckWriteStatus(region)
readItems := c.CheckReadStatus(region)
Expand All @@ -420,10 +421,6 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
} else {
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
// Region meta is stale, return an error.
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() {
return ErrRegionIsStale(region.GetMeta(), origin.GetMeta())
}
if r.GetVersion() > o.GetVersion() {
log.Info("region Version changed",
zap.Uint64("region-id", region.GetID()),
Expand Down Expand Up @@ -1454,6 +1451,8 @@ func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.Hot
return c.hotSpotCache.CheckRead(region, c.storesStats)
}

// TODO: remove me.
// only used in test.
func (c *RaftCluster) putRegion(region *core.RegionInfo) error {
c.Lock()
defer c.Unlock()
Expand Down
23 changes: 23 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,29 @@ func (bc *BasicCluster) TakeStore(storeID uint64) *StoreInfo {
return bc.Stores.TakeStore(storeID)
}

// PreCheckPutRegion checks if the region is valid to put.
func (bc *BasicCluster) PreCheckPutRegion(region *RegionInfo) (*RegionInfo, error) {
bc.RLock()
for _, item := range bc.Regions.GetOverlaps(region) {
if region.GetRegionEpoch().GetVersion() < item.GetRegionEpoch().GetVersion() {
bc.RUnlock()
return nil, ErrRegionIsStale(region.GetMeta(), item.GetMeta())
}
}
origin := bc.Regions.GetRegion(region.GetID())
bc.RUnlock()
if origin == nil {
return nil, nil
}
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
// Region meta is stale, return an error.
if r.GetVersion() < o.GetVersion() || r.GetConfVer() < o.GetConfVer() {
return origin, ErrRegionIsStale(region.GetMeta(), origin.GetMeta())
}
return origin, nil
}

// PutRegion put a region.
func (bc *BasicCluster) PutRegion(region *RegionInfo) []*RegionInfo {
bc.Lock()
Expand Down
7 changes: 7 additions & 0 deletions server/core/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"net/http"

"github.com/pingcap/errcode"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
)

var (
Expand Down Expand Up @@ -61,3 +63,8 @@ func (e StoreBlockedErr) Error() string {

// Code returns StoreBlockedCode
func (e StoreBlockedErr) Code() errcode.Code { return StoreBlockedCode }

// ErrRegionIsStale is error info for region is stale.
var ErrRegionIsStale = func(region *metapb.Region, origin *metapb.Region) error {
return errors.Errorf("region is stale: region %v origin %v", region, origin)
}
3 changes: 3 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ func (r *RegionInfo) GetID() uint64 {

// GetMeta returns the meta information of the region.
func (r *RegionInfo) GetMeta() *metapb.Region {
if r == nil {
return nil
}
return r.meta
}

Expand Down
7 changes: 7 additions & 0 deletions tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) {
err = rc.HandleRegionHeartbeat(region)
c.Assert(err, IsNil)
}
// merge case
// region2-> region 1 -> region 0
regions[2] = regions[0].Clone(core.WithEndKey(regions[2].GetEndKey()), core.WithIncVersion(), core.WithIncVersion())
err = rc.HandleRegionHeartbeat(regions[2])
c.Assert(err, IsNil)
regionLen -= 2

// ensure flush to region storage, we use a duration larger than the
// region storage flush rate limit (3s).
time.Sleep(4 * time.Second)
Expand Down

0 comments on commit 0828315

Please sign in to comment.