Skip to content

Commit

Permalink
ddl: do not reference owner_id for specific system DDL (pingcap#53332)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored and terry1purcell committed May 17, 2024
1 parent c045525 commit 47b2471
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 8 deletions.
2 changes: 2 additions & 0 deletions pkg/ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
ReorgTable = "tidb_ddl_reorg"
// HistoryTable stores the history DDL jobs.
HistoryTable = "tidb_ddl_history"
// MDLInfoTable stores lock info used by metadata lock.
MDLInfoTable = "tidb_mdl_info"

// JobTableID is the table ID of `tidb_ddl_job`.
JobTableID = meta.MaxInt48 - 1
Expand Down
33 changes: 27 additions & 6 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math/rand"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/binloginfo"
Expand Down Expand Up @@ -590,6 +592,11 @@ func (w *worker) updateDDLJob(job *model.Job, meetErr bool) error {
return errors.Trace(updateDDLJob2Table(w.sess, job, updateRawArgs))
}

func matchMDLInfoTable(schemaName, tblName string) bool {
return strings.ToLower(schemaName) == mysql.SystemDB &&
strings.ToLower(tblName) == MDLInfoTable
}

// registerMDLInfo registers metadata lock info.
func (w *worker) registerMDLInfo(job *model.Job, ver int64) error {
if !variable.EnableMDL.Load() {
Expand All @@ -607,31 +614,45 @@ func (w *worker) registerMDLInfo(job *model.Job, ver int64) error {
}
ownerID := w.ownerManager.ID()
ids := rows[0].GetString(0)
sql := fmt.Sprintf("replace into mysql.tidb_mdl_info (job_id, version, table_ids, owner_id) values (%d, %d, '%s', '%s')", job.ID, ver, ids, ownerID)
var sql string
if matchMDLInfoTable(job.SchemaName, job.TableName) {
// DDLs that modify system table `tidb_mdl_info` could only happen in upgrade process,
// we should not reference 'owner_id'. Otherwise, there is a circular problem.
sql = fmt.Sprintf("replace into mysql.tidb_mdl_info (job_id, version, table_ids) values (%d, %d, '%s')", job.ID, ver, ids)
} else {
sql = fmt.Sprintf("replace into mysql.tidb_mdl_info (job_id, version, table_ids, owner_id) values (%d, %d, '%s', '%s')", job.ID, ver, ids, ownerID)
}
_, err = w.sess.Execute(context.Background(), sql, "register-mdl-info")
return err
}

// cleanMDLInfo cleans metadata lock info.
func cleanMDLInfo(pool *sess.Pool, jobID int64, ec *clientv3.Client, ownerID string, cleanETCD bool) {
func cleanMDLInfo(pool *sess.Pool, job *model.Job, ec *clientv3.Client, ownerID string, cleanETCD bool) {
if !variable.EnableMDL.Load() {
return
}
sql := fmt.Sprintf("delete from mysql.tidb_mdl_info where job_id = %d and owner_id = '%s'", jobID, ownerID)
var sql string
if matchMDLInfoTable(job.SchemaName, job.TableName) {
// DDLs that modify system table `tidb_mdl_info` could only happen in upgrade process,
// we should not reference 'owner_id'. Otherwise, there is a circular problem.
sql = fmt.Sprintf("delete from mysql.tidb_mdl_info where job_id = %d", job.ID)
} else {
sql = fmt.Sprintf("delete from mysql.tidb_mdl_info where job_id = %d and owner_id = '%s'", job.ID, ownerID)
}
sctx, _ := pool.Get()
defer pool.Put(sctx)
se := sess.NewSession(sctx)
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
_, err := se.Execute(context.Background(), sql, "delete-mdl-info")
if err != nil {
logutil.DDLLogger().Warn("unexpected error when clean mdl info", zap.Int64("job ID", jobID), zap.Error(err))
logutil.DDLLogger().Warn("unexpected error when clean mdl info", zap.Int64("job ID", job.ID), zap.Error(err))
return
}
if cleanETCD && ec != nil {
path := fmt.Sprintf("%s/%d/", util.DDLAllSchemaVersionsByJob, jobID)
path := fmt.Sprintf("%s/%d/", util.DDLAllSchemaVersionsByJob, job.ID)
_, err = ec.Delete(context.Background(), path, clientv3.WithPrefix())
if err != nil {
logutil.DDLLogger().Warn("delete versions failed", zap.Int64("job ID", jobID), zap.Error(err))
logutil.DDLLogger().Warn("delete versions failed", zap.Int64("job ID", job.ID), zap.Error(err))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) {
return
}
d.setAlreadyRunOnce(job.ID)
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli, ownerID, job.State == model.JobStateSynced)
cleanMDLInfo(d.sessPool, job, d.etcdCli, ownerID, job.State == model.JobStateSynced)
// Don't have a worker now.
return
}
Expand Down Expand Up @@ -481,7 +481,7 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) {
if err != nil {
return
}
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli, ownerID, job.State == model.JobStateSynced)
cleanMDLInfo(d.sessPool, job, d.etcdCli, ownerID, job.State == model.JobStateSynced)
d.synced(job)

if RunInGoTest {
Expand Down

0 comments on commit 47b2471

Please sign in to comment.