diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 62c568a4f3eaf..eb018836f8f59 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -737,7 +737,7 @@ func spawnAddIndexWorker(sessCtx sessionctx.Context, seq int, job *model.Job, t decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *backfillWorker { // Firstly, check and try lightning path. if bc, ok := lightning.BackCtxMgr.Load(job.ID); ok && bc.NeedRestore() { - _, err := bc.EngMgr.Register(bc, job, reorgInfo.currElement.ID, 1) + err := bc.EngMgr.Register(bc, job, reorgInfo.currElement.ID) if err != nil { if seq == 0 { // The first worker. lightning.BackCtxMgr.Unregister(job.ID) // fallback to the general worker. diff --git a/ddl/index.go b/ddl/index.go index c7ce8e3ed9038..9722f8be829cb 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -722,8 +722,8 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo func goFastDDLBackfill(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo, reorgInfo *reorgInfo, elements []*meta.Element, rh *reorgHandler) (reorg bool, ver int64, err error) { - // When sub state is StatePublic means not go fast ddl path. if indexInfo.BackfillState == model.BackfillStateInapplicable { + // Use the original txn backfill process. return false, 0, nil } switch indexInfo.BackfillState { diff --git a/ddl/index_lightning.go b/ddl/index_lightning.go index b3431b7248367..5a947e0523f95 100644 --- a/ddl/index_lightning.go +++ b/ddl/index_lightning.go @@ -74,15 +74,6 @@ func isPiTREnable(w *worker) bool { return lit.CheckPiTR(ctx) } -func prepareLightningEngine(job *model.Job, indexID int64, workerCnt int) (wCnt int, err error) { - bc, _ := lit.BackCtxMgr.Load(job.ID) - wCnt, err = bc.EngMgr.Register(bc, job, indexID, workerCnt) - if err != nil { - lit.BackCtxMgr.Unregister(job.ID) - } - return wCnt, err -} - // importIndexDataToStore import local index sst file into TiKV. func importIndexDataToStore(reorg *reorgInfo, indexID int64, unique bool, tbl table.Table) error { if bc, ok := lit.BackCtxMgr.Load(reorg.Job.ID); ok && bc.NeedRestore() { diff --git a/ddl/lightning/engine_mgr.go b/ddl/lightning/engine_mgr.go index a62c0b8402248..74c556a035c79 100644 --- a/ddl/lightning/engine_mgr.go +++ b/ddl/lightning/engine_mgr.go @@ -34,57 +34,63 @@ func (m *engineManager) init(memRoot MemRoot) { } // Register create a new engineInfo and register it to the engineManager. -func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int64, wCnt int) (int, error) { - var err error - // Calculate lightning concurrency degree and set memory usage. - // and pre-allocate memory usage for worker +func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int64) error { + // Calculate lightning concurrency degree and set memory usage + // and pre-allocate memory usage for worker. engineKey := GenEngineInfoKey(job.ID, indexID) - newWorkerCount := m.MemRoot.WorkerDegree(wCnt, engineKey, job.ID) + + m.MemRoot.RefreshConsumption() + err := m.MemRoot.TryConsume(int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize)) + if err != nil { + return logAllocMemFailed(bc.key, engineKey, m.MemRoot, err) + } + en, exist1 := m.Load(engineKey) if !exist1 { - // When return workerCount is 0, means there is no memory available for lightning worker. - if newWorkerCount == int(allocFailed) { - logutil.BgLogger().Warn(LitErrAllocMemFail, zap.String("Backend key", bc.key), - zap.String("Engine key", engineKey), - zap.String("Expected worker count:", strconv.Itoa(wCnt)), - zap.String("Current alloc worker count:", strconv.Itoa(newWorkerCount))) - return 0, errors.New(LitErrCreateEngineFail) - } - // Firstly, update and check the current memory usage. - m.MemRoot.RefreshConsumption() - err = m.MemRoot.TryConsume(StructSizeEngineInfo) + engineCacheSize := int64(bc.cfg.TikvImporter.EngineMemCacheSize) + err := m.MemRoot.TryConsume(StructSizeEngineInfo + engineCacheSize) if err != nil { - logutil.BgLogger().Warn(LitErrAllocMemFail, zap.String("Backend key", bc.key), - zap.String("Engine key", engineKey), - zap.String("Current Memory Usage:", strconv.FormatInt(m.MemRoot.CurrentUsage(), 10)), - zap.String("Memory limitation:", strconv.FormatInt(m.MemRoot.MaxMemoryQuota(), 10))) - return 0, err + return logAllocMemFailed(bc.key, engineKey, m.MemRoot, err) } + // Create one slice for one backend on one stmt, current we share one engine // Open one engine under an existing backend. cfg := generateLocalEngineConfig(job.ID, job.SchemaName, job.TableName) - en, err := bc.backend.OpenEngine(bc.ctx, cfg, job.TableName, int32(indexID)) + openedEn, err := bc.backend.OpenEngine(bc.ctx, cfg, job.TableName, int32(indexID)) if err != nil { - return 0, errors.New(LitErrCreateEngineFail) + return errors.New(LitErrCreateEngineFail) } - id := en.GetEngineUUID() - ei := NewEngineInfo(indexID, engineKey, cfg, bc, en, job.TableName, id, wCnt, m.MemRoot) - m.Store(engineKey, ei) + id := openedEn.GetEngineUUID() + en = NewEngineInfo(indexID, engineKey, cfg, bc, openedEn, job.TableName, id, 1, m.MemRoot) + m.Store(engineKey, en) if err != nil { - return 0, errors.New(LitErrCreateEngineFail) + return errors.New(LitErrCreateEngineFail) } - m.MemRoot.Consume(StructSizeEngineInfo) + m.MemRoot.Consume(StructSizeEngineInfo + engineCacheSize) } else { - // If engine exist, then add newWorkerCount. - en.writerCount += newWorkerCount + if en.writerCount+1 > bc.cfg.TikvImporter.RangeConcurrency { + logutil.BgLogger().Warn(LitErrExceedConcurrency, zap.String("Backend key", bc.key), + zap.String("Engine key", engineKey), + zap.Int("Concurrency", bc.cfg.TikvImporter.RangeConcurrency)) + return errors.New(LitErrExceedConcurrency) + } + en.writerCount += 1 } + m.MemRoot.ConsumeWithTag(engineKey, int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize)) logutil.BgLogger().Info(LitInfoOpenEngine, zap.String("backend key", bc.key), zap.String("Engine key", engineKey), zap.String("Current Memory Usage:", strconv.FormatInt(m.MemRoot.CurrentUsage(), 10)), zap.String("Memory limitation:", strconv.FormatInt(m.MemRoot.MaxMemoryQuota(), 10)), - zap.String("Expected Worker Count", strconv.Itoa(wCnt)), - zap.String("Allocated worker count", strconv.Itoa(newWorkerCount))) - return newWorkerCount, nil + zap.String("Current Writer Count", strconv.Itoa(en.writerCount))) + return nil +} + +func logAllocMemFailed(bcKey, engineKey string, memRoot MemRoot, err error) error { + logutil.BgLogger().Warn(LitErrAllocMemFail, zap.String("Backend key", bcKey), + zap.String("Engine key", engineKey), + zap.Int64("Current Memory Usage:", memRoot.CurrentUsage()), + zap.Int64("Memory limitation:", memRoot.MaxMemoryQuota())) + return err } // Unregister delete the engineInfo from the engineManager. diff --git a/ddl/lightning/mem_root.go b/ddl/lightning/mem_root.go index 3fbe17a3cf5cb..855b22ec80687 100644 --- a/ddl/lightning/mem_root.go +++ b/ddl/lightning/mem_root.go @@ -35,12 +35,6 @@ type MemRoot interface { CurrentUsage() int64 CurrentUsageWithTag(tag string) int64 RefreshConsumption() - // WorkerDegree adjust worker count according the available memory. - // return 0 means there is no enough memory for one lightning worker. - // TODO: split this function into two functions: - // 1. Calculate the worker degree. - // 2. Update the MemRoot. - WorkerDegree(workerCnt int, engineKey string, jobID int64) int } const ( @@ -161,52 +155,3 @@ func (m *memRootImpl) ReleaseWithTag(tag string) { func (m *memRootImpl) RefreshConsumption() { m.backendCtxMgr.UpdateMemoryUsage() } - -// WorkerDegree implements MemRoot. -func (m *memRootImpl) WorkerDegree(workerCnt int, engineKey string, jobID int64) int { - var enSize int64 - var currWorkerNum int - m.mu.Lock() - defer m.mu.Unlock() - bc, exist := m.backendCtxMgr.Load(jobID) - if !exist { - return 0 - } - - _, exist = m.structSize[engineKey] - if !exist { - enSize = int64(bc.cfg.TikvImporter.EngineMemCacheSize) - } else { - en, exist1 := bc.EngMgr.Load(engineKey) - if !exist1 { - return 0 - } - currWorkerNum = en.writerCount - } - if currWorkerNum+workerCnt > bc.cfg.TikvImporter.RangeConcurrency { - workerCnt = bc.cfg.TikvImporter.RangeConcurrency - currWorkerNum - if workerCnt == 0 { - return workerCnt - } - } - - size := int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize) - - // If only one worker's memory init requirement still bigger than mem limitation. - if enSize+size+m.currUsage > m.maxLimit { - return int(allocFailed) - } - - for enSize+size*int64(workerCnt)+m.currUsage > m.maxLimit && workerCnt > 1 { - workerCnt /= 2 - } - - m.currUsage += size * int64(workerCnt) - - if !exist { - m.structSize[engineKey] = size * int64(workerCnt) - } else { - m.structSize[engineKey] += size * int64(workerCnt) - } - return workerCnt -} diff --git a/ddl/lightning/message.go b/ddl/lightning/message.go index e71c9076f5958..2fc3074d7ff32 100644 --- a/ddl/lightning/message.go +++ b/ddl/lightning/message.go @@ -37,6 +37,7 @@ const ( LitErrRemoteDupExistErr string = "lightning: Remote duplicate index key exist" LitErrDiskQuotaLess string = "lightning: Specified disk quota is less than 100 GB disable the lightning" LitErrIncompatiblePiTR string = "lightning: The storage config log-backup.enable should be false when the lightning backfill is enabled" + LitErrExceedConcurrency string = "lightning: The concurrency is greater than lightning limit(tikv-importer.range-concurrency)" LitWarnEnvInitFail string = "lightning: Initialize environment failed" LitWarnBackendNOTExist string = "lightning: Backend not exist" LitWarnConfigError string = "lightning: Build config for backend failed" diff --git a/parser/model/model.go b/parser/model/model.go index 15ffa1d825c7e..c526b2b265d60 100644 --- a/parser/model/model.go +++ b/parser/model/model.go @@ -75,22 +75,22 @@ func (s SchemaState) String() string { } } -// BackfillState is the state used by the lightning backfill process. +// BackfillState is the state used by the backfill-merge process. type BackfillState byte const ( - // BackfillStateInapplicable means the lightning backfill process is not used. + // BackfillStateInapplicable means the backfill-merge process is not used. BackfillStateInapplicable BackfillState = iota - // BackfillStateRunning is the state that the lightning backfill process is running. + // BackfillStateRunning is the state that the backfill process is running. // In this state, the index's write and delete operations are redirected to a temporary index. BackfillStateRunning // BackfillStateReadyToMerge is the state that the temporary index's records are ready to be merged back // to the origin index. - // In this state, the index's write and delete operations are **copied** to a temporary index. - // It makes sure that all the TiDB instances are aware of the copy during the merge(BackfillStateMerging). + // In this state, the index's write and delete operations are copied to a temporary index. + // This state is used to make sure that all the TiDB instances are aware of the copy during the merge(BackfillStateMerging). BackfillStateReadyToMerge // BackfillStateMerging is the state that the temp index is merging back to the origin index. - // In this state, the index's write and delete operations are **copied** to a temporary index. + // In this state, the index's write and delete operations are copied to a temporary index. BackfillStateMerging )