diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index 9d4be0bab7bc4..6e3c185b22861 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -500,6 +500,16 @@ func init() { } } +// DeleteLeader deletes the leader key. +func DeleteLeader(ctx context.Context, cli *clientv3.Client, key string) error { + ownerKey, _, _, _, _, err := getOwnerInfo(ctx, ctx, cli, key) + if err != nil { + return errors.Trace(err) + } + _, err = cli.Delete(ctx, ownerKey) + return err +} + // AcquireDistributedLock creates a mutex with ETCD client, and returns a mutex release function. func AcquireDistributedLock( ctx context.Context, @@ -512,7 +522,14 @@ func AcquireDistributedLock( return nil, err } mu := concurrency.NewMutex(se, key) - err = mu.Lock(ctx) + maxRetryCnt := 10 + err = util2.RunWithRetry(maxRetryCnt, util2.RetryInterval, func() (bool, error) { + err = mu.Lock(ctx) + if err != nil { + return true, err + } + return false, nil + }) if err != nil { err1 := se.Close() if err1 != nil { diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index 633ae102d0bc5..ff2ad11f06a86 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/bindinfo" "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/domain/infosync" @@ -62,6 +63,9 @@ import ( "go.uber.org/zap" ) +// bootstrapOwnerKey is the key used by ddl owner mutex during boostrap. +var bootstrapOwnerKey = "/tidb/distributeDDLOwnerLock/" + const ( // CreateUserTable is the SQL statement creates User table in system db. // WARNING: There are some limitations on altering the schema of mysql.user table. @@ -1417,6 +1421,48 @@ var ( SupportUpgradeHTTPOpVer int64 = version174 ) +func acquireLock(s sessiontypes.Session) (func(), bool) { + dom := domain.GetDomain(s) + if dom == nil { + logutil.BgLogger().Warn("domain is nil") + return nil, false + } + cli := dom.GetEtcdClient() + if cli == nil { + logutil.BgLogger().Warn("etcd client is nil, force to acquire ddl owner lock") + // Special handling for test. + return func() { + // do nothing + }, true + } + releaseFn, err := owner.AcquireDistributedLock(context.Background(), cli, ddl.DDLOwnerKey, 10) + if err != nil { + return nil, false + } + return releaseFn, true +} + +func forceToLeader(ctx context.Context, s sessiontypes.Session) error { + dom := domain.GetDomain(s) + for !dom.DDL().OwnerManager().IsOwner() { + ownerID, err := dom.DDL().OwnerManager().GetOwnerID(ctx) + if err != nil && (errors.ErrorEqual(err, concurrency.ErrElectionNoLeader) || strings.Contains(err.Error(), "no owner")) { + time.Sleep(50 * time.Millisecond) + continue + } else if err != nil { + logutil.BgLogger().Error("unexpected error", zap.Error(err)) + return err + } + err = owner.DeleteLeader(ctx, dom.EtcdClient(), ddl.DDLOwnerKey) + if err != nil { + logutil.BgLogger().Error("unexpected error", zap.Error(err), zap.String("ownerID", ownerID)) + return err + } + time.Sleep(50 * time.Millisecond) + } + return nil +} + func checkDistTask(s sessiontypes.Session, ver int64) { if ver > version195 { // since version195 we enable dist task by default, no need to check @@ -1466,35 +1512,34 @@ func checkDistTask(s sessiontypes.Session, ver int64) { // upgrade function will do some upgrade works, when the system is bootstrapped by low version TiDB server // For example, add new system variables into mysql.global_variables table. func upgrade(s sessiontypes.Session) { - ver, err := getBootstrapVersion(s) + // Do upgrade works then update bootstrap version. + isNull, err := InitMDLVariableForUpgrade(s.GetStore()) + if err != nil { + logutil.BgLogger().Fatal("[upgrade] init metadata lock failed", zap.Error(err)) + } + + var ver int64 + releaseFn, ok := acquireLock(s) + if !ok { + logutil.BgLogger().Fatal("[upgrade] get ddl owner distributed lock failed", zap.Error(err)) + } + ver, err = getBootstrapVersion(s) terror.MustNil(err) if ver >= currentBootstrapVersion { // It is already bootstrapped/upgraded by a higher version TiDB server. + releaseFn() return } + defer releaseFn() - checkDistTask(s, ver) - printClusterState(s, ver) - - // Only upgrade from under version92 and this TiDB is not owner set. - // The owner in older tidb does not support concurrent DDL, we should add the internal DDL to job queue. - if ver < version92 { - useConcurrentDDL, err := checkOwnerVersion(context.Background(), domain.GetDomain(s)) - if err != nil { - logutil.BgLogger().Fatal("[upgrade] upgrade failed", zap.Error(err)) - } - if !useConcurrentDDL { - // Use another variable DDLForce2Queue but not EnableConcurrentDDL since in upgrade it may set global variable, the initial step will - // overwrite variable EnableConcurrentDDL. - variable.DDLForce2Queue.Store(true) - } - } - // Do upgrade works then update bootstrap version. - isNull, err := InitMDLVariableForUpgrade(s.GetStore()) + err = forceToLeader(context.Background(), s) if err != nil { - logutil.BgLogger().Fatal("[upgrade] init metadata lock failed", zap.Error(err)) + logutil.BgLogger().Fatal("[upgrade] force to owner failed", zap.Error(err)) } + checkDistTask(s, ver) + printClusterState(s, ver) + // when upgrade from v6.4.0 or earlier, enables metadata lock automatically, // but during upgrade we disable it. if isNull {