Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: do not reference owner_id for specific system DDL #53332

Merged
merged 2 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
ReorgTable = "tidb_ddl_reorg"
// HistoryTable stores the history DDL jobs.
HistoryTable = "tidb_ddl_history"
// MDLInfoTable stores lock info used by metadata lock.
MDLInfoTable = "tidb_mdl_info"

// JobTableID is the table ID of `tidb_ddl_job`.
JobTableID = meta.MaxInt48 - 1
Expand Down
33 changes: 27 additions & 6 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"math/rand"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/binloginfo"
Expand Down Expand Up @@ -590,6 +592,11 @@ func (w *worker) updateDDLJob(job *model.Job, meetErr bool) error {
return errors.Trace(updateDDLJob2Table(w.sess, job, updateRawArgs))
}

func matchMDLInfoTable(schemaName, tblName string) bool {
return strings.ToLower(schemaName) == mysql.SystemDB &&
strings.ToLower(tblName) == MDLInfoTable
}

// registerMDLInfo registers metadata lock info.
func (w *worker) registerMDLInfo(job *model.Job, ver int64) error {
if !variable.EnableMDL.Load() {
Expand All @@ -607,31 +614,45 @@ func (w *worker) registerMDLInfo(job *model.Job, ver int64) error {
}
ownerID := w.ownerManager.ID()
ids := rows[0].GetString(0)
sql := fmt.Sprintf("replace into mysql.tidb_mdl_info (job_id, version, table_ids, owner_id) values (%d, %d, '%s', '%s')", job.ID, ver, ids, ownerID)
var sql string
if matchMDLInfoTable(job.SchemaName, job.TableName) {
// DDLs that modify system table `tidb_mdl_info` could only happen in upgrade process,
// we should not reference 'owner_id'. Otherwise, there is a circular problem.
sql = fmt.Sprintf("replace into mysql.tidb_mdl_info (job_id, version, table_ids) values (%d, %d, '%s')", job.ID, ver, ids)
} else {
sql = fmt.Sprintf("replace into mysql.tidb_mdl_info (job_id, version, table_ids, owner_id) values (%d, %d, '%s', '%s')", job.ID, ver, ids, ownerID)
}
_, err = w.sess.Execute(context.Background(), sql, "register-mdl-info")
return err
}

// cleanMDLInfo cleans metadata lock info.
func cleanMDLInfo(pool *sess.Pool, jobID int64, ec *clientv3.Client, ownerID string, cleanETCD bool) {
func cleanMDLInfo(pool *sess.Pool, job *model.Job, ec *clientv3.Client, ownerID string, cleanETCD bool) {
if !variable.EnableMDL.Load() {
return
}
sql := fmt.Sprintf("delete from mysql.tidb_mdl_info where job_id = %d and owner_id = '%s'", jobID, ownerID)
var sql string
if matchMDLInfoTable(job.SchemaName, job.TableName) {
// DDLs that modify system table `tidb_mdl_info` could only happen in upgrade process,
// we should not reference 'owner_id'. Otherwise, there is a circular problem.
sql = fmt.Sprintf("delete from mysql.tidb_mdl_info where job_id = %d", job.ID)
} else {
sql = fmt.Sprintf("delete from mysql.tidb_mdl_info where job_id = %d and owner_id = '%s'", job.ID, ownerID)
}
sctx, _ := pool.Get()
defer pool.Put(sctx)
se := sess.NewSession(sctx)
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
_, err := se.Execute(context.Background(), sql, "delete-mdl-info")
if err != nil {
logutil.DDLLogger().Warn("unexpected error when clean mdl info", zap.Int64("job ID", jobID), zap.Error(err))
logutil.DDLLogger().Warn("unexpected error when clean mdl info", zap.Int64("job ID", job.ID), zap.Error(err))
return
}
if cleanETCD && ec != nil {
path := fmt.Sprintf("%s/%d/", util.DDLAllSchemaVersionsByJob, jobID)
path := fmt.Sprintf("%s/%d/", util.DDLAllSchemaVersionsByJob, job.ID)
_, err = ec.Delete(context.Background(), path, clientv3.WithPrefix())
if err != nil {
logutil.DDLLogger().Warn("delete versions failed", zap.Int64("job ID", jobID), zap.Error(err))
logutil.DDLLogger().Warn("delete versions failed", zap.Int64("job ID", job.ID), zap.Error(err))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) {
return
}
d.setAlreadyRunOnce(job.ID)
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli, ownerID, job.State == model.JobStateSynced)
cleanMDLInfo(d.sessPool, job, d.etcdCli, ownerID, job.State == model.JobStateSynced)
// Don't have a worker now.
return
}
Expand Down Expand Up @@ -481,7 +481,7 @@ func (d *ddl) delivery2Worker(wk *worker, pool *workerPool, job *model.Job) {
if err != nil {
return
}
cleanMDLInfo(d.sessPool, job.ID, d.etcdCli, ownerID, job.State == model.JobStateSynced)
cleanMDLInfo(d.sessPool, job, d.etcdCli, ownerID, job.State == model.JobStateSynced)
d.synced(job)

if RunInGoTest {
Expand Down