From d18cf71867ab1cb769948eb0956cefdac393c956 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 12 Dec 2022 12:11:16 +0800 Subject: [PATCH] ddl: fix unexpect fail when create expression index --- ddl/backfilling.go | 12 +++++++++++- ddl/column.go | 4 ++-- ddl/index.go | 6 +++--- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index f20e28f08b1f3..2c0ca5bb7919d 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -29,6 +29,7 @@ import ( ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/terror" @@ -805,12 +806,21 @@ func (b *backfillScheduler) Close() { // // The above operations are completed in a transaction. // Finally, update the concurrent processing of the total number of rows, and store the completed handle value. -func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error { +func (dc *ddlCtx) writePhysicalTableRecord(sess *session, sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error { job := reorgInfo.Job totalAddedCount := job.GetRowCount() startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey sessCtx := newContext(reorgInfo.d.store) + m, err := sess.Txn(true) + if err != nil { + return err + } + dbInfo, err := meta.NewMeta(m).GetDatabase(job.SchemaID) + if err != nil { + return err + } + sessCtx.GetSessionVars().CurrentDB = dbInfo.Name.O decodeColMap, err := makeupDecodeColMap(sessCtx, t) if err != nil { return errors.Trace(err) diff --git a/ddl/column.go b/ddl/column.go index e9c353aacf2f5..37ad069900afc 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1047,7 +1047,7 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err if p == nil { return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID) } - err := w.writePhysicalTableRecord(w.sessPool, p, typeUpdateColumnWorker, reorgInfo) + err := w.writePhysicalTableRecord(w.sess, w.sessPool, p, typeUpdateColumnWorker, reorgInfo) if err != nil { return err } @@ -1059,7 +1059,7 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err return nil } if tbl, ok := t.(table.PhysicalTable); ok { - return w.writePhysicalTableRecord(w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sess, w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo) } return dbterror.ErrCancelledDDLJob.GenWithStack("internal error for phys tbl id: %d tbl id: %d", reorgInfo.PhysicalTableID, t.Meta().ID) } diff --git a/ddl/index.go b/ddl/index.go index 8ecc916e7b3f9..c252e213dc1da 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1592,10 +1592,10 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { if reorgInfo.mergingTmpIdx { logutil.BgLogger().Info("[ddl] start to merge temp index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(w.sessPool, t, typeAddIndexMergeTmpWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sess, w.sessPool, t, typeAddIndexMergeTmpWorker, reorgInfo) } logutil.BgLogger().Info("[ddl] start to add table index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(w.sessPool, t, typeAddIndexWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sess, w.sessPool, t, typeAddIndexWorker, reorgInfo) } // addTableIndex handles the add index reorganization state for a table. @@ -1804,7 +1804,7 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t // cleanupPhysicalTableIndex handles the drop partition reorganization state for a non-partitioned table or a partition. func (w *worker) cleanupPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to clean up index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(w.sessPool, t, typeCleanUpIndexWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sess, w.sessPool, t, typeCleanUpIndexWorker, reorgInfo) } // cleanupGlobalIndex handles the drop partition reorganization state to clean up index entries of partitions.