Skip to content

Commit

Permalink
Merge branch 'master' into metabuildctx
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Sep 25, 2024
2 parents 70a5a64 + a06f6aa commit 1707848
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 21 deletions.
19 changes: 18 additions & 1 deletion pkg/owner/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,16 @@ func init() {
}
}

// DeleteLeader deletes the leader key.
func DeleteLeader(ctx context.Context, cli *clientv3.Client, key string) error {
ownerKey, _, _, _, _, err := getOwnerInfo(ctx, ctx, cli, key)
if err != nil {
return errors.Trace(err)
}
_, err = cli.Delete(ctx, ownerKey)
return err
}

// AcquireDistributedLock creates a mutex with ETCD client, and returns a mutex release function.
func AcquireDistributedLock(
ctx context.Context,
Expand All @@ -512,7 +522,14 @@ func AcquireDistributedLock(
return nil, err
}
mu := concurrency.NewMutex(se, key)
err = mu.Lock(ctx)
maxRetryCnt := 10
err = util2.RunWithRetry(maxRetryCnt, util2.RetryInterval, func() (bool, error) {
err = mu.Lock(ctx)
if err != nil {
return true, err
}
return false, nil
})
if err != nil {
err1 := se.Close()
if err1 != nil {
Expand Down
85 changes: 65 additions & 20 deletions pkg/session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/bindinfo"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/domain/infosync"
Expand Down Expand Up @@ -62,6 +63,9 @@ import (
"go.uber.org/zap"
)

// bootstrapOwnerKey is the key used by ddl owner mutex during boostrap.
var bootstrapOwnerKey = "/tidb/distributeDDLOwnerLock/"

const (
// CreateUserTable is the SQL statement creates User table in system db.
// WARNING: There are some limitations on altering the schema of mysql.user table.
Expand Down Expand Up @@ -1417,6 +1421,48 @@ var (
SupportUpgradeHTTPOpVer int64 = version174
)

func acquireLock(s sessiontypes.Session) (func(), bool) {
dom := domain.GetDomain(s)
if dom == nil {
logutil.BgLogger().Warn("domain is nil")
return nil, false
}
cli := dom.GetEtcdClient()
if cli == nil {
logutil.BgLogger().Warn("etcd client is nil, force to acquire ddl owner lock")
// Special handling for test.
return func() {
// do nothing
}, true
}
releaseFn, err := owner.AcquireDistributedLock(context.Background(), cli, ddl.DDLOwnerKey, 10)
if err != nil {
return nil, false
}
return releaseFn, true
}

func forceToLeader(ctx context.Context, s sessiontypes.Session) error {
dom := domain.GetDomain(s)
for !dom.DDL().OwnerManager().IsOwner() {
ownerID, err := dom.DDL().OwnerManager().GetOwnerID(ctx)
if err != nil && (errors.ErrorEqual(err, concurrency.ErrElectionNoLeader) || strings.Contains(err.Error(), "no owner")) {
time.Sleep(50 * time.Millisecond)
continue
} else if err != nil {
logutil.BgLogger().Error("unexpected error", zap.Error(err))
return err
}
err = owner.DeleteLeader(ctx, dom.EtcdClient(), ddl.DDLOwnerKey)
if err != nil {
logutil.BgLogger().Error("unexpected error", zap.Error(err), zap.String("ownerID", ownerID))
return err
}
time.Sleep(50 * time.Millisecond)
}
return nil
}

func checkDistTask(s sessiontypes.Session, ver int64) {
if ver > version195 {
// since version195 we enable dist task by default, no need to check
Expand Down Expand Up @@ -1466,35 +1512,34 @@ func checkDistTask(s sessiontypes.Session, ver int64) {
// upgrade function will do some upgrade works, when the system is bootstrapped by low version TiDB server
// For example, add new system variables into mysql.global_variables table.
func upgrade(s sessiontypes.Session) {
ver, err := getBootstrapVersion(s)
// Do upgrade works then update bootstrap version.
isNull, err := InitMDLVariableForUpgrade(s.GetStore())
if err != nil {
logutil.BgLogger().Fatal("[upgrade] init metadata lock failed", zap.Error(err))
}

var ver int64
releaseFn, ok := acquireLock(s)
if !ok {
logutil.BgLogger().Fatal("[upgrade] get ddl owner distributed lock failed", zap.Error(err))
}
ver, err = getBootstrapVersion(s)
terror.MustNil(err)
if ver >= currentBootstrapVersion {
// It is already bootstrapped/upgraded by a higher version TiDB server.
releaseFn()
return
}
defer releaseFn()

checkDistTask(s, ver)
printClusterState(s, ver)

// Only upgrade from under version92 and this TiDB is not owner set.
// The owner in older tidb does not support concurrent DDL, we should add the internal DDL to job queue.
if ver < version92 {
useConcurrentDDL, err := checkOwnerVersion(context.Background(), domain.GetDomain(s))
if err != nil {
logutil.BgLogger().Fatal("[upgrade] upgrade failed", zap.Error(err))
}
if !useConcurrentDDL {
// Use another variable DDLForce2Queue but not EnableConcurrentDDL since in upgrade it may set global variable, the initial step will
// overwrite variable EnableConcurrentDDL.
variable.DDLForce2Queue.Store(true)
}
}
// Do upgrade works then update bootstrap version.
isNull, err := InitMDLVariableForUpgrade(s.GetStore())
err = forceToLeader(context.Background(), s)
if err != nil {
logutil.BgLogger().Fatal("[upgrade] init metadata lock failed", zap.Error(err))
logutil.BgLogger().Fatal("[upgrade] force to owner failed", zap.Error(err))
}

checkDistTask(s, ver)
printClusterState(s, ver)

// when upgrade from v6.4.0 or earlier, enables metadata lock automatically,
// but during upgrade we disable it.
if isNull {
Expand Down

0 comments on commit 1707848

Please sign in to comment.