diff --git a/pkg/ddl/constant.go b/pkg/ddl/constant.go index 212c84126814b..165172ebfe1a3 100644 --- a/pkg/ddl/constant.go +++ b/pkg/ddl/constant.go @@ -25,6 +25,8 @@ const ( ReorgTable = "tidb_ddl_reorg" // HistoryTable stores the history DDL jobs. HistoryTable = "tidb_ddl_history" + // MDLInfoTable stores lock info used by metadata lock. + MDLInfoTable = "tidb_mdl_info" // JobTableID is the table ID of `tidb_ddl_job`. JobTableID = meta.MaxInt48 - 1 diff --git a/pkg/ddl/ddl_worker.go b/pkg/ddl/ddl_worker.go index 3e2c5a96cfd8f..ab755b3366561 100644 --- a/pkg/ddl/ddl_worker.go +++ b/pkg/ddl/ddl_worker.go @@ -20,6 +20,7 @@ import ( "math/rand" "os" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -36,6 +37,7 @@ import ( "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/binloginfo" @@ -590,6 +592,11 @@ func (w *worker) updateDDLJob(job *model.Job, meetErr bool) error { return errors.Trace(updateDDLJob2Table(w.sess, job, updateRawArgs)) } +func matchMDLInfoTable(schemaName, tblName string) bool { + return strings.ToLower(schemaName) == mysql.SystemDB && + strings.ToLower(tblName) == MDLInfoTable +} + // registerMDLInfo registers metadata lock info. func (w *worker) registerMDLInfo(job *model.Job, ver int64) error { if !variable.EnableMDL.Load() { @@ -607,31 +614,45 @@ func (w *worker) registerMDLInfo(job *model.Job, ver int64) error { } ownerID := w.ownerManager.ID() ids := rows[0].GetString(0) - sql := fmt.Sprintf("replace into mysql.tidb_mdl_info (job_id, version, table_ids, owner_id) values (%d, %d, '%s', '%s')", job.ID, ver, ids, ownerID) + var sql string + if matchMDLInfoTable(job.SchemaName, job.TableName) { + // DDLs that modify system table `tidb_mdl_info` could only happen in upgrade process, + // we should not reference 'owner_id'. Otherwise, there is a circular problem. + sql = fmt.Sprintf("replace into mysql.tidb_mdl_info (job_id, version, table_ids) values (%d, %d, '%s')", job.ID, ver, ids) + } else { + sql = fmt.Sprintf("replace into mysql.tidb_mdl_info (job_id, version, table_ids, owner_id) values (%d, %d, '%s', '%s')", job.ID, ver, ids, ownerID) + } _, err = w.sess.Execute(context.Background(), sql, "register-mdl-info") return err } // cleanMDLInfo cleans metadata lock info. -func cleanMDLInfo(pool *sess.Pool, jobID int64, ec *clientv3.Client, ownerID string, cleanETCD bool) { +func cleanMDLInfo(pool *sess.Pool, job *model.Job, ec *clientv3.Client, ownerID string, cleanETCD bool) { if !variable.EnableMDL.Load() { return } - sql := fmt.Sprintf("delete from mysql.tidb_mdl_info where job_id = %d and owner_id = '%s'", jobID, ownerID) + var sql string + if matchMDLInfoTable(job.SchemaName, job.TableName) { + // DDLs that modify system table `tidb_mdl_info` could only happen in upgrade process, + // we should not reference 'owner_id'. Otherwise, there is a circular problem. + sql = fmt.Sprintf("delete from mysql.tidb_mdl_info where job_id = %d", job.ID) + } else { + sql = fmt.Sprintf("delete from mysql.tidb_mdl_info where job_id = %d and owner_id = '%s'", job.ID, ownerID) + } sctx, _ := pool.Get() defer pool.Put(sctx) se := sess.NewSession(sctx) se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull) _, err := se.Execute(context.Background(), sql, "delete-mdl-info") if err != nil { - logutil.DDLLogger().Warn("unexpected error when clean mdl info", zap.Int64("job ID", jobID), zap.Error(err)) + logutil.DDLLogger().Warn("unexpected error when clean mdl info", zap.Int64("job ID", job.ID), zap.Error(err)) return } if cleanETCD && ec != nil { - path := fmt.Sprintf("%s/%d/", util.DDLAllSchemaVersionsByJob, jobID) + path := fmt.Sprintf("%s/%d/", util.DDLAllSchemaVersionsByJob, job.ID) _, err = ec.Delete(context.Background(), path, clientv3.WithPrefix()) if err != nil { - logutil.DDLLogger().Warn("delete versions failed", zap.Int64("job ID", jobID), zap.Error(err)) + logutil.DDLLogger().Warn("delete versions failed", zap.Int64("job ID", job.ID), zap.Error(err)) } } } diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index 76dd37f98082d..59125be52b429 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -443,7 +443,7 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) { return } d.setAlreadyRunOnce(job.ID) - cleanMDLInfo(d.sessPool, job.ID, d.etcdCli, ownerID, job.State == model.JobStateSynced) + cleanMDLInfo(d.sessPool, job, d.etcdCli, ownerID, job.State == model.JobStateSynced) // Don't have a worker now. return } @@ -481,7 +481,7 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) { if err != nil { return } - cleanMDLInfo(d.sessPool, job.ID, d.etcdCli, ownerID, job.State == model.JobStateSynced) + cleanMDLInfo(d.sessPool, job, d.etcdCli, ownerID, job.State == model.JobStateSynced) d.synced(job) if RunInGoTest {