From 7aff2ef20937dfbbaffa2d428f697392a04829b3 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 22 Oct 2024 19:08:08 +0800 Subject: [PATCH 1/9] ddl mode --- br/pkg/gluetidb/glue.go | 2 +- pkg/ddl/ddl.go | 14 ++++++++++++++ pkg/domain/domain.go | 2 +- pkg/domain/domain_test.go | 2 +- pkg/session/bootstrap_test.go | 2 +- pkg/session/session.go | 27 +++++++++++++++++++-------- 6 files changed, 37 insertions(+), 12 deletions(-) diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index cf1a876755c99..a86ef312d1804 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -111,7 +111,7 @@ func (g Glue) startDomainAsNeeded(store kv.Storage) error { if err != nil { return err } - return dom.Start() + return dom.Start(ddl.Normal) } func (g Glue) createTypesSession(store kv.Storage) (sessiontypes.Session, error) { diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index e82338bd2afe1..a6e5b15b74cce 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -93,6 +93,20 @@ var ( detectJobVerInterval = 10 * time.Second ) +// StartMode is an enum type for the start mode of the DDL. +type StartMode string + +const ( + // Normal mode, cluster is in normal state. + Normal StartMode = "normal" + // Bootstrap mode, cluster is during bootstrap. + Bootstrap = "bootstrap" + // Upgrade mode, cluster is during upgrade, we will force current node to be + // the DDL owner, to make sure all upgrade related DDLs are run on new version + // TiDB instance. + Upgrade = "upgrade" +) + // OnExist specifies what to do when a new object has a name collision. type OnExist uint8 diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 91a7b75044ff4..09b90a7e4a9bb 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -1482,7 +1482,7 @@ func (do *Domain) Init( // Start starts the domain. After start, DDLs can be executed using session, see // Init also. -func (do *Domain) Start() error { +func (do *Domain) Start(ddl.StartMode) error { gCfg := config.GetGlobalConfig() if gCfg.EnableGlobalKill && do.etcdClient != nil { do.wg.Add(1) diff --git a/pkg/domain/domain_test.go b/pkg/domain/domain_test.go index a6ac1da87955d..92eff2e83dd98 100644 --- a/pkg/domain/domain_test.go +++ b/pkg/domain/domain_test.go @@ -94,7 +94,7 @@ func TestInfo(t *testing.T) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/MockReplaceDDL", `return(true)`)) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/NoDDLDispatchLoop", `return(true)`)) require.NoError(t, dom.Init(sysMockFactory, nil)) - require.NoError(t, dom.Start()) + require.NoError(t, dom.Start(ddl.Bootstrap)) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/NoDDLDispatchLoop")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/MockReplaceDDL")) diff --git a/pkg/session/bootstrap_test.go b/pkg/session/bootstrap_test.go index 05b1b2363046b..0972c1906ca57 100644 --- a/pkg/session/bootstrap_test.go +++ b/pkg/session/bootstrap_test.go @@ -170,7 +170,7 @@ func TestBootstrapWithError(t *testing.T) { require.NoError(t, err) dom, err := domap.Get(store) require.NoError(t, err) - require.NoError(t, dom.Start()) + require.NoError(t, dom.Start(ddl.Bootstrap)) domain.BindDomain(se, dom) b, err := checkBootstrapped(se) require.False(t, b) diff --git a/pkg/session/session.go b/pkg/session/session.go index 0af806c52a58c..89fa1cea3a40f 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -3448,10 +3448,8 @@ func bootstrapSessionImpl(ctx context.Context, store kv.Storage, createSessionsI return nil, err } ver := getStoreBootstrapVersion(store) - if ver == notBootstrapped { - runInBootstrapSession(store, bootstrap) - } else if ver < currentBootstrapVersion { - runInBootstrapSession(store, upgrade) + if ver < currentBootstrapVersion { + runInBootstrapSession(store, getStartMode(ver)) } else { err = InitMDLVariable(store) if err != nil { @@ -3504,7 +3502,7 @@ func bootstrapSessionImpl(ctx context.Context, store kv.Storage, createSessionsI // only start the domain after we have initialized some global variables. dom := domain.GetDomain(ses[0]) - err = dom.Start() + err = dom.Start(ddl.Normal) if err != nil { return nil, err } @@ -3667,18 +3665,27 @@ func GetDomain(store kv.Storage) (*domain.Domain, error) { return domap.Get(store) } +func getStartMode(ver int64) ddl.StartMode { + if ver == notBootstrapped { + return ddl.Bootstrap + } else if ver < currentBootstrapVersion { + return ddl.Upgrade + } + return ddl.Normal +} + // runInBootstrapSession create a special session for bootstrap to run. // If no bootstrap and storage is remote, we must use a little lease time to // bootstrap quickly, after bootstrapped, we will reset the lease time. // TODO: Using a bootstrap tool for doing this may be better later. -func runInBootstrapSession(store kv.Storage, bootstrap func(types.Session)) { +func runInBootstrapSession(store kv.Storage, startMode ddl.StartMode) { s, err := createSession(store) if err != nil { // Bootstrap fail will cause program exit. logutil.BgLogger().Fatal("createSession error", zap.Error(err)) } dom := domain.GetDomain(s) - err = dom.Start() + err = dom.Start(startMode) if err != nil { // Bootstrap fail will cause program exit. logutil.BgLogger().Fatal("start domain error", zap.Error(err)) @@ -3688,7 +3695,11 @@ func runInBootstrapSession(store kv.Storage, bootstrap func(types.Session)) { s.sessionVars.EnableClusteredIndex = variable.ClusteredIndexDefModeIntOnly s.SetValue(sessionctx.Initing, true) - bootstrap(s) + if startMode == ddl.Bootstrap { + bootstrap(s) + } else { + upgrade(s) + } finishBootstrap(store) s.ClearValue(sessionctx.Initing) From daf8f43b8698d88f6ee6886a77b995c0a993e7c9 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Tue, 29 Oct 2024 18:58:05 +0800 Subject: [PATCH 2/9] change --- pkg/ddl/ddl.go | 19 ++++- pkg/ddl/restart_test.go | 2 +- pkg/ddl/schematracker/checker.go | 4 +- pkg/domain/domain.go | 4 +- pkg/owner/manager.go | 129 +++++++++++++++++++++++++++---- pkg/owner/mock.go | 4 + pkg/session/bootstrap.go | 34 -------- pkg/session/session.go | 25 +++++- pkg/session/tidb.go | 2 + 9 files changed, 161 insertions(+), 62 deletions(-) diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index a6e5b15b74cce..bf581e84d7d69 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -175,7 +175,7 @@ var ( type DDL interface { // Start campaigns the owner and starts workers. // ctxPool is used for the worker's delRangeManager and creates sessions. - Start(ctxPool *pools.ResourcePool) error + Start(startMode StartMode, ctxPool *pools.ResourcePool) error // Stats returns the DDL statistics. Stats(vars *variable.SessionVars) (map[string]any, error) // GetScope gets the status variables scope. @@ -762,10 +762,21 @@ func (d *ddl) newDeleteRangeManager(mock bool) delRangeManager { } // Start implements DDL.Start interface. -func (d *ddl) Start(ctxPool *pools.ResourcePool) error { +func (d *ddl) Start(startMode StartMode, ctxPool *pools.ResourcePool) error { d.detectAndUpdateJobVersion() + campaignOwner := config.GetGlobalConfig().Instance.TiDBEnableDDL.Load() + if startMode == Upgrade { + if !campaignOwner { + return errors.New("DDL must be enabled when upgrading") + } + + logutil.DDLLogger().Info("DDL is in upgrade mode, force to be owner") + if err := d.ownerManager.ForceToBeOwner(d.ctx); err != nil { + return errors.Trace(err) + } + } logutil.DDLLogger().Info("start DDL", zap.String("ID", d.uuid), - zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()), + zap.Bool("runWorker", campaignOwner), zap.Stringer("jobVersion", model.GetJobVerInUse())) d.sessPool = sess.NewSessionPool(ctxPool) @@ -800,7 +811,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { // If tidb_enable_ddl is true, we need campaign owner and do DDL jobs. Besides, we also can do backfill jobs. // Otherwise, we needn't do that. - if config.GetGlobalConfig().Instance.TiDBEnableDDL.Load() { + if campaignOwner { if err := d.EnableDDL(); err != nil { return err } diff --git a/pkg/ddl/restart_test.go b/pkg/ddl/restart_test.go index 3009111af511f..30935643db1bb 100644 --- a/pkg/ddl/restart_test.go +++ b/pkg/ddl/restart_test.go @@ -50,7 +50,7 @@ func restartWorkers(t *testing.T, store kv.Storage, d *domain.Domain) { ddl.WithSchemaLoader(d), ) d.SetDDL(newDDL, newDDLExecutor) - err = newDDL.Start(pools.NewResourcePool(func() (pools.Resource, error) { + err = newDDL.Start(ddl.Normal, pools.NewResourcePool(func() (pools.Resource, error) { session := testkit.NewTestKit(t, store).Session() session.GetSessionVars().CommonGlobalLoaded = true return session, nil diff --git a/pkg/ddl/schematracker/checker.go b/pkg/ddl/schematracker/checker.go index 1c666a026d352..7068d4bce229b 100644 --- a/pkg/ddl/schematracker/checker.go +++ b/pkg/ddl/schematracker/checker.go @@ -502,8 +502,8 @@ func (*Checker) CreatePlacementPolicyWithInfo(_ sessionctx.Context, _ *model.Pol } // Start implements the DDL interface. -func (d *Checker) Start(ctxPool *pools.ResourcePool) error { - return d.realDDL.Start(ctxPool) +func (d *Checker) Start(startMode ddl.StartMode, ctxPool *pools.ResourcePool) error { + return d.realDDL.Start(startMode, ctxPool) } // Stats implements the DDL interface. diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 09b90a7e4a9bb..ff5334a098195 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -1482,7 +1482,7 @@ func (do *Domain) Init( // Start starts the domain. After start, DDLs can be executed using session, see // Init also. -func (do *Domain) Start(ddl.StartMode) error { +func (do *Domain) Start(startMode ddl.StartMode) error { gCfg := config.GetGlobalConfig() if gCfg.EnableGlobalKill && do.etcdClient != nil { do.wg.Add(1) @@ -1501,7 +1501,7 @@ func (do *Domain) Start(ddl.StartMode) error { sysCtxPool := pools.NewResourcePool(sysFac, 512, 512, resourceIdleTimeout) // start the ddl after the domain reload, avoiding some internal sql running before infoSchema construction. - err := do.ddl.Start(sysCtxPool) + err := do.ddl.Start(startMode, sysCtxPool) if err != nil { return err } diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index e4d467b16b1db..1533ff6b02d87 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -69,10 +69,24 @@ type Manager interface { CampaignCancel() // SetListener sets the listener, set before CampaignOwner. SetListener(listener Listener) + // ForceToBeOwner restart the owner election and trying to be the new owner by + // end campaigns of all candidates and start a new campaign in a single transaction. + // + // This method is only used during upgrade and try to make node of newer version + // to be the DDL owner, to mitigate the issue https://github.com/pingcap/tidb/issues/54689, + // current instance shouldn't call CampaignOwner before calling this method. + // don't use it in other cases. + // + // Note: only one instance can call this method at a time, so you have to use + // a distributed lock when there are multiple instances of new version TiDB trying + // to be the owner. See runInBootstrapSession for where we lock it in DDL. + ForceToBeOwner(ctx context.Context) error } const ( keyOpDefaultTimeout = 5 * time.Second + + waitTimeOnForceOwner = 5 * time.Second ) // OpType is the owner key value operation type. @@ -120,7 +134,8 @@ type ownerManager struct { wg sync.WaitGroup campaignCancel context.CancelFunc - listener Listener + listener Listener + forceOwnerSession *concurrency.Session } // NewOwnerManager creates a new Manager. @@ -165,6 +180,94 @@ func (m *ownerManager) SetListener(listener Listener) { m.listener = listener } +func (m *ownerManager) ForceToBeOwner(context.Context) error { + logPrefix := fmt.Sprintf("[%s] %s", m.prompt, m.key) + logutil.BgLogger().Info("force to be owner", zap.String("ownerInfo", logPrefix)) + session, err := util2.NewSession(m.ctx, logPrefix, m.etcdCli, util2.NewSessionDefaultRetryCnt, ManagerSessionTTL) + if err != nil { + return errors.Trace(err) + } + m.forceOwnerSession = session + m.sessionLease.Store(int64(m.forceOwnerSession.Lease())) + + // due to issue https://github.com/pingcap/tidb/issues/54689, if the cluster + // version before upgrade don't have fix, when retire owners runs on older version + // and trying to be the new owner, it's possible that multiple owner exist at + // the same time, it cannot be avoided completely, but we can use below 2 strategies + // to mitigate this issue: + // 1. when trying to be owner, we delete all existing owner related keys and + // put new key in a single txn, if we delete the key one by one, other node + // might become the owner, it will have more chances to trigger previous issue. + // 2. sleep for a while before trying to be owner to make sure there is an owner in + // the cluster, and it has started watching. in the case of upgrade using + // tiup, tiup might restart current owner node to do rolling upgrade. + // before the restarted node force owner, another node might try to be + // the new owner too, it's still possible to trigger the issue. so we + // sleep a while to wait the cluster have a new owner and start watching. + for i := 0; i < 3; i++ { + // we need to sleep in every retry, as other TiDB nodes will start campaign + // immediately after we delete their key. + time.Sleep(waitTimeOnForceOwner) + if err = m.tryToBeOwnerOnce(); err != nil { + logutil.Logger(m.logCtx).Warn("failed to retire owner on older version", zap.Error(err)) + continue + } + break + } + return nil +} + +func (m *ownerManager) tryToBeOwnerOnce() error { + lease := m.forceOwnerSession.Lease() + keyPrefix := m.key + "/" + + getResp, err := m.etcdCli.Get(m.ctx, keyPrefix, clientv3.WithPrefix()) + if err != nil { + return err + } + + // modifications to the same key multiple times within a single transaction are + // forbidden in etcd, so we cannot use delete by prefix and put in a single txn. + // it will report "duplicate key given in txn request" error. + // It's possible that other nodes put campaign keys between we get the keys and + // the txn to put new key, we relay on the sleep before calling this method to + // make sure all TiDBs have already put the key, and the distributed lock inside + // bootstrap to make sure no concurrent ForceToBeOwner is called. + txnOps := make([]clientv3.Op, 0, len(getResp.Kvs)+1) + // below key structure is copied from Election.Campaign. + campaignKey := fmt.Sprintf("%s%x", keyPrefix, lease) + for _, kv := range getResp.Kvs { + key := string(kv.Key) + if key == campaignKey { + // if below campaign failed, it will resign automatically, but if resign + // also failed, the old key might already exist + continue + } + txnOps = append(txnOps, clientv3.OpDelete(key)) + } + txnOps = append(txnOps, clientv3.OpPut(campaignKey, m.id, clientv3.WithLease(lease))) + _, err = m.etcdCli.Txn(m.ctx).Then(txnOps...).Commit() + if err != nil { + return errors.Trace(err) + } + // Campaign will wait until current instance become owner or the key is deleted, + // in case other nodes put keys in between previous get and txn, we never become + // the owner, so we add a timeout to avoid blocking. + ctx, cancel := context.WithTimeout(m.ctx, keyOpDefaultTimeout) + defer cancel() + elec := concurrency.NewElection(m.forceOwnerSession, m.key) + if err = elec.Campaign(ctx, m.id); err != nil { + return errors.Trace(err) + } + + // Campaign assumes that it's the only client managing the lifecycle of the campaign + // key, so it will also return when the campaign key is deleted by other TiDB + // instances when the distributed lock has failed to keep alive and another TiDB + // get the lock. It's a quite rare case, and the TiDB must be of newer version + // which has the fix of the issue, so it's ok to return now. + return nil +} + // ManagerSessionTTL is the etcd session's TTL in seconds. It's exported for testing. var ManagerSessionTTL = 60 @@ -190,15 +293,19 @@ func (m *ownerManager) CampaignOwner(withTTL ...int) error { } logPrefix := fmt.Sprintf("[%s] %s", m.prompt, m.key) logutil.BgLogger().Info("start campaign owner", zap.String("ownerInfo", logPrefix)) - session, err := util2.NewSession(m.ctx, logPrefix, m.etcdCli, util2.NewSessionDefaultRetryCnt, ttl) - if err != nil { - return errors.Trace(err) + campaignSession := m.forceOwnerSession + if campaignSession == nil { + session, err := util2.NewSession(m.ctx, logPrefix, m.etcdCli, util2.NewSessionDefaultRetryCnt, ttl) + if err != nil { + return errors.Trace(err) + } + m.sessionLease.Store(int64(session.Lease())) + campaignSession = session } - m.sessionLease.Store(int64(session.Lease())) m.wg.Add(1) var campaignContext context.Context campaignContext, m.campaignCancel = context.WithCancel(m.ctx) - go m.campaignLoop(campaignContext, session) + go m.campaignLoop(campaignContext, campaignSession) return nil } @@ -500,16 +607,6 @@ func init() { } } -// DeleteLeader deletes the leader key. -func DeleteLeader(ctx context.Context, cli *clientv3.Client, key string) error { - ownerKey, _, _, _, _, err := getOwnerInfo(ctx, ctx, cli, key) - if err != nil { - return errors.Trace(err) - } - _, err = cli.Delete(ctx, ownerKey) - return err -} - // AcquireDistributedLock creates a mutex with ETCD client, and returns a mutex release function. func AcquireDistributedLock( ctx context.Context, diff --git a/pkg/owner/mock.go b/pkg/owner/mock.go index 75a934307b41e..f653b51e0e119 100644 --- a/pkg/owner/mock.go +++ b/pkg/owner/mock.go @@ -181,6 +181,10 @@ func (m *mockManager) SetListener(listener Listener) { m.listener = listener } +func (*mockManager) ForceToBeOwner(context.Context) error { + return nil +} + // CampaignCancel implements Manager.CampaignCancel interface func (m *mockManager) CampaignCancel() { m.campaignDone <- struct{}{} diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index 25e1fcc1c175a..ce63cbb3d9336 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/bindinfo" "github.com/pingcap/tidb/pkg/config" - "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/domain/infosync" @@ -1448,28 +1447,6 @@ func acquireLock(s sessiontypes.Session) (func(), bool) { return releaseFn, true } -func forceToLeader(ctx context.Context, s sessiontypes.Session) error { - dom := domain.GetDomain(s) - for !dom.DDL().OwnerManager().IsOwner() { - ownerID, err := dom.DDL().OwnerManager().GetOwnerID(ctx) - if err != nil && (errors.ErrorEqual(err, concurrency.ErrElectionNoLeader) || strings.Contains(err.Error(), "no owner")) { - logutil.BgLogger().Info("ddl owner not found", zap.Error(err)) - time.Sleep(50 * time.Millisecond) - continue - } else if err != nil { - logutil.BgLogger().Error("unexpected error", zap.Error(err)) - return err - } - err = owner.DeleteLeader(ctx, dom.EtcdClient(), ddl.DDLOwnerKey) - if err != nil { - logutil.BgLogger().Error("unexpected error", zap.Error(err), zap.String("ownerID", ownerID)) - return err - } - time.Sleep(50 * time.Millisecond) - } - return nil -} - func checkDistTask(s sessiontypes.Session, ver int64) { if ver > version195 { // since version195 we enable dist task by default, no need to check @@ -1526,23 +1503,12 @@ func upgrade(s sessiontypes.Session) { } var ver int64 - releaseFn, ok := acquireLock(s) - if !ok { - logutil.BgLogger().Fatal("[upgrade] get ddl owner distributed lock failed", zap.Error(err)) - } ver, err = getBootstrapVersion(s) terror.MustNil(err) if ver >= currentBootstrapVersion { // It is already bootstrapped/upgraded by a higher version TiDB server. - releaseFn() return } - defer releaseFn() - - err = forceToLeader(context.Background(), s) - if err != nil { - logutil.BgLogger().Fatal("[upgrade] force to owner failed", zap.Error(err)) - } checkDistTask(s, ver) printClusterState(s, ver) diff --git a/pkg/session/session.go b/pkg/session/session.go index 89fa1cea3a40f..5b9669f44e19c 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -3449,7 +3449,7 @@ func bootstrapSessionImpl(ctx context.Context, store kv.Storage, createSessionsI } ver := getStoreBootstrapVersion(store) if ver < currentBootstrapVersion { - runInBootstrapSession(store, getStartMode(ver)) + runInBootstrapSession(store, ver) } else { err = InitMDLVariable(store) if err != nil { @@ -3678,12 +3678,31 @@ func getStartMode(ver int64) ddl.StartMode { // If no bootstrap and storage is remote, we must use a little lease time to // bootstrap quickly, after bootstrapped, we will reset the lease time. // TODO: Using a bootstrap tool for doing this may be better later. -func runInBootstrapSession(store kv.Storage, startMode ddl.StartMode) { +func runInBootstrapSession(store kv.Storage, ver int64) { + startMode := getStartMode(ver) s, err := createSession(store) if err != nil { // Bootstrap fail will cause program exit. logutil.BgLogger().Fatal("createSession error", zap.Error(err)) } + if startMode == ddl.Upgrade { + releaseFn, ok := acquireLock(s) + if !ok { + logutil.BgLogger().Fatal("[upgrade] get ddl owner distributed lock failed", zap.Error(err)) + } + defer releaseFn() + ver, err = getBootstrapVersion(s) + terror.MustNil(err) + if ver >= currentBootstrapVersion { + // It is already bootstrapped/upgraded by another TiDB instance, but + // we still need to go through the following domain Start/Close code + // right now as we have already initialized it when creating the session, + // so we switch to normal mode. + // TODO remove this after we can refactor below code out in this case. + logutil.BgLogger().Info("[upgrade] already upgraded by other nodes, switch to normal mode") + startMode = ddl.Normal + } + } dom := domain.GetDomain(s) err = dom.Start(startMode) if err != nil { @@ -3697,7 +3716,7 @@ func runInBootstrapSession(store kv.Storage, startMode ddl.StartMode) { s.SetValue(sessionctx.Initing, true) if startMode == ddl.Bootstrap { bootstrap(s) - } else { + } else if startMode == ddl.Upgrade { upgrade(s) } finishBootstrap(store) diff --git a/pkg/session/tidb.go b/pkg/session/tidb.go index 913d8d2d50b06..f0bc68aba465a 100644 --- a/pkg/session/tidb.go +++ b/pkg/session/tidb.go @@ -125,6 +125,8 @@ var ( domains: map[string]*domain.Domain{}, } // store.UUID()-> IfBootstrapped + // TODO this memory flag is meaningless, a store is always bootstrapped once, + // we can always get the version from the store, remove it later. storeBootstrapped = make(map[string]bool) storeBootstrappedLock sync.Mutex From dbf2b433ab0b349fbd6119cc1daecbc01b1eb895 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 30 Oct 2024 09:45:51 +0800 Subject: [PATCH 3/9] change --- pkg/ddl/ddl.go | 4 ++-- pkg/ddl/schema_test.go | 2 +- pkg/session/BUILD.bazel | 1 + pkg/session/session_test.go | 29 +++++++++++++++++++++++++++++ 4 files changed, 33 insertions(+), 3 deletions(-) create mode 100644 pkg/session/session_test.go diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index bf581e84d7d69..ad9d87bed4d7a 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -100,11 +100,11 @@ const ( // Normal mode, cluster is in normal state. Normal StartMode = "normal" // Bootstrap mode, cluster is during bootstrap. - Bootstrap = "bootstrap" + Bootstrap StartMode = "bootstrap" // Upgrade mode, cluster is during upgrade, we will force current node to be // the DDL owner, to make sure all upgrade related DDLs are run on new version // TiDB instance. - Upgrade = "upgrade" + Upgrade StartMode = "upgrade" ) // OnExist specifies what to do when a new object has a name collision. diff --git a/pkg/ddl/schema_test.go b/pkg/ddl/schema_test.go index 2ff375b76a7c9..7132fabe8c956 100644 --- a/pkg/ddl/schema_test.go +++ b/pkg/ddl/schema_test.go @@ -307,7 +307,7 @@ func TestSchemaWaitJob(t *testing.T) { ddl.WithSchemaLoader(domain), ) det2 := de2.(ddl.ExecutorForTest) - err := d2.Start(pools.NewResourcePool(func() (pools.Resource, error) { + err := d2.Start(ddl.Normal, pools.NewResourcePool(func() (pools.Resource, error) { session := testkit.NewTestKit(t, store).Session() session.GetSessionVars().CommonGlobalLoaded = true return session, nil diff --git a/pkg/session/BUILD.bazel b/pkg/session/BUILD.bazel index 34b00407b45d6..51c5b5e23c739 100644 --- a/pkg/session/BUILD.bazel +++ b/pkg/session/BUILD.bazel @@ -144,6 +144,7 @@ go_test( "bench_test.go", "bootstrap_test.go", "main_test.go", + "session_test.go", "tidb_test.go", ], embed = [":session"], diff --git a/pkg/session/session_test.go b/pkg/session/session_test.go new file mode 100644 index 0000000000000..1c9bfb95609b4 --- /dev/null +++ b/pkg/session/session_test.go @@ -0,0 +1,29 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package session + +import ( + "testing" + + "github.com/pingcap/tidb/pkg/ddl" + "github.com/stretchr/testify/require" +) + +func TestGetStartMode(t *testing.T) { + require.Equal(t, ddl.Normal, getStartMode(currentBootstrapVersion)) + require.Equal(t, ddl.Normal, getStartMode(currentBootstrapVersion+1)) + require.Equal(t, ddl.Upgrade, getStartMode(currentBootstrapVersion-1)) + require.Equal(t, ddl.Bootstrap, getStartMode(0)) +} From 55d2d9a1e8b5dc1f2b4188fe8c4a57182420fa23 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 30 Oct 2024 11:09:04 +0800 Subject: [PATCH 4/9] change --- pkg/session/session.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/session/session.go b/pkg/session/session.go index 5b9669f44e19c..7d563d9fa29dd 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -3685,6 +3685,11 @@ func runInBootstrapSession(store kv.Storage, ver int64) { // Bootstrap fail will cause program exit. logutil.BgLogger().Fatal("createSession error", zap.Error(err)) } + // For the bootstrap SQLs, the following variables should be compatible with old TiDB versions. + // TODO we should have a createBootstrapSession to init those special variables. + s.sessionVars.EnableClusteredIndex = variable.ClusteredIndexDefModeIntOnly + s.SetValue(sessionctx.Initing, true) + if startMode == ddl.Upgrade { releaseFn, ok := acquireLock(s) if !ok { @@ -3710,10 +3715,6 @@ func runInBootstrapSession(store kv.Storage, ver int64) { logutil.BgLogger().Fatal("start domain error", zap.Error(err)) } - // For the bootstrap SQLs, the following variables should be compatible with old TiDB versions. - s.sessionVars.EnableClusteredIndex = variable.ClusteredIndexDefModeIntOnly - - s.SetValue(sessionctx.Initing, true) if startMode == ddl.Bootstrap { bootstrap(s) } else if startMode == ddl.Upgrade { From fe413854fa835c2d2d46a3c390ea67beec764ec5 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 30 Oct 2024 17:39:18 +0800 Subject: [PATCH 5/9] change --- pkg/domain/domain.go | 66 ++++++++++++++++++++++++----------- pkg/owner/manager.go | 10 +++--- pkg/session/bootstrap.go | 27 +++++++------- pkg/session/bootstrap_test.go | 6 ++-- pkg/session/session.go | 59 +++++++++++++++++-------------- 5 files changed, 102 insertions(+), 66 deletions(-) diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index ff5334a098195..c845b07497773 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -1311,6 +1311,37 @@ func NewDomain(store kv.Storage, schemaLease time.Duration, statsLease time.Dura const serverIDForStandalone = 1 // serverID for standalone deployment. +// NewEtcdCli creates a new clientv3.Client from store if the store support it. +// the returned client might be nil. +// TODO currently uni-store/mock-tikv/tikv all implements EtcdBackend while they don't support actually. +// refactor this part. +func NewEtcdCli(store kv.Storage) (*clientv3.Client, error) { + etcdStore, addrs, err := getEtcdAddrs(store) + if err != nil { + return nil, err + } + if len(addrs) == 0 { + return nil, nil + } + cli, err := newEtcdCli(addrs, etcdStore) + if err != nil { + return nil, err + } + return cli, nil +} + +func getEtcdAddrs(store kv.Storage) (kv.EtcdBackend, []string, error) { + etcdStore, ok := store.(kv.EtcdBackend) + if !ok { + return nil, nil, nil + } + addrs, err := etcdStore.EtcdAddrs() + if err != nil { + return nil, nil, err + } + return etcdStore, addrs, nil +} + func newEtcdCli(addrs []string, ebd kv.EtcdBackend) (*clientv3.Client, error) { cfg := config.GetGlobalConfig() etcdLogCfg := zap.NewProductionConfig() @@ -1344,30 +1375,26 @@ func (do *Domain) Init( ) error { do.sysExecutorFactory = sysExecutorFactory perfschema.Init() - if ebd, ok := do.store.(kv.EtcdBackend); ok { - var addrs []string - var err error - if addrs, err = ebd.EtcdAddrs(); err != nil { - return err + etcdStore, addrs, err := getEtcdAddrs(do.store) + if err != nil { + return errors.Trace(err) + } + if len(addrs) > 0 { + cli, err2 := newEtcdCli(addrs, etcdStore) + if err2 != nil { + return errors.Trace(err2) } - if addrs != nil { - cli, err := newEtcdCli(addrs, ebd) - if err != nil { - return errors.Trace(err) - } + etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec())) - etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec())) + do.etcdClient = cli - do.etcdClient = cli + do.autoidClient = autoid.NewClientDiscover(cli) - do.autoidClient = autoid.NewClientDiscover(cli) - - unprefixedEtcdCli, err := newEtcdCli(addrs, ebd) - if err != nil { - return errors.Trace(err) - } - do.unprefixedEtcdCli = unprefixedEtcdCli + unprefixedEtcdCli, err2 := newEtcdCli(addrs, etcdStore) + if err2 != nil { + return errors.Trace(err2) } + do.unprefixedEtcdCli = unprefixedEtcdCli } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -1418,7 +1445,6 @@ func (do *Domain) Init( // step 1: prepare the info/schema syncer which domain reload needed. pdCli, pdHTTPCli := do.GetPDClient(), do.GetPDHTTPClient() skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard - var err error do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, pdCli, pdHTTPCli, do.Store().GetCodec(), skipRegisterToDashboard) diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index 1533ff6b02d87..82586176c4571 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -250,9 +250,10 @@ func (m *ownerManager) tryToBeOwnerOnce() error { if err != nil { return errors.Trace(err) } - // Campaign will wait until current instance become owner or the key is deleted, - // in case other nodes put keys in between previous get and txn, we never become - // the owner, so we add a timeout to avoid blocking. + // Campaign will wait until there is no key with smaller create-revision, either + // current instance become owner or all the keys are deleted, in case other nodes + // put keys in between previous get and txn, we never become the owner, so we + // add a timeout to avoid blocking. ctx, cancel := context.WithTimeout(m.ctx, keyOpDefaultTimeout) defer cancel() elec := concurrency.NewElection(m.forceOwnerSession, m.key) @@ -261,7 +262,8 @@ func (m *ownerManager) tryToBeOwnerOnce() error { } // Campaign assumes that it's the only client managing the lifecycle of the campaign - // key, so it will also return when the campaign key is deleted by other TiDB + // key, it only checks whether there are any keys with smaller create-revision, + // so it will also return when all the campaign keys are deleted by other TiDB // instances when the distributed lock has failed to keep alive and another TiDB // get the lock. It's a quite rare case, and the TiDB must be of newer version // which has the fix of the issue, so it's ok to return now. diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index ce63cbb3d9336..4faa9138396aa 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -1426,25 +1426,28 @@ var ( SupportUpgradeHTTPOpVer int64 = version174 ) -func acquireLock(s sessiontypes.Session) (func(), bool) { - dom := domain.GetDomain(s) - if dom == nil { - logutil.BgLogger().Warn("domain is nil") - return nil, false +func acquireLock(store kv.Storage) (func(), error) { + etcdCli, err := domain.NewEtcdCli(store) + if err != nil { + return nil, errors.Trace(err) } - cli := dom.GetEtcdClient() - if cli == nil { - logutil.BgLogger().Warn("etcd client is nil, force to acquire ddl owner lock") + if etcdCli == nil { // Special handling for test. + logutil.BgLogger().Warn("skip acquire ddl owner lock for uni-store") return func() { // do nothing - }, true + }, nil } - releaseFn, err := owner.AcquireDistributedLock(context.Background(), cli, bootstrapOwnerKey, 10) + releaseFn, err := owner.AcquireDistributedLock(context.Background(), etcdCli, bootstrapOwnerKey, 10) if err != nil { - return nil, false + return nil, errors.Trace(err) } - return releaseFn, true + return func() { + releaseFn() + if err2 := etcdCli.Close(); err2 != nil { + logutil.BgLogger().Error("failed to close etcd client", zap.Error(err2)) + } + }, nil } func checkDistTask(s sessiontypes.Session, ver int64) { diff --git a/pkg/session/bootstrap_test.go b/pkg/session/bootstrap_test.go index 0972c1906ca57..1b6d51dc441bf 100644 --- a/pkg/session/bootstrap_test.go +++ b/pkg/session/bootstrap_test.go @@ -739,7 +739,7 @@ func TestIndexMergeInNewCluster(t *testing.T) { store, err := mockstore.NewMockStore(mockstore.WithStoreType(mockstore.EmbedUnistore)) require.NoError(t, err) // Indicates we are in a new cluster. - require.Equal(t, int64(notBootstrapped), getStoreBootstrapVersion(store)) + require.Equal(t, int64(notBootstrapped), getStoreBootstrapVersionWithCache(store)) dom, err := BootstrapSession(store) require.NoError(t, err) defer func() { require.NoError(t, store.Close()) }() @@ -1059,7 +1059,7 @@ func TestTiDBOptAdvancedJoinHintInNewCluster(t *testing.T) { store, err := mockstore.NewMockStore(mockstore.WithStoreType(mockstore.EmbedUnistore)) require.NoError(t, err) // Indicates we are in a new cluster. - require.Equal(t, int64(notBootstrapped), getStoreBootstrapVersion(store)) + require.Equal(t, int64(notBootstrapped), getStoreBootstrapVersionWithCache(store)) dom, err := BootstrapSession(store) require.NoError(t, err) defer func() { require.NoError(t, store.Close()) }() @@ -1085,7 +1085,7 @@ func TestTiDBCostModelInNewCluster(t *testing.T) { store, err := mockstore.NewMockStore(mockstore.WithStoreType(mockstore.EmbedUnistore)) require.NoError(t, err) // Indicates we are in a new cluster. - require.Equal(t, int64(notBootstrapped), getStoreBootstrapVersion(store)) + require.Equal(t, int64(notBootstrapped), getStoreBootstrapVersionWithCache(store)) dom, err := BootstrapSession(store) require.NoError(t, err) defer func() { require.NoError(t, store.Close()) }() diff --git a/pkg/session/session.go b/pkg/session/session.go index 7d563d9fa29dd..8c4f82a555ea4 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -3447,7 +3447,7 @@ func bootstrapSessionImpl(ctx context.Context, store kv.Storage, createSessionsI if err != nil { return nil, err } - ver := getStoreBootstrapVersion(store) + ver := getStoreBootstrapVersionWithCache(store) if ver < currentBootstrapVersion { runInBootstrapSession(store, ver) } else { @@ -3680,25 +3680,16 @@ func getStartMode(ver int64) ddl.StartMode { // TODO: Using a bootstrap tool for doing this may be better later. func runInBootstrapSession(store kv.Storage, ver int64) { startMode := getStartMode(ver) - s, err := createSession(store) - if err != nil { - // Bootstrap fail will cause program exit. - logutil.BgLogger().Fatal("createSession error", zap.Error(err)) - } - // For the bootstrap SQLs, the following variables should be compatible with old TiDB versions. - // TODO we should have a createBootstrapSession to init those special variables. - s.sessionVars.EnableClusteredIndex = variable.ClusteredIndexDefModeIntOnly - s.SetValue(sessionctx.Initing, true) if startMode == ddl.Upgrade { - releaseFn, ok := acquireLock(s) - if !ok { + // TODO at this time domain hasn't created, we need to make sure this in a clear way + releaseFn, err := acquireLock(store) + if err != nil { logutil.BgLogger().Fatal("[upgrade] get ddl owner distributed lock failed", zap.Error(err)) } defer releaseFn() - ver, err = getBootstrapVersion(s) - terror.MustNil(err) - if ver >= currentBootstrapVersion { + currVer := mustGetStoreBootstrapVersion(store) + if currVer >= currentBootstrapVersion { // It is already bootstrapped/upgraded by another TiDB instance, but // we still need to go through the following domain Start/Close code // right now as we have already initialized it when creating the session, @@ -3708,6 +3699,11 @@ func runInBootstrapSession(store kv.Storage, ver int64) { startMode = ddl.Normal } } + s, err := createSession(store) + if err != nil { + // Bootstrap fail will cause program exit. + logutil.BgLogger().Fatal("createSession error", zap.Error(err)) + } dom := domain.GetDomain(s) err = dom.Start(startMode) if err != nil { @@ -3715,6 +3711,11 @@ func runInBootstrapSession(store kv.Storage, ver int64) { logutil.BgLogger().Fatal("start domain error", zap.Error(err)) } + // For the bootstrap SQLs, the following variables should be compatible with old TiDB versions. + // TODO we should have a createBootstrapSession to init those special variables. + s.sessionVars.EnableClusteredIndex = variable.ClusteredIndexDefModeIntOnly + + s.SetValue(sessionctx.Initing, true) if startMode == ddl.Bootstrap { bootstrap(s) } else if startMode == ddl.Upgrade { @@ -3858,28 +3859,32 @@ const ( notBootstrapped = 0 ) -func getStoreBootstrapVersion(store kv.Storage) int64 { - storeBootstrappedLock.Lock() - defer storeBootstrappedLock.Unlock() - // check in memory - _, ok := storeBootstrapped[store.UUID()] - if ok { - return currentBootstrapVersion - } - +func mustGetStoreBootstrapVersion(store kv.Storage) int64 { var ver int64 // check in kv store ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap) err := kv.RunInNewTxn(ctx, store, false, func(_ context.Context, txn kv.Transaction) error { var err error - t := meta.NewMutator(txn) + t := meta.NewReader(txn) ver, err = t.GetBootstrapVersion() return err }) if err != nil { - logutil.BgLogger().Fatal("check bootstrapped failed", - zap.Error(err)) + logutil.BgLogger().Fatal("check bootstrapped failed", zap.Error(err)) } + return ver +} + +func getStoreBootstrapVersionWithCache(store kv.Storage) int64 { + storeBootstrappedLock.Lock() + defer storeBootstrappedLock.Unlock() + // check in memory + _, ok := storeBootstrapped[store.UUID()] + if ok { + return currentBootstrapVersion + } + + ver := mustGetStoreBootstrapVersion(store) if ver > notBootstrapped { // here mean memory is not ok, but other server has already finished it From d84175d2774a4ec2a7ab875e2731107cd46ff0b5 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 30 Oct 2024 17:49:04 +0800 Subject: [PATCH 6/9] change --- pkg/session/session.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/session/session.go b/pkg/session/session.go index 8c4f82a555ea4..18463dfed1dc4 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -3683,9 +3683,10 @@ func runInBootstrapSession(store kv.Storage, ver int64) { if startMode == ddl.Upgrade { // TODO at this time domain hasn't created, we need to make sure this in a clear way + logutil.BgLogger().Info("[upgrade] get owner lock to upgrade") releaseFn, err := acquireLock(store) if err != nil { - logutil.BgLogger().Fatal("[upgrade] get ddl owner distributed lock failed", zap.Error(err)) + logutil.BgLogger().Fatal("[upgrade] get owner lock failed", zap.Error(err)) } defer releaseFn() currVer := mustGetStoreBootstrapVersion(store) From 0abc4428bb953a6e56318f5272eb417db84fd8c8 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 30 Oct 2024 19:01:38 +0800 Subject: [PATCH 7/9] change --- pkg/owner/manager.go | 11 ++++++----- pkg/session/session.go | 3 +++ 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index 82586176c4571..9652a2125e415 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -86,7 +86,8 @@ type Manager interface { const ( keyOpDefaultTimeout = 5 * time.Second - waitTimeOnForceOwner = 5 * time.Second + // WaitTimeOnForceOwner is the time to wait before or after force to be owner. + WaitTimeOnForceOwner = 5 * time.Second ) // OpType is the owner key value operation type. @@ -207,7 +208,7 @@ func (m *ownerManager) ForceToBeOwner(context.Context) error { for i := 0; i < 3; i++ { // we need to sleep in every retry, as other TiDB nodes will start campaign // immediately after we delete their key. - time.Sleep(waitTimeOnForceOwner) + time.Sleep(WaitTimeOnForceOwner) if err = m.tryToBeOwnerOnce(); err != nil { logutil.Logger(m.logCtx).Warn("failed to retire owner on older version", zap.Error(err)) continue @@ -639,13 +640,13 @@ func AcquireDistributedLock( } return nil, err } - logutil.Logger(ctx).Info("acquire distributed flush lock success", zap.String("key", key)) + logutil.Logger(ctx).Info("acquire distributed lock success", zap.String("key", key)) return func() { err = mu.Unlock(ctx) if err != nil { - logutil.Logger(ctx).Warn("release distributed flush lock error", zap.Error(err), zap.String("key", key)) + logutil.Logger(ctx).Warn("release distributed lock error", zap.Error(err), zap.String("key", key)) } else { - logutil.Logger(ctx).Info("release distributed flush lock success", zap.String("key", key)) + logutil.Logger(ctx).Info("release distributed lock success", zap.String("key", key)) } err = se.Close() if err != nil { diff --git a/pkg/session/session.go b/pkg/session/session.go index 18463dfed1dc4..997c409ec6b57 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -3720,6 +3720,9 @@ func runInBootstrapSession(store kv.Storage, ver int64) { if startMode == ddl.Bootstrap { bootstrap(s) } else if startMode == ddl.Upgrade { + // below sleep is used to mitigate https://github.com/pingcap/tidb/issues/57003, + // to let the older owner have time to notice that it's already retired. + time.Sleep(owner.WaitTimeOnForceOwner) upgrade(s) } finishBootstrap(store) From 330511b5f62833a2f5d42a419ec66ae362801da0 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 30 Oct 2024 19:34:23 +0800 Subject: [PATCH 8/9] change --- pkg/domain/BUILD.bazel | 2 +- pkg/domain/domain_test.go | 16 ++++++++++++++++ pkg/owner/manager.go | 4 ++-- pkg/session/session.go | 5 +++-- 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/pkg/domain/BUILD.bazel b/pkg/domain/BUILD.bazel index 296bacaf25c8d..fea17d5c870e1 100644 --- a/pkg/domain/BUILD.bazel +++ b/pkg/domain/BUILD.bazel @@ -136,7 +136,7 @@ go_test( ], embed = [":domain"], flaky = True, - shard_count = 30, + shard_count = 31, deps = [ "//pkg/config", "//pkg/ddl", diff --git a/pkg/domain/domain_test.go b/pkg/domain/domain_test.go index 92eff2e83dd98..f575d8702b305 100644 --- a/pkg/domain/domain_test.go +++ b/pkg/domain/domain_test.go @@ -486,3 +486,19 @@ func TestDeferFn(t *testing.T) { require.True(t, d) require.Len(t, df.data, 1) } + +func TestNewEtcdCliGetEtcdAddrs(t *testing.T) { + etcdStore, addrs, err := getEtcdAddrs(nil) + require.NoError(t, err) + require.Empty(t, addrs) + require.Nil(t, etcdStore) + + etcdStore, addrs, err = getEtcdAddrs(&mockEtcdBackend{pdAddrs: []string{"localhost:2379"}}) + require.NoError(t, err) + require.Equal(t, []string{"localhost:2379"}, addrs) + require.NotNil(t, etcdStore) + + cli, err := NewEtcdCli(nil) + require.NoError(t, err) + require.Nil(t, cli) +} diff --git a/pkg/owner/manager.go b/pkg/owner/manager.go index 9652a2125e415..e6c4ebd4593f5 100644 --- a/pkg/owner/manager.go +++ b/pkg/owner/manager.go @@ -253,8 +253,8 @@ func (m *ownerManager) tryToBeOwnerOnce() error { } // Campaign will wait until there is no key with smaller create-revision, either // current instance become owner or all the keys are deleted, in case other nodes - // put keys in between previous get and txn, we never become the owner, so we - // add a timeout to avoid blocking. + // put keys in between previous get and txn, and makes current node never become + // the owner, so we add a timeout to avoid blocking. ctx, cancel := context.WithTimeout(m.ctx, keyOpDefaultTimeout) defer cancel() elec := concurrency.NewElection(m.forceOwnerSession, m.key) diff --git a/pkg/session/session.go b/pkg/session/session.go index 997c409ec6b57..126e11aad7a40 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -3682,7 +3682,8 @@ func runInBootstrapSession(store kv.Storage, ver int64) { startMode := getStartMode(ver) if startMode == ddl.Upgrade { - // TODO at this time domain hasn't created, we need to make sure this in a clear way + // TODO at this time domain must not be created, else it will register server + // info, and cause deadlock, we need to make sure this in a clear way logutil.BgLogger().Info("[upgrade] get owner lock to upgrade") releaseFn, err := acquireLock(store) if err != nil { @@ -3874,7 +3875,7 @@ func mustGetStoreBootstrapVersion(store kv.Storage) int64 { return err }) if err != nil { - logutil.BgLogger().Fatal("check bootstrapped failed", zap.Error(err)) + logutil.BgLogger().Fatal("get store bootstrap version failed", zap.Error(err)) } return ver } From 1c641c8018e5d5e51d0d189de8f59d457719a146 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Thu, 31 Oct 2024 16:05:02 +0800 Subject: [PATCH 9/9] close on aquire fail --- pkg/session/bootstrap.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index 4faa9138396aa..0d5c5d295abcc 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -1440,6 +1440,9 @@ func acquireLock(store kv.Storage) (func(), error) { } releaseFn, err := owner.AcquireDistributedLock(context.Background(), etcdCli, bootstrapOwnerKey, 10) if err != nil { + if err2 := etcdCli.Close(); err2 != nil { + logutil.BgLogger().Error("failed to close etcd client", zap.Error(err2)) + } return nil, errors.Trace(err) } return func() {