diff --git a/kv/mpp.go b/kv/mpp.go index b0752f8186deb..2e398af595650 100644 --- a/kv/mpp.go +++ b/kv/mpp.go @@ -16,7 +16,6 @@ package kv import ( "context" - "sync" "time" "github.com/pingcap/kvproto/pkg/mpp" @@ -81,7 +80,7 @@ type MPPDispatchRequest struct { type MPPClient interface { // ConstructMPPTasks schedules task for a plan fragment. // TODO:: This interface will be refined after we support more executors. - ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, *sync.Map, time.Duration) ([]MPPTaskMeta, error) + ConstructMPPTasks(context.Context, *MPPBuildTasksRequest, time.Duration) ([]MPPTaskMeta, error) // DispatchMPPTasks dispatches ALL mpp requests at once, and returns an iterator that transfers the data. DispatchMPPTasks(ctx context.Context, vars interface{}, reqs []*MPPDispatchRequest, needTriggerFallback bool, startTs uint64) Response diff --git a/metrics/metrics.go b/metrics/metrics.go index 0cca442e5eb83..f609caaa124cd 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -182,6 +182,7 @@ func RegisterMetrics() { prometheus.MustRegister(TokenGauge) prometheus.MustRegister(ConfigStatus) prometheus.MustRegister(TiFlashQueryTotalCounter) + prometheus.MustRegister(TiFlashFailedMPPStoreState) prometheus.MustRegister(SmallTxnWriteDuration) prometheus.MustRegister(TxnWriteThroughput) prometheus.MustRegister(LoadSysVarCacheCounter) @@ -238,6 +239,7 @@ func ToggleSimplifiedMode(simplified bool) { InfoCacheCounters, ReadFromTableCacheCounter, TiFlashQueryTotalCounter, + TiFlashFailedMPPStoreState, CampaignOwnerCounter, NonTransactionalDMLCount, MemoryUsage, diff --git a/metrics/server.go b/metrics/server.go index 116b02eb122b6..6bd34c4a8b903 100644 --- a/metrics/server.go +++ b/metrics/server.go @@ -279,6 +279,14 @@ var ( Help: "Counter of TiFlash queries.", }, []string{LblType, LblResult}) + TiFlashFailedMPPStoreState = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "tidb", + Subsystem: "server", + Name: "tiflash_failed_store", + Help: "Statues of failed tiflash mpp store,-1 means detector heartbeat,0 means reachable,1 means abnormal.", + }, []string{LblAddress}) + PDAPIExecutionHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "tidb", diff --git a/planner/core/fragment.go b/planner/core/fragment.go index c6aec17f21e6d..917f4392d9f9e 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -378,7 +378,7 @@ func (e *mppTaskGenerator) constructMPPTasksImpl(ctx context.Context, ts *Physic logutil.BgLogger().Warn("MPP store fail ttl is invalid", zap.Error(err)) ttl = 30 * time.Second } - metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, e.ctx.GetSessionVars().MPPStoreLastFailTime, ttl) + metas, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req, ttl) if err != nil { return nil, errors.Trace(err) } diff --git a/sessionctx/sessionstates/session_states.go b/sessionctx/sessionstates/session_states.go index 36ea0b22455d7..c9e1652a9c1df 100644 --- a/sessionctx/sessionstates/session_states.go +++ b/sessionctx/sessionstates/session_states.go @@ -15,8 +15,6 @@ package sessionstates import ( - "time" - "github.com/pingcap/tidb/errno" ptypes "github.com/pingcap/tidb/parser/types" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -79,7 +77,6 @@ type SessionStates struct { FoundInPlanCache bool `json:"in-plan-cache,omitempty"` FoundInBinding bool `json:"in-binding,omitempty"` SequenceLatestValues map[int64]int64 `json:"seq-values,omitempty"` - MPPStoreLastFailTime map[string]time.Time `json:"store-fail-time,omitempty"` LastAffectedRows int64 `json:"affected-rows,omitempty"` LastInsertID uint64 `json:"last-insert-id,omitempty"` Warnings []stmtctx.SQLWarn `json:"warnings,omitempty"` diff --git a/sessionctx/sessionstates/session_states_test.go b/sessionctx/sessionstates/session_states_test.go index 1eef55ed20bcf..21de8d53727d6 100644 --- a/sessionctx/sessionstates/session_states_test.go +++ b/sessionctx/sessionstates/session_states_test.go @@ -20,9 +20,7 @@ import ( "fmt" "strconv" "strings" - "sync" "testing" - "time" "github.com/pingcap/errors" "github.com/pingcap/tidb/config" @@ -378,23 +376,6 @@ func TestSessionCtx(t *testing.T) { tk.MustQuery("select nextval(test.s)").Check(testkit.Rows("2")) }, }, - { - // check MPPStoreLastFailTime - setFunc: func(tk *testkit.TestKit) any { - m := sync.Map{} - m.Store("store1", time.Now()) - tk.Session().GetSessionVars().MPPStoreLastFailTime = &m - return tk.Session().GetSessionVars().MPPStoreLastFailTime - }, - checkFunc: func(tk *testkit.TestKit, param any) { - failTime := tk.Session().GetSessionVars().MPPStoreLastFailTime - tm, ok := failTime.Load("store1") - require.True(t, ok) - v, ok := (param.(*sync.Map)).Load("store1") - require.True(t, ok) - require.True(t, tm.(time.Time).Equal(v.(time.Time))) - }, - }, { // check FoundInPlanCache setFunc: func(tk *testkit.TestKit) any { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 695d0ad48a5a5..d3d30d9c7bcc5 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1171,9 +1171,6 @@ type SessionVars struct { // TemporaryTableData stores committed kv values for temporary table for current session. TemporaryTableData TemporaryTableData - // MPPStoreLastFailTime records the lastest fail time that a TiFlash store failed. It maps store address(string) to fail time(time.Time). - MPPStoreLastFailTime *sync.Map - // MPPStoreFailTTL indicates the duration that protect TiDB from sending task to a new recovered TiFlash. MPPStoreFailTTL string @@ -1713,7 +1710,6 @@ func NewSessionVars(hctx HookContext) *SessionVars { AllowFallbackToTiKV: make(map[kv.StoreType]struct{}), CTEMaxRecursionDepth: DefCTEMaxRecursionDepth, TMPTableSize: DefTiDBTmpTableMaxSize, - MPPStoreLastFailTime: new(sync.Map), MPPStoreFailTTL: DefTiDBMPPStoreFailTTL, Rng: mathutil.NewWithTime(), StatsLoadSyncWait: StatsLoadSyncWait.Load(), @@ -2336,12 +2332,6 @@ func (s *SessionVars) EncodeSessionStates(ctx context.Context, sessionStates *se } sessionStates.LastFoundRows = s.LastFoundRows sessionStates.SequenceLatestValues = s.SequenceState.GetAllStates() - sessionStates.MPPStoreLastFailTime = make(map[string]time.Time, 0) - s.MPPStoreLastFailTime.Range( - func(key, value interface{}) bool { - sessionStates.MPPStoreLastFailTime[key.(string)] = value.(time.Time) - return true - }) sessionStates.FoundInPlanCache = s.PrevFoundInPlanCache sessionStates.FoundInBinding = s.PrevFoundInBinding @@ -2377,9 +2367,6 @@ func (s *SessionVars) DecodeSessionStates(ctx context.Context, sessionStates *se } s.LastFoundRows = sessionStates.LastFoundRows s.SequenceState.SetAllStates(sessionStates.SequenceLatestValues) - for k, v := range sessionStates.MPPStoreLastFailTime { - s.MPPStoreLastFailTime.Store(k, v) - } s.FoundInPlanCache = sessionStates.FoundInPlanCache s.FoundInBinding = sessionStates.FoundInBinding diff --git a/store/copr/BUILD.bazel b/store/copr/BUILD.bazel index eb3eb2f016424..f6cbe57efa2d7 100644 --- a/store/copr/BUILD.bazel +++ b/store/copr/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "coprocessor_cache.go", "key_ranges.go", "mpp.go", + "mpp_probe.go", "region_cache.go", "store.go", ], @@ -66,6 +67,7 @@ go_test( "coprocessor_test.go", "key_ranges_test.go", "main_test.go", + "mpp_probe_test.go", ], embed = [":copr"], flaky = True, @@ -75,12 +77,15 @@ go_test( "//store/driver/backoff", "//testkit/testsetup", "//util/paging", + "@com_github_pingcap_errors//:errors", "@com_github_pingcap_kvproto//pkg/coprocessor", + "@com_github_pingcap_kvproto//pkg/mpp", "@com_github_stathat_consistent//:consistent", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//testutils", "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_client_go_v2//tikvrpc", "@org_uber_go_goleak//:goleak", ], ) diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 801eebc40de9a..b316d10acaf6e 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/log" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" @@ -295,12 +294,11 @@ func balanceBatchCopTaskWithContinuity(storeTaskMap map[uint64]*batchCopTask, ca // // The second balance strategy: Not only consider the region count between TiFlash stores, but also try to make the regions' range continuous(stored in TiFlash closely). // If balanceWithContinuity is true, the second balance strategy is enable. -func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask { +func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks []*batchCopTask, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) []*batchCopTask { if len(originalTasks) == 0 { log.Info("Batch cop task balancer got an empty task set.") return originalTasks } - isMPP := mppStoreLastFailTime != nil // for mpp, we still need to detect the store availability if len(originalTasks) <= 1 && !isMPP { return originalTasks @@ -331,45 +329,21 @@ func balanceBatchCopTask(ctx context.Context, kvStore *kvStore, originalTasks [] var wg sync.WaitGroup var mu sync.Mutex wg.Add(len(stores)) - cur := time.Now() for i := range stores { go func(idx int) { defer wg.Done() s := stores[idx] - var lastAny any - var ok bool - mu.Lock() - if lastAny, ok = mppStoreLastFailTime.Load(s.GetAddr()); ok && cur.Sub(lastAny.(time.Time)) < 100*time.Millisecond { - // The interval time is so short that may happen in a same query, so we needn't to check again. - mu.Unlock() - return - } else if !ok { - lastAny = time.Time{} - } - mu.Unlock() - - resp, err := kvStore.GetTiKVClient().SendRequest(ctx, s.GetAddr(), &tikvrpc.Request{ - Type: tikvrpc.CmdMPPAlive, - StoreTp: tikvrpc.TiFlash, - Req: &mpp.IsAliveRequest{}, - Context: kvrpcpb.Context{}, - }, 2*time.Second) - - if err != nil || !resp.Resp.(*mpp.IsAliveResponse).Available { - errMsg := "store not ready to serve" - if err != nil { - errMsg = err.Error() - } - logutil.BgLogger().Warn("Store is not ready", zap.String("store address", s.GetAddr()), zap.String("err message", errMsg)) - mu.Lock() - mppStoreLastFailTime.Store(s.GetAddr(), time.Now()) - mu.Unlock() + // check if store is failed already. + ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, s.GetAddr(), ttl) + if !ok { return } - if cur.Sub(lastAny.(time.Time)) < ttl { - logutil.BgLogger().Warn("Cannot detect store's availability because the current time has not reached MPPStoreLastFailTime + MPPStoreFailTTL", zap.String("store address", s.GetAddr()), zap.Time("last fail time", lastAny.(time.Time))) + tikvClient := kvStore.GetTiKVClient() + ok = detectMPPStore(ctx, tikvClient, s.GetAddr(), DetectTimeoutLimit) + if !ok { + GlobalMPPFailedStoreProber.Add(ctx, s.GetAddr(), tikvClient) return } @@ -534,29 +508,29 @@ func buildBatchCopTasksForNonPartitionedTable(bo *backoff.Backoffer, store *kvStore, ranges *KeyRanges, storeType kv.StoreType, - mppStoreLastFailTime *sync.Map, + isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + return buildBatchCopTasksConsistentHash(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } - return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + return buildBatchCopTasksCore(bo, store, []*KeyRanges{ranges}, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, - mppStoreLastFailTime *sync.Map, + isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64, partitionIDs []int64) (batchTasks []*batchCopTask, err error) { if config.GetGlobalConfig().DisaggregatedTiFlash { - batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + batchTasks, err = buildBatchCopTasksConsistentHash(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } else { - batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + batchTasks, err = buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) } if err != nil { return nil, err @@ -566,8 +540,8 @@ func buildBatchCopTasksForPartitionedTable(bo *backoff.Backoffer, return batchTasks, nil } -func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { - batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) +func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { + batchTasks, err := buildBatchCopTasksCore(bo, store, rangesForEachPhysicalTable, storeType, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) if err != nil { return nil, err } @@ -614,7 +588,7 @@ func buildBatchCopTasksConsistentHash(bo *backoff.Backoffer, store *kvStore, ran // When `partitionIDs != nil`, it means that buildBatchCopTasksCore is constructing a batch cop tasks for PartitionTableScan. // At this time, `len(rangesForEachPhysicalTable) == len(partitionIDs)` and `rangesForEachPhysicalTable[i]` is for partition `partitionIDs[i]`. // Otherwise, `rangesForEachPhysicalTable[0]` indicates the range for the single physical table. -func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, mppStoreLastFailTime *sync.Map, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { +func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, isMPP bool, ttl time.Duration, balanceWithContinuity bool, balanceContinuousRegionCount int64) ([]*batchCopTask, error) { cache := store.GetRegionCache() start := time.Now() const cmdType = tikvrpc.CmdBatchCop @@ -644,7 +618,6 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach storeTaskMap := make(map[string]*batchCopTask) needRetry := false - isMPP := mppStoreLastFailTime != nil for _, task := range tasks { rpcCtx, err := cache.GetTiFlashRPCContext(bo.TiKVBackoffer(), task.region, isMPP) if err != nil { @@ -696,7 +669,7 @@ func buildBatchCopTasksCore(bo *backoff.Backoffer, store *kvStore, rangesForEach logutil.BgLogger().Debug(msg) } balanceStart := time.Now() - batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, mppStoreLastFailTime, ttl, balanceWithContinuity, balanceContinuousRegionCount) + batchTasks = balanceBatchCopTask(bo.GetCtx(), store, batchTasks, isMPP, ttl, balanceWithContinuity, balanceContinuousRegionCount) balanceElapsed := time.Since(balanceStart) if log.GetLevel() <= zap.DebugLevel { msg := "After region balance:" @@ -762,11 +735,11 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V keyRanges = append(keyRanges, NewKeyRanges(pi.KeyRanges)) partitionIDs = append(partitionIDs, pi.ID) } - tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store.kvStore, keyRanges, req.StoreType, nil, 0, false, 0, partitionIDs) + tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store.kvStore, keyRanges, req.StoreType, false, 0, false, 0, partitionIDs) } else { // TODO: merge the if branch. ranges := NewKeyRanges(req.KeyRanges.FirstPartitionRange()) - tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store.kvStore, ranges, req.StoreType, nil, 0, false, 0) + tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store.kvStore, ranges, req.StoreType, false, 0, false, 0) } if err != nil { @@ -913,7 +886,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba ranges = append(ranges, *ran) }) } - ret, err := buildBatchCopTasksForNonPartitionedTable(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, nil, 0, false, 0) + ret, err := buildBatchCopTasksForNonPartitionedTable(bo, b.store, NewKeyRanges(ranges), b.req.StoreType, false, 0, false, 0) return ret, err } // Retry Partition Table Scan @@ -932,7 +905,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba } keyRanges = append(keyRanges, NewKeyRanges(ranges)) } - ret, err := buildBatchCopTasksForPartitionedTable(bo, b.store, keyRanges, b.req.StoreType, nil, 0, false, 0, pid) + ret, err := buildBatchCopTasksForPartitionedTable(bo, b.store, keyRanges, b.req.StoreType, false, 0, false, 0, pid) return ret, err } diff --git a/store/copr/batch_coprocessor_test.go b/store/copr/batch_coprocessor_test.go index 5616f61c54365..3e10ce627b1f6 100644 --- a/store/copr/batch_coprocessor_test.go +++ b/store/copr/batch_coprocessor_test.go @@ -120,13 +120,13 @@ func TestBalanceBatchCopTaskWithContinuity(t *testing.T) { func TestBalanceBatchCopTaskWithEmptyTaskSet(t *testing.T) { { var nilTaskSet []*batchCopTask - nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, nil, time.Second, false, 0) + nilResult := balanceBatchCopTask(nil, nil, nilTaskSet, false, time.Second, false, 0) require.True(t, nilResult == nil) } { emptyTaskSet := make([]*batchCopTask, 0) - emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, nil, time.Second, false, 0) + emptyResult := balanceBatchCopTask(nil, nil, emptyTaskSet, false, time.Second, false, 0) require.True(t, emptyResult != nil) require.True(t, len(emptyResult) == 0) } diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 37d5629ac6f3d..02b66478958d4 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -62,7 +62,7 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta { } // ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns. -func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest, mppStoreLastFailTime *sync.Map, ttl time.Duration) ([]kv.MPPTaskMeta, error) { +func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest, ttl time.Duration) ([]kv.MPPTaskMeta, error) { ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS) bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil) var tasks []*batchCopTask @@ -74,13 +74,13 @@ func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasks rangesForEachPartition[i] = NewKeyRanges(p.KeyRanges) partitionIDs[i] = p.ID } - tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store, rangesForEachPartition, kv.TiFlash, mppStoreLastFailTime, ttl, true, 20, partitionIDs) + tasks, err = buildBatchCopTasksForPartitionedTable(bo, c.store, rangesForEachPartition, kv.TiFlash, true, ttl, true, 20, partitionIDs) } else { if req.KeyRanges == nil { return c.selectAllTiFlashStore(), nil } ranges := NewKeyRanges(req.KeyRanges) - tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store, ranges, kv.TiFlash, mppStoreLastFailTime, ttl, true, 20) + tasks, err = buildBatchCopTasksForNonPartitionedTable(bo, c.store, ranges, kv.TiFlash, true, ttl, true, 20) } if err != nil { diff --git a/store/copr/mpp_probe.go b/store/copr/mpp_probe.go new file mode 100644 index 0000000000000..0a0eba286648e --- /dev/null +++ b/store/copr/mpp_probe.go @@ -0,0 +1,270 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package copr + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/mpp" + "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/util/logutil" + "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/tikvrpc" + "go.uber.org/zap" +) + +// GlobalMPPFailedStoreProber mpp failed store probe +var GlobalMPPFailedStoreProber *MPPFailedStoreProber + +const ( + // DetectPeriod detect period + DetectPeriod = 3 * time.Second + // DetectTimeoutLimit detect timeout + DetectTimeoutLimit = 2 * time.Second + // MaxRecoveryTimeLimit wait TiFlash recovery,more than MPPStoreFailTTL + MaxRecoveryTimeLimit = 15 * time.Minute + // MaxObsoletTimeLimit no request for a long time,that might be obsoleted + MaxObsoletTimeLimit = time.Hour +) + +// MPPStoreState the state for MPPStore. +type MPPStoreState struct { + address string // MPPStore TiFlash address + tikvClient tikv.Client + + lock struct { + sync.Mutex + + recoveryTime time.Time + lastLookupTime time.Time + lastDetectTime time.Time + } +} + +// MPPFailedStoreProber use for detecting of failed TiFlash instance +type MPPFailedStoreProber struct { + failedMPPStores *sync.Map + lock *sync.Mutex + isStop *atomic.Bool + wg *sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + + detectPeriod time.Duration + detectTimeoutLimit time.Duration + maxRecoveryTimeLimit time.Duration + maxObsoletTimeLimit time.Duration +} + +func (t *MPPStoreState) detect(ctx context.Context, detectPeriod time.Duration, detectTimeoutLimit time.Duration) { + if time.Since(t.lock.lastDetectTime) < detectPeriod { + return + } + + defer func() { t.lock.lastDetectTime = time.Now() }() + metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(0) + ok := detectMPPStore(ctx, t.tikvClient, t.address, detectTimeoutLimit) + if !ok { + metrics.TiFlashFailedMPPStoreState.WithLabelValues(t.address).Set(1) + t.lock.recoveryTime = time.Time{} // if detect failed,reset recovery time to zero. + return + } + + // record the time of the first recovery + if t.lock.recoveryTime.IsZero() { + t.lock.recoveryTime = time.Now() + } +} + +func (t *MPPStoreState) isRecovery(ctx context.Context, recoveryTTL time.Duration) bool { + if !t.lock.TryLock() { + return false + } + defer t.lock.Unlock() + + t.lock.lastLookupTime = time.Now() + if !t.lock.recoveryTime.IsZero() && time.Since(t.lock.recoveryTime) > recoveryTTL { + return true + } + logutil.Logger(ctx).Debug("Cannot detect store's availability "+ + "because the current time has not recovery or wait mppStoreFailTTL", + zap.String("store address", t.address), + zap.Time("recovery time", t.lock.recoveryTime), + zap.Duration("MPPStoreFailTTL", recoveryTTL)) + return false +} + +func (t MPPFailedStoreProber) scan(ctx context.Context) { + defer func() { + if r := recover(); r != nil { + logutil.Logger(ctx).Warn("mpp failed store probe scan error,will restart", zap.Any("recover", r), zap.Stack("stack")) + } + }() + + do := func(k, v any) { + address := fmt.Sprint(k) + state, ok := v.(*MPPStoreState) + if !ok { + logutil.BgLogger().Warn("MPPStoreState struct assert failed,will be clean", + zap.String("address", address)) + t.Delete(address) + return + } + + if !state.lock.TryLock() { + return + } + defer state.lock.Unlock() + + state.detect(ctx, t.detectPeriod, t.detectTimeoutLimit) + + // clean restored store + if !state.lock.recoveryTime.IsZero() && time.Since(state.lock.recoveryTime) > t.maxRecoveryTimeLimit { + t.Delete(address) + // clean store that may be obsolete + } else if state.lock.recoveryTime.IsZero() && time.Since(state.lock.lastLookupTime) > t.maxObsoletTimeLimit { + t.Delete(address) + } + } + + f := func(k, v any) bool { + go do(k, v) + return true + } + + metrics.TiFlashFailedMPPStoreState.WithLabelValues("probe").Set(-1) //probe heartbeat + t.failedMPPStores.Range(f) +} + +// Add add a store when sync probe failed +func (t *MPPFailedStoreProber) Add(ctx context.Context, address string, tikvClient tikv.Client) { + state := MPPStoreState{ + address: address, + tikvClient: tikvClient, + } + state.lock.lastLookupTime = time.Now() + logutil.Logger(ctx).Debug("add mpp store to failed list", zap.String("address", address)) + t.failedMPPStores.Store(address, &state) +} + +// IsRecovery check whether the store is recovery +func (t *MPPFailedStoreProber) IsRecovery(ctx context.Context, address string, recoveryTTL time.Duration) bool { + logutil.Logger(ctx).Debug("check failed store recovery", + zap.String("address", address), zap.Duration("ttl", recoveryTTL)) + v, ok := t.failedMPPStores.Load(address) + if !ok { + // store not in failed map + return true + } + + state, ok := v.(*MPPStoreState) + if !ok { + logutil.BgLogger().Warn("MPPStoreState struct assert failed,will be clean", + zap.String("address", address)) + t.Delete(address) + return false + } + + return state.isRecovery(ctx, recoveryTTL) +} + +// Run a loop of scan +// there can be only one background task +func (t *MPPFailedStoreProber) Run() { + if !t.lock.TryLock() { + return + } + t.wg.Add(1) + t.isStop.Swap(false) + go func() { + defer t.wg.Done() + defer t.lock.Unlock() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-t.ctx.Done(): + logutil.BgLogger().Debug("ctx.done") + return + case <-ticker.C: + t.scan(t.ctx) + } + } + }() + logutil.BgLogger().Debug("run a background probe process for mpp") +} + +// Stop stop background goroutine +func (t *MPPFailedStoreProber) Stop() { + if !t.isStop.CompareAndSwap(false, true) { + return + } + t.cancel() + t.wg.Wait() + logutil.BgLogger().Debug("stop background task") +} + +// Delete clean store from failed map +func (t *MPPFailedStoreProber) Delete(address string) { + metrics.TiFlashFailedMPPStoreState.DeleteLabelValues(address) + _, ok := t.failedMPPStores.LoadAndDelete(address) + if !ok { + logutil.BgLogger().Warn("Store is deleted", zap.String("address", address)) + } +} + +// MPPStore detect function +func detectMPPStore(ctx context.Context, client tikv.Client, address string, detectTimeoutLimit time.Duration) bool { + resp, err := client.SendRequest(ctx, address, &tikvrpc.Request{ + Type: tikvrpc.CmdMPPAlive, + StoreTp: tikvrpc.TiFlash, + Req: &mpp.IsAliveRequest{}, + Context: kvrpcpb.Context{}, + }, detectTimeoutLimit) + if err != nil || !resp.Resp.(*mpp.IsAliveResponse).Available { + if err == nil { + err = fmt.Errorf("store not ready to serve") + } + logutil.BgLogger().Warn("Store is not ready", + zap.String("store address", address), + zap.String("err message", err.Error())) + return false + } + return true +} + +func init() { + ctx, cancel := context.WithCancel(context.Background()) + isStop := atomic.Bool{} + isStop.Swap(true) + GlobalMPPFailedStoreProber = &MPPFailedStoreProber{ + failedMPPStores: &sync.Map{}, + lock: &sync.Mutex{}, + isStop: &isStop, + ctx: ctx, + cancel: cancel, + wg: &sync.WaitGroup{}, + detectPeriod: DetectPeriod, + detectTimeoutLimit: DetectTimeoutLimit, + maxRecoveryTimeLimit: MaxRecoveryTimeLimit, + maxObsoletTimeLimit: MaxObsoletTimeLimit, + } +} diff --git a/store/copr/mpp_probe_test.go b/store/copr/mpp_probe_test.go new file mode 100644 index 0000000000000..7826c970d3e1e --- /dev/null +++ b/store/copr/mpp_probe_test.go @@ -0,0 +1,177 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package copr + +import ( + "context" + "testing" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/mpp" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikvrpc" +) + +const ( + testimeout = "timeout" + Error = "error" + Normal = "normal" +) + +type mockDetectClient struct { + errortestype string +} + +func (t *mockDetectClient) CloseAddr(string) error { + return nil +} + +func (t *mockDetectClient) Close() error { + return nil +} + +func (t *mockDetectClient) SendRequest( + ctx context.Context, + addr string, + req *tikvrpc.Request, + timeout time.Duration, +) (*tikvrpc.Response, error) { + if t.errortestype == Error { + return nil, errors.New("store error") + } else if t.errortestype == testimeout { + return &tikvrpc.Response{Resp: &mpp.IsAliveResponse{}}, nil + } + + return &tikvrpc.Response{Resp: &mpp.IsAliveResponse{Available: true}}, nil +} + +type ProbeTest map[string]*mockDetectClient + +func (t ProbeTest) add(ctx context.Context) { + for k, v := range t { + GlobalMPPFailedStoreProber.Add(ctx, k, v) + } +} + +func (t ProbeTest) reSetErrortestype(to string) { + for k, v := range t { + if to == Normal { + v.errortestype = Normal + } else { + v.errortestype = k + } + } +} + +func (t ProbeTest) judge(ctx context.Context, test *testing.T, recoveryTTL time.Duration, need bool) { + for k := range t { + ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, k, recoveryTTL) + require.Equal(test, need, ok) + } +} + +func failedStoreSizeJudge(ctx context.Context, test *testing.T, need int) { + var l int + GlobalMPPFailedStoreProber.scan(ctx) + time.Sleep(time.Second / 10) + GlobalMPPFailedStoreProber.failedMPPStores.Range(func(k, v interface{}) bool { + l++ + return true + }) + require.Equal(test, need, l) +} + +func testFlow(ctx context.Context, probetestest ProbeTest, test *testing.T, flow []string) { + probetestest.add(ctx) + for _, to := range flow { + probetestest.reSetErrortestype(to) + + GlobalMPPFailedStoreProber.scan(ctx) + time.Sleep(time.Second / 10) //wait detect goroutine finish + + var need bool + if to == Normal { + need = true + } + probetestest.judge(ctx, test, 0, need) + probetestest.judge(ctx, test, time.Minute, false) + } + + lastTo := flow[len(flow)-1] + cleanRecover := func(need int) { + GlobalMPPFailedStoreProber.maxRecoveryTimeLimit = 0 - time.Second + failedStoreSizeJudge(ctx, test, need) + GlobalMPPFailedStoreProber.maxRecoveryTimeLimit = MaxRecoveryTimeLimit + } + + cleanObsolet := func(need int) { + GlobalMPPFailedStoreProber.maxObsoletTimeLimit = 0 - time.Second + failedStoreSizeJudge(ctx, test, need) + GlobalMPPFailedStoreProber.maxObsoletTimeLimit = MaxObsoletTimeLimit + } + + if lastTo == Error { + cleanRecover(2) + cleanObsolet(0) + } else if lastTo == Normal { + cleanObsolet(2) + cleanRecover(0) + } +} + +func TestMPPFailedStoreProbe(t *testing.T) { + ctx := context.Background() + + notExistAddress := "not exist address" + + GlobalMPPFailedStoreProber.detectPeriod = 0 - time.Second + + // check not exist address + ok := GlobalMPPFailedStoreProber.IsRecovery(ctx, notExistAddress, 0) + require.True(t, ok) + + GlobalMPPFailedStoreProber.scan(ctx) + + probetestest := map[string]*mockDetectClient{ + testimeout: {errortestype: testimeout}, + Error: {errortestype: Error}, + } + + testFlowFinallyRecover := []string{Error, Normal, Error, Error, Normal} + testFlow(ctx, probetestest, t, testFlowFinallyRecover) + testFlowFinallyDesert := []string{Error, Normal, Normal, Error, Error} + testFlow(ctx, probetestest, t, testFlowFinallyDesert) +} + +func TestMPPFailedStoreProbeGoroutineTask(t *testing.T) { + // Confirm that multiple tasks are not allowed + GlobalMPPFailedStoreProber.lock.Lock() + GlobalMPPFailedStoreProber.Run() + GlobalMPPFailedStoreProber.lock.Unlock() + + GlobalMPPFailedStoreProber.Run() + GlobalMPPFailedStoreProber.Stop() +} + +func TestMPPFailedStoreAssertFailed(t *testing.T) { + ctx := context.Background() + + GlobalMPPFailedStoreProber.failedMPPStores.Store("errorinfo", nil) + GlobalMPPFailedStoreProber.scan(ctx) + + GlobalMPPFailedStoreProber.failedMPPStores.Store("errorinfo", nil) + GlobalMPPFailedStoreProber.IsRecovery(ctx, "errorinfo", 0) +} diff --git a/store/copr/store.go b/store/copr/store.go index 758109b81d805..32553961acc67 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -84,6 +84,7 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store if err != nil { return nil, errors.Trace(err) } + /* #nosec G404 */ return &Store{ kvStore: &kvStore{store: s}, diff --git a/tests/realtikvtest/sessiontest/session_fail_test.go b/tests/realtikvtest/sessiontest/session_fail_test.go index 919932e6f9357..a3df51be821c0 100644 --- a/tests/realtikvtest/sessiontest/session_fail_test.go +++ b/tests/realtikvtest/sessiontest/session_fail_test.go @@ -200,7 +200,6 @@ func TestAutoCommitNeedNotLinearizability(t *testing.T) { func TestKill(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) - tk := testkit.NewTestKit(t, store) tk.MustExec("kill connection_id();") } diff --git a/tidb-server/BUILD.bazel b/tidb-server/BUILD.bazel index 493eedfcccbfa..361a929351642 100644 --- a/tidb-server/BUILD.bazel +++ b/tidb-server/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "//sessionctx/variable", "//statistics", "//store", + "//store/copr", "//store/driver", "//store/mockstore", "//store/mockstore/unistore/metrics", diff --git a/tidb-server/main.go b/tidb-server/main.go index d58e24a531f4c..8605c7d0c3aaa 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -54,6 +54,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" kvstore "github.com/pingcap/tidb/store" + "github.com/pingcap/tidb/store/copr" "github.com/pingcap/tidb/store/driver" "github.com/pingcap/tidb/store/mockstore" uni_metrics "github.com/pingcap/tidb/store/mockstore/unistore/metrics" @@ -309,6 +310,7 @@ func createStoreAndDomain() (kv.Storage, *domain.Domain) { var err error storage, err := kvstore.New(fullPath) terror.MustNil(err) + copr.GlobalMPPFailedStoreProber.Run() err = infosync.CheckTiKVVersion(storage, *semver.New(versioninfo.TiKVMinVersion)) terror.MustNil(err) // Bootstrap a session to load information schema. @@ -814,6 +816,7 @@ func setupTracing() { func closeDomainAndStorage(storage kv.Storage, dom *domain.Domain) { tikv.StoreShuttingDown(1) dom.Close() + copr.GlobalMPPFailedStoreProber.Stop() err := storage.Close() terror.Log(errors.Trace(err)) }