Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic/receiving io #6

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2092,6 +2092,10 @@ func (c *RaftCluster) RemoveStoreLimit(storeID uint64) {
for _, limitType := range storelimit.TypeNameValue {
c.core.ResetStoreLimit(storeID, limitType)
}

for _, snapType := range storelimit.SnapTypeNameValue {
c.core.ResetSnapLimit(storeID, snapType)
}
delete(cfg.StoreLimit, storeID)
c.opt.SetScheduleConfig(cfg)
var err error
Expand Down
6 changes: 6 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cluster
import (
"bytes"
"context"
"github.com/tikv/pd/server/core/storelimit"
"net/http"
"strconv"
"sync"
Expand Down Expand Up @@ -150,6 +151,11 @@ func (c *coordinator) patrolRegions() {
continue
}

store := c.cluster.GetStore(region.GetLeader().GetStoreId())
if store == nil || !store.IsAvailableSnap(storelimit.SendSnapShot) {
continue
}

ops := c.checkers.CheckRegion(region)

key = region.GetEndKey()
Expand Down
8 changes: 8 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ const (
defaultLogFormat = "text"

defaultMaxMovableHotPeerSize = int64(512)

defaultSendSnapshotSize = int64(1000)
)

// Special keys for Labels
Expand Down Expand Up @@ -761,6 +763,8 @@ type ScheduleConfig struct {
// MaxMovableHotPeerSize is the threshold of region size for balance hot region and split bucket scheduler.
// Hot region must be split before moved if it's region size is greater than MaxMovableHotPeerSize.
MaxMovableHotPeerSize int64 `toml:"max-movable-hot-peer-size" json:"max-movable-hot-peer-size,omitempty"`

SendSnapshotSize int64 `toml:"send-snapshot-size" json:"send-snapshot-size"`
}

// Clone returns a cloned scheduling configuration.
Expand Down Expand Up @@ -862,6 +866,10 @@ func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error {
if !meta.IsDefined("enable-cross-table-merge") {
c.EnableCrossTableMerge = defaultEnableCrossTableMerge
}

if !meta.IsDefined("send-snapshot-size") {
adjustInt64(&c.SendSnapshotSize, defaultSendSnapshotSize)
}
adjustFloat64(&c.LowSpaceRatio, defaultLowSpaceRatio)
adjustFloat64(&c.HighSpaceRatio, defaultHighSpaceRatio)

Expand Down
9 changes: 9 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,15 @@ func (o *PersistOptions) IsLocationReplacementEnabled() bool {
return o.GetScheduleConfig().EnableLocationReplacement
}

// GetSendSnapshotSize returns the send snapshot size.
func (o *PersistOptions) GetSendSnapshotSize() int64 {
size := o.GetScheduleConfig().SendSnapshotSize
if size <= 0 {
size = defaultSendSnapshotSize
}
return size
}

// GetMaxMovableHotPeerSize returns the max movable hot peer size.
func (o *PersistOptions) GetMaxMovableHotPeerSize() int64 {
size := o.GetScheduleConfig().MaxMovableHotPeerSize
Expand Down
7 changes: 7 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ func (bc *BasicCluster) ResetStoreLimit(storeID uint64, limitType storelimit.Typ
bc.Stores.ResetStoreLimit(storeID, limitType, ratePerSec...)
}

// ResetSnapLimit resets the snapshot limit for the given store.
func (bc *BasicCluster) ResetSnapLimit(storeID uint64, limitType storelimit.SnapType, cap ...int64) {
bc.Lock()
defer bc.Unlock()
bc.Stores.ResetSnapLimit(storeID, limitType, cap...)
}

// UpdateStoreStatus updates the information of the store.
func (bc *BasicCluster) UpdateStoreStatus(storeID uint64, leaderCount int, regionCount int, pendingPeerCount int, leaderSize int64, regionSize int64) {
bc.Lock()
Expand Down
37 changes: 36 additions & 1 deletion server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type StoreInfo struct {
leaderWeight float64
regionWeight float64
limiter map[storelimit.Type]*storelimit.StoreLimit
snapLimiter map[storelimit.SnapType]*storelimit.SlidingWindows
minResolvedTS uint64
}

Expand All @@ -69,6 +70,7 @@ func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo {
leaderWeight: 1.0,
regionWeight: 1.0,
limiter: make(map[storelimit.Type]*storelimit.StoreLimit),
snapLimiter: make(map[storelimit.SnapType]*storelimit.SlidingWindows),
minResolvedTS: 0,
}
for _, opt := range opts {
Expand Down Expand Up @@ -100,6 +102,7 @@ func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo {
leaderWeight: s.leaderWeight,
regionWeight: s.regionWeight,
limiter: s.limiter,
snapLimiter: s.snapLimiter,
minResolvedTS: s.minResolvedTS,
}

Expand All @@ -125,6 +128,7 @@ func (s *StoreInfo) ShallowClone(opts ...StoreCreateOption) *StoreInfo {
leaderWeight: s.leaderWeight,
regionWeight: s.regionWeight,
limiter: s.limiter,
snapLimiter: s.snapLimiter,
minResolvedTS: s.minResolvedTS,
}

Expand Down Expand Up @@ -155,6 +159,18 @@ func (s *StoreInfo) IsAvailable(limitType storelimit.Type) bool {
return true
}

// IsAvailableSnap returns ture if the store have available size.
func (s *StoreInfo) IsAvailableSnap(snapType storelimit.SnapType) bool {
s.mu.RLock()
defer s.mu.RUnlock()

if s.snapLimiter != nil && s.snapLimiter[snapType] != nil {
isAvailable := s.snapLimiter[snapType].Available(0)
return isAvailable
}
return true
}

// IsTiFlash returns true if the store is tiflash.
func (s *StoreInfo) IsTiFlash() bool {
return IsStoreContainLabel(s.GetMeta(), EngineKey, EngineTiFlash)
Expand Down Expand Up @@ -301,6 +317,13 @@ func (s *StoreInfo) GetStoreLimit(limitType storelimit.Type) *storelimit.StoreLi
return s.limiter[limitType]
}

// GetSnapLimit returns the snapshot limit of the given store.
func (s *StoreInfo) GetSnapLimit(snapType storelimit.SnapType) *storelimit.SlidingWindows {
s.mu.RLock()
defer s.mu.RUnlock()
return s.snapLimiter[snapType]
}

const minWeight = 1e-6
const maxScore = 1024 * 1024 * 1024

Expand Down Expand Up @@ -660,10 +683,22 @@ func (s *StoresInfo) SlowStoreRecovered(storeID uint64) {
s.stores[storeID] = store.Clone(SlowStoreRecovered())
}

// defaultSnapSize is the default snapshot size of the
const defaultSnapSize = int64(100 * 10)

// ResetStoreLimit resets the limit for a specific store.
func (s *StoresInfo) ResetStoreLimit(storeID uint64, limitType storelimit.Type, ratePerSec ...float64) {
if store, ok := s.stores[storeID]; ok {
s.stores[storeID] = store.Clone(ResetStoreLimit(limitType, ratePerSec...))
s.stores[storeID] = store.Clone(
ResetStoreLimit(limitType, ratePerSec...))
}
}

// ResetSnapLimit resets the snapshot limit for the given store.
func (s *StoresInfo) ResetSnapLimit(storeID uint64, snapType storelimit.SnapType, cap ...int64) {
if store, ok := s.stores[storeID]; ok {
s.stores[storeID] = store.Clone(
ResetSnapLimit(snapType, cap...))
}
}

Expand Down
16 changes: 16 additions & 0 deletions server/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,19 @@ func ResetStoreLimit(limitType storelimit.Type, ratePerSec ...float64) StoreCrea
store.limiter[limitType] = storelimit.NewStoreLimit(ratePerSec[0], storelimit.RegionInfluence[limitType])
}
}

func ResetSnapLimit(snapType storelimit.SnapType, capacity ...int64) StoreCreateOption {
return func(store *StoreInfo) {
store.mu.Lock()
defer store.mu.Unlock()
cap := defaultSnapSize
if len(capacity) > 0 {
cap = capacity[0]
}
if limiter := store.snapLimiter[snapType]; limiter != nil {
limiter.Adjust(cap)
} else {
store.snapLimiter[snapType] = storelimit.NewSlidingWindows(cap)
}
}
}
6 changes: 6 additions & 0 deletions server/core/storelimit/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package storelimit

type Limiter interface {
Available(n int64) bool
Take(count int64) bool
}
127 changes: 127 additions & 0 deletions server/core/storelimit/sliding_window.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storelimit

import (
"github.com/tikv/pd/pkg/syncutil"
)

const snapSize = 10

type SnapType int

const (
// RecvSnapShot indicates the type of store limit that limits the adding peer rate
RecvSnapShot SnapType = iota
// SendSnapShot indicates the type of store limit that limits the leader peer rate
SendSnapShot
)

// SnapTypeNameValue indicates the name of store limit type and the enum value
var SnapTypeNameValue = map[string]SnapType{
"recv-snapshot": RecvSnapShot,
"send-snapshot": SendSnapShot,
}

// String returns the representation of the Type
func (t SnapType) String() string {
for n, v := range SnapTypeNameValue {
if v == t {
return n
}
}
return ""
}

// SlidingWindows limits the operators of a store
type SlidingWindows struct {
mu syncutil.Mutex
capacity int64
used int64
}

// NewSlidingWindows is the construct of sliding windows.
func NewSlidingWindows(capacity int64) *SlidingWindows {
return &SlidingWindows{capacity: capacity, used: 0}
}

// Adjust the sliding window capacity.
func (s *SlidingWindows) Adjust(capacity int64) {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
s.capacity = capacity
}

// Ack indicates that some executing operator has been finished.
func (s *SlidingWindows) Ack(token int64) {
if s == nil {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if s.used > token {
s.used -= token
} else {
s.used = 0
}

}

// Available returns false if there is no free size for the token.
func (s *SlidingWindows) Available(_ int64) bool {
if s == nil {
return true
}
s.mu.Lock()
defer s.mu.Unlock()
return s.used+snapSize <= s.capacity
}

// GetUsed returns the used size in the sliding windows.
func (s *SlidingWindows) GetUsed() int64 {
if s == nil {
return 0
}
s.mu.Lock()
defer s.mu.Unlock()
return s.used
}

// GetCapacity returns the capacity of the sliding windows.
func (s *SlidingWindows) GetCapacity() int64 {
if s == nil {
return 0
}
s.mu.Lock()
defer s.mu.Unlock()
return s.capacity
}

// Take some size if there are some free size more than token.
func (s *SlidingWindows) Take(token int64) bool {
if s == nil {
return true
}
s.mu.Lock()
defer s.mu.Unlock()
if s.used+snapSize <= s.capacity {
s.used += token
return true
}
return false
}
20 changes: 20 additions & 0 deletions server/core/storelimit/sliding_window_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package storelimit

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_SlidingWindows(t *testing.T) {
t.Parallel()
capacity := int64(100 * 10)
re := assert.New(t)
s := NewSlidingWindows(capacity)
re.True(s.Available(capacity))
re.True(s.Take(capacity))
re.False(s.Take(capacity))
s.Ack(capacity)
re.True(s.Take(capacity))
re.Equal(capacity, s.used)
}
4 changes: 2 additions & 2 deletions server/core/storelimit/store_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,6 @@ func (l *StoreLimit) Rate() float64 {
}

// Take takes count tokens from the bucket without blocking.
func (l *StoreLimit) Take(count int64) {
l.limiter.AllowN(int(count))
func (l *StoreLimit) Take(count int64) bool {
return l.limiter.AllowN(int(count))
}
11 changes: 10 additions & 1 deletion server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,15 @@ func (f *StoreStateFilter) isBusy(opt *config.PersistOptions, store *core.StoreI
return statusOK
}

func (f *StoreStateFilter) exceedRecvSnapLimit(_ *config.PersistOptions, store *core.StoreInfo) plan.Status {
if !f.AllowTemporaryStates && !store.IsAvailableSnap(storelimit.RecvSnapShot) {
f.Reason = "exceed-recv-snapshot-limit"
return statusStoreSnapRemoveLimit
}
f.Reason = ""
return statusOK
}

func (f *StoreStateFilter) exceedRemoveLimit(opt *config.PersistOptions, store *core.StoreInfo) plan.Status {
if !f.AllowTemporaryStates && !store.IsAvailable(storelimit.RemovePeer) {
f.Reason = "exceed-remove-limit"
Expand Down Expand Up @@ -463,7 +472,7 @@ func (f *StoreStateFilter) anyConditionMatch(typ int, opt *config.PersistOptions
f.slowStoreEvicted, f.isDisconnected, f.isBusy, f.hasRejectLeaderProperty}
case regionTarget:
funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.isDisconnected, f.isBusy,
f.exceedAddLimit, f.tooManySnapshots, f.tooManyPendingPeers}
f.exceedAddLimit, f.tooManySnapshots, f.tooManyPendingPeers, f.exceedRecvSnapLimit}
case scatterRegionTarget:
funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.isDisconnected, f.isBusy}
}
Expand Down
1 change: 1 addition & 0 deletions server/schedule/filter/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
statusStoreTooManyPendingPeer = plan.NewStatus(plan.StatusStoreThrottled, "store has too many pending peers, the related setting is 'max-pending-peer-count'")
statusStoreAddLimit = plan.NewStatus(plan.StatusStoreThrottled, "store's add limit is exhausted, please check the setting of 'store limit'")
statusStoreRemoveLimit = plan.NewStatus(plan.StatusStoreThrottled, "store's remove limit is exhausted, please check the setting of 'store limit'")
statusStoreSnapRemoveLimit = plan.NewStatus(plan.StatusStoreThrottled, "store's snapshot remove limit is exhausted'")
statusStoreLabel = plan.NewStatus(plan.StatusLabelNotMatch)
statusStoreRule = plan.NewStatus(plan.StatusRuleNotMatch)
statusStorePauseLeader = plan.NewStatus(plan.StatusStoreBlocked, "the store is not allowed to transfer leader, there might be an evict-leader-scheduler")
Expand Down
Loading