Skip to content

Commit

Permalink
schedule: add more dimension for filter metrics (tikv#1746)
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Nov 7, 2019
1 parent 6571002 commit c03de9a
Show file tree
Hide file tree
Showing 17 changed files with 204 additions and 151 deletions.
161 changes: 79 additions & 82 deletions server/schedule/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (

// Filter is an interface to filter source and target store.
type Filter interface {
// Scope is used to indicate where the filter will act on.
Scope() string
Type() string
// Return true if the store should not be used as a source store.
FilterSource(opt Options, store *core.StoreInfo) bool
Expand All @@ -38,7 +40,7 @@ func FilterSource(opt Options, store *core.StoreInfo, filters []Filter) bool {
storeID := fmt.Sprintf("%d", store.GetID())
for _, filter := range filters {
if filter.FilterSource(opt, store) {
filterCounter.WithLabelValues("filter-source", storeAddress, storeID, filter.Type()).Inc()
filterCounter.WithLabelValues("filter-source", storeAddress, storeID, filter.Scope(), filter.Type()).Inc()
return true
}
}
Expand All @@ -51,26 +53,32 @@ func FilterTarget(opt Options, store *core.StoreInfo, filters []Filter) bool {
storeID := fmt.Sprintf("%d", store.GetID())
for _, filter := range filters {
if filter.FilterTarget(opt, store) {
filterCounter.WithLabelValues("filter-target", storeAddress, storeID, filter.Type()).Inc()
filterCounter.WithLabelValues("filter-target", storeAddress, storeID, filter.Scope(), filter.Type()).Inc()
return true
}
}
return false
}

type excludedFilter struct {
scope string
sources map[uint64]struct{}
targets map[uint64]struct{}
}

// NewExcludedFilter creates a Filter that filters all specified stores.
func NewExcludedFilter(sources, targets map[uint64]struct{}) Filter {
func NewExcludedFilter(scope string, sources, targets map[uint64]struct{}) Filter {
return &excludedFilter{
scope: scope,
sources: sources,
targets: targets,
}
}

func (f *excludedFilter) Scope() string {
return f.scope
}

func (f *excludedFilter) Type() string {
return "exclude-filter"
}
Expand All @@ -85,30 +93,15 @@ func (f *excludedFilter) FilterTarget(opt Options, store *core.StoreInfo) bool {
return ok
}

type blockFilter struct{}

// NewBlockFilter creates a Filter that filters all stores that are blocked from balance.
func NewBlockFilter() Filter {
return &blockFilter{}
}

func (f *blockFilter) Type() string {
return "block-filter"
}

func (f *blockFilter) FilterSource(opt Options, store *core.StoreInfo) bool {
return store.IsBlocked()
}
type overloadFilter struct{ scope string }

func (f *blockFilter) FilterTarget(opt Options, store *core.StoreInfo) bool {
return store.IsBlocked()
// NewOverloadFilter creates a Filter that filters all stores that are overloaded from balance.
func NewOverloadFilter(scope string) Filter {
return &overloadFilter{scope: scope}
}

type overloadFilter struct{}

// NewOverloadFilter creates a Filter that filters all stores that are overloaded from balance.
func NewOverloadFilter() Filter {
return &overloadFilter{}
func (f *overloadFilter) Scope() string {
return f.scope
}

func (f *overloadFilter) Type() string {
Expand All @@ -123,11 +116,15 @@ func (f *overloadFilter) FilterTarget(opt Options, store *core.StoreInfo) bool {
return store.IsOverloaded()
}

type stateFilter struct{}
type stateFilter struct{ scope string }

// NewStateFilter creates a Filter that filters all stores that are not UP.
func NewStateFilter() Filter {
return &stateFilter{}
func NewStateFilter(scope string) Filter {
return &stateFilter{scope: scope}
}

func (f *stateFilter) Scope() string {
return f.scope
}

func (f *stateFilter) Type() string {
Expand All @@ -142,11 +139,15 @@ func (f *stateFilter) FilterTarget(opt Options, store *core.StoreInfo) bool {
return !store.IsUp()
}

type healthFilter struct{}
type healthFilter struct{ scope string }

// NewHealthFilter creates a Filter that filters all stores that are Busy or Down.
func NewHealthFilter() Filter {
return &healthFilter{}
func NewHealthFilter(scope string) Filter {
return &healthFilter{scope: scope}
}

func (f *healthFilter) Scope() string {
return f.scope
}

func (f *healthFilter) Type() string {
Expand All @@ -168,31 +169,16 @@ func (f *healthFilter) FilterTarget(opt Options, store *core.StoreInfo) bool {
return f.filter(opt, store)
}

type disconnectFilter struct{}

// NewDisconnectFilter creates a Filter that filters all stores that are disconnected.
func NewDisconnectFilter() Filter {
return &disconnectFilter{}
}

func (f *disconnectFilter) Type() string {
return "disconnect-filter"
}

func (f *disconnectFilter) FilterSource(opt Options, store *core.StoreInfo) bool {
return store.IsDisconnected()
}

func (f *disconnectFilter) FilterTarget(opt Options, store *core.StoreInfo) bool {
return store.IsDisconnected()
}

type pendingPeerCountFilter struct{}
type pendingPeerCountFilter struct{ scope string }

// NewPendingPeerCountFilter creates a Filter that filters all stores that are
// currently handling too many pending peers.
func NewPendingPeerCountFilter() Filter {
return &pendingPeerCountFilter{}
func NewPendingPeerCountFilter(scope string) Filter {
return &pendingPeerCountFilter{scope: scope}
}

func (p *pendingPeerCountFilter) Scope() string {
return p.scope
}

func (p *pendingPeerCountFilter) Type() string {
Expand All @@ -214,12 +200,16 @@ func (p *pendingPeerCountFilter) FilterTarget(opt Options, store *core.StoreInfo
return p.filter(opt, store)
}

type snapshotCountFilter struct{}
type snapshotCountFilter struct{ scope string }

// NewSnapshotCountFilter creates a Filter that filters all stores that are
// currently handling too many snapshots.
func NewSnapshotCountFilter() Filter {
return &snapshotCountFilter{}
func NewSnapshotCountFilter(scope string) Filter {
return &snapshotCountFilter{scope: scope}
}

func (f *snapshotCountFilter) Scope() string {
return f.scope
}

func (f *snapshotCountFilter) Type() string {
Expand All @@ -241,12 +231,17 @@ func (f *snapshotCountFilter) FilterTarget(opt Options, store *core.StoreInfo) b
}

type cacheFilter struct {
scope string
cache *cache.TTLUint64
}

// NewCacheFilter creates a Filter that filters all stores that are in the cache.
func NewCacheFilter(cache *cache.TTLUint64) Filter {
return &cacheFilter{cache: cache}
func NewCacheFilter(scope string, cache *cache.TTLUint64) Filter {
return &cacheFilter{scope: scope, cache: cache}
}

func (f *cacheFilter) Scope() string {
return f.scope
}

func (f *cacheFilter) Type() string {
Expand All @@ -261,12 +256,16 @@ func (f *cacheFilter) FilterTarget(opt Options, store *core.StoreInfo) bool {
return false
}

type storageThresholdFilter struct{}
type storageThresholdFilter struct{ scope string }

// NewStorageThresholdFilter creates a Filter that filters all stores that are
// almost full.
func NewStorageThresholdFilter() Filter {
return &storageThresholdFilter{}
func NewStorageThresholdFilter(scope string) Filter {
return &storageThresholdFilter{scope: scope}
}

func (f *storageThresholdFilter) Scope() string {
return f.scope
}

func (f *storageThresholdFilter) Type() string {
Expand All @@ -283,14 +282,15 @@ func (f *storageThresholdFilter) FilterTarget(opt Options, store *core.StoreInfo

// distinctScoreFilter ensures that distinct score will not decrease.
type distinctScoreFilter struct {
scope string
labels []string
stores []*core.StoreInfo
safeScore float64
}

// NewDistinctScoreFilter creates a filter that filters all stores that have
// lower distinct score than specified store.
func NewDistinctScoreFilter(labels []string, stores []*core.StoreInfo, source *core.StoreInfo) Filter {
func NewDistinctScoreFilter(scope string, labels []string, stores []*core.StoreInfo, source *core.StoreInfo) Filter {
newStores := make([]*core.StoreInfo, 0, len(stores)-1)
for _, s := range stores {
if s.GetID() == source.GetID() {
Expand All @@ -300,12 +300,17 @@ func NewDistinctScoreFilter(labels []string, stores []*core.StoreInfo, source *c
}

return &distinctScoreFilter{
scope: scope,
labels: labels,
stores: newStores,
safeScore: DistinctScore(labels, newStores, source),
}
}

func (f *distinctScoreFilter) Scope() string {
return f.scope
}

func (f *distinctScoreFilter) Type() string {
return "distinct-filter"
}
Expand All @@ -319,19 +324,25 @@ func (f *distinctScoreFilter) FilterTarget(opt Options, store *core.StoreInfo) b
}

type namespaceFilter struct {
scope string
classifier namespace.Classifier
namespace string
}

// NewNamespaceFilter creates a Filter that filters all stores that are not
// belong to a namespace.
func NewNamespaceFilter(classifier namespace.Classifier, namespace string) Filter {
func NewNamespaceFilter(scope string, classifier namespace.Classifier, namespace string) Filter {
return &namespaceFilter{
scope: scope,
classifier: classifier,
namespace: namespace,
}
}

func (f *namespaceFilter) Scope() string {
return f.scope
}

func (f *namespaceFilter) Type() string {
return "namespace-filter"
}
Expand All @@ -348,35 +359,21 @@ func (f *namespaceFilter) FilterTarget(opt Options, store *core.StoreInfo) bool
return f.filter(store)
}

type rejectLeaderFilter struct{}

// NewRejectLeaderFilter creates a Filter that filters stores that marked as
// rejectLeader from being the target of leader transfer.
func NewRejectLeaderFilter() Filter {
return rejectLeaderFilter{}
}

func (f rejectLeaderFilter) Type() string {
return "reject-leader-filter"
}

func (f rejectLeaderFilter) FilterSource(opt Options, store *core.StoreInfo) bool {
return false
}

func (f rejectLeaderFilter) FilterTarget(opt Options, store *core.StoreInfo) bool {
return opt.CheckLabelProperty(RejectLeader, store.GetLabels())
}

// StoreStateFilter is used to determine whether a store can be selected as the
// source or target of the schedule based on the store's state.
type StoreStateFilter struct {
ActionScope string
// Set true if the schedule involves any transfer leader operation.
TransferLeader bool
// Set true if the schedule involves any move region operation.
MoveRegion bool
}

// Scope returns the scheduler or the checker which the filter acts on.
func (f StoreStateFilter) Scope() string {
return f.ActionScope
}

// Type returns the type of the Filter.
func (f StoreStateFilter) Type() string {
return "store-state-filter"
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var _ = Suite(&testFiltersSuite{})
type testFiltersSuite struct{}

func (s *testReplicationSuite) TestPendingPeerFilter(c *C) {
filter := NewPendingPeerCountFilter()
filter := NewPendingPeerCountFilter("")
opt := mockoption.NewScheduleOptions()
tc := mockcluster.NewCluster(opt)
store := core.NewStoreInfo(&metapb.Store{Id: 1})
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
Subsystem: "schedule",
Name: "filter",
Help: "Counter of the filter",
}, []string{"action", "address", "store", "type"})
}, []string{"action", "address", "store", "scope", "type"})

operatorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down
10 changes: 7 additions & 3 deletions server/schedule/namespace_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"go.uber.org/zap"
)

const namespaceCheckerName = "namespace-checker"

// NamespaceChecker ensures region to go to the right place.
type NamespaceChecker struct {
name string
cluster Cluster
filters []Filter
classifier namespace.Classifier
Expand All @@ -31,10 +34,11 @@ type NamespaceChecker struct {
// NewNamespaceChecker creates a namespace checker.
func NewNamespaceChecker(cluster Cluster, classifier namespace.Classifier) *NamespaceChecker {
filters := []Filter{
StoreStateFilter{MoveRegion: true},
StoreStateFilter{ActionScope: namespaceCheckerName, MoveRegion: true},
}

return &NamespaceChecker{
name: namespaceCheckerName,
cluster: cluster,
filters: filters,
classifier: classifier,
Expand Down Expand Up @@ -102,7 +106,7 @@ func (n *NamespaceChecker) SelectBestPeerToRelocate(region *core.RegionInfo, tar
// SelectBestStoreToRelocate randomly returns the store to relocate
func (n *NamespaceChecker) SelectBestStoreToRelocate(region *core.RegionInfo, targets []*core.StoreInfo) uint64 {
selector := NewRandomSelector(n.filters)
target := selector.SelectTarget(n.cluster, targets, NewExcludedFilter(nil, region.GetStoreIds()))
target := selector.SelectTarget(n.cluster, targets, NewExcludedFilter(n.name, nil, region.GetStoreIds()))
if target == nil {
return 0
}
Expand All @@ -120,7 +124,7 @@ func (n *NamespaceChecker) isExists(stores []*core.StoreInfo, storeID uint64) bo

func (n *NamespaceChecker) getNamespaceStores(region *core.RegionInfo) []*core.StoreInfo {
ns := n.classifier.GetRegionNamespace(region)
filteredStores := n.filter(n.cluster.GetStores(), NewNamespaceFilter(n.classifier, ns))
filteredStores := n.filter(n.cluster.GetStores(), NewNamespaceFilter(n.name, n.classifier, ns))

return filteredStores
}
Expand Down
Loading

0 comments on commit c03de9a

Please sign in to comment.