Skip to content

Commit

Permalink
Merge branch 'feature/ring' of github.com:bufferflies/pd into feature…
Browse files Browse the repository at this point in the history
…/ring
  • Loading branch information
bufferflies committed May 16, 2022
2 parents b8aed05 + b3f72b0 commit 9bd855a
Show file tree
Hide file tree
Showing 18 changed files with 319 additions and 91 deletions.
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,16 @@ error = '''
failed to lookup plugin function
'''

["PD:progress:ErrProgressNotFound"]
error = '''
no progress found for %s
'''

["PD:progress:ErrProgressWrongStatus"]
error = '''
progress status is wrong
'''

["PD:prometheus:ErrPrometheusCreateClient"]
error = '''
create client error
Expand Down
6 changes: 6 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,9 @@ var (
ErrUnsafeRecoveryIsRunning = errors.Normalize("unsafe recovery is running", errors.RFCCodeText("PD:unsaferecovery:ErrUnsafeRecoveryIsRunning"))
ErrUnsafeRecoveryInvalidInput = errors.Normalize("invalid input %s", errors.RFCCodeText("PD:unsaferecovery:ErrUnsafeRecoveryInvalidInput"))
)

// progress errors
var (
ErrProgressWrongStatus = errors.Normalize("progress status is wrong", errors.RFCCodeText("PD:progress:ErrProgressWrongStatus"))
ErrProgressNotFound = errors.Normalize("no progress found for %s", errors.RFCCodeText("PD:progress:ErrProgressNotFound"))
)
15 changes: 13 additions & 2 deletions pkg/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package progress

import (
"fmt"
"math"
"time"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/syncutil"
)

Expand Down Expand Up @@ -81,6 +83,9 @@ func (m *Manager) UpdateProgressRemaining(progress string, remaining float64) {
if p.total < remaining {
p.total = remaining
}
if p.lastTimeRemaining < remaining {
p.lastTimeRemaining = remaining
}
// calculate the average speed for every `speedStatisticalInterval`
if time.Since(p.lastTime) >= speedStatisticalInterval {
if (p.lastTimeRemaining - remaining) <= 0 {
Expand Down Expand Up @@ -131,12 +136,17 @@ func (m *Manager) GetProgresses(filter func(p string) bool) []string {
}

// Status returns the current progress status of a give name.
func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed float64) {
func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed float64, err error) {
m.RLock()
defer m.RUnlock()

if p, exist := m.progesses[progress]; exist {
process = 1 - p.remaining/p.total
if process < 0 {
process = 0
err = errs.ErrProgressWrongStatus.FastGenByArgs(fmt.Sprintf("the remaining: %v is larger than the total: %v", p.remaining, p.total))
return
}
currentSpeed = 0
// when the progress is newly added
if p.lastSpeed == 0 && time.Since(p.lastTime) < speedStatisticalInterval {
Expand All @@ -150,5 +160,6 @@ func (m *Manager) Status(progress string) (process, leftSeconds, currentSpeed fl
}
return
}
return 0, 0, 0
err = errs.ErrProgressNotFound.FastGenByArgs(fmt.Sprintf("the progress: %s", progress))
return
}
34 changes: 31 additions & 3 deletions pkg/progress/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ func (s *testProgressSuite) Test(c *C) {
n := "test"
m := NewManager()
c.Assert(m.AddProgress(n, 100), IsFalse)
p, ls, cs := m.Status(n)
p, ls, cs, err := m.Status(n)
c.Assert(err, IsNil)
c.Assert(p, Equals, 0.0)
c.Assert(ls, Equals, math.MaxFloat64)
c.Assert(cs, Equals, 0.0)
Expand All @@ -47,7 +48,8 @@ func (s *testProgressSuite) Test(c *C) {
}()
time.Sleep(time.Millisecond)
m.UpdateProgressRemaining(n, 30)
p, ls, cs = m.Status(n)
p, ls, cs, err = m.Status(n)
c.Assert(err, IsNil)
c.Assert(p, Equals, 0.7)
// 30/(70/1s+) > 30/70
c.Assert(ls, Greater, 30.0/70.0)
Expand All @@ -56,7 +58,8 @@ func (s *testProgressSuite) Test(c *C) {
// there is no scheduling
time.Sleep(time.Millisecond)
m.UpdateProgressRemaining(n, 30)
p, ls, cs = m.Status(n)
p, ls, cs, err = m.Status(n)
c.Assert(err, IsNil)
c.Assert(p, Equals, 0.7)
// the speed in previous `speedStatisticalInterval` is zero
c.Assert(ls, Equals, math.MaxFloat64)
Expand All @@ -74,3 +77,28 @@ func (s *testProgressSuite) Test(c *C) {
c.Assert(m.RemoveProgress(n), IsTrue)
c.Assert(m.RemoveProgress(n), IsFalse)
}

func (s *testProgressSuite) TestAbnormal(c *C) {
n := "test"
m := NewManager()
c.Assert(m.AddProgress(n, 100), IsFalse)
p, ls, cs, err := m.Status(n)
c.Assert(err, IsNil)
c.Assert(p, Equals, 0.0)
c.Assert(ls, Equals, math.MaxFloat64)
c.Assert(cs, Equals, 0.0)
// When offline a store, but there are operators moving region to it, e.g., merge operation
m.UpdateProgressRemaining(n, 110)
p, ls, cs, err = m.Status(n)
c.Assert(err, IsNil)
c.Assert(p, Equals, 0.0)
c.Assert(ls, Equals, math.MaxFloat64)
c.Assert(cs, Equals, 0.0)
// It usually won't happens
m.UpdateProgressTotal(n, 10)
p, ls, cs, err = m.Status(n)
c.Assert(err, NotNil)
c.Assert(p, Equals, 0.0)
c.Assert(ls, Equals, 0.0)
c.Assert(cs, Equals, 0.0)
}
12 changes: 6 additions & 6 deletions server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,9 +633,9 @@ func (h *storesHandler) GetStoresProgress(w http.ResponseWriter, r *http.Request
return
}

action, progress, leftSeconds, currentSpeed := h.Handler.GetProgressByID(v)
if progress == 0 && leftSeconds == 0 && currentSpeed == 0 {
h.rd.JSON(w, http.StatusNotFound, "no progress found for the given store ID")
action, progress, leftSeconds, currentSpeed, err := h.Handler.GetProgressByID(v)
if err != nil {
h.rd.JSON(w, http.StatusNotFound, err.Error())
return
}
sp := &Progress{
Expand All @@ -650,9 +650,9 @@ func (h *storesHandler) GetStoresProgress(w http.ResponseWriter, r *http.Request
return
}
if v := r.URL.Query().Get("action"); v != "" {
progress, leftSeconds, currentSpeed := h.Handler.GetProgressByAction(v)
if progress == 0 && leftSeconds == 0 && currentSpeed == 0 {
h.rd.JSON(w, http.StatusNotFound, "no progress found for the action")
progress, leftSeconds, currentSpeed, err := h.Handler.GetProgressByAction(v)
if err != nil {
h.rd.JSON(w, http.StatusNotFound, err.Error())
return
}
sp := &Progress{
Expand Down
43 changes: 32 additions & 11 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,11 @@ func syncConfig(manager *config.StoreConfigManager, stores []*core.StoreInfo) bo
// it will try next store if the current store is failed.
address := netutil.ResolveLoopBackAddr(stores[index].GetStatusAddress(), stores[index].GetAddress())
if err := manager.ObserveConfig(address); err != nil {
log.Warn("sync store config failed, it will try next store", zap.Error(err))
storeSyncConfigEvent.WithLabelValues(address, "fail").Inc()
log.Debug("sync store config failed, it will try next store", zap.Error(err))
continue
}
storeSyncConfigEvent.WithLabelValues(address, "succ").Inc()
// it will only try one store.
return true
}
Expand All @@ -339,6 +341,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
return nil, nil
}

c.core.ResetStores()
start := time.Now()
if err := c.storage.LoadStores(c.core.PutStore); err != nil {
return nil, err
Expand Down Expand Up @@ -614,6 +617,11 @@ func (c *RaftCluster) GetSuspectRegions() []uint64 {
return c.coordinator.checkers.GetSuspectRegions()
}

// GetHotStat gets hot stat for test.
func (c *RaftCluster) GetHotStat() *statistics.HotStat {
return c.hotStat
}

// RemoveSuspectRegion removes region from suspect list.
func (c *RaftCluster) RemoveSuspectRegion(id uint64) {
c.coordinator.checkers.RemoveSuspectRegion(id)
Expand Down Expand Up @@ -1432,7 +1440,8 @@ func (c *RaftCluster) checkStores() {
threshold := c.getThreshold(stores, store)
log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold))
regionSize := float64(store.GetRegionSize())
if store.GetUptime() > c.opt.GetMaxStorePreparingTime() || regionSize >= threshold {
if store.GetUptime() > c.opt.GetMaxStorePreparingTime() || regionSize >= threshold ||
c.GetRegionCount() < core.InitClusterRegionThreshold {
if err := c.ReadyToServe(storeID); err != nil {
log.Error("change store to serving failed",
zap.Stringer("store", store.GetMeta()),
Expand All @@ -1457,8 +1466,9 @@ func (c *RaftCluster) checkStores() {
id := offlineStore.GetId()
regionSize := c.core.GetStoreRegionSize(id)
c.updateProgress(id, store.GetAddress(), removingAction, float64(regionSize))
regionCount := c.core.GetStoreRegionCount(id)
// If the store is empty, it can be buried.
if regionSize == 0 {
if regionCount == 0 {
if err := c.BuryStore(id, false); err != nil {
log.Error("bury store failed",
zap.Stringer("store", offlineStore),
Expand Down Expand Up @@ -1624,7 +1634,11 @@ func (c *RaftCluster) updateProgress(storeID uint64, storeAddress string, action
return
}
c.progressManager.UpdateProgressRemaining(progress, remaining)
process, ls, _ := c.progressManager.Status(progress)
process, ls, _, err := c.progressManager.Status(progress)
if err != nil {
log.Error("get progress status failed", zap.String("progress", progress), zap.Float64("remaining", remaining), errs.ZapError(err))
return
}
storesProgressGauge.WithLabelValues(storeAddress, storeLabel, action).Set(process)
storesETAGauge.WithLabelValues(storeAddress, storeLabel, action).Set(ls)
}
Expand Down Expand Up @@ -2153,39 +2167,46 @@ func (c *RaftCluster) GetEtcdClient() *clientv3.Client {
}

// GetProgressByID returns the progress details for a given store ID.
func (c *RaftCluster) GetProgressByID(storeID string) (action string, process, ls, cs float64) {
func (c *RaftCluster) GetProgressByID(storeID string) (action string, process, ls, cs float64, err error) {
filter := func(progress string) bool {
s := strings.Split(progress, "-")
return len(s) == 2 && s[1] == storeID
}
progress := c.progressManager.GetProgresses(filter)
if len(progress) != 0 {
process, ls, cs = c.progressManager.Status(progress[0])
process, ls, cs, err = c.progressManager.Status(progress[0])
if err != nil {
return
}
if strings.HasPrefix(progress[0], removingAction) {
action = removingAction
} else if strings.HasPrefix(progress[0], preparingAction) {
action = preparingAction
}
return
}
return "", 0, 0, 0
return "", 0, 0, 0, errs.ErrProgressNotFound.FastGenByArgs(fmt.Sprintf("the given store ID: %s", storeID))
}

// GetProgressByAction returns the progress details for a given action.
func (c *RaftCluster) GetProgressByAction(action string) (process, ls, cs float64) {
func (c *RaftCluster) GetProgressByAction(action string) (process, ls, cs float64, err error) {
filter := func(progress string) bool {
return strings.HasPrefix(progress, action)
}

progresses := c.progressManager.GetProgresses(filter)
if len(progresses) == 0 {
return 0, 0, 0
return 0, 0, 0, errs.ErrProgressNotFound.FastGenByArgs(fmt.Sprintf("the action: %s", action))
}
var p, l, s float64
for _, progress := range progresses {
p, l, c := c.progressManager.Status(progress)
p, l, s, err = c.progressManager.Status(progress)
if err != nil {
return
}
process += p
ls += l
cs += c
cs += s
}
num := float64(len(progresses))
process /= num
Expand Down
6 changes: 4 additions & 2 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,8 @@ func (s *testClusterInfoSuite) TestRemovingProcess(c *C) {
cluster.checkStores()
process := "removing-1"
// no region moving
p, l, cs := cluster.progressManager.Status(process)
p, l, cs, err := cluster.progressManager.Status(process)
c.Assert(err, IsNil)
c.Assert(p, Equals, 0.0)
c.Assert(l, Equals, math.MaxFloat64)
c.Assert(cs, Equals, 0.0)
Expand All @@ -467,7 +468,8 @@ func (s *testClusterInfoSuite) TestRemovingProcess(c *C) {
}
time.Sleep(time.Second)
cluster.checkStores()
p, l, cs = cluster.progressManager.Status(process)
p, l, cs, err = cluster.progressManager.Status(process)
c.Assert(err, IsNil)
// In above we delete 5 region from store 1, the total count of region in store 1 is 20.
// process = 5 / 20 = 0.25
c.Assert(p, Equals, 0.25)
Expand Down
9 changes: 9 additions & 0 deletions server/cluster/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ var (
Help: "The current progress of corresponding action",
}, []string{"address", "store", "action"})

storeSyncConfigEvent = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "cluster",
Name: "store_sync",
Help: "The state of store sync config",
}, []string{"address", "state"})

storesETAGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd",
Expand All @@ -117,4 +125,5 @@ func init() {
prometheus.MustRegister(bucketEventCounter)
prometheus.MustRegister(storesProgressGauge)
prometheus.MustRegister(storesETAGauge)
prometheus.MustRegister(storeSyncConfigEvent)
}
7 changes: 7 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,13 @@ func (bc *BasicCluster) PutStore(store *StoreInfo) {
bc.Stores.SetStore(store)
}

// ResetStores resets the store cache.
func (bc *BasicCluster) ResetStores() {
bc.Lock()
defer bc.Unlock()
bc.Stores = NewStoresInfo()
}

// DeleteStore deletes a store.
func (bc *BasicCluster) DeleteStore(store *StoreInfo) {
bc.Lock()
Expand Down
2 changes: 2 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ const (
// Only statistics within this interval limit are valid.
statsReportMinInterval = 3 // 3s
statsReportMaxInterval = 5 * 60 // 5min
// InitClusterRegionThreshold is a threshold which represent a new cluster.
InitClusterRegionThreshold = 50
)

// RegionFromHeartbeat constructs a Region from region heartbeat.
Expand Down
4 changes: 2 additions & 2 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,12 +938,12 @@ func (h *Handler) GetStoreLimitScene(limitType storelimit.Type) *storelimit.Scen
}

// GetProgressByID returns the progress details for a given store ID.
func (h *Handler) GetProgressByID(storeID string) (action string, p, ls, cs float64) {
func (h *Handler) GetProgressByID(storeID string) (action string, p, ls, cs float64, err error) {
return h.s.GetRaftCluster().GetProgressByID(storeID)
}

// GetProgressByAction returns the progress details for a given action.
func (h *Handler) GetProgressByAction(action string) (p, ls, cs float64) {
func (h *Handler) GetProgressByAction(action string) (p, ls, cs float64, err error) {
return h.s.GetRaftCluster().GetProgressByAction(action)
}

Expand Down
4 changes: 1 addition & 3 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ const (
BalanceRegionName = "balance-region-scheduler"
// BalanceRegionType is balance region scheduler type.
BalanceRegionType = "balance-region"
// BalanceEmptyRegionThreshold is a threshold which allow balance the empty region if the region number is less than this threshold.
balanceEmptyRegionThreshold = 50
)

type balanceRegionSchedulerConfig struct {
Expand Down Expand Up @@ -265,7 +263,7 @@ func (s *balanceRegionScheduler) transferPeer(plan *balancePlan) *operator.Opera

// isEmptyRegionAllowBalance checks if a region is an empty region and can be balanced.
func isEmptyRegionAllowBalance(cluster schedule.Cluster, region *core.RegionInfo) bool {
return region.GetApproximateSize() > core.EmptyRegionApproximateSize || cluster.GetRegionCount() < balanceEmptyRegionThreshold
return region.GetApproximateSize() > core.EmptyRegionApproximateSize || cluster.GetRegionCount() < core.InitClusterRegionThreshold
}

// isAllowBalanceEmptyRegion returns a function that checks if a region is an empty region and can be balanced.
Expand Down
Loading

0 comments on commit 9bd855a

Please sign in to comment.