Skip to content

Commit

Permalink
ddl: release memory when adding index on partitions finish (pingcap#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Nov 21, 2022
1 parent cf36a9c commit 6db7386
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 16 deletions.
37 changes: 28 additions & 9 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,15 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
}()
jc := dc.jobContext(job)

var ingestBeCtx *ingest.BackendContext
if bfWorkerType == typeAddIndexWorker && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
if bc, ok := ingest.LitBackCtxMgr.Load(job.ID); ok {
ingestBeCtx = bc
} else {
return errors.New(ingest.LitErrGetBackendFail)
}
}

for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey)
if err != nil {
Expand Down Expand Up @@ -661,7 +670,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
case typeAddIndexWorker:
idxWorker, err := newAddIndexWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc, job)
if err != nil {
return errors.Trace(err)
return handleCreateBackfillWorkerErr(err, len(backfillWorkers), reorgInfo.ID)
}
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
Expand Down Expand Up @@ -716,14 +725,11 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
zap.Int("regionCnt", len(kvRanges)),
zap.String("startKey", hex.EncodeToString(startKey)),
zap.String("endKey", hex.EncodeToString(endKey)))
if bfWorkerType == typeAddIndexWorker && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
if bc, ok := ingest.LitBackCtxMgr.Load(job.ID); ok {
err := bc.Flush(reorgInfo.currElement.ID)
if err != nil {
return errors.Trace(err)
}
} else {
return errors.New(ingest.LitErrGetBackendFail)

if ingestBeCtx != nil {
err := ingestBeCtx.Flush(reorgInfo.currElement.ID)
if err != nil {
return errors.Trace(err)
}
}
remains, err := dc.handleRangeTasks(sessPool, t, backfillWorkers, reorgInfo, &totalAddedCount, kvRanges)
Expand All @@ -732,13 +738,26 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
}

if len(remains) == 0 {
if ingestBeCtx != nil {
ingestBeCtx.EngMgr.ResetWorkers(ingestBeCtx, job.ID, reorgInfo.currElement.ID)
}
break
}
startKey = remains[0].StartKey
}
return nil
}

func handleCreateBackfillWorkerErr(err error, workerCnt int, jobID int64) error {
if workerCnt == 0 {
return errors.Trace(err)
}
logutil.BgLogger().Warn("[ddl] create add index backfill worker failed",
zap.Int("current worker count", workerCnt),
zap.Int64("job ID", jobID), zap.Error(err))
return nil
}

// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error)

Expand Down
9 changes: 6 additions & 3 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ func tryFallbackToTxnMerge(job *model.Job, err error) {
logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", zap.Error(err))
job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge
job.SnapshotVer = 0
job.RowCount = 0
}
}

Expand All @@ -782,8 +783,10 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
}
switch indexInfo.BackfillState {
case model.BackfillStateRunning:
logutil.BgLogger().Info("[ddl] index backfill state running", zap.Int64("job ID", job.ID),
zap.String("table", tbl.Meta().Name.O), zap.String("index", indexInfo.Name.O))
logutil.BgLogger().Info("[ddl] index backfill state running",
zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O),
zap.Bool("ingest mode", bfProcess == model.ReorgTypeLitMerge),
zap.String("index", indexInfo.Name.O))
switch bfProcess {
case model.ReorgTypeLitMerge:
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
Expand Down Expand Up @@ -1204,7 +1207,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
}
ei, err := bc.EngMgr.Register(bc, job, reorgInfo.currElement.ID)
if err != nil {
return nil, errors.Trace(errors.New(ingest.LitErrCreateEngineFail))
return nil, errors.Trace(err)
}
lwCtx, err = ei.NewWriterCtx(id)
if err != nil {
Expand Down
18 changes: 17 additions & 1 deletion ddl/ingest/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int
cfg := generateLocalEngineConfig(job.ID, job.SchemaName, job.TableName)
openedEn, err := bc.backend.OpenEngine(bc.ctx, cfg, job.TableName, int32(indexID))
if err != nil {
return nil, errors.New(LitErrCreateEngineFail)
logutil.BgLogger().Warn(LitErrCreateEngineFail, zap.Int64("job ID", job.ID),
zap.Int64("index ID", indexID), zap.Error(err))
return nil, errors.Trace(err)
}
id := openedEn.GetEngineUUID()
en = NewEngineInfo(bc.ctx, job.ID, indexID, cfg, openedEn, id, 1, m.MemRoot, m.DiskRoot)
Expand Down Expand Up @@ -99,6 +101,20 @@ func (m *engineManager) Unregister(jobID, indexID int64) {
m.MemRoot.Release(StructSizeEngineInfo)
}

// ResetWorkers reset the writer count of the engineInfo because
// the goroutines of backfill workers have been terminated.
func (m *engineManager) ResetWorkers(bc *BackendContext, jobID, indexID int64) {
ei, exist := m.Load(indexID)
if !exist {
return
}
m.MemRoot.Release(StructSizeWriterCtx * int64(ei.writerCount))
m.MemRoot.ReleaseWithTag(encodeEngineTag(jobID, indexID))
engineCacheSize := int64(bc.cfg.TikvImporter.EngineMemCacheSize)
m.MemRoot.ConsumeWithTag(encodeEngineTag(jobID, indexID), engineCacheSize)
ei.writerCount = 0
}

// UnregisterAll delete all engineInfo from the engineManager.
func (m *engineManager) UnregisterAll(jobID int64) {
for _, idxID := range m.Keys() {
Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/mem_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (m *memRootImpl) ConsumeWithTag(tag string, size int64) {
m.structSize[tag] = size
}

// TestConsume implements MemRoot.
// CheckConsume implements MemRoot.
func (m *memRootImpl) CheckConsume(size int64) bool {
m.mu.RLock()
defer m.mu.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
LitErrStatDirFail string = "[ddl-ingest] stat lightning sort path error"
LitErrDeleteDirFail string = "[ddl-ingest] delete lightning sort path error"
LitErrCreateBackendFail string = "[ddl-ingest] build lightning backend failed, will use kernel index reorg method to backfill the index"
LitErrGetBackendFail string = "[ddl-ingest]: Can not get cached backend"
LitErrGetBackendFail string = "[ddl-ingest] can not get cached backend"
LitErrCreateEngineFail string = "[ddl-ingest] build lightning engine failed, will use kernel index reorg method to backfill the index"
LitErrCreateContextFail string = "[ddl-ingest] build lightning worker context failed, will use kernel index reorg method to backfill the index"
LitErrGetEngineFail string = "[ddl-ingest] can not get cached engine info"
Expand Down
2 changes: 1 addition & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo
return dbterror.ErrCancelledDDLJob
}
rowCount, _, _ := rc.getRowCountAndKey()
logutil.BgLogger().Info("[ddl] run reorg job done", zap.Int64("handled rows", rowCount))
logutil.BgLogger().Info("[ddl] run reorg job done", zap.Int64("handled rows", rowCount), zap.Error(err))
job.SetRowCount(rowCount)

// Update a job's warnings.
Expand Down
51 changes: 51 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,54 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) {
require.True(t, strings.Contains(rows[0][3].(string) /* job_type */, "ingest"))
require.Equal(t, rows[0][7].(string) /* row_count */, "3")
}

func TestAddIndexIngestWriterCountOnPartitionTable(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)

tk.MustExec("create table t (a int primary key) partition by hash(a) partitions 32;")
var sb strings.Builder
sb.WriteString("insert into t values ")
for i := 0; i < 100; i++ {
sb.WriteString(fmt.Sprintf("(%d)", i))
if i != 99 {
sb.WriteString(",")
}
}
tk.MustExec(sb.String())
tk.MustExec("alter table t add index idx(a);")
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
require.Len(t, rows, 1)
jobTp := rows[0][3].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}

func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 20;")
tk.MustExec("create table t (a int primary key);")
var sb strings.Builder
sb.WriteString("insert into t values ")
for i := 0; i < 20; i++ {
sb.WriteString(fmt.Sprintf("(%d000)", i))
if i != 19 {
sb.WriteString(",")
}
}
tk.MustExec(sb.String())
tk.MustQuery("split table t between (0) and (20000) regions 20;").Check(testkit.Rows("19 1"))
tk.MustExec("alter table t add index idx(a);")
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
require.Len(t, rows, 1)
jobTp := rows[0][3].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}

0 comments on commit 6db7386

Please sign in to comment.