Skip to content

Commit

Permalink
optmize for metris and some code
Browse files Browse the repository at this point in the history
  • Loading branch information
hackersean committed Dec 21, 2022
1 parent ec3410b commit 59b45b8
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 51 deletions.
2 changes: 1 addition & 1 deletion kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type MPPDispatchRequest struct {
type MPPClient interface {
// ConstructMPPTasks schedules task for a plan fragment.
// TODO:: This interface will be refined after we support more executors.
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, bool, time.Duration) ([]MPPTaskMeta, error)
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64) Response
Expand Down
2 changes: 1 addition & 1 deletion metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ var (
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "tiflash_failed_mpp_store",
Name: "tiflash_failed_store",
Help: "Statues of failed tiflash mpp store,-1 means detector heartbeat,0 means reachable,1 means abnormal.",
}, []string{LblAddress})

Expand Down
2 changes: 1 addition & 1 deletion planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic
logutil.BgLogger().Warn("MPP store fail ttl is invalid", zap.Error(err))
ttl = 30 * time.Second
}
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, true, ttl)
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, ttl)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
8 changes: 4 additions & 4 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,15 +335,15 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
s := stores[idx]

// check if store is failed already.
ok := GlobalMPPFailedStoreProbe.IsRecovery(ctx, s.GetAddr(), ttl)
ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl)
if !ok {
return
}

tikvClient := kvStore.GetTiKVClient()
err := detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit)
if err != nil {
GlobalMPPFailedStoreProbe.Add(ctx, s.GetAddr(), tikvClient)
ok = detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit)
if !ok {
GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient)
return
}

Expand Down
6 changes: 3 additions & 3 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta {
}

// ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns.
func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest, isMPP bool, ttl time.Duration) ([]kv.MPPTaskMeta, error) {
func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest, ttl time.Duration) ([]kv.MPPTaskMeta, error) {
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil)
var tasks []*batchCopTask
Expand All @@ -74,13 +74,13 @@ func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasks
rangesForEachPartition[i] = NewKeyRanges(p.KeyRanges)
partitionIDs[i] = p.ID
}
tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store, rangesForEachPartition, kv.TiFlash, isMPP, ttl, true, 20, partitionIDs)
tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store, rangesForEachPartition, kv.TiFlash, true, ttl, true, 20, partitionIDs)
} else {
if req.KeyRanges == nil {
return c.selectAllTiFlashStore(), nil
}
ranges := NewKeyRanges(req.KeyRanges)
tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store, ranges, kv.TiFlash, isMPP, ttl, true, 20)
tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store, ranges, kv.TiFlash, true, ttl, true, 20)
}

if err != nil {
Expand Down
36 changes: 18 additions & 18 deletions store/copr/mpp_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
"go.uber.org/zap"
)

// GlobalMPPFailedStoreProbe mpp failed store probe
var GlobalMPPFailedStoreProbe *MPPFailedStoreProbe
// GlobalMPPFailedStoreProber mpp failed store probe
var GlobalMPPFailedStoreProber *MPPFailedStoreProber

const (
// DetectPeriod detect period
Expand All @@ -41,7 +41,7 @@ const (
// MaxRecoveryTimeLimit wait TiFlash recovery,more than MPPStoreFailTTL
MaxRecoveryTimeLimit = 15 * time.Minute
// MaxObsoletTimeLimit no request for a long time,that might be obsoleted
MaxObsoletTimeLimit = 24 * time.Hour
MaxObsoletTimeLimit = time.Hour
)

// MPPSotreState the state for MPPStore.
Expand All @@ -56,8 +56,8 @@ type MPPSotreState struct {
lastDetectTime time.Time
}

// MPPFailedStoreProbe use for detecting of failed TiFlash instance
type MPPFailedStoreProbe struct {
// MPPFailedStoreProber use for detecting of failed TiFlash instance
type MPPFailedStoreProber struct {
failedMPPStores *sync.Map
lock *sync.Mutex
isStop *atomic.Bool
Expand All @@ -78,8 +78,8 @@ func (t *MPPSotreState) detect(ctx context.Context, detectPeriod time.Duration,

defer func() { t.lastDetectTime = time.Now() }()
metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(0)
err := detectMPPStore(ctx, t.tikvClient, t.address, detectTimeoutLimit)
if err != nil {
ok := detectMPPStore(ctx, t.tikvClient, t.address, detectTimeoutLimit)
if !ok {
metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(1)
t.recoveryTime = time.Time{} // if detect failed,reset recovery time to zero.
return
Expand Down Expand Up @@ -109,7 +109,7 @@ func (t *MPPSotreState) isRecovery(ctx context.Context, recoveryTTL time.Duratio
return false
}

func (t MPPFailedStoreProbe) scan(ctx context.Context) {
func (t MPPFailedStoreProber) scan(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
logutil.Logger(ctx).Warn("mpp failed store probe scan error,will restart", zap.Any("recover", r), zap.Stack("stack"))
Expand Down Expand Up @@ -152,7 +152,7 @@ func (t MPPFailedStoreProbe) scan(ctx context.Context) {
}

// Add add a store when sync probe failed
func (t *MPPFailedStoreProbe) Add(ctx context.Context, address string, tikvClient tikv.Client) {
func (t *MPPFailedStoreProber) Add(ctx context.Context, address string, tikvClient tikv.Client) {
state := MPPSotreState{
address: address,
tikvClient: tikvClient,
Expand All @@ -163,7 +163,7 @@ func (t *MPPFailedStoreProbe) Add(ctx context.Context, address string, tikvClien
}

// IsRecovery check whether the store is recovery
func (t *MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, recoveryTTL time.Duration) bool {
func (t *MPPFailedStoreProber) IsRecovery(ctx context.Context, address string, recoveryTTL time.Duration) bool {
logutil.Logger(ctx).Debug("check failed store recovery",
zap.String("address", address), zap.Duration("ttl", recoveryTTL))
v, ok := t.failedMPPStores.Load(address)
Expand All @@ -185,7 +185,7 @@ func (t *MPPFailedStoreProbe) IsRecovery(ctx context.Context, address string, re

// Run a loop of scan
// there can be only one background task
func (t *MPPFailedStoreProbe) Run() {
func (t *MPPFailedStoreProber) Run() {
if !t.lock.TryLock() {
return
}
Expand All @@ -211,7 +211,7 @@ func (t *MPPFailedStoreProbe) Run() {
}

// Stop stop background goroutine
func (t *MPPFailedStoreProbe) Stop() {
func (t *MPPFailedStoreProber) Stop() {
if !t.isStop.CompareAndSwap(false, true) {
return
}
Expand All @@ -221,16 +221,16 @@ func (t *MPPFailedStoreProbe) Stop() {
}

// Delete clean store from failed map
func (t *MPPFailedStoreProbe) Delete(address string) {
func (t *MPPFailedStoreProber) Delete(address string) {
metrics.TiFlashFailedMPPStoreState.DeleteLabelValues(address)
_, ok := t.failedMPPStores.LoadAndDelete(address)
if !ok {
logutil.BgLogger().Warn("Store is deleted", zap.String("address", address), zap.Any("isok", ok))
logutil.BgLogger().Warn("Store is deleted", zap.String("address", address))
}
}

// MPPStore detect function
func detectMPPStore(ctx context.Context, client tikv.Client, address string, detectTimeoutLimit time.Duration) error {
func detectMPPStore(ctx context.Context, client tikv.Client, address string, detectTimeoutLimit time.Duration) bool {
resp, err := client.SendRequest(ctx, address, &tikvrpc.Request{
Type: tikvrpc.CmdMPPAlive,
StoreTp: tikvrpc.TiFlash,
Expand All @@ -244,16 +244,16 @@ func detectMPPStore(ctx context.Context, client tikv.Client, address string, det
logutil.BgLogger().Warn("Store is not ready",
zap.String("store address", address),
zap.String("err message", err.Error()))
return err
return false
}
return nil
return true
}

func init() {
ctx, cancel := context.WithCancel(context.Background())
isStop := atomic.Bool{}
isStop.Swap(true)
GlobalMPPFailedStoreProbe = &MPPFailedStoreProbe{
GlobalMPPFailedStoreProber = &MPPFailedStoreProber{
failedMPPStores: &sync.Map{},
lock: &sync.Mutex{},
isStop: &isStop,
Expand Down
42 changes: 21 additions & 21 deletions store/copr/mpp_probe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type ProbeTest map[string]*mockDetectClient

func (t ProbeTest) add(ctx context.Context) {
for k, v := range t {
GlobalMPPFailedStoreProbe.Add(ctx, k, v)
GlobalMPPFailedStoreProber.Add(ctx, k, v)
}
}

Expand All @@ -78,16 +78,16 @@ func (t ProbeTest) reSetErrortestype(to string) {

func (t ProbeTest) judge(ctx context.Context, test *testing.T, recoveryTTL time.Duration, need bool) {
for k := range t {
ok := GlobalMPPFailedStoreProbe.IsRecovery(ctx, k, recoveryTTL)
ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, k, recoveryTTL)
require.Equal(test, need, ok)
}
}

func failedStoreSizeJudge(ctx context.Context, test *testing.T, need int) {
var l int
GlobalMPPFailedStoreProbe.scan(ctx)
GlobalMPPFailedStoreProber.scan(ctx)
time.Sleep(time.Second / 10)
GlobalMPPFailedStoreProbe.failedMPPStores.Range(func(k, v interface{}) bool {
GlobalMPPFailedStoreProber.failedMPPStores.Range(func(k, v interface{}) bool {
l++
return true
})
Expand All @@ -99,7 +99,7 @@ func testFlow(ctx context.Context, probetestest ProbeTest, test *testing.T, flow
for _, to := range flow {
probetestest.reSetErrortestype(to)

GlobalMPPFailedStoreProbe.scan(ctx)
GlobalMPPFailedStoreProber.scan(ctx)
time.Sleep(time.Second / 10) //wait detect goroutine finish

var need bool
Expand All @@ -112,15 +112,15 @@ func testFlow(ctx context.Context, probetestest ProbeTest, test *testing.T, flow

lastTo := flow[len(flow)-1]
cleanRecover := func(need int) {
GlobalMPPFailedStoreProbe.maxRecoveryTimeLimit = 0 - time.Second
GlobalMPPFailedStoreProber.maxRecoveryTimeLimit = 0 - time.Second
failedStoreSizeJudge(ctx, test, need)
GlobalMPPFailedStoreProbe.maxRecoveryTimeLimit = MaxRecoveryTimeLimit
GlobalMPPFailedStoreProber.maxRecoveryTimeLimit = MaxRecoveryTimeLimit
}

cleanObsolet := func(need int) {
GlobalMPPFailedStoreProbe.maxObsoletTimeLimit = 0 - time.Second
GlobalMPPFailedStoreProber.maxObsoletTimeLimit = 0 - time.Second
failedStoreSizeJudge(ctx, test, need)
GlobalMPPFailedStoreProbe.maxObsoletTimeLimit = MaxObsoletTimeLimit
GlobalMPPFailedStoreProber.maxObsoletTimeLimit = MaxObsoletTimeLimit
}

if lastTo == Error {
Expand All @@ -137,13 +137,13 @@ func TestMPPFailedStoreProbe(t *testing.T) {

notExistAddress := "not exist address"

GlobalMPPFailedStoreProbe.detectPeriod = 0 - time.Second
GlobalMPPFailedStoreProber.detectPeriod = 0 - time.Second

// check not exist address
ok := GlobalMPPFailedStoreProbe.IsRecovery(ctx, notExistAddress, 0)
ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, notExistAddress, 0)
require.True(t, ok)

GlobalMPPFailedStoreProbe.scan(ctx)
GlobalMPPFailedStoreProber.scan(ctx)

probetestest := map[string]*mockDetectClient{
testimeout: {errortestype: testimeout},
Expand All @@ -158,20 +158,20 @@ func TestMPPFailedStoreProbe(t *testing.T) {

func TestMPPFailedStoreProbeGoroutineTask(t *testing.T) {
// Confirm that multiple tasks are not allowed
GlobalMPPFailedStoreProbe.lock.Lock()
GlobalMPPFailedStoreProbe.Run()
GlobalMPPFailedStoreProbe.lock.Unlock()
GlobalMPPFailedStoreProber.lock.Lock()
GlobalMPPFailedStoreProber.Run()
GlobalMPPFailedStoreProber.lock.Unlock()

GlobalMPPFailedStoreProbe.Run()
GlobalMPPFailedStoreProbe.Stop()
GlobalMPPFailedStoreProber.Run()
GlobalMPPFailedStoreProber.Stop()
}

func TestMPPFailedStoreAssertFailed(t *testing.T) {
ctx := context.Background()

GlobalMPPFailedStoreProbe.failedMPPStores.Store("errorinfo", nil)
GlobalMPPFailedStoreProbe.scan(ctx)
GlobalMPPFailedStoreProber.failedMPPStores.Store("errorinfo", nil)
GlobalMPPFailedStoreProber.scan(ctx)

GlobalMPPFailedStoreProbe.failedMPPStores.Store("errorinfo", nil)
GlobalMPPFailedStoreProbe.IsRecovery(ctx, "errorinfo", 0)
GlobalMPPFailedStoreProber.failedMPPStores.Store("errorinfo", nil)
GlobalMPPFailedStoreProber.IsRecovery(ctx, "errorinfo", 0)
}
4 changes: 2 additions & 2 deletions tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func createStoreAndDomain() (kv.Storage, *domain.Domain) {
var err error
storage, err := kvstore.New(fullPath)
terror.MustNil(err)
copr.GlobalMPPFailedStoreProbe.Run()
copr.GlobalMPPFailedStoreProber.Run()
err = infosync.CheckTiKVVersion(storage, *semver.New(versioninfo.TiKVMinVersion))
terror.MustNil(err)
// Bootstrap a session to load information schema.
Expand Down Expand Up @@ -816,7 +816,7 @@ func setupTracing() {
func closeDomainAndStorage(storage kv.Storage, dom *domain.Domain) {
tikv.StoreShuttingDown(1)
dom.Close()
copr.GlobalMPPFailedStoreProbe.Stop()
copr.GlobalMPPFailedStoreProber.Stop()
err := storage.Close()
terror.Log(errors.Trace(err))
}
Expand Down

0 comments on commit 59b45b8

Please sign in to comment.