Skip to content

Commit

Permalink
Merge branch 'feature/ring' of github.com:bufferflies/pd into feature…
Browse files Browse the repository at this point in the history
…/ring
  • Loading branch information
bufferflies committed May 16, 2022
2 parents 8aea8e5 + bbaa78e commit ef25fc9
Show file tree
Hide file tree
Showing 22 changed files with 169 additions and 31 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
steps:
- uses: actions/setup-go@v2
with:
go-version: 1.17
go-version: 1.18
- name: Checkout code
uses: actions/checkout@v2
- name: Restore cache
Expand All @@ -18,7 +18,6 @@ jobs:
path: |
~/go/pkg/mod
~/.cache/go-build
**/.tools
**/.dashboard_download_cache
key: ${{ runner.os }}-golang-${{ hashFiles('**/go.sum') }}
restore-keys: |
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ SHELL := env PATH='$(PATH)' GOBIN='$(GO_TOOLS_BIN_PATH)' $(shell which bash)

install-tools:
@mkdir -p $(GO_TOOLS_BIN_PATH)
@which golangci-lint >/dev/null 2>&1 || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GO_TOOLS_BIN_PATH) v1.43.0
@which golangci-lint >/dev/null 2>&1 || curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GO_TOOLS_BIN_PATH) v1.46.0
@grep '_' tools.go | sed 's/"//g' | awk '{print $$2}' | xargs go install

.PHONY: install-tools
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ require (
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/goleak v1.1.12
go.uber.org/zap v1.19.1
golang.org/x/text v0.3.3
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65
golang.org/x/tools v0.1.5
google.golang.org/grpc v1.26.0
Expand Down
89 changes: 89 additions & 0 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -3277,6 +3277,95 @@
"align": false,
"alignLevel": null
}
},{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "99% operator region size",
"fill": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 42
},
"id": 1455,
"legend": {
"alignAsTable": true,
"avg": true,
"current": true,
"hideEmpty": true,
"hideZero": true,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 2,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "histogram_quantile(0.99, sum(rate(pd_schedule_operator_region_size_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\"}[5m])) by (type,le))",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{type,le}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "99% operator region size",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "decmbytes",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"repeat": null,
Expand Down
2 changes: 1 addition & 1 deletion pkg/autoscaling/calculation.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func filterTiKVInstances(informer core.StoreSetInformer) []instance {
var instances []instance
stores := informer.GetStores()
for _, store := range stores {
if store.IsPreparing() || store.IsServing() {
if store.IsUp() {
instances = append(instances, instance{id: store.GetID(), address: store.GetAddress()})
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/autoscaling/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (prom *PrometheusQuerier) queryMetricsFromPrometheus(query string, timestam
return nil, errs.ErrPrometheusQuery.Wrap(err).FastGenWithCause()
}

if warnings != nil && len(warnings) > 0 {
if len(warnings) > 0 {
log.Warn("prometheus query returns with warnings", zap.Strings("warnings", warnings))
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func (mc *Cluster) newMockRegionInfo(regionID uint64, leaderStoreID uint64, othe
var followerStoreIDs []uint64
var learnerStoreIDs []uint64
for _, storeID := range otherPeerStoreIDs {
if store := mc.GetStore(storeID); store != nil && core.IsStoreContainLabel(store.GetMeta(), core.EngineKey, core.EngineTiFlash) {
if store := mc.GetStore(storeID); store != nil && store.IsTiFlash() {
learnerStoreIDs = append(learnerStoreIDs, storeID)
} else {
followerStoreIDs = append(followerStoreIDs, storeID)
Expand Down
3 changes: 1 addition & 2 deletions server/api/hot_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"strconv"

"github.com/tikv/pd/server"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/server/storage"
"github.com/unrolled/render"
Expand Down Expand Up @@ -154,7 +153,7 @@ func (h *hotStatusHandler) GetHotStores(w http.ResponseWriter, r *http.Request)
for _, store := range stores {
id := store.GetID()
if loads, ok := storesLoads[id]; ok {
if core.IsStoreContainLabel(store.GetMeta(), core.EngineKey, core.EngineTiFlash) {
if store.IsTiFlash() {
stats.BytesWriteStats[id] = loads[statistics.StoreRegionsWriteBytes]
stats.KeysWriteStats[id] = loads[statistics.StoreRegionsWriteKeys]
} else {
Expand Down
8 changes: 4 additions & 4 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,7 +1233,7 @@ func (c *RaftCluster) getEvictLeaderStores() (evictStores []uint64) {
func (c *RaftCluster) getUpStores() []uint64 {
upStores := make([]uint64, 0)
for _, store := range c.GetStores() {
if store.IsPreparing() || store.IsServing() {
if store.IsUp() {
upStores = append(upStores, store.GetID())
}
}
Expand All @@ -1256,7 +1256,7 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error {
return nil
}

if store.IsPreparing() || store.IsServing() {
if store.IsUp() {
if !forceBury {
return errs.ErrStoreIsUp.FastGenByArgs()
} else if !store.IsDisconnected() {
Expand Down Expand Up @@ -1324,7 +1324,7 @@ func (c *RaftCluster) UpStore(storeID uint64) error {
return errs.ErrStoreDestroyed.FastGenByArgs(storeID)
}

if store.IsPreparing() || store.IsServing() {
if store.IsUp() {
return nil
}

Expand Down Expand Up @@ -1446,7 +1446,7 @@ func (c *RaftCluster) checkStores() {
}
}

if store.IsPreparing() || store.IsServing() {
if store.IsUp() {
if !store.IsLowSpace(c.opt.GetLowSpaceRatio()) {
upStoreCount++
}
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (s *testClusterInfoSuite) TestSetOfflineStore(c *C) {
// test bury store
for storeID := uint64(0); storeID <= 4; storeID++ {
store := cluster.GetStore(storeID)
if store == nil || store.IsPreparing() || store.IsServing() {
if store == nil || store.IsUp() {
c.Assert(cluster.BuryStore(storeID, false), NotNil)
} else {
c.Assert(cluster.BuryStore(storeID, false), IsNil)
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (u *unsafeRecoveryController) RemoveFailedStores(failedStores map[uint64]st
}
u.failedStores = failedStores
for _, s := range u.cluster.GetStores() {
if s.IsRemoved() || s.IsPhysicallyDestroyed() || core.IsStoreContainLabel(s.GetMeta(), core.EngineKey, core.EngineTiFlash) {
if s.IsRemoved() || s.IsPhysicallyDestroyed() || s.IsTiFlash() {
continue
}
if _, exists := failedStores[s.GetID()]; exists {
Expand Down
12 changes: 11 additions & 1 deletion server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ func (s *StoreInfo) IsAvailable(limitType storelimit.Type) bool {
return true
}

// IsTiFlash returns true if the store is tiflash.
func (s *StoreInfo) IsTiFlash() bool {
return IsStoreContainLabel(s.GetMeta(), EngineKey, EngineTiFlash)
}

// IsUp returns true if store is serving or preparing.
func (s *StoreInfo) IsUp() bool {
return s.IsServing() || s.IsPreparing()
}

// IsPreparing checks if the store's state is preparing.
func (s *StoreInfo) IsPreparing() bool {
return s.GetNodeState() == metapb.NodeState_Preparing
Expand Down Expand Up @@ -742,5 +752,5 @@ func IsStoreContainLabel(store *metapb.Store, key, value string) bool {
func IsAvailableForMinResolvedTS(s *StoreInfo) bool {
// If a store is tombstone or no leader, it is not meaningful for min resolved ts.
// And we will skip tiflash, because it does not report min resolved ts.
return !s.IsRemoved() && !IsStoreContainLabel(s.GetMeta(), EngineKey, EngineTiFlash) && s.GetLeaderCount() != 0
return !s.IsRemoved() && !s.IsTiFlash() && s.GetLeaderCount() != 0
}
2 changes: 2 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/tsoutil"
"github.com/tikv/pd/server/cluster"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/storage/endpoint"
"github.com/tikv/pd/server/storage/kv"
"github.com/tikv/pd/server/tso"
Expand Down Expand Up @@ -1646,6 +1647,7 @@ func scatterRegions(cluster *cluster.RaftCluster, regionsID []uint64, group stri
return 0, err
}
for _, op := range ops {
op.AttachKind(operator.OpAdmin)
if ok := cluster.GetOperatorController().AddOperator(op); !ok {
failures[op.RegionID()] = fmt.Errorf("region %v failed to add operator", op.RegionID())
}
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Ope
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return nil
}
if store.IsPreparing() || store.IsServing() {
if store.IsUp() {
continue
}

Expand Down
39 changes: 28 additions & 11 deletions server/schedule/labeler/labeler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/storage"
"github.com/tikv/pd/server/storage/endpoint"
Expand Down Expand Up @@ -273,7 +274,7 @@ func (s *testLabelerSuite) TestLabelerRuleTTL(c *C) {
{
ID: "rule2",
Labels: []RegionLabel{
{Key: "k2", Value: "v2", TTL: "5ms"}, // would expired first.},
{Key: "k2", Value: "v2", TTL: "1s"}, // would expire first.
},
RuleType: "key-range",

Expand All @@ -297,22 +298,38 @@ func (s *testLabelerSuite) TestLabelerRuleTTL(c *C) {
err := s.labeler.SetLabelRule(r)
c.Assert(err, IsNil)
}
// get rule with "rule2".
c.Assert(s.labeler.GetLabelRule("rule2"), NotNil)
c.Assert(failpoint.Enable("github.com/tikv/pd/server/schedule/labeler/regionLabelExpireSub1Minute", "return(true)"), IsNil)

// get rule with "rule2" and wait until it expired.
for s.labeler.GetLabelRule("rule2") != nil {
labels := s.labeler.GetRegionLabels(region)
if len(labels) == 2 {
break
}
time.Sleep(time.Millisecond * 5)
}
c.Assert(s.labeler.GetRegionLabel(region, "k2"), Equals, "")
// rule2 should be timeout first.
// rule2 should expire and only 2 labels left.
labels := s.labeler.GetRegionLabels(region)
c.Assert(labels, HasLen, 2)

c.Assert(failpoint.Disable("github.com/tikv/pd/server/schedule/labeler/regionLabelExpireSub1Minute"), IsNil)
// rule2 should be exist since `GetRegionLabels` won't clear it physically.
s.checkRuleInMemoryAndStoage(c, "rule2", true)
c.Assert(s.labeler.GetLabelRule("rule2"), IsNil)
// rule2 should be physically clear.
s.checkRuleInMemoryAndStoage(c, "rule2", false)

c.Assert(s.labeler.GetRegionLabel(region, "k2"), Equals, "")

c.Assert(s.labeler.GetLabelRule("rule3"), NotNil)
c.Assert(s.labeler.GetLabelRule("rule1"), NotNil)
}

func (s *testLabelerSuite) checkRuleInMemoryAndStoage(c *C, ruleID string, exist bool) {
c.Assert(s.labeler.labelRules[ruleID] != nil, Equals, exist)
existInStorage := false
s.labeler.storage.LoadRegionRules(func(k, v string) {
if k == ruleID {
existInStorage = true
}
})
c.Assert(existInStorage, Equals, exist)
}

func (s *testLabelerSuite) TestGC(c *C) {
// set gcInterval to 1 hour.
store := storage.NewStorageWithMemoryBackend()
Expand Down
6 changes: 6 additions & 0 deletions server/schedule/labeler/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"go.uber.org/zap"
Expand Down Expand Up @@ -74,6 +75,11 @@ type LabelRulePatch struct {
}

func (l *RegionLabel) expireBefore(t time.Time) bool {
failpoint.Inject("regionLabelExpireSub1Minute", func() {
if l.expire != nil {
*l.expire = l.expire.Add(-time.Minute)
}
})
if l.expire == nil {
return false
}
Expand Down
10 changes: 10 additions & 0 deletions server/schedule/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ var (
Buckets: []float64{0.5, 1, 2, 4, 8, 16, 20, 40, 60, 90, 120, 180, 240, 300, 480, 600, 720, 900, 1200, 1800, 3600},
}, []string{"type"})

operatorSizeHist = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "operator_region_size",
Help: "Bucketed histogram of the operator region size.",
Buckets: prometheus.ExponentialBuckets(1, 2, 20), // 1MB~1TB
}, []string{"type"})

operatorWaitCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Expand Down Expand Up @@ -84,4 +93,5 @@ func init() {
prometheus.MustRegister(operatorWaitCounter)
prometheus.MustRegister(scatterCounter)
prometheus.MustRegister(scatterDistributionCounter)
prometheus.MustRegister(operatorSizeHist)
}
1 change: 1 addition & 0 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool {
}
oc.operators[regionID] = op
operatorCounter.WithLabelValues(op.Desc(), "start").Inc()
operatorSizeHist.WithLabelValues(op.Desc()).Observe(float64(op.ApproximateSize))
operatorWaitDuration.WithLabelValues(op.Desc()).Observe(op.ElapsedTime().Seconds())
opInfluence := NewTotalOpInfluence([]*operator.Operator{op}, oc.cluster)
for storeID := range opInfluence.StoresInfluence {
Expand Down
Loading

0 comments on commit ef25fc9

Please sign in to comment.