From d18cf71867ab1cb769948eb0956cefdac393c956 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 12 Dec 2022 12:11:16 +0800 Subject: [PATCH 1/5] ddl: fix unexpect fail when create expression index --- ddl/backfilling.go | 12 +++++++++++- ddl/column.go | 4 ++-- ddl/index.go | 6 +++--- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index f20e28f08b1f3..2c0ca5bb7919d 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -29,6 +29,7 @@ import ( ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/terror" @@ -805,12 +806,21 @@ func (b *backfillScheduler) Close() { // // The above operations are completed in a transaction. // Finally, update the concurrent processing of the total number of rows, and store the completed handle value. -func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error { +func (dc *ddlCtx) writePhysicalTableRecord(sess *session, sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error { job := reorgInfo.Job totalAddedCount := job.GetRowCount() startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey sessCtx := newContext(reorgInfo.d.store) + m, err := sess.Txn(true) + if err != nil { + return err + } + dbInfo, err := meta.NewMeta(m).GetDatabase(job.SchemaID) + if err != nil { + return err + } + sessCtx.GetSessionVars().CurrentDB = dbInfo.Name.O decodeColMap, err := makeupDecodeColMap(sessCtx, t) if err != nil { return errors.Trace(err) diff --git a/ddl/column.go b/ddl/column.go index e9c353aacf2f5..37ad069900afc 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1047,7 +1047,7 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err if p == nil { return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID) } - err := w.writePhysicalTableRecord(w.sessPool, p, typeUpdateColumnWorker, reorgInfo) + err := w.writePhysicalTableRecord(w.sess, w.sessPool, p, typeUpdateColumnWorker, reorgInfo) if err != nil { return err } @@ -1059,7 +1059,7 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err return nil } if tbl, ok := t.(table.PhysicalTable); ok { - return w.writePhysicalTableRecord(w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sess, w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo) } return dbterror.ErrCancelledDDLJob.GenWithStack("internal error for phys tbl id: %d tbl id: %d", reorgInfo.PhysicalTableID, t.Meta().ID) } diff --git a/ddl/index.go b/ddl/index.go index 8ecc916e7b3f9..c252e213dc1da 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1592,10 +1592,10 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { if reorgInfo.mergingTmpIdx { logutil.BgLogger().Info("[ddl] start to merge temp index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(w.sessPool, t, typeAddIndexMergeTmpWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sess, w.sessPool, t, typeAddIndexMergeTmpWorker, reorgInfo) } logutil.BgLogger().Info("[ddl] start to add table index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(w.sessPool, t, typeAddIndexWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sess, w.sessPool, t, typeAddIndexWorker, reorgInfo) } // addTableIndex handles the add index reorganization state for a table. @@ -1804,7 +1804,7 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t // cleanupPhysicalTableIndex handles the drop partition reorganization state for a non-partitioned table or a partition. func (w *worker) cleanupPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to clean up index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(w.sessPool, t, typeCleanUpIndexWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sess, w.sessPool, t, typeCleanUpIndexWorker, reorgInfo) } // cleanupGlobalIndex handles the drop partition reorganization state to clean up index entries of partitions. From 3e7982193d25002dc6fce704138f4022a10b2eb5 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 12 Dec 2022 12:22:01 +0800 Subject: [PATCH 2/5] add test --- ddl/db_change_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index cf11036a9935e..4d77dc93577e1 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -1712,6 +1712,12 @@ func TestCreateExpressionIndex(t *testing.T) { require.NoError(t, checkErr) tk.MustExec("admin check table t") tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "0 11", "0 11", "1 7", "2 7", "5 7", "8 8", "10 10", "10 10")) + + // https://github.com/pingcap/tidb/issues/39784 + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(name varchar(20))") + tk.MustExec("create index idx on test.t((lower(test.t.name)))") } func TestCreateUniqueExpressionIndex(t *testing.T) { From f033e634d32807b68aab1e576002ac0397ab5747 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 12 Dec 2022 12:52:14 +0800 Subject: [PATCH 3/5] update test case --- ddl/db_change_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index 4d77dc93577e1..da49688ccc608 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -1717,7 +1717,9 @@ func TestCreateExpressionIndex(t *testing.T) { tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t(name varchar(20))") + tk.MustExec("insert into t values ('Abc'), ('Bcd'), ('abc')") tk.MustExec("create index idx on test.t((lower(test.t.name)))") + tk.MustExec("admin check table t") } func TestCreateUniqueExpressionIndex(t *testing.T) { From 425de57a06a9feb9cc6d99240ea9b148f9768ba3 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 12 Dec 2022 15:37:46 +0800 Subject: [PATCH 4/5] :Revert "ddl: fix unexpect fail when create expression index" This reverts commit d18cf71867ab1cb769948eb0956cefdac393c956. --- ddl/backfilling.go | 12 +----------- ddl/column.go | 4 ++-- ddl/index.go | 6 +++--- 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 2c0ca5bb7919d..f20e28f08b1f3 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -29,7 +29,6 @@ import ( ddlutil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/terror" @@ -806,21 +805,12 @@ func (b *backfillScheduler) Close() { // // The above operations are completed in a transaction. // Finally, update the concurrent processing of the total number of rows, and store the completed handle value. -func (dc *ddlCtx) writePhysicalTableRecord(sess *session, sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error { +func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error { job := reorgInfo.Job totalAddedCount := job.GetRowCount() startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey sessCtx := newContext(reorgInfo.d.store) - m, err := sess.Txn(true) - if err != nil { - return err - } - dbInfo, err := meta.NewMeta(m).GetDatabase(job.SchemaID) - if err != nil { - return err - } - sessCtx.GetSessionVars().CurrentDB = dbInfo.Name.O decodeColMap, err := makeupDecodeColMap(sessCtx, t) if err != nil { return errors.Trace(err) diff --git a/ddl/column.go b/ddl/column.go index 37ad069900afc..e9c353aacf2f5 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1047,7 +1047,7 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err if p == nil { return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID) } - err := w.writePhysicalTableRecord(w.sess, w.sessPool, p, typeUpdateColumnWorker, reorgInfo) + err := w.writePhysicalTableRecord(w.sessPool, p, typeUpdateColumnWorker, reorgInfo) if err != nil { return err } @@ -1059,7 +1059,7 @@ func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) err return nil } if tbl, ok := t.(table.PhysicalTable); ok { - return w.writePhysicalTableRecord(w.sess, w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo) } return dbterror.ErrCancelledDDLJob.GenWithStack("internal error for phys tbl id: %d tbl id: %d", reorgInfo.PhysicalTableID, t.Meta().ID) } diff --git a/ddl/index.go b/ddl/index.go index c252e213dc1da..8ecc916e7b3f9 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1592,10 +1592,10 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { if reorgInfo.mergingTmpIdx { logutil.BgLogger().Info("[ddl] start to merge temp index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(w.sess, w.sessPool, t, typeAddIndexMergeTmpWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sessPool, t, typeAddIndexMergeTmpWorker, reorgInfo) } logutil.BgLogger().Info("[ddl] start to add table index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(w.sess, w.sessPool, t, typeAddIndexWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sessPool, t, typeAddIndexWorker, reorgInfo) } // addTableIndex handles the add index reorganization state for a table. @@ -1804,7 +1804,7 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t // cleanupPhysicalTableIndex handles the drop partition reorganization state for a non-partitioned table or a partition. func (w *worker) cleanupPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to clean up index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(w.sess, w.sessPool, t, typeCleanUpIndexWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sessPool, t, typeCleanUpIndexWorker, reorgInfo) } // cleanupGlobalIndex handles the drop partition reorganization state to clean up index entries of partitions. From 587ef63595660071229ca1a15fc3f29f0dda5c36 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 12 Dec 2022 15:58:01 +0800 Subject: [PATCH 5/5] updated --- ddl/backfilling.go | 5 ++--- ddl/column.go | 6 +++++- ddl/index.go | 6 +++++- ddl/partition.go | 6 +++++- ddl/reorg.go | 7 +++++-- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index f20e28f08b1f3..1648e259fa4ea 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -577,8 +577,7 @@ func loadDDLReorgVars(ctx context.Context, sessPool *sessionPool) error { return ddlutil.LoadDDLReorgVars(ctx, sCtx) } -func makeupDecodeColMap(sessCtx sessionctx.Context, t table.Table) (map[int64]decoder.Column, error) { - dbName := model.NewCIStr(sessCtx.GetSessionVars().CurrentDB) +func makeupDecodeColMap(sessCtx sessionctx.Context, dbName model.CIStr, t table.Table) (map[int64]decoder.Column, error) { writableColInfos := make([]*model.ColumnInfo, 0, len(t.WritableCols())) for _, col := range t.WritableCols() { writableColInfos = append(writableColInfos, col.ColumnInfo) @@ -811,7 +810,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey sessCtx := newContext(reorgInfo.d.store) - decodeColMap, err := makeupDecodeColMap(sessCtx, t) + decodeColMap, err := makeupDecodeColMap(sessCtx, reorgInfo.dbInfo.Name, t) if err != nil { return errors.Trace(err) } diff --git a/ddl/column.go b/ddl/column.go index e9c353aacf2f5..d9425ceabac2c 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -807,7 +807,11 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) { job.ReorgMeta.ReorgTp = model.ReorgTypeTxn rh := newReorgHandler(t, w.sess, w.concurrentDDL) - reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, BuildElements(changingCol, changingIdxs), false) + dbInfo, err := t.GetDatabase(job.SchemaID) + if err != nil { + return false, ver, errors.Trace(err) + } + reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. diff --git a/ddl/index.go b/ddl/index.go index 8ecc916e7b3f9..0f70b73b61046 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -870,7 +870,11 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) { elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} rh := newReorgHandler(t, w.sess, w.concurrentDDL) - reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, elements, mergingTmpIdx) + dbInfo, err := t.GetDatabase(job.SchemaID) + if err != nil { + return false, ver, errors.Trace(err) + } + reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. diff --git a/ddl/partition.go b/ddl/partition.go index cf4bd7aed962f..0a1ea4e6fbe66 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1743,6 +1743,10 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( if err != nil { return ver, errors.Trace(err) } + dbInfo, err := t.GetDatabase(job.SchemaID) + if err != nil { + return ver, errors.Trace(err) + } // If table has global indexes, we need reorg to clean up them. if pt, ok := tbl.(table.PartitionedTable); ok && hasGlobalIndex(tblInfo) { // Build elements for compatible with modify column type. elements will not be used when reorganizing. @@ -1753,7 +1757,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( } } rh := newReorgHandler(t, w.sess, w.concurrentDDL) - reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, tbl, physicalTableIDs, elements) + reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, dbInfo, tbl, physicalTableIDs, elements) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version diff --git a/ddl/reorg.go b/ddl/reorg.go index a03cf417177dc..d7671031f64d1 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -386,6 +386,7 @@ type reorgInfo struct { // PhysicalTableID is used to trace the current partition we are handling. // If the table is not partitioned, PhysicalTableID would be TableID. PhysicalTableID int64 + dbInfo *model.DBInfo elements []*meta.Element currElement *meta.Element } @@ -585,7 +586,7 @@ func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) { return ver, nil } -func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, +func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo, tbl table.Table, elements []*meta.Element, mergingTmpIdx bool) (*reorgInfo, error) { var ( element *meta.Element @@ -685,11 +686,12 @@ func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, info.currElement = element info.elements = elements info.mergingTmpIdx = mergingTmpIdx + info.dbInfo = dbInfo return &info, nil } -func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) { +func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) { var ( element *meta.Element start kv.Key @@ -745,6 +747,7 @@ func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, jo info.PhysicalTableID = pid info.currElement = element info.elements = elements + info.dbInfo = dbInfo return &info, nil }