diff --git a/ddl/backfilling_test.go b/ddl/backfilling_test.go index 62e627630cad6..17030d9a2f6f0 100644 --- a/ddl/backfilling_test.go +++ b/ddl/backfilling_test.go @@ -72,19 +72,19 @@ func TestPickBackfillType(t *testing.T) { }, } variable.EnableFastReorg.Store(true) - tp, err := pickBackfillType(mockCtx, mockJob, uk) + tp, err := pickBackfillType(mockCtx, mockJob, uk, nil) require.NoError(t, err) require.Equal(t, tp, model.ReorgTypeTxn) mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone ingest.LitInitialized = false - tp, err = pickBackfillType(mockCtx, mockJob, uk) + tp, err = pickBackfillType(mockCtx, mockJob, uk, nil) require.NoError(t, err) require.Equal(t, tp, model.ReorgTypeTxnMerge) mockJob.ReorgMeta.ReorgTp = model.ReorgTypeNone ingest.LitInitialized = true - tp, err = pickBackfillType(mockCtx, mockJob, uk) + tp, err = pickBackfillType(mockCtx, mockJob, uk, nil) require.NoError(t, err) require.Equal(t, tp, model.ReorgTypeLitMerge) } diff --git a/ddl/ddl.go b/ddl/ddl.go index 5cb4c0d5dfe1c..99f3bd5691ca7 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -551,8 +551,10 @@ func (dc *ddlCtx) removeReorgCtx(jobID int64) { func (dc *ddlCtx) notifyReorgWorkerJobStateChange(job *model.Job) { rc := dc.getReorgCtx(job.ID) if rc == nil { + logutil.BgLogger().Error("cannot find reorgCtx", zap.Int64("jobID", job.ID)) return } + logutil.BgLogger().Info("[ddl] notify reorg worker during canceling ddl job", zap.Int64("jobID", job.ID)) rc.notifyJobState(job.State) } @@ -678,7 +680,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl { scheduler.RegisterSchedulerConstructor("backfill", func(taskMeta []byte, step int64) (scheduler.Scheduler, error) { - return NewBackfillSchedulerHandle(taskMeta, d) + return NewBackfillSchedulerHandle(taskMeta, d, step == proto.StepTwo) }) dispatcher.RegisterTaskFlowHandle(BackfillTaskType, NewLitBackfillFlowHandle(d)) diff --git a/ddl/dist_owner.go b/ddl/dist_owner.go index 1bae9992f0faa..aced50d4bf864 100644 --- a/ddl/dist_owner.go +++ b/ddl/dist_owner.go @@ -16,10 +16,6 @@ package ddl import ( "time" - - "github.com/pingcap/tidb/metrics" - "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/sessionctx/variable" ) // CheckBackfillJobFinishInterval is export for test. @@ -28,11 +24,3 @@ var CheckBackfillJobFinishInterval = 300 * time.Millisecond const ( distPhysicalTableConcurrency = 16 ) - -func initDistReorg(reorgMeta *model.DDLReorgMeta) { - isDistReorg := variable.EnableDistTask.Load() - reorgMeta.IsDistReorg = isDistReorg - if isDistReorg { - metrics.TelemetryDistReorgCnt.Inc() - } -} diff --git a/ddl/disttask_flow.go b/ddl/disttask_flow.go index 3f417ae4b96bf..8880ef0c7add2 100644 --- a/ddl/disttask_flow.go +++ b/ddl/disttask_flow.go @@ -15,8 +15,10 @@ package ddl import ( + "bytes" "context" "encoding/json" + "sort" "github.com/pingcap/errors" "github.com/pingcap/tidb/disttask/framework/dispatcher" @@ -24,6 +26,9 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/store/helper" + "github.com/pingcap/tidb/table" + "github.com/tikv/client-go/v2/tikv" ) type litBackfillFlowHandle struct { @@ -39,11 +44,6 @@ func NewLitBackfillFlowHandle(d DDL) dispatcher.TaskFlowHandle { // ProcessNormalFlow processes the normal flow. func (h *litBackfillFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task) (metas [][]byte, err error) { - if gTask.State != proto.TaskStatePending { - // This flow has only one step, finish task when it is not pending - return nil, nil - } - var globalTaskMeta BackfillGlobalMeta if err = json.Unmarshal(gTask.Meta, &globalTaskMeta); err != nil { return nil, err @@ -56,7 +56,7 @@ func (h *litBackfillFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatche job := &globalTaskMeta.Job var tblInfo *model.TableInfo - err = kv.RunInNewTxn(d.ctx, d.store, false, func(ctx context.Context, txn kv.Transaction) error { + err = kv.RunInNewTxn(d.ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error { tblInfo, err = meta.NewMeta(txn).GetTable(job.SchemaID, job.TableID) return err }) @@ -64,28 +64,101 @@ func (h *litBackfillFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatche return nil, err } + var subTaskMetas [][]byte if tblInfo.Partition == nil { - return nil, errors.New("Non-partition table not supported yet") - } - - defs := tblInfo.Partition.Definitions - physicalIDs := make([]int64, len(defs)) - for i := range defs { - physicalIDs[i] = defs[i].ID - } - - subTaskMetas := make([][]byte, 0, len(physicalIDs)) - for _, physicalID := range physicalIDs { - subTaskMeta := &BackfillSubTaskMeta{ - PhysicalTableID: physicalID, + switch gTask.Step { + case proto.StepOne: + serverNodes, err := dispatcher.GenerateSchedulerNodes(d.ctx) + if err != nil { + return nil, err + } + subTaskMetas = make([][]byte, 0, len(serverNodes)) + dummyMeta := &BackfillSubTaskMeta{} + metaBytes, err := json.Marshal(dummyMeta) + if err != nil { + return nil, err + } + for range serverNodes { + subTaskMetas = append(subTaskMetas, metaBytes) + } + gTask.Step = proto.StepTwo + return subTaskMetas, nil + case proto.StepTwo: + return nil, nil + default: } - - metaBytes, err := json.Marshal(subTaskMeta) + tbl, err := getTable(d.store, job.SchemaID, tblInfo) if err != nil { return nil, err } + ver, err := getValidCurrentVersion(d.store) + if err != nil { + return nil, errors.Trace(err) + } + startKey, endKey, err := getTableRange(d.jobContext(job.ID), d.ddlCtx, tbl.(table.PhysicalTable), ver.Ver, job.Priority) + if startKey == nil && endKey == nil { + // Empty table. + gTask.Step = proto.StepOne + return nil, nil + } + if err != nil { + return nil, errors.Trace(err) + } + regionCache := d.store.(helper.Storage).GetRegionCache() + recordRegionMetas, err := regionCache.LoadRegionsInKeyRange(tikv.NewBackofferWithVars(context.Background(), 20000, nil), startKey, endKey) + if err != nil { + return nil, err + } + + subTaskMetas = make([][]byte, 0, 100) + regionBatch := 20 + sort.Slice(recordRegionMetas, func(i, j int) bool { + return bytes.Compare(recordRegionMetas[i].StartKey(), recordRegionMetas[j].StartKey()) < 0 + }) + for i := 0; i < len(recordRegionMetas); i += regionBatch { + end := i + regionBatch + if end > len(recordRegionMetas) { + end = len(recordRegionMetas) + } + batch := recordRegionMetas[i:end] + subTaskMeta := &BackfillSubTaskMeta{StartKey: batch[0].StartKey(), EndKey: batch[len(batch)-1].EndKey()} + if i == 0 { + subTaskMeta.StartKey = startKey + } + if end == len(recordRegionMetas) { + subTaskMeta.EndKey = endKey + } + metaBytes, err := json.Marshal(subTaskMeta) + if err != nil { + return nil, err + } + subTaskMetas = append(subTaskMetas, metaBytes) + } + } else { + if gTask.State != proto.TaskStatePending { + // This flow for partition table has only one step, finish task when it is not pending + return nil, nil + } - subTaskMetas = append(subTaskMetas, metaBytes) + defs := tblInfo.Partition.Definitions + physicalIDs := make([]int64, len(defs)) + for i := range defs { + physicalIDs[i] = defs[i].ID + } + + subTaskMetas = make([][]byte, 0, len(physicalIDs)) + for _, physicalID := range physicalIDs { + subTaskMeta := &BackfillSubTaskMeta{ + PhysicalTableID: physicalID, + } + + metaBytes, err := json.Marshal(subTaskMeta) + if err != nil { + return nil, err + } + + subTaskMetas = append(subTaskMetas, metaBytes) + } } gTask.Step = proto.StepOne diff --git a/ddl/disttask_flow_test.go b/ddl/disttask_flow_test.go index adcde950628f4..c0833b4f39afc 100644 --- a/ddl/disttask_flow_test.go +++ b/ddl/disttask_flow_test.go @@ -72,11 +72,10 @@ func TestBackfillFlowHandle(t *testing.T) { require.NoError(t, err) require.Nil(t, errMeta) - // test normal table not supported yet tk.MustExec("create table t1(id int primary key, v int)") gTask = createAddIndexGlobalTask(t, dom, "test", "t1", ddl.BackfillTaskType) _, err = handler.ProcessNormalFlow(context.Background(), nil, gTask) - require.EqualError(t, err, "Non-partition table not supported yet") + require.NoError(t, err) } func createAddIndexGlobalTask(t *testing.T, dom *domain.Domain, dbName, tblName string, taskType string) *proto.Task { diff --git a/ddl/index.go b/ddl/index.go index 7ab350d2ee1fb..368eae0373c2c 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -624,7 +624,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo case model.StateNone: // none -> delete only var reorgTp model.ReorgType - reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique) + reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique, d) if err != nil { break } @@ -666,10 +666,6 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo // Initialize SnapshotVer to 0 for later reorganization check. job.SnapshotVer = 0 job.SchemaState = model.StateWriteReorganization - - if job.MultiSchemaInfo == nil && tblInfo.GetPartitionInfo() != nil { - initDistReorg(job.ReorgMeta) - } case model.StateWriteReorganization: // reorganization -> public tbl, err := getTable(d.store, schemaID, tblInfo) @@ -716,7 +712,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo } // pickBackfillType determines which backfill process will be used. -func pickBackfillType(ctx context.Context, job *model.Job, unique bool) (model.ReorgType, error) { +func pickBackfillType(ctx context.Context, job *model.Job, unique bool, d *ddlCtx) (model.ReorgType, error) { if job.ReorgMeta.ReorgTp != model.ReorgTypeNone { // The backfill task has been started. // Don't change the backfill type. @@ -736,11 +732,18 @@ func pickBackfillType(ctx context.Context, job *model.Job, unique bool) (model.R if err != nil { return model.ReorgTypeNone, err } - _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID) + if variable.EnableDistTask.Load() { + _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli) + } else { + _, err = ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, nil) + } if err != nil { return model.ReorgTypeNone, err } job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge + if variable.EnableDistTask.Load() { + job.ReorgMeta.IsDistReorg = true + } return model.ReorgTypeLitMerge, nil } } @@ -834,7 +837,7 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) { var reorgTp model.ReorgType - reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique) + reorgTp, err = pickBackfillType(w.ctx, job, indexInfo.Unique, d) if err != nil { return false, ver, err } @@ -905,7 +908,7 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, if ok && bc.Done() { return true, 0, nil } - bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID) + bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, nil) if err != nil { ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err) return false, ver, errors.Trace(err) @@ -1817,7 +1820,7 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { return errors.New("unexpected error, can't find index info") } if indexInfo.Unique { - bc, err := ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, reorgInfo.ID) + bc, err := ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, reorgInfo.ID, nil) if err != nil { return err } diff --git a/ddl/ingest/BUILD.bazel b/ddl/ingest/BUILD.bazel index 54763c30c15bc..86ee64a582bb9 100644 --- a/ddl/ingest/BUILD.bazel +++ b/ddl/ingest/BUILD.bazel @@ -43,6 +43,8 @@ go_library( "//util/size", "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", + "@io_etcd_go_etcd_client_v3//:client", + "@io_etcd_go_etcd_client_v3//concurrency", "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", ], diff --git a/ddl/ingest/backend.go b/ddl/ingest/backend.go index ec654e58f6be6..ad6aedb740e75 100644 --- a/ddl/ingest/backend.go +++ b/ddl/ingest/backend.go @@ -16,6 +16,7 @@ package ingest import ( "context" + "fmt" "time" "github.com/pingcap/tidb/br/pkg/lightning/backend/encode" @@ -29,6 +30,8 @@ import ( "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/generic" "github.com/pingcap/tidb/util/logutil" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/concurrency" atomicutil "go.uber.org/atomic" "go.uber.org/zap" ) @@ -57,6 +60,8 @@ const ( FlushModeAuto FlushMode = iota // FlushModeForceLocal means flush all data to local storage. FlushModeForceLocal + // FlushModeForceLocalAndCheckDiskQuota means flush all data to local storage and check disk quota. + FlushModeForceLocalAndCheckDiskQuota // FlushModeForceGlobal means import all data in local storage to global storage. FlushModeForceGlobal ) @@ -77,6 +82,7 @@ type litBackendCtx struct { timeOfLastFlush atomicutil.Time updateInterval time.Duration checkpointMgr *CheckpointManager + etcdClient *clientv3.Client } // CollectRemoteDuplicateRows collects duplicate rows from remote TiKV. @@ -141,6 +147,15 @@ func (bc *litBackendCtx) FinishImport(indexID int64, unique bool, tbl table.Tabl return nil } +func acquireLock(ctx context.Context, se *concurrency.Session, key string) (*concurrency.Mutex, error) { + mu := concurrency.NewMutex(se, key) + err := mu.Lock(ctx) + if err != nil { + return nil, err + } + return mu, nil +} + // Flush checks the disk quota and imports the current key-values in engine to the storage. func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error) { ei, exist := bc.Load(indexID) @@ -169,6 +184,30 @@ func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported if !shouldImport { return true, false, nil } + + // Use distributed lock if run in distributed mode). + if bc.etcdClient != nil { + distLockKey := fmt.Sprintf("/tidb/distributeLock/%d/%d", bc.jobID, indexID) + se, _ := concurrency.NewSession(bc.etcdClient) + mu, err := acquireLock(bc.ctx, se, distLockKey) + if err != nil { + return true, false, err + } + logutil.BgLogger().Info("[ddl] acquire distributed flush lock success", zap.Int64("jobID", bc.jobID)) + defer func() { + err = mu.Unlock(bc.ctx) + if err != nil { + logutil.BgLogger().Warn("[ddl] release distributed flush lock error", zap.Error(err), zap.Int64("jobID", bc.jobID)) + } else { + logutil.BgLogger().Info("[ddl] release distributed flush lock success", zap.Int64("jobID", bc.jobID)) + } + err = se.Close() + if err != nil { + logutil.BgLogger().Warn("[ddl] close session error", zap.Error(err)) + } + }() + } + logutil.BgLogger().Info(LitInfoUnsafeImport, zap.Int64("index ID", indexID), zap.String("usage info", bc.diskRoot.UsageInfo())) err = bc.backend.UnsafeImportAndReset(bc.ctx, ei.uuid, int64(lightning.SplitRegionSize)*int64(lightning.MaxSplitRegionSizeRatio), int64(lightning.SplitRegionKeys)) @@ -189,8 +228,12 @@ func (bc *litBackendCtx) ShouldSync(mode FlushMode) (shouldFlush bool, shouldImp } bc.diskRoot.UpdateUsage() shouldImport = bc.diskRoot.ShouldImport() - shouldFlush = shouldImport || - time.Since(bc.timeOfLastFlush.Load()) >= bc.updateInterval + if mode == FlushModeForceLocalAndCheckDiskQuota { + shouldFlush = true + } else { + shouldFlush = shouldImport || + time.Since(bc.timeOfLastFlush.Load()) >= bc.updateInterval + } return shouldFlush, shouldImport } diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index d954e15fbc387..fbe51f8fb39b8 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -25,13 +25,14 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/util/generic" "github.com/pingcap/tidb/util/logutil" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) // BackendCtxMgr is used to manage the backend context. type BackendCtxMgr interface { CheckAvailable() (bool, error) - Register(ctx context.Context, unique bool, jobID int64) (BackendCtx, error) + Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error) Unregister(jobID int64) Load(jobID int64) (BackendCtx, bool) } @@ -77,7 +78,7 @@ func (m *litBackendCtxMgr) CheckAvailable() (bool, error) { } // Register creates a new backend and registers it to the backend context. -func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64) (BackendCtx, error) { +func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int64, etcdClient *clientv3.Client) (BackendCtx, error) { bc, exist := m.Load(jobID) if !exist { m.memRoot.RefreshConsumption() @@ -96,7 +97,7 @@ func (m *litBackendCtxMgr) Register(ctx context.Context, unique bool, jobID int6 return nil, err } - bcCtx := newBackendContext(ctx, jobID, bd, cfg.Lightning, defaultImportantVariables, m.memRoot, m.diskRoot) + bcCtx := newBackendContext(ctx, jobID, bd, cfg.Lightning, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient) m.Store(jobID, bcCtx) m.memRoot.Consume(StructSizeBackendCtx) @@ -126,8 +127,7 @@ func createLocalBackend(ctx context.Context, cfg *Config) (*local.Backend, error const checkpointUpdateInterval = 10 * time.Minute -func newBackendContext(ctx context.Context, jobID int64, be *local.Backend, - cfg *config.Config, vars map[string]string, memRoot MemRoot, diskRoot DiskRoot) *litBackendCtx { +func newBackendContext(ctx context.Context, jobID int64, be *local.Backend, cfg *config.Config, vars map[string]string, memRoot MemRoot, diskRoot DiskRoot, etcdClient *clientv3.Client) *litBackendCtx { bCtx := &litBackendCtx{ SyncMap: generic.NewSyncMap[int64, *engineInfo](10), MemRoot: memRoot, @@ -139,6 +139,7 @@ func newBackendContext(ctx context.Context, jobID int64, be *local.Backend, sysVars: vars, diskRoot: diskRoot, updateInterval: checkpointUpdateInterval, + etcdClient: etcdClient, } bCtx.timeOfLastFlush.Store(time.Now()) return bCtx diff --git a/ddl/ingest/mock.go b/ddl/ingest/mock.go index 65cfdabc3cb97..797562fa10f1a 100644 --- a/ddl/ingest/mock.go +++ b/ddl/ingest/mock.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util/logutil" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -46,7 +47,7 @@ func (*MockBackendCtxMgr) CheckAvailable() (bool, error) { } // Register implements BackendCtxMgr.Register interface. -func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64) (BackendCtx, error) { +func (m *MockBackendCtxMgr) Register(_ context.Context, _ bool, jobID int64, _ *clientv3.Client) (BackendCtx, error) { logutil.BgLogger().Info("mock backend mgr register", zap.Int64("jobID", jobID)) if mockCtx, ok := m.runningJobs[jobID]; ok { return mockCtx, nil diff --git a/ddl/scheduler.go b/ddl/scheduler.go index f00cbd96860dd..13a9ded3d611b 100644 --- a/ddl/scheduler.go +++ b/ddl/scheduler.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/scheduler" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/table" @@ -41,17 +42,19 @@ import ( var MockDMLExecutionAddIndexSubTaskFinish func() type backfillSchedulerHandle struct { - d *ddl - db *model.DBInfo - index *model.IndexInfo - job *model.Job - bc ingest.BackendCtx - ptbl table.PhysicalTable - jc *JobContext - eleTypeKey []byte - totalRowCnt int64 - done chan struct{} - ctx context.Context + d *ddl + db *model.DBInfo + index *model.IndexInfo + job *model.Job + bc ingest.BackendCtx + ptbl table.PhysicalTable + jc *JobContext + eleTypeKey []byte + totalRowCnt int64 + isPartition bool + stepForImport bool + done chan struct{} + ctx context.Context } // BackfillGlobalMeta is the global task meta for backfilling index. @@ -63,7 +66,9 @@ type BackfillGlobalMeta struct { // BackfillSubTaskMeta is the sub-task meta for backfilling index. type BackfillSubTaskMeta struct { - PhysicalTableID int64 `json:"physical_table_id"` + PhysicalTableID int64 `json:"physical_table_id"` + StartKey []byte `json:"start_key"` + EndKey []byte `json:"end_key"` } // BackfillMinimalTask is the minimal-task for backfilling index. @@ -75,7 +80,7 @@ func (b *BackfillMinimalTask) IsMinimalTask() { } // NewBackfillSchedulerHandle creates a new backfill scheduler. -func NewBackfillSchedulerHandle(taskMeta []byte, d *ddl) (scheduler.Scheduler, error) { +func NewBackfillSchedulerHandle(taskMeta []byte, d *ddl, stepForImport bool) (scheduler.Scheduler, error) { bh := &backfillSchedulerHandle{d: d} bgm := &BackfillGlobalMeta{} @@ -92,17 +97,12 @@ func NewBackfillSchedulerHandle(taskMeta []byte, d *ddl) (scheduler.Scheduler, e if err != nil { return nil, err } + bh.isPartition = tbl.Meta().GetPartitionInfo() != nil bh.db = db physicalTable := tbl.(table.PhysicalTable) bh.ptbl = physicalTable - d.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query) - d.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type) - jobCtx := d.jobContext(jobMeta.ID) - bh.jc = jobCtx - - // Build reader. indexInfo := model.FindIndexInfoByID(tbl.Meta().Indices, bgm.EleID) if indexInfo == nil { logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", @@ -111,6 +111,17 @@ func NewBackfillSchedulerHandle(taskMeta []byte, d *ddl) (scheduler.Scheduler, e } bh.index = indexInfo + if stepForImport { + bh.stepForImport = true + return bh, nil + } + + d.setDDLLabelForTopSQL(jobMeta.ID, jobMeta.Query) + d.setDDLSourceForDiagnosis(jobMeta.ID, jobMeta.Type) + jobCtx := d.jobContext(jobMeta.ID) + bh.jc = jobCtx + d.newReorgCtx(jobMeta.ID, 0) + return bh, nil } @@ -124,7 +135,7 @@ func (b *backfillSchedulerHandle) UpdateStatLoop() { } path := fmt.Sprintf("%s/%d/%s:%d", rowCountEtcdPath, b.job.ID, ser.IP, ser.Port) writeToEtcd := func() { - err := ddlutil.PutKVToEtcd(b.ctx, b.d.etcdCli, 3, path, strconv.Itoa(int(b.totalRowCnt))) + err := ddlutil.PutKVToEtcd(context.TODO(), b.d.etcdCli, 3, path, strconv.Itoa(int(b.totalRowCnt))) if err != nil { logutil.BgLogger().Warn("[ddl] update row count for distributed add index failed", zap.Error(err)) } @@ -145,12 +156,15 @@ func (b *backfillSchedulerHandle) InitSubtaskExecEnv(ctx context.Context) error logutil.BgLogger().Info("[ddl] lightning init subtask exec env") d := b.d - bc, err := ingest.LitBackCtxMgr.Register(ctx, b.index.Unique, b.job.ID) + bc, err := ingest.LitBackCtxMgr.Register(d.ctx, b.index.Unique, b.job.ID, d.etcdCli) if err != nil { logutil.BgLogger().Warn("[ddl] lightning register error", zap.Error(err)) return err } b.bc = bc + if b.stepForImport { + return b.doFlushAndHandleError(ingest.FlushModeForceGlobal) + } b.ctx = ctx ser, err := infosync.GetServerInfo() @@ -172,7 +186,18 @@ func (b *backfillSchedulerHandle) InitSubtaskExecEnv(ctx context.Context) error b.done = make(chan struct{}) go b.UpdateStatLoop() + return nil +} +func (b *backfillSchedulerHandle) doFlushAndHandleError(mode ingest.FlushMode) error { + _, _, err := b.bc.Flush(b.index.ID, mode) + if err != nil { + if common.ErrFoundDuplicateKeys.Equal(err) { + err = convertToKeyExistsErr(err, b.index, b.ptbl.Meta()) + } + logutil.BgLogger().Error("[ddl] flush error", zap.Error(err)) + return err + } return nil } @@ -180,6 +205,10 @@ func (b *backfillSchedulerHandle) InitSubtaskExecEnv(ctx context.Context) error func (b *backfillSchedulerHandle) SplitSubtask(ctx context.Context, subtask []byte) ([]proto.MinimalTask, error) { logutil.BgLogger().Info("[ddl] lightning split subtask") + if b.stepForImport { + return nil, nil + } + fnCtx, fnCancel := context.WithCancel(context.Background()) defer fnCancel() @@ -199,17 +228,26 @@ func (b *backfillSchedulerHandle) SplitSubtask(ctx context.Context, subtask []by return nil, err } - pid := sm.PhysicalTableID - parTbl := b.ptbl.(table.PartitionedTable) + var startKey, endKey kv.Key + var tbl table.PhysicalTable currentVer, err1 := getValidCurrentVersion(d.store) if err1 != nil { return nil, errors.Trace(err1) } - startKey, endKey, err := getTableRange(b.jc, d.ddlCtx, parTbl.GetPartition(pid), currentVer.Ver, b.job.Priority) - if err != nil { - logutil.BgLogger().Error("[ddl] get table range error", zap.Error(err)) - return nil, err + + if !b.isPartition { + startKey, endKey = sm.StartKey, sm.EndKey + tbl = b.ptbl + } else { + pid := sm.PhysicalTableID + parTbl := b.ptbl.(table.PartitionedTable) + startKey, endKey, err = getTableRange(b.jc, d.ddlCtx, parTbl.GetPartition(pid), currentVer.Ver, b.job.Priority) + if err != nil { + logutil.BgLogger().Error("[ddl] get table range error", zap.Error(err)) + return nil, err + } + tbl = parTbl.GetPartition(pid) } mockReorgInfo := &reorgInfo{Job: b.job, d: d.ddlCtx} @@ -218,7 +256,7 @@ func (b *backfillSchedulerHandle) SplitSubtask(ctx context.Context, subtask []by mockReorgInfo.elements = elements mockReorgInfo.currElement = mockReorgInfo.elements[0] - ingestScheduler := newIngestBackfillScheduler(ctx, mockReorgInfo, d.sessPool, parTbl.GetPartition(pid), true) + ingestScheduler := newIngestBackfillScheduler(ctx, mockReorgInfo, d.sessPool, tbl, true) defer ingestScheduler.close(true) consumer := newResultConsumer(d.ddlCtx, mockReorgInfo, nil, true) @@ -246,7 +284,7 @@ func (b *backfillSchedulerHandle) SplitSubtask(ctx context.Context, subtask []by zap.String("startKey", hex.EncodeToString(startKey)), zap.String("endKey", hex.EncodeToString(endKey))) - sendTasks(ingestScheduler, consumer, parTbl.GetPartition(pid), kvRanges, mockReorgInfo, taskIDAlloc) + sendTasks(ingestScheduler, consumer, tbl, kvRanges, mockReorgInfo, taskIDAlloc) if consumer.shouldAbort() { break } @@ -258,15 +296,14 @@ func (b *backfillSchedulerHandle) SplitSubtask(ctx context.Context, subtask []by } ingestScheduler.close(false) - _, _, err = b.bc.Flush(b.index.ID, ingest.FlushModeForceGlobal) - if err != nil { - if common.ErrFoundDuplicateKeys.Equal(err) { - err = convertToKeyExistsErr(err, b.index, b.ptbl.Meta()) - } - logutil.BgLogger().Error("[ddl] flush error", zap.Error(err)) + if err := consumer.getResult(); err != nil { return nil, err } - return nil, consumer.getResult() + + if b.isPartition { + return nil, b.doFlushAndHandleError(ingest.FlushModeForceGlobal) + } + return nil, b.doFlushAndHandleError(ingest.FlushModeForceLocalAndCheckDiskQuota) } // OnSubtaskFinished implements the Scheduler interface. @@ -284,18 +321,22 @@ func (*backfillSchedulerHandle) OnSubtaskFinished(_ context.Context, meta []byte func (b *backfillSchedulerHandle) CleanupSubtaskExecEnv(context.Context) error { logutil.BgLogger().Info("[ddl] lightning cleanup subtask exec env") - b.bc.Unregister(b.job.ID, b.index.ID) - close(b.done) + if b.isPartition || b.stepForImport { + ingest.LitBackCtxMgr.Unregister(b.job.ID) + } + + if !b.stepForImport { + close(b.done) + b.d.removeReorgCtx(b.job.ID) + } return nil } // Rollback implements the Scheduler interface. func (b *backfillSchedulerHandle) Rollback(context.Context) error { logutil.BgLogger().Info("[ddl] rollback backfill add index task", zap.Int64("jobID", b.job.ID)) - bc, ok := ingest.LitBackCtxMgr.Load(b.job.ID) - if ok { - bc.Unregister(b.job.ID, b.index.ID) - } + ingest.LitBackCtxMgr.Unregister(b.job.ID) + b.d.removeReorgCtx(b.job.ID) return nil } @@ -305,7 +346,7 @@ type BackFillSubtaskExecutor struct { } // Run implements the Executor interface. -func (b *BackFillSubtaskExecutor) Run(ctx context.Context) error { +func (b *BackFillSubtaskExecutor) Run(_ context.Context) error { return nil } diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index 159533fdd4f9d..8766ad50aa8a6 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -80,7 +80,7 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) { tk2.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3);") // Mock there is a running ingest job. - _, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535) + _, err := ingest.LitBackCtxMgr.Register(context.Background(), false, 65535, nil) require.NoError(t, err) wg := &sync.WaitGroup{} wg.Add(2)