Skip to content

Commit

Permalink
ddl: mitigate the issue that multiple owner might exist during force …
Browse files Browse the repository at this point in the history
…to be owner (#56963)

close #56924
  • Loading branch information
D3Hunter authored Oct 31, 2024
1 parent e92e2d0 commit 6679c74
Show file tree
Hide file tree
Showing 16 changed files with 338 additions and 122 deletions.
2 changes: 1 addition & 1 deletion br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (g Glue) startDomainAsNeeded(store kv.Storage) error {
if err != nil {
return err
}
return dom.Start()
return dom.Start(ddl.Normal)
}

func (g Glue) createTypesSession(store kv.Storage) (sessiontypes.Session, error) {
Expand Down
33 changes: 29 additions & 4 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,20 @@ var (
detectJobVerInterval = 10 * time.Second
)

// StartMode is an enum type for the start mode of the DDL.
type StartMode string

const (
// Normal mode, cluster is in normal state.
Normal StartMode = "normal"
// Bootstrap mode, cluster is during bootstrap.
Bootstrap StartMode = "bootstrap"
// Upgrade mode, cluster is during upgrade, we will force current node to be
// the DDL owner, to make sure all upgrade related DDLs are run on new version
// TiDB instance.
Upgrade StartMode = "upgrade"
)

// OnExist specifies what to do when a new object has a name collision.
type OnExist uint8

Expand Down Expand Up @@ -161,7 +175,7 @@ var (
type DDL interface {
// Start campaigns the owner and starts workers.
// ctxPool is used for the worker's delRangeManager and creates sessions.
Start(ctxPool *pools.ResourcePool) error
Start(startMode StartMode, ctxPool *pools.ResourcePool) error
// Stats returns the DDL statistics.
Stats(vars *variable.SessionVars) (map[string]any, error)
// GetScope gets the status variables scope.
Expand Down Expand Up @@ -746,10 +760,21 @@ func (d *ddl) newDeleteRangeManager(mock bool) delRangeManager {
}

// Start implements DDL.Start interface.
func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
func (d *ddl) Start(startMode StartMode, ctxPool *pools.ResourcePool) error {
d.detectAndUpdateJobVersion()
campaignOwner := config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()
if startMode == Upgrade {
if !campaignOwner {
return errors.New("DDL must be enabled when upgrading")
}

logutil.DDLLogger().Info("DDL is in upgrade mode, force to be owner")
if err := d.ownerManager.ForceToBeOwner(d.ctx); err != nil {
return errors.Trace(err)
}
}
logutil.DDLLogger().Info("start DDL", zap.String("ID", d.uuid),
zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()),
zap.Bool("runWorker", campaignOwner),
zap.Stringer("jobVersion", model.GetJobVerInUse()))

d.sessPool = sess.NewSessionPool(ctxPool)
Expand Down Expand Up @@ -784,7 +809,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {

// If tidb_enable_ddl is true, we need campaign owner and do DDL jobs. Besides, we also can do backfill jobs.
// Otherwise, we needn't do that.
if config.GetGlobalConfig().Instance.TiDBEnableDDL.Load() {
if campaignOwner {
if err := d.EnableDDL(); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func restartWorkers(t *testing.T, store kv.Storage, d *domain.Domain) {
ddl.WithSchemaLoader(d),
)
d.SetDDL(newDDL, newDDLExecutor)
err = newDDL.Start(pools.NewResourcePool(func() (pools.Resource, error) {
err = newDDL.Start(ddl.Normal, pools.NewResourcePool(func() (pools.Resource, error) {
session := testkit.NewTestKit(t, store).Session()
session.GetSessionVars().CommonGlobalLoaded = true
return session, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func TestSchemaWaitJob(t *testing.T) {
ddl.WithSchemaLoader(domain),
)
det2 := de2.(ddl.ExecutorForTest)
err := d2.Start(pools.NewResourcePool(func() (pools.Resource, error) {
err := d2.Start(ddl.Normal, pools.NewResourcePool(func() (pools.Resource, error) {
session := testkit.NewTestKit(t, store).Session()
session.GetSessionVars().CommonGlobalLoaded = true
return session, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/schematracker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,8 @@ func (*Checker) CreatePlacementPolicyWithInfo(_ sessionctx.Context, _ *model.Pol
}

// Start implements the DDL interface.
func (d *Checker) Start(ctxPool *pools.ResourcePool) error {
return d.realDDL.Start(ctxPool)
func (d *Checker) Start(startMode ddl.StartMode, ctxPool *pools.ResourcePool) error {
return d.realDDL.Start(startMode, ctxPool)
}

// Stats implements the DDL interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ go_test(
],
embed = [":domain"],
flaky = True,
shard_count = 30,
shard_count = 31,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
70 changes: 48 additions & 22 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,37 @@ func NewDomain(store kv.Storage, schemaLease time.Duration, statsLease time.Dura

const serverIDForStandalone = 1 // serverID for standalone deployment.

// NewEtcdCli creates a new clientv3.Client from store if the store support it.
// the returned client might be nil.
// TODO currently uni-store/mock-tikv/tikv all implements EtcdBackend while they don't support actually.
// refactor this part.
func NewEtcdCli(store kv.Storage) (*clientv3.Client, error) {
etcdStore, addrs, err := getEtcdAddrs(store)
if err != nil {
return nil, err
}
if len(addrs) == 0 {
return nil, nil
}
cli, err := newEtcdCli(addrs, etcdStore)
if err != nil {
return nil, err
}
return cli, nil
}

func getEtcdAddrs(store kv.Storage) (kv.EtcdBackend, []string, error) {
etcdStore, ok := store.(kv.EtcdBackend)
if !ok {
return nil, nil, nil
}
addrs, err := etcdStore.EtcdAddrs()
if err != nil {
return nil, nil, err
}
return etcdStore, addrs, nil
}

func newEtcdCli(addrs []string, ebd kv.EtcdBackend) (*clientv3.Client, error) {
cfg := config.GetGlobalConfig()
etcdLogCfg := zap.NewProductionConfig()
Expand Down Expand Up @@ -1344,30 +1375,26 @@ func (do *Domain) Init(
) error {
do.sysExecutorFactory = sysExecutorFactory
perfschema.Init()
if ebd, ok := do.store.(kv.EtcdBackend); ok {
var addrs []string
var err error
if addrs, err = ebd.EtcdAddrs(); err != nil {
return err
etcdStore, addrs, err := getEtcdAddrs(do.store)
if err != nil {
return errors.Trace(err)
}
if len(addrs) > 0 {
cli, err2 := newEtcdCli(addrs, etcdStore)
if err2 != nil {
return errors.Trace(err2)
}
if addrs != nil {
cli, err := newEtcdCli(addrs, ebd)
if err != nil {
return errors.Trace(err)
}
etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec()))

etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec()))
do.etcdClient = cli

do.etcdClient = cli
do.autoidClient = autoid.NewClientDiscover(cli)

do.autoidClient = autoid.NewClientDiscover(cli)

unprefixedEtcdCli, err := newEtcdCli(addrs, ebd)
if err != nil {
return errors.Trace(err)
}
do.unprefixedEtcdCli = unprefixedEtcdCli
unprefixedEtcdCli, err2 := newEtcdCli(addrs, etcdStore)
if err2 != nil {
return errors.Trace(err2)
}
do.unprefixedEtcdCli = unprefixedEtcdCli
}

ctx, cancelFunc := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1414,7 +1441,6 @@ func (do *Domain) Init(
// step 1: prepare the info/schema syncer which domain reload needed.
pdCli, pdHTTPCli := do.GetPDClient(), do.GetPDHTTPClient()
skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard
var err error
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID,
do.etcdClient, do.unprefixedEtcdCli, pdCli, pdHTTPCli,
do.Store().GetCodec(), skipRegisterToDashboard)
Expand Down Expand Up @@ -1478,7 +1504,7 @@ func (do *Domain) Init(

// Start starts the domain. After start, DDLs can be executed using session, see
// Init also.
func (do *Domain) Start() error {
func (do *Domain) Start(startMode ddl.StartMode) error {
gCfg := config.GetGlobalConfig()
if gCfg.EnableGlobalKill && do.etcdClient != nil {
do.wg.Add(1)
Expand All @@ -1497,7 +1523,7 @@ func (do *Domain) Start() error {
sysCtxPool := pools.NewResourcePool(sysFac, 512, 512, resourceIdleTimeout)

// start the ddl after the domain reload, avoiding some internal sql running before infoSchema construction.
err := do.ddl.Start(sysCtxPool)
err := do.ddl.Start(startMode, sysCtxPool)
if err != nil {
return err
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestInfo(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/MockReplaceDDL", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/NoDDLDispatchLoop", `return(true)`))
require.NoError(t, dom.Init(sysMockFactory, nil))
require.NoError(t, dom.Start())
require.NoError(t, dom.Start(ddl.Bootstrap))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/NoDDLDispatchLoop"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/MockReplaceDDL"))

Expand Down Expand Up @@ -486,3 +486,19 @@ func TestDeferFn(t *testing.T) {
require.True(t, d)
require.Len(t, df.data, 1)
}

func TestNewEtcdCliGetEtcdAddrs(t *testing.T) {
etcdStore, addrs, err := getEtcdAddrs(nil)
require.NoError(t, err)
require.Empty(t, addrs)
require.Nil(t, etcdStore)

etcdStore, addrs, err = getEtcdAddrs(&mockEtcdBackend{pdAddrs: []string{"localhost:2379"}})
require.NoError(t, err)
require.Equal(t, []string{"localhost:2379"}, addrs)
require.NotNil(t, etcdStore)

cli, err := NewEtcdCli(nil)
require.NoError(t, err)
require.Nil(t, cli)
}
Loading

0 comments on commit 6679c74

Please sign in to comment.