Skip to content

Commit

Permalink
executor: support recovery mem limit error in disagg tiflash mode (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
guo-shaoge authored Dec 29, 2023
1 parent fbe232e commit e19e06e
Show file tree
Hide file tree
Showing 13 changed files with 569 additions and 68 deletions.
1 change: 1 addition & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ go_library(
"//pkg/executor/lockstats",
"//pkg/executor/metrics",
"//pkg/executor/mppcoordmanager",
"//pkg/executor/mpperr",
"//pkg/executor/sortexec",
"//pkg/expression",
"//pkg/expression/aggregation",
Expand Down
1 change: 0 additions & 1 deletion pkg/executor/internal/mpp/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ go_library(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_zap//:zap",
],
)
Expand Down
23 changes: 14 additions & 9 deletions pkg/executor/internal/mpp/local_mpp_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -145,6 +144,8 @@ type localMppCoordinator struct {
enableCollectExecutionInfo bool
reportExecutionInfo bool // if each mpp task needs to report execution info directly to coordinator through ReportMPPTaskStatus

// Record node cnt that involved in the mpp computation.
nodeCnt int
}

// NewLocalMPPCoordinator creates a new localMppCoordinator instance
Expand Down Expand Up @@ -518,17 +519,12 @@ func (c *localMppCoordinator) receiveResults(req *kv.MPPDispatchRequest, taskMet

resp, err = stream.Recv()
if err != nil {
logutil.BgLogger().Info("mpp stream recv got error", zap.Error(err), zap.Uint64("timestamp", taskMeta.StartTs),
zap.Int64("task", taskMeta.TaskId), zap.Int64("mpp-version", taskMeta.MppVersion))
if errors.Cause(err) == io.EOF {
return
}

if err1 := bo.Backoff(tikv.BoTiKVRPC(), errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if errors.Cause(err) == context.Canceled {
logutil.BgLogger().Info("stream recv timeout", zap.Error(err), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId), zap.Int64("mpp-version", taskMeta.MppVersion))
} else {
logutil.BgLogger().Info("stream unknown error", zap.Error(err), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId), zap.Int64("mpp-version", taskMeta.MppVersion))
}
}
// if NeedTriggerFallback is true, we return timeout to trigger tikv's fallback
if c.needTriggerFallback {
c.sendError(derr.ErrTiFlashServerTimeout)
Expand Down Expand Up @@ -715,10 +711,14 @@ func (c *localMppCoordinator) Execute(ctx context.Context) (kv.Response, []kv.Ke
// TODO: Move the construct tasks logic to planner, so we can see the explain results.
sender := c.originalPlan.(*plannercore.PhysicalExchangeSender)
sctx := c.sessionCtx
frags, kvRanges, err := plannercore.GenerateRootMPPTasks(sctx, c.startTS, c.gatherID, c.mppQueryID, sender, c.is)
frags, kvRanges, nodeInfo, err := plannercore.GenerateRootMPPTasks(sctx, c.startTS, c.gatherID, c.mppQueryID, sender, c.is)
if err != nil {
return nil, nil, errors.Trace(err)
}
if nodeInfo == nil {
return nil, nil, errors.New("node info should not be nil")
}
c.nodeCnt = len(nodeInfo)

for _, frag := range frags {
err = c.appendMPPDispatchReq(frag)
Expand All @@ -744,3 +744,8 @@ func (c *localMppCoordinator) Execute(ctx context.Context) (kv.Response, []kv.Ke

return c, kvRanges, nil
}

// GetNodeCnt returns the node count that involved in the mpp computation.
func (c *localMppCoordinator) GetNodeCnt() int {
return c.nodeCnt
}
190 changes: 175 additions & 15 deletions pkg/executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@ package executor

import (
"context"
"fmt"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/distsql"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/executor/internal/mpp"
"github.com/pingcap/tidb/pkg/executor/mppcoordmanager"
"github.com/pingcap/tidb/pkg/executor/mpperr"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
Expand All @@ -31,9 +36,15 @@ import (
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/pingcap/tidb/pkg/util/memory"
"go.uber.org/zap"
)

// For mpp err recovery, hold at most 4 * MaxChunkSize rows.
const mppErrRecoveryHoldChkCap = 4

func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader) bool {
if !ctx.GetSessionVars().IsMPPAllowed() {
return false
Expand Down Expand Up @@ -76,6 +87,18 @@ type MPPGather struct {
table table.Table
kvRanges []kv.KeyRange
dummy bool

// mppErrRecovery is designed for the recovery of MPP errors.
// Basic idea:
// 1. It attempts to hold the results of MPP. During the holding process, if an error occurs, it starts error recovery.
// If the recovery is successful, it discards held results and reconstructs the respIter, then re-executes the MPP task.
// If the recovery fails, an error is reported directly.
// 2. If the held MPP results exceed the capacity, will starts returning results to caller.
// Once the results start being returned, error recovery cannot be performed anymore.
mppErrRecovery *mpperr.RecoveryHandler
// Only for MemLimit err recovery for now.
// AutoScaler use this value as hint to scale out CN.
nodeCnt int
}

func collectPlanIDS(plan plannercore.PhysicalPlan, ids []int) []int {
Expand All @@ -86,6 +109,47 @@ func collectPlanIDS(plan plannercore.PhysicalPlan, ids []int) []int {
return ids
}

func (e *MPPGather) setupRespIter(ctx context.Context, isRecoverying bool) (err error) {
if isRecoverying {
// If we are trying to recovery from MPP error, needs to cleanup some resources.
// Sanity check.
if e.dummy {
return errors.New("should not reset mpp resp iter for dummy table")
}
if e.respIter == nil {
return errors.New("mpp resp iter should already be setup")
}

if err := e.respIter.Close(); err != nil {
return err
}
mppcoordmanager.InstanceMPPCoordinatorManager.Unregister(mppcoordmanager.CoordinatorUniqueID{MPPQueryID: e.mppQueryID, GatherID: e.gatherID})
}

planIDs := collectPlanIDS(e.originalPlan, nil)
e.gatherID = allocMPPGatherID(e.Ctx())
coord := e.buildCoordinator(planIDs)
if err = mppcoordmanager.InstanceMPPCoordinatorManager.Register(mppcoordmanager.CoordinatorUniqueID{MPPQueryID: e.mppQueryID, GatherID: e.gatherID}, coord); err != nil {
return err
}
var resp kv.Response
if resp, e.kvRanges, err = coord.Execute(ctx); err != nil {
return errors.Trace(err)
}
if e.nodeCnt = coord.GetNodeCnt(); e.nodeCnt <= 0 {
return errors.Errorf("tiflash node count should be greater than zero: %v", e.nodeCnt)
}

failpoint.Inject("mpp_recovery_test_check_node_cnt", func(nodeCnt failpoint.Value) {
if nodeCntInt := nodeCnt.(int); nodeCntInt != e.nodeCnt {
panic(fmt.Sprintf("unexpected node cnt, expect %v, got %v", nodeCntInt, e.nodeCnt))
}
})

e.respIter = distsql.GenSelectResultFromResponse(e.Ctx(), e.RetFieldTypes(), planIDs, e.ID(), resp)
return nil
}

// allocMPPGatherID allocates mpp gather id for mpp gathers. It will reset the gather id when the query finished.
// To support mpp_gather level cancel/retry and mpp_gather under apply executors, need to generate incremental ids when Open function is invoked
func allocMPPGatherID(ctx sessionctx.Context) uint64 {
Expand All @@ -101,22 +165,30 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
if !ok {
return errors.Errorf("unexpected plan type, expect: PhysicalExchangeSender, got: %s", e.originalPlan.TP())
}
_, e.kvRanges, err = plannercore.GenerateRootMPPTasks(e.Ctx(), e.startTS, e.gatherID, e.mppQueryID, sender, e.is)
_, e.kvRanges, _, err = plannercore.GenerateRootMPPTasks(e.Ctx(), e.startTS, e.gatherID, e.mppQueryID, sender, e.is)
return err
}
planIDs := collectPlanIDS(e.originalPlan, nil)
e.gatherID = allocMPPGatherID(e.Ctx())
coord := e.buildCoordinator(planIDs)
err = mppcoordmanager.InstanceMPPCoordinatorManager.Register(mppcoordmanager.CoordinatorUniqueID{MPPQueryID: e.mppQueryID, GatherID: e.gatherID}, coord)
if err != nil {
if err = e.setupRespIter(ctx, false); err != nil {
return err
}
var resp kv.Response
resp, e.kvRanges, err = coord.Execute(ctx)
if err != nil {
return errors.Trace(err)
}
e.respIter = distsql.GenSelectResultFromResponse(e.Ctx(), e.RetFieldTypes(), planIDs, e.ID(), resp)

holdCap := mathutil.Max(32, mppErrRecoveryHoldChkCap*e.Ctx().GetSessionVars().MaxChunkSize)

disaggTiFlashWithAutoScaler := config.GetGlobalConfig().DisaggregatedTiFlash && config.GetGlobalConfig().UseAutoScaler
_, allowTiFlashFallback := e.Ctx().GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
// 1. For now, mpp err recovery only support MemLimit, which is only useful when AutoScaler is used.
// 2. When enable fallback to tikv, the returned mpp err will be ErrTiFlashServerTimeout,
// which we cannot handle for now. Also there is no need to recovery because tikv will retry the query.
// 3. For cached table, will not dispatch tasks to TiFlash, so no need to recovery.
enableMPPRecovery := disaggTiFlashWithAutoScaler && !allowTiFlashFallback && !e.dummy

failpoint.Inject("mpp_recovery_test_mock_enable", func() {
if !e.dummy && !allowTiFlashFallback {
enableMPPRecovery = true
}
})

e.mppErrRecovery = mpperr.NewRecoveryHandler(disaggTiFlashWithAutoScaler, uint64(holdCap), enableMPPRecovery, e.memTracker)
return nil
}

Expand All @@ -126,17 +198,104 @@ func (e *MPPGather) buildCoordinator(planIDs []int) kv.MppCoordinator {
return coord
}

func (e *MPPGather) nextWithRecovery(ctx context.Context) error {
if !e.mppErrRecovery.Enabled() {
return nil
}

for e.mppErrRecovery.CanHoldResult() {
tmpChk := exec.NewFirstChunk(e)
mppErr := e.respIter.Next(ctx, tmpChk)

// Mock recovery n times.
failpoint.Inject("mpp_recovery_test_max_err_times", func(forceErrCnt failpoint.Value) {
forceErrCntInt := forceErrCnt.(int)
if e.mppErrRecovery.RecoveryCnt() < uint32(forceErrCntInt) {
mppErr = errors.New("mock mpp error")
}
})

if mppErr != nil {
recoveryErr := e.mppErrRecovery.Recovery(&mpperr.RecoveryInfo{
MPPErr: mppErr,
NodeCnt: e.nodeCnt,
})

// Mock recovery succeed, ignore no recovery handler err.
failpoint.Inject("mpp_recovery_test_ignore_recovery_err", func() {
if recoveryErr == nil {
panic("mocked mpp err should got recovery err")
}
if strings.Contains(recoveryErr.Error(), "no handler to recovery") {
recoveryErr = nil
}
})

if recoveryErr != nil {
logutil.BgLogger().Error("recovery mpp error failed", zap.Any("mppErr", mppErr),
zap.Any("recoveryErr", recoveryErr))
return mppErr
}

logutil.BgLogger().Info("recovery mpp error succeed, begin next retry",
zap.Any("mppErr", mppErr), zap.Any("recoveryCnt", e.mppErrRecovery.RecoveryCnt()))

if err := e.setupRespIter(ctx, true); err != nil {
logutil.BgLogger().Error("setup resp iter when recovery mpp err failed", zap.Any("err", err))
return mppErr
}
e.mppErrRecovery.ResetHolder()

continue
}

if tmpChk.NumRows() == 0 {
break
}

e.mppErrRecovery.HoldResult(tmpChk)
}

failpoint.Inject("mpp_recovery_test_hold_size", func(num failpoint.Value) {
// Note: this failpoint only execute once.
curRows := e.mppErrRecovery.NumHoldRows()
numInt := num.(int)
if curRows != uint64(numInt) {
panic(fmt.Sprintf("unexpected holding rows, cur: %d", curRows))
}
})
return nil
}

// Next fills data into the chunk passed by its caller.
func (e *MPPGather) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.dummy {
return nil
}
err := e.respIter.Next(ctx, chk)
if err != nil {

if err := e.nextWithRecovery(ctx); err != nil {
return err
}
err = table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.Schema().Columns, e.columns, e.Ctx(), chk)

if e.mppErrRecovery.NumHoldChk() != 0 {
var tmpChk *chunk.Chunk
if tmpChk = e.mppErrRecovery.PopFrontChk(); tmpChk == nil {
return errors.New("cannot get chunk from mpp result holder")
}
chk.SwapColumns(tmpChk)
} else if err := e.respIter.Next(ctx, chk); err != nil {
// Got here when:
// 1. mppErrRecovery is disabled. So no chk held in mppErrRecovery.
// 2. mppErrRecovery is enabled and it holds some chks, but we consume all these chks.
return err
}

if chk.NumRows() == 0 {
return nil
}

err := table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.Schema().Columns, e.columns, e.Ctx(), chk)
if err != nil {
return err
}
Expand All @@ -156,6 +315,7 @@ func (e *MPPGather) Close() error {
if err != nil {
return err
}
e.mppErrRecovery.ResetHolder()
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/executor/mppcoordmanager/mpp_coordinator_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func (*IdleCoordinator) IsClosed() bool {
return true
}

// GetNodeCnt implements MppCoordinator interface function.
func (*IdleCoordinator) GetNodeCnt() int {
return 0
}

func TestDetectAndDelete(t *testing.T) {
startTs := uint64(time.Now().UnixNano())
InstanceMPPCoordinatorManager.maxLifeTime = uint64(copr.TiFlashReadTimeoutUltraLong.Nanoseconds() + detectFrequency.Nanoseconds())
Expand Down
14 changes: 14 additions & 0 deletions pkg/executor/mpperr/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "mpperr",
srcs = ["mpp_err_recovery.go"],
importpath = "github.com/pingcap/tidb/pkg/executor/mpperr",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/chunk",
"//pkg/util/memory",
"//pkg/util/tiflashcompute",
"@com_github_pingcap_errors//:errors",
],
)
Loading

0 comments on commit e19e06e

Please sign in to comment.