Skip to content

Commit

Permalink
Merge branch 'release-6.5' into disksing/timeout2
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Jul 14, 2023
2 parents 379a9e0 + 086871d commit 5edf9fd
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 28 deletions.
27 changes: 22 additions & 5 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,11 +494,28 @@ func (c *coordinator) getHotRegionsByType(typ statistics.RWType) *statistics.Sto
default:
}
// update params `IsLearner` and `LastUpdateTime`
for _, stores := range []statistics.StoreHotPeersStat{infos.AsLeader, infos.AsPeer} {
for _, store := range stores {
for _, hotPeer := range store.Stats {
region := c.cluster.GetRegion(hotPeer.RegionID)
hotPeer.UpdateHotPeerStatShow(region)
s := []statistics.StoreHotPeersStat{infos.AsLeader, infos.AsPeer}
for i, stores := range s {
for j, store := range stores {
for k := range store.Stats {
h := &s[i][j].Stats[k]
region := c.cluster.GetRegion(h.RegionID)
if region != nil {
h.IsLearner = core.IsLearner(region.GetPeer(h.StoreID))
}
switch typ {
case statistics.Write:
if region != nil {
h.LastUpdateTime = time.Unix(int64(region.GetInterval().GetEndTimestamp()), 0)
}
case statistics.Read:
store := c.cluster.GetStore(h.StoreID)
if store != nil {
ts := store.GetMeta().GetLastHeartbeat()
h.LastUpdateTime = time.Unix(ts/1e9, ts%1e9)
}
default:
}
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions server/cluster/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,9 +706,6 @@ func (u *unsafeRecoveryController) getFailedPeers(region *metapb.Region) []*meta

var failedPeers []*metapb.Peer
for _, peer := range region.Peers {
if peer.Role == metapb.PeerRole_Learner || peer.Role == metapb.PeerRole_DemotingVoter {
continue
}
if u.isFailed(peer) {
failedPeers = append(failedPeers, peer)
}
Expand Down
42 changes: 42 additions & 0 deletions server/cluster/unsafe_recovery_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,48 @@ func TestAutoDetectMode(t *testing.T) {
}
}

// Failed learner replica/ store should be considered by auto-recover.
func TestAutoDetectModeWithOneLearner(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, opt, _ := newTestScheduleConfig()
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(ctx, cluster, hbstream.NewTestHeartbeatStreams(ctx, cluster.meta.GetId(), cluster, true))
cluster.coordinator.run()
for _, store := range newTestStores(1, "6.0.0") {
re.NoError(cluster.PutStore(store.GetMeta()))
}
recoveryController := newUnsafeRecoveryController(cluster)
re.NoError(recoveryController.RemoveFailedStores(nil, 60, true))

storeReport := pdpb.StoreReport{
PeerReports: []*pdpb.PeerReport{
{
RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10, HardState: &eraftpb.HardState{Term: 1, Commit: 10}},
RegionState: &raft_serverpb.RegionLocalState{
Region: &metapb.Region{
Id: 1001,
RegionEpoch: &metapb.RegionEpoch{ConfVer: 7, Version: 10},
Peers: []*metapb.Peer{
{Id: 11, StoreId: 1}, {Id: 12, StoreId: 2}, {Id: 13, StoreId: 3, Role: metapb.PeerRole_Learner}}}}},
},
}
req := newStoreHeartbeat(1, &storeReport)
req.StoreReport.Step = 1
resp := &pdpb.StoreHeartbeatResponse{}
recoveryController.HandleStoreHeartbeat(req, resp)
hasStore3AsFailedStore := false
for _, failedStore := range resp.RecoveryPlan.ForceLeader.FailedStores {
if failedStore == 3 {
hasStore3AsFailedStore = true
break
}
}
re.True(hasStore3AsFailedStore)
}

func TestOneLearner(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
10 changes: 8 additions & 2 deletions server/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package election

import (
"context"
"sync"
"sync/atomic"

"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -54,8 +55,9 @@ type Leadership struct {
leaderKey string
leaderValue string

keepAliveCtx context.Context
keepAliveCancelFunc context.CancelFunc
keepAliveCtx context.Context
keepAliveCancelFunc context.CancelFunc
keepAliveCancelFuncLock sync.Mutex
}

// NewLeadership creates a new Leadership.
Expand Down Expand Up @@ -137,7 +139,9 @@ func (ls *Leadership) Keep(ctx context.Context) {
if ls == nil {
return
}
ls.keepAliveCancelFuncLock.Lock()
ls.keepAliveCtx, ls.keepAliveCancelFunc = context.WithCancel(ctx)
ls.keepAliveCancelFuncLock.Unlock()
go ls.getLease().KeepAlive(ls.keepAliveCtx)
}

Expand Down Expand Up @@ -230,8 +234,10 @@ func (ls *Leadership) Reset() {
if ls == nil || ls.getLease() == nil {
return
}
ls.keepAliveCancelFuncLock.Lock()
if ls.keepAliveCancelFunc != nil {
ls.keepAliveCancelFunc()
}
ls.keepAliveCancelFuncLock.Unlock()
ls.getLease().Close()
}
7 changes: 4 additions & 3 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,17 @@ func (f *hotPeerCache) checkPeerFlow(peer *core.PeerInfo, region *core.RegionInf
}
}

peers := region.GetPeers()
newItem := &HotPeerStat{
StoreID: storeID,
RegionID: regionID,
Loads: f.kind.GetLoadRatesFromPeer(peer),
isLeader: region.GetLeader().GetStoreId() == storeID,
actionType: Update,
stores: make([]uint64, len(region.GetPeers())),
stores: make([]uint64, len(peers)),
}
for _, peer := range region.GetPeers() {
newItem.stores = append(newItem.stores, peer.GetStoreId())
for i, peer := range peers {
newItem.stores[i] = peer.GetStoreId()
}

if oldItem == nil {
Expand Down
17 changes: 2 additions & 15 deletions server/statistics/hot_regions_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@

package statistics

import (
"time"

"github.com/tikv/pd/server/core"
)
import "time"

// HotPeersStat records all hot regions statistics
type HotPeersStat struct {
Expand All @@ -44,14 +40,5 @@ type HotPeerStatShow struct {
KeyRate float64 `json:"flow_keys"`
QueryRate float64 `json:"flow_query"`
AntiCount int `json:"anti_count"`
LastUpdateTime time.Time `json:"last_update_time"`
}

// UpdateHotPeerStatShow updates the region information, such as `IsLearner` and `LastUpdateTime`.
func (h *HotPeerStatShow) UpdateHotPeerStatShow(region *core.RegionInfo) {
if region == nil {
return
}
h.IsLearner = core.IsLearner(region.GetPeer(h.StoreID))
h.LastUpdateTime = time.Unix(int64(region.GetInterval().GetEndTimestamp()), 0)
LastUpdateTime time.Time `json:"last_update_time,omitempty"`
}

0 comments on commit 5edf9fd

Please sign in to comment.