Skip to content

Commit

Permalink
ddl: add index using ingest 640 hotfix (#39274)
Browse files Browse the repository at this point in the history
ref #39190, ref #39239
  • Loading branch information
tangenta authored Nov 21, 2022
1 parent cf36a9c commit 5afdf35
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 23 deletions.
37 changes: 28 additions & 9 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,15 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
}()
jc := dc.jobContext(job)

var ingestBeCtx *ingest.BackendContext
if bfWorkerType == typeAddIndexWorker && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
if bc, ok := ingest.LitBackCtxMgr.Load(job.ID); ok {
ingestBeCtx = bc
} else {
return errors.New(ingest.LitErrGetBackendFail)
}
}

for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey)
if err != nil {
Expand Down Expand Up @@ -661,7 +670,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
case typeAddIndexWorker:
idxWorker, err := newAddIndexWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc, job)
if err != nil {
return errors.Trace(err)
return handleCreateBackfillWorkerErr(err, len(backfillWorkers), reorgInfo.ID)
}
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
Expand Down Expand Up @@ -716,14 +725,11 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
zap.Int("regionCnt", len(kvRanges)),
zap.String("startKey", hex.EncodeToString(startKey)),
zap.String("endKey", hex.EncodeToString(endKey)))
if bfWorkerType == typeAddIndexWorker && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
if bc, ok := ingest.LitBackCtxMgr.Load(job.ID); ok {
err := bc.Flush(reorgInfo.currElement.ID)
if err != nil {
return errors.Trace(err)
}
} else {
return errors.New(ingest.LitErrGetBackendFail)

if ingestBeCtx != nil {
err := ingestBeCtx.Flush(reorgInfo.currElement.ID)
if err != nil {
return errors.Trace(err)
}
}
remains, err := dc.handleRangeTasks(sessPool, t, backfillWorkers, reorgInfo, &totalAddedCount, kvRanges)
Expand All @@ -732,13 +738,26 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
}

if len(remains) == 0 {
if ingestBeCtx != nil {
ingestBeCtx.EngMgr.ResetWorkers(ingestBeCtx, job.ID, reorgInfo.currElement.ID)
}
break
}
startKey = remains[0].StartKey
}
return nil
}

func handleCreateBackfillWorkerErr(err error, workerCnt int, jobID int64) error {
if workerCnt == 0 {
return errors.Trace(err)
}
logutil.BgLogger().Warn("[ddl] create add index backfill worker failed",
zap.Int("current worker count", workerCnt),
zap.Int64("job ID", jobID), zap.Error(err))
return nil
}

// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h kv.Handle, rowKey kv.Key, rawRecord []byte) (more bool, err error)

Expand Down
10 changes: 7 additions & 3 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ func tryFallbackToTxnMerge(job *model.Job, err error) {
logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", zap.Error(err))
job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge
job.SnapshotVer = 0
job.RowCount = 0
}
}

Expand All @@ -782,8 +783,10 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
}
switch indexInfo.BackfillState {
case model.BackfillStateRunning:
logutil.BgLogger().Info("[ddl] index backfill state running", zap.Int64("job ID", job.ID),
zap.String("table", tbl.Meta().Name.O), zap.String("index", indexInfo.Name.O))
logutil.BgLogger().Info("[ddl] index backfill state running",
zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O),
zap.Bool("ingest mode", bfProcess == model.ReorgTypeLitMerge),
zap.String("index", indexInfo.Name.O))
switch bfProcess {
case model.ReorgTypeLitMerge:
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
Expand All @@ -793,6 +796,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
if !ok && job.SnapshotVer != 0 {
// The owner is crashed or changed, we need to restart the backfill.
job.SnapshotVer = 0
job.RowCount = 0
return false, ver, nil
}
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode)
Expand Down Expand Up @@ -1204,7 +1208,7 @@ func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
}
ei, err := bc.EngMgr.Register(bc, job, reorgInfo.currElement.ID)
if err != nil {
return nil, errors.Trace(errors.New(ingest.LitErrCreateEngineFail))
return nil, errors.Trace(err)
}
lwCtx, err = ei.NewWriterCtx(id)
if err != nil {
Expand Down
67 changes: 67 additions & 0 deletions ddl/index_merge_tmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,73 @@ func TestAddIndexMergeVersionIndexValue(t *testing.T) {
require.Equal(t, []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, iter.Value())
}

func TestAddIndexMergeIndexUntouchedValue(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec(`create table t (
id int not null auto_increment,
k int not null default '0',
c char(120) not null default '',
pad char(60) not null default '',
primary key (id) clustered,
key k_1(k));`)
tk.MustExec("insert into t values (1, 1, 'a', 'a')")
// Force onCreateIndex use the txn-merge process.
ingest.LitInitialized = false
tk.MustExec("set @@global.tidb_ddl_enable_fast_reorg = 1;")

var checkErrs []error
var runInsert bool
var runUpdate bool
originHook := dom.DDL().GetHook()
callback := &ddl.TestDDLCallback{
Do: dom,
}
onJobUpdatedExportedFunc := func(job *model.Job) {
if job.Type != model.ActionAddIndex || job.SchemaState != model.StateWriteReorganization {
return
}
idx := findIdxInfo(dom, "test", "t", "idx")
if idx == nil {
return
}
if !runInsert {
if idx.BackfillState != model.BackfillStateRunning || job.SnapshotVer == 0 {
return
}
runInsert = true
_, err := tk2.Exec("insert into t values (100, 1, 'a', 'a');")
checkErrs = append(checkErrs, err)
}
if !runUpdate {
if idx.BackfillState != model.BackfillStateReadyToMerge {
return
}
runUpdate = true
_, err := tk2.Exec("begin;")
checkErrs = append(checkErrs, err)
_, err = tk2.Exec("update t set k=k+1 where id = 100;")
checkErrs = append(checkErrs, err)
_, err = tk2.Exec("commit;")
checkErrs = append(checkErrs, err)
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc)
dom.DDL().SetHook(callback)
tk.MustExec("alter table t add index idx(c);")
dom.DDL().SetHook(originHook)
require.True(t, runUpdate)
for _, err := range checkErrs {
require.NoError(t, err)
}
tk.MustExec("admin check table t;")
tk.MustQuery("select * from t use index (idx);").Check(testkit.Rows("1 1 a a", "100 2 a a"))
tk.MustQuery("select * from t ignore index (idx);").Check(testkit.Rows("1 1 a a", "100 2 a a"))
}

func findIdxInfo(dom *domain.Domain, dbName, tbName, idxName string) *model.IndexInfo {
tbl, err := dom.InfoSchema().TableByName(model.NewCIStr(dbName), model.NewCIStr(tbName))
if err != nil {
Expand Down
18 changes: 17 additions & 1 deletion ddl/ingest/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ func (m *engineManager) Register(bc *BackendContext, job *model.Job, indexID int
cfg := generateLocalEngineConfig(job.ID, job.SchemaName, job.TableName)
openedEn, err := bc.backend.OpenEngine(bc.ctx, cfg, job.TableName, int32(indexID))
if err != nil {
return nil, errors.New(LitErrCreateEngineFail)
logutil.BgLogger().Warn(LitErrCreateEngineFail, zap.Int64("job ID", job.ID),
zap.Int64("index ID", indexID), zap.Error(err))
return nil, errors.Trace(err)
}
id := openedEn.GetEngineUUID()
en = NewEngineInfo(bc.ctx, job.ID, indexID, cfg, openedEn, id, 1, m.MemRoot, m.DiskRoot)
Expand Down Expand Up @@ -99,6 +101,20 @@ func (m *engineManager) Unregister(jobID, indexID int64) {
m.MemRoot.Release(StructSizeEngineInfo)
}

// ResetWorkers reset the writer count of the engineInfo because
// the goroutines of backfill workers have been terminated.
func (m *engineManager) ResetWorkers(bc *BackendContext, jobID, indexID int64) {
ei, exist := m.Load(indexID)
if !exist {
return
}
m.MemRoot.Release(StructSizeWriterCtx * int64(ei.writerCount))
m.MemRoot.ReleaseWithTag(encodeEngineTag(jobID, indexID))
engineCacheSize := int64(bc.cfg.TikvImporter.EngineMemCacheSize)
m.MemRoot.ConsumeWithTag(encodeEngineTag(jobID, indexID), engineCacheSize)
ei.writerCount = 0
}

// UnregisterAll delete all engineInfo from the engineManager.
func (m *engineManager) UnregisterAll(jobID int64) {
for _, idxID := range m.Keys() {
Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/mem_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (m *memRootImpl) ConsumeWithTag(tag string, size int64) {
m.structSize[tag] = size
}

// TestConsume implements MemRoot.
// CheckConsume implements MemRoot.
func (m *memRootImpl) CheckConsume(size int64) bool {
m.mu.RLock()
defer m.mu.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion ddl/ingest/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
LitErrStatDirFail string = "[ddl-ingest] stat lightning sort path error"
LitErrDeleteDirFail string = "[ddl-ingest] delete lightning sort path error"
LitErrCreateBackendFail string = "[ddl-ingest] build lightning backend failed, will use kernel index reorg method to backfill the index"
LitErrGetBackendFail string = "[ddl-ingest]: Can not get cached backend"
LitErrGetBackendFail string = "[ddl-ingest] can not get cached backend"
LitErrCreateEngineFail string = "[ddl-ingest] build lightning engine failed, will use kernel index reorg method to backfill the index"
LitErrCreateContextFail string = "[ddl-ingest] build lightning worker context failed, will use kernel index reorg method to backfill the index"
LitErrGetEngineFail string = "[ddl-ingest] can not get cached engine info"
Expand Down
2 changes: 1 addition & 1 deletion ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo
return dbterror.ErrCancelledDDLJob
}
rowCount, _, _ := rc.getRowCountAndKey()
logutil.BgLogger().Info("[ddl] run reorg job done", zap.Int64("handled rows", rowCount))
logutil.BgLogger().Info("[ddl] run reorg job done", zap.Int64("handled rows", rowCount), zap.Error(err))
job.SetRowCount(rowCount)

// Update a job's warnings.
Expand Down
16 changes: 9 additions & 7 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,15 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
}

var (
tempKey []byte
keyVer byte
keyIsRewritten bool
tempKey []byte
keyVer byte
keyIsTempIdxKey bool
)
if !opt.FromBackFill {
key, tempKey, keyVer = genTempIdxKeyByState(c.idxInfo, key)
if keyVer == TempIndexKeyTypeBackfill {
key, tempKey = tempKey, nil
keyIsRewritten = true
keyIsTempIdxKey = true
}
}

Expand Down Expand Up @@ -176,15 +176,17 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
opt.IgnoreAssertion = opt.IgnoreAssertion || c.idxInfo.State != model.StatePublic

if !distinct || skipCheck || opt.Untouched {
if keyIsRewritten {
if keyIsTempIdxKey && !opt.Untouched { // Untouched key-values never occur in the storage.
idxVal = append(idxVal, keyVer)
}
err = txn.GetMemBuffer().Set(key, idxVal)
if err != nil {
return nil, err
}
if len(tempKey) > 0 {
idxVal = append(idxVal, keyVer)
if !opt.Untouched { // Untouched key-values never occur in the storage.
idxVal = append(idxVal, keyVer)
}
err = txn.GetMemBuffer().Set(tempKey, idxVal)
if err != nil {
return nil, err
Expand Down Expand Up @@ -224,7 +226,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
}
if err != nil || len(value) == 0 {
lazyCheck := sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil
if keyIsRewritten {
if keyIsTempIdxKey {
idxVal = append(idxVal, keyVer)
}
if lazyCheck {
Expand Down
51 changes: 51 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,54 @@ func TestAddIndexIngestLimitOneBackend(t *testing.T) {
require.True(t, strings.Contains(rows[0][3].(string) /* job_type */, "ingest"))
require.Equal(t, rows[0][7].(string) /* row_count */, "3")
}

func TestAddIndexIngestWriterCountOnPartitionTable(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)

tk.MustExec("create table t (a int primary key) partition by hash(a) partitions 32;")
var sb strings.Builder
sb.WriteString("insert into t values ")
for i := 0; i < 100; i++ {
sb.WriteString(fmt.Sprintf("(%d)", i))
if i != 99 {
sb.WriteString(",")
}
}
tk.MustExec(sb.String())
tk.MustExec("alter table t add index idx(a);")
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
require.Len(t, rows, 1)
jobTp := rows[0][3].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}

func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 20;")
tk.MustExec("create table t (a int primary key);")
var sb strings.Builder
sb.WriteString("insert into t values ")
for i := 0; i < 20; i++ {
sb.WriteString(fmt.Sprintf("(%d000)", i))
if i != 19 {
sb.WriteString(",")
}
}
tk.MustExec(sb.String())
tk.MustQuery("split table t between (0) and (20000) regions 20;").Check(testkit.Rows("19 1"))
tk.MustExec("alter table t add index idx(a);")
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
require.Len(t, rows, 1)
jobTp := rows[0][3].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}

0 comments on commit 5afdf35

Please sign in to comment.