Skip to content

Commit

Permalink
remove workerDegree (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Aug 21, 2022
1 parent f5e4081 commit bab8c46
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 105 deletions.
2 changes: 1 addition & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ func spawnAddIndexWorker(sessCtx sessionctx.Context, seq int, job *model.Job, t
decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *backfillWorker {
// Firstly, check and try lightning path.
if bc, ok := lightning.BackCtxMgr.Load(job.ID); ok && bc.NeedRestore() {
_, err := bc.EngMgr.Register(bc, job, reorgInfo.currElement.ID, 1)
err := bc.EngMgr.Register(bc, job, reorgInfo.currElement.ID)
if err != nil {
if seq == 0 { // The first worker.
lightning.BackCtxMgr.Unregister(job.ID) // fallback to the general worker.
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,8 +722,8 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo
func goFastDDLBackfill(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo, reorgInfo *reorgInfo,
elements []*meta.Element, rh *reorgHandler) (reorg bool, ver int64, err error) {
// When sub state is StatePublic means not go fast ddl path.
if indexInfo.BackfillState == model.BackfillStateInapplicable {
// Use the original txn backfill process.
return false, 0, nil
}
switch indexInfo.BackfillState {
Expand Down
9 changes: 0 additions & 9 deletions ddl/index_lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,6 @@ func isPiTREnable(w *worker) bool {
return lit.CheckPiTR(ctx)
}

func prepareLightningEngine(job *model.Job, indexID int64, workerCnt int) (wCnt int, err error) {
bc, _ := lit.BackCtxMgr.Load(job.ID)
wCnt, err = bc.EngMgr.Register(bc, job, indexID, workerCnt)
if err != nil {
lit.BackCtxMgr.Unregister(job.ID)
}
return wCnt, err
}

// importIndexDataToStore import local index sst file into TiKV.
func importIndexDataToStore(reorg *reorgInfo, indexID int64, unique bool, tbl table.Table) error {
if bc, ok := lit.BackCtxMgr.Load(reorg.Job.ID); ok && bc.NeedRestore() {
Expand Down
72 changes: 39 additions & 33 deletions ddl/lightning/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,57 +34,63 @@ func (m *engineManager) init(memRoot MemRoot) {
}

// Register create a new engineInfo and register it to the engineManager.
func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int64, wCnt int) (int, error) {
var err error
// Calculate lightning concurrency degree and set memory usage.
// and pre-allocate memory usage for worker
func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int64) error {
// Calculate lightning concurrency degree and set memory usage
// and pre-allocate memory usage for worker.
engineKey := GenEngineInfoKey(job.ID, indexID)
newWorkerCount := m.MemRoot.WorkerDegree(wCnt, engineKey, job.ID)

m.MemRoot.RefreshConsumption()
err := m.MemRoot.TryConsume(int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize))
if err != nil {
return logAllocMemFailed(bc.key, engineKey, m.MemRoot, err)
}

en, exist1 := m.Load(engineKey)
if !exist1 {
// When return workerCount is 0, means there is no memory available for lightning worker.
if newWorkerCount == int(allocFailed) {
logutil.BgLogger().Warn(LitErrAllocMemFail, zap.String("Backend key", bc.key),
zap.String("Engine key", engineKey),
zap.String("Expected worker count:", strconv.Itoa(wCnt)),
zap.String("Current alloc worker count:", strconv.Itoa(newWorkerCount)))
return 0, errors.New(LitErrCreateEngineFail)
}
// Firstly, update and check the current memory usage.
m.MemRoot.RefreshConsumption()
err = m.MemRoot.TryConsume(StructSizeEngineInfo)
engineCacheSize := int64(bc.cfg.TikvImporter.EngineMemCacheSize)
err := m.MemRoot.TryConsume(StructSizeEngineInfo + engineCacheSize)
if err != nil {
logutil.BgLogger().Warn(LitErrAllocMemFail, zap.String("Backend key", bc.key),
zap.String("Engine key", engineKey),
zap.String("Current Memory Usage:", strconv.FormatInt(m.MemRoot.CurrentUsage(), 10)),
zap.String("Memory limitation:", strconv.FormatInt(m.MemRoot.MaxMemoryQuota(), 10)))
return 0, err
return logAllocMemFailed(bc.key, engineKey, m.MemRoot, err)
}

// Create one slice for one backend on one stmt, current we share one engine
// Open one engine under an existing backend.
cfg := generateLocalEngineConfig(job.ID, job.SchemaName, job.TableName)
en, err := bc.backend.OpenEngine(bc.ctx, cfg, job.TableName, int32(indexID))
openedEn, err := bc.backend.OpenEngine(bc.ctx, cfg, job.TableName, int32(indexID))
if err != nil {
return 0, errors.New(LitErrCreateEngineFail)
return errors.New(LitErrCreateEngineFail)
}
id := en.GetEngineUUID()
ei := NewEngineInfo(indexID, engineKey, cfg, bc, en, job.TableName, id, wCnt, m.MemRoot)
m.Store(engineKey, ei)
id := openedEn.GetEngineUUID()
en = NewEngineInfo(indexID, engineKey, cfg, bc, openedEn, job.TableName, id, 1, m.MemRoot)
m.Store(engineKey, en)
if err != nil {
return 0, errors.New(LitErrCreateEngineFail)
return errors.New(LitErrCreateEngineFail)
}
m.MemRoot.Consume(StructSizeEngineInfo)
m.MemRoot.Consume(StructSizeEngineInfo + engineCacheSize)
} else {
// If engine exist, then add newWorkerCount.
en.writerCount += newWorkerCount
if en.writerCount+1 > bc.cfg.TikvImporter.RangeConcurrency {
logutil.BgLogger().Warn(LitErrExceedConcurrency, zap.String("Backend key", bc.key),
zap.String("Engine key", engineKey),
zap.Int("Concurrency", bc.cfg.TikvImporter.RangeConcurrency))
return errors.New(LitErrExceedConcurrency)
}
en.writerCount += 1
}
m.MemRoot.ConsumeWithTag(engineKey, int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize))
logutil.BgLogger().Info(LitInfoOpenEngine, zap.String("backend key", bc.key),
zap.String("Engine key", engineKey),
zap.String("Current Memory Usage:", strconv.FormatInt(m.MemRoot.CurrentUsage(), 10)),
zap.String("Memory limitation:", strconv.FormatInt(m.MemRoot.MaxMemoryQuota(), 10)),
zap.String("Expected Worker Count", strconv.Itoa(wCnt)),
zap.String("Allocated worker count", strconv.Itoa(newWorkerCount)))
return newWorkerCount, nil
zap.String("Current Writer Count", strconv.Itoa(en.writerCount)))
return nil
}

func logAllocMemFailed(bcKey, engineKey string, memRoot MemRoot, err error) error {
logutil.BgLogger().Warn(LitErrAllocMemFail, zap.String("Backend key", bcKey),
zap.String("Engine key", engineKey),
zap.Int64("Current Memory Usage:", memRoot.CurrentUsage()),
zap.Int64("Memory limitation:", memRoot.MaxMemoryQuota()))
return err
}

// Unregister delete the engineInfo from the engineManager.
Expand Down
55 changes: 0 additions & 55 deletions ddl/lightning/mem_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ type MemRoot interface {
CurrentUsage() int64
CurrentUsageWithTag(tag string) int64
RefreshConsumption()
// WorkerDegree adjust worker count according the available memory.
// return 0 means there is no enough memory for one lightning worker.
// TODO: split this function into two functions:
// 1. Calculate the worker degree.
// 2. Update the MemRoot.
WorkerDegree(workerCnt int, engineKey string, jobID int64) int
}

const (
Expand Down Expand Up @@ -161,52 +155,3 @@ func (m *memRootImpl) ReleaseWithTag(tag string) {
func (m *memRootImpl) RefreshConsumption() {
m.backendCtxMgr.UpdateMemoryUsage()
}

// WorkerDegree implements MemRoot.
func (m *memRootImpl) WorkerDegree(workerCnt int, engineKey string, jobID int64) int {
var enSize int64
var currWorkerNum int
m.mu.Lock()
defer m.mu.Unlock()
bc, exist := m.backendCtxMgr.Load(jobID)
if !exist {
return 0
}

_, exist = m.structSize[engineKey]
if !exist {
enSize = int64(bc.cfg.TikvImporter.EngineMemCacheSize)
} else {
en, exist1 := bc.EngMgr.Load(engineKey)
if !exist1 {
return 0
}
currWorkerNum = en.writerCount
}
if currWorkerNum+workerCnt > bc.cfg.TikvImporter.RangeConcurrency {
workerCnt = bc.cfg.TikvImporter.RangeConcurrency - currWorkerNum
if workerCnt == 0 {
return workerCnt
}
}

size := int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize)

// If only one worker's memory init requirement still bigger than mem limitation.
if enSize+size+m.currUsage > m.maxLimit {
return int(allocFailed)
}

for enSize+size*int64(workerCnt)+m.currUsage > m.maxLimit && workerCnt > 1 {
workerCnt /= 2
}

m.currUsage += size * int64(workerCnt)

if !exist {
m.structSize[engineKey] = size * int64(workerCnt)
} else {
m.structSize[engineKey] += size * int64(workerCnt)
}
return workerCnt
}
1 change: 1 addition & 0 deletions ddl/lightning/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
LitErrRemoteDupExistErr string = "lightning: Remote duplicate index key exist"
LitErrDiskQuotaLess string = "lightning: Specified disk quota is less than 100 GB disable the lightning"
LitErrIncompatiblePiTR string = "lightning: The storage config log-backup.enable should be false when the lightning backfill is enabled"
LitErrExceedConcurrency string = "lightning: The concurrency is greater than lightning limit(tikv-importer.range-concurrency)"
LitWarnEnvInitFail string = "lightning: Initialize environment failed"
LitWarnBackendNOTExist string = "lightning: Backend not exist"
LitWarnConfigError string = "lightning: Build config for backend failed"
Expand Down
12 changes: 6 additions & 6 deletions parser/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,22 @@ func (s SchemaState) String() string {
}
}

// BackfillState is the state used by the lightning backfill process.
// BackfillState is the state used by the backfill-merge process.
type BackfillState byte

const (
// BackfillStateInapplicable means the lightning backfill process is not used.
// BackfillStateInapplicable means the backfill-merge process is not used.
BackfillStateInapplicable BackfillState = iota
// BackfillStateRunning is the state that the lightning backfill process is running.
// BackfillStateRunning is the state that the backfill process is running.
// In this state, the index's write and delete operations are redirected to a temporary index.
BackfillStateRunning
// BackfillStateReadyToMerge is the state that the temporary index's records are ready to be merged back
// to the origin index.
// In this state, the index's write and delete operations are **copied** to a temporary index.
// It makes sure that all the TiDB instances are aware of the copy during the merge(BackfillStateMerging).
// In this state, the index's write and delete operations are copied to a temporary index.
// This state is used to make sure that all the TiDB instances are aware of the copy during the merge(BackfillStateMerging).
BackfillStateReadyToMerge
// BackfillStateMerging is the state that the temp index is merging back to the origin index.
// In this state, the index's write and delete operations are **copied** to a temporary index.
// In this state, the index's write and delete operations are copied to a temporary index.
BackfillStateMerging
)

Expand Down

0 comments on commit bab8c46

Please sign in to comment.