Skip to content

Commit

Permalink
*: optimize mpp probe (#39932)
Browse files Browse the repository at this point in the history
close #39686
  • Loading branch information
hackersean authored Dec 22, 2022
1 parent 0f4bd73 commit aeccf77
Show file tree
Hide file tree
Showing 17 changed files with 496 additions and 93 deletions.
3 changes: 1 addition & 2 deletions kv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package kv

import (
"context"
"sync"
"time"

"github.com/pingcap/kvproto/pkg/mpp"
Expand Down Expand Up @@ -81,7 +80,7 @@ type MPPDispatchRequest struct {
type MPPClient interface {
// ConstructMPPTasks schedules task for a plan fragment.
// TODO:: This interface will be refined after we support more executors.
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, *sync.Map, time.Duration) ([]MPPTaskMeta, error)
ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration) ([]MPPTaskMeta, error)

// DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data.
DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64) Response
Expand Down
2 changes: 2 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func RegisterMetrics() {
prometheus.MustRegister(TokenGauge)
prometheus.MustRegister(ConfigStatus)
prometheus.MustRegister(TiFlashQueryTotalCounter)
prometheus.MustRegister(TiFlashFailedMPPStoreState)
prometheus.MustRegister(SmallTxnWriteDuration)
prometheus.MustRegister(TxnWriteThroughput)
prometheus.MustRegister(LoadSysVarCacheCounter)
Expand Down Expand Up @@ -238,6 +239,7 @@ func ToggleSimplifiedMode(simplified bool) {
InfoCacheCounters,
ReadFromTableCacheCounter,
TiFlashQueryTotalCounter,
TiFlashFailedMPPStoreState,
CampaignOwnerCounter,
NonTransactionalDMLCount,
MemoryUsage,
Expand Down
8 changes: 8 additions & 0 deletions metrics/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ var (
Help: "Counter of TiFlash queries.",
}, []string{LblType, LblResult})

TiFlashFailedMPPStoreState = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "server",
Name: "tiflash_failed_store",
Help: "Statues of failed tiflash mpp store,-1 means detector heartbeat,0 means reachable,1 means abnormal.",
}, []string{LblAddress})

PDAPIExecutionHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Expand Down
2 changes: 1 addition & 1 deletion planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic
logutil.BgLogger().Warn("MPP store fail ttl is invalid", zap.Error(err))
ttl = 30 * time.Second
}
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, e.ctx.GetSessionVars().MPPStoreLastFailTime, ttl)
metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, ttl)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
3 changes: 0 additions & 3 deletions sessionctx/sessionstates/session_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package sessionstates

import (
"time"

"github.com/pingcap/tidb/errno"
ptypes "github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/sessionctx/stmtctx"
Expand Down Expand Up @@ -79,7 +77,6 @@ type SessionStates struct {
FoundInPlanCache bool `json:"in-plan-cache,omitempty"`
FoundInBinding bool `json:"in-binding,omitempty"`
SequenceLatestValues map[int64]int64 `json:"seq-values,omitempty"`
MPPStoreLastFailTime map[string]time.Time `json:"store-fail-time,omitempty"`
LastAffectedRows int64 `json:"affected-rows,omitempty"`
LastInsertID uint64 `json:"last-insert-id,omitempty"`
Warnings []stmtctx.SQLWarn `json:"warnings,omitempty"`
Expand Down
19 changes: 0 additions & 19 deletions sessionctx/sessionstates/session_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
Expand Down Expand Up @@ -378,23 +376,6 @@ func TestSessionCtx(t *testing.T) {
tk.MustQuery("select nextval(test.s)").Check(testkit.Rows("2"))
},
},
{
// check MPPStoreLastFailTime
setFunc: func(tk *testkit.TestKit) any {
m := sync.Map{}
m.Store("store1", time.Now())
tk.Session().GetSessionVars().MPPStoreLastFailTime = &m
return tk.Session().GetSessionVars().MPPStoreLastFailTime
},
checkFunc: func(tk *testkit.TestKit, param any) {
failTime := tk.Session().GetSessionVars().MPPStoreLastFailTime
tm, ok := failTime.Load("store1")
require.True(t, ok)
v, ok := (param.(*sync.Map)).Load("store1")
require.True(t, ok)
require.True(t, tm.(time.Time).Equal(v.(time.Time)))
},
},
{
// check FoundInPlanCache
setFunc: func(tk *testkit.TestKit) any {
Expand Down
13 changes: 0 additions & 13 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,9 +1171,6 @@ type SessionVars struct {
// TemporaryTableData stores committed kv values for temporary table for current session.
TemporaryTableData TemporaryTableData

// MPPStoreLastFailTime records the lastest fail time that a TiFlash store failed. It maps store address(string) to fail time(time.Time).
MPPStoreLastFailTime *sync.Map

// MPPStoreFailTTL indicates the duration that protect TiDB from sending task to a new recovered TiFlash.
MPPStoreFailTTL string

Expand Down Expand Up @@ -1713,7 +1710,6 @@ func NewSessionVars(hctx HookContext) *SessionVars {
AllowFallbackToTiKV: make(map[kv.StoreType]struct{}),
CTEMaxRecursionDepth: DefCTEMaxRecursionDepth,
TMPTableSize: DefTiDBTmpTableMaxSize,
MPPStoreLastFailTime: new(sync.Map),
MPPStoreFailTTL: DefTiDBMPPStoreFailTTL,
Rng: mathutil.NewWithTime(),
StatsLoadSyncWait: StatsLoadSyncWait.Load(),
Expand Down Expand Up @@ -2336,12 +2332,6 @@ func (s *SessionVars) EncodeSessionStates(ctx context.Context, sessionStates *se
}
sessionStates.LastFoundRows = s.LastFoundRows
sessionStates.SequenceLatestValues = s.SequenceState.GetAllStates()
sessionStates.MPPStoreLastFailTime = make(map[string]time.Time, 0)
s.MPPStoreLastFailTime.Range(
func(key, value interface{}) bool {
sessionStates.MPPStoreLastFailTime[key.(string)] = value.(time.Time)
return true
})
sessionStates.FoundInPlanCache = s.PrevFoundInPlanCache
sessionStates.FoundInBinding = s.PrevFoundInBinding

Expand Down Expand Up @@ -2377,9 +2367,6 @@ func (s *SessionVars) DecodeSessionStates(ctx context.Context, sessionStates *se
}
s.LastFoundRows = sessionStates.LastFoundRows
s.SequenceState.SetAllStates(sessionStates.SequenceLatestValues)
for k, v := range sessionStates.MPPStoreLastFailTime {
s.MPPStoreLastFailTime.Store(k, v)
}
s.FoundInPlanCache = sessionStates.FoundInPlanCache
s.FoundInBinding = sessionStates.FoundInBinding

Expand Down
5 changes: 5 additions & 0 deletions store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"coprocessor_cache.go",
"key_ranges.go",
"mpp.go",
"mpp_probe.go",
"region_cache.go",
"store.go",
],
Expand Down Expand Up @@ -66,6 +67,7 @@ go_test(
"coprocessor_test.go",
"key_ranges_test.go",
"main_test.go",
"mpp_probe_test.go",
],
embed = [":copr"],
flaky = True,
Expand All @@ -75,12 +77,15 @@ go_test(
"//store/driver/backoff",
"//testkit/testsetup",
"//util/paging",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_stathat_consistent//:consistent",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@org_uber_go_goleak//:goleak",
],
)
71 changes: 22 additions & 49 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/log"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -295,12 +294,11 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca
//
// The second balance strategy: Not only consider the region count between TiFlash stores, but also try to make the regions' range continuous(stored in TiFlash closely).
// If balanceWithContinuity is true, the second balance strategy is enable.
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask {
if len(originalTasks) == 0 {
log.Info("Batch cop task balancer got an empty task set.")
return originalTasks
}
isMPP := mppStoreLastFailTime != nil
// for mpp, we still need to detect the store availability
if len(originalTasks) <= 1 && !isMPP {
return originalTasks
Expand Down Expand Up @@ -331,45 +329,21 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []
var wg sync.WaitGroup
var mu sync.Mutex
wg.Add(len(stores))
cur := time.Now()
for i := range stores {
go func(idx int) {
defer wg.Done()
s := stores[idx]

var lastAny any
var ok bool
mu.Lock()
if lastAny, ok = mppStoreLastFailTime.Load(s.GetAddr()); ok && cur.Sub(lastAny.(time.Time)) < 100*time.Millisecond {
// The interval time is so short that may happen in a same query, so we needn't to check again.
mu.Unlock()
return
} else if !ok {
lastAny = time.Time{}
}
mu.Unlock()

resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), &tikvrpc.Request{
Type: tikvrpc.CmdMPPAlive,
StoreTp: tikvrpc.TiFlash,
Req: &mpp.IsAliveRequest{},
Context: kvrpcpb.Context{},
}, 2*time.Second)

if err != nil || !resp.Resp.(*mpp.IsAliveResponse).Available {
errMsg := "store not ready to serve"
if err != nil {
errMsg = err.Error()
}
logutil.BgLogger().Warn("Store is not ready", zap.String("store address", s.GetAddr()), zap.String("err message", errMsg))
mu.Lock()
mppStoreLastFailTime.Store(s.GetAddr(), time.Now())
mu.Unlock()
// check if store is failed already.
ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl)
if !ok {
return
}

if cur.Sub(lastAny.(time.Time)) < ttl {
logutil.BgLogger().Warn("Cannot detect store's availability because the current time has not reached MPPStoreLastFailTime + MPPStoreFailTTL", zap.String("store address", s.GetAddr()), zap.Time("last fail time", lastAny.(time.Time)))
tikvClient := kvStore.GetTiKVClient()
ok = detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit)
if !ok {
GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient)
return
}

Expand Down Expand Up @@ -534,29 +508,29 @@ func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer,
store *kvStore,
ranges *KeyRanges,
storeType kv.StoreType,
mppStoreLastFailTime *sync.Map,
isMPP bool,
ttl time.Duration,
balanceWithContinuity bool,
balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
if config.GetGlobalConfig().DisaggregatedTiFlash {
return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount)
return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
}
return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount)
return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
}

func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer,
store *kvStore,
rangesForEachPhysicalTable []*KeyRanges,
storeType kv.StoreType,
mppStoreLastFailTime *sync.Map,
isMPP bool,
ttl time.Duration,
balanceWithContinuity bool,
balanceContinuousRegionCount int64,
partitionIDs []int64) (batchTasks []*batchCopTask, err error) {
if config.GetGlobalConfig().DisaggregatedTiFlash {
batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount)
batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
} else {
batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount)
batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
}
if err != nil {
return nil, err
Expand All @@ -566,8 +540,8 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer,
return batchTasks, nil
}

func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount)
func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -614,7 +588,7 @@ func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, ran
// When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan.
// At this time, `len(rangesForEachPhysicalTable) == len(partitionIDs)` and `rangesForEachPhysicalTable[i]` is for partition `partitionIDs[i]`.
// Otherwise, `rangesForEachPhysicalTable[0]` indicates the range for the single physical table.
func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) {
cache := store.GetRegionCache()
start := time.Now()
const cmdType = tikvrpc.CmdBatchCop
Expand Down Expand Up @@ -644,7 +618,6 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach

storeTaskMap := make(map[string]*batchCopTask)
needRetry := false
isMPP := mppStoreLastFailTime != nil
for _, task := range tasks {
rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP)
if err != nil {
Expand Down Expand Up @@ -696,7 +669,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach
logutil.BgLogger().Debug(msg)
}
balanceStart := time.Now()
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount)
batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount)
balanceElapsed := time.Since(balanceStart)
if log.GetLevel() <= zap.DebugLevel {
msg := "After region balance:"
Expand Down Expand Up @@ -762,11 +735,11 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V
keyRanges = append(keyRanges, NewKeyRanges(pi.KeyRanges))
partitionIDs = append(partitionIDs, pi.ID)
}
tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store.kvStore, keyRanges, req.StoreType, nil, 0, false, 0, partitionIDs)
tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store.kvStore, keyRanges, req.StoreType, false, 0, false, 0, partitionIDs)
} else {
// TODO: merge the if branch.
ranges := NewKeyRanges(req.KeyRanges.FirstPartitionRange())
tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store.kvStore, ranges, req.StoreType, nil, 0, false, 0)
tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store.kvStore, ranges, req.StoreType, false, 0, false, 0)
}

if err != nil {
Expand Down Expand Up @@ -913,7 +886,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba
ranges = append(ranges, *ran)
})
}
ret, err := buildBatchCopTasksForNonPartitionedTable(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, nil, 0, false, 0)
ret, err := buildBatchCopTasksForNonPartitionedTable(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, false, 0, false, 0)
return ret, err
}
// Retry Partition Table Scan
Expand All @@ -932,7 +905,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba
}
keyRanges = append(keyRanges, NewKeyRanges(ranges))
}
ret, err := buildBatchCopTasksForPartitionedTable(bo, b.store, keyRanges, b.req.StoreType, nil, 0, false, 0, pid)
ret, err := buildBatchCopTasksForPartitionedTable(bo, b.store, keyRanges, b.req.StoreType, false, 0, false, 0, pid)
return ret, err
}

Expand Down
4 changes: 2 additions & 2 deletions store/copr/batch_coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ func TestBalanceBatchCopTaskWithContinuity(t *testing.T) {
func TestBalanceBatchCopTaskWithEmptyTaskSet(t *testing.T) {
{
var nilTaskSet []*batchCopTask
nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, nil, time.Second, false, 0)
nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, false, time.Second, false, 0)
require.True(t, nilResult == nil)
}

{
emptyTaskSet := make([]*batchCopTask, 0)
emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, nil, time.Second, false, 0)
emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, false, time.Second, false, 0)
require.True(t, emptyResult != nil)
require.True(t, len(emptyResult) == 0)
}
Expand Down
Loading

0 comments on commit aeccf77

Please sign in to comment.