Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: mitigate the issue that multiple owner might exist during force to be owner #56963

Merged
merged 9 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (g Glue) startDomainAsNeeded(store kv.Storage) error {
if err != nil {
return err
}
return dom.Start()
return dom.Start(ddl.Normal)
}

func (g Glue) createTypesSession(store kv.Storage) (sessiontypes.Session, error) {
Expand Down
33 changes: 29 additions & 4 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,20 @@ var (
detectJobVerInterval = 10 * time.Second
)

// StartMode is an enum type for the start mode of the DDL.
type StartMode string

const (
// Normal mode, cluster is in normal state.
Normal StartMode = "normal"
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
// Bootstrap mode, cluster is during bootstrap.
Bootstrap StartMode = "bootstrap"
// Upgrade mode, cluster is during upgrade, we will force current node to be
// the DDL owner, to make sure all upgrade related DDLs are run on new version
// TiDB instance.
Upgrade StartMode = "upgrade"
)

// OnExist specifies what to do when a new object has a name collision.
type OnExist uint8

Expand Down Expand Up @@ -161,7 +175,7 @@ var (
type DDL interface {
// Start campaigns the owner and starts workers.
// ctxPool is used for the worker's delRangeManager and creates sessions.
Start(ctxPool *pools.ResourcePool) error
Start(startMode StartMode, ctxPool *pools.ResourcePool) error
// Stats returns the DDL statistics.
Stats(vars *variable.SessionVars) (map[string]any, error)
// GetScope gets the status variables scope.
Expand Down Expand Up @@ -748,10 +762,21 @@ func (d *ddl) newDeleteRangeManager(mock bool) delRangeManager {
}

// Start implements DDL.Start interface.
func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
func (d *ddl) Start(startMode StartMode, ctxPool *pools.ResourcePool) error {
d.detectAndUpdateJobVersion()
campaignOwner := config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()
if startMode == Upgrade {
if !campaignOwner {
return errors.New("DDL must be enabled when upgrading")
}

logutil.DDLLogger().Info("DDL is in upgrade mode, force to be owner")
if err := d.ownerManager.ForceToBeOwner(d.ctx); err != nil {
return errors.Trace(err)
}
}
logutil.DDLLogger().Info("start DDL", zap.String("ID", d.uuid),
zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()),
zap.Bool("runWorker", campaignOwner),
zap.Stringer("jobVersion", model.GetJobVerInUse()))

d.sessPool = sess.NewSessionPool(ctxPool)
Expand Down Expand Up @@ -786,7 +811,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {

// If tidb_enable_ddl is true, we need campaign owner and do DDL jobs. Besides, we also can do backfill jobs.
// Otherwise, we needn't do that.
if config.GetGlobalConfig().Instance.TiDBEnableDDL.Load() {
if campaignOwner {
if err := d.EnableDDL(); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func restartWorkers(t *testing.T, store kv.Storage, d *domain.Domain) {
ddl.WithSchemaLoader(d),
)
d.SetDDL(newDDL, newDDLExecutor)
err = newDDL.Start(pools.NewResourcePool(func() (pools.Resource, error) {
err = newDDL.Start(ddl.Normal, pools.NewResourcePool(func() (pools.Resource, error) {
session := testkit.NewTestKit(t, store).Session()
session.GetSessionVars().CommonGlobalLoaded = true
return session, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func TestSchemaWaitJob(t *testing.T) {
ddl.WithSchemaLoader(domain),
)
det2 := de2.(ddl.ExecutorForTest)
err := d2.Start(pools.NewResourcePool(func() (pools.Resource, error) {
err := d2.Start(ddl.Normal, pools.NewResourcePool(func() (pools.Resource, error) {
session := testkit.NewTestKit(t, store).Session()
session.GetSessionVars().CommonGlobalLoaded = true
return session, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/schematracker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,8 @@ func (*Checker) CreatePlacementPolicyWithInfo(_ sessionctx.Context, _ *model.Pol
}

// Start implements the DDL interface.
func (d *Checker) Start(ctxPool *pools.ResourcePool) error {
return d.realDDL.Start(ctxPool)
func (d *Checker) Start(startMode ddl.StartMode, ctxPool *pools.ResourcePool) error {
return d.realDDL.Start(startMode, ctxPool)
}

// Stats implements the DDL interface.
Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ go_test(
],
embed = [":domain"],
flaky = True,
shard_count = 30,
shard_count = 31,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
70 changes: 48 additions & 22 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,37 @@ func NewDomain(store kv.Storage, schemaLease time.Duration, statsLease time.Dura

const serverIDForStandalone = 1 // serverID for standalone deployment.

// NewEtcdCli creates a new clientv3.Client from store if the store support it.
// the returned client might be nil.
// TODO currently uni-store/mock-tikv/tikv all implements EtcdBackend while they don't support actually.
// refactor this part.
func NewEtcdCli(store kv.Storage) (*clientv3.Client, error) {
etcdStore, addrs, err := getEtcdAddrs(store)
if err != nil {
return nil, err
}
if len(addrs) == 0 {
return nil, nil
}
cli, err := newEtcdCli(addrs, etcdStore)
if err != nil {
return nil, err
}
return cli, nil
}

func getEtcdAddrs(store kv.Storage) (kv.EtcdBackend, []string, error) {
etcdStore, ok := store.(kv.EtcdBackend)
if !ok {
return nil, nil, nil
}
addrs, err := etcdStore.EtcdAddrs()
if err != nil {
return nil, nil, err
}
return etcdStore, addrs, nil
}

func newEtcdCli(addrs []string, ebd kv.EtcdBackend) (*clientv3.Client, error) {
cfg := config.GetGlobalConfig()
etcdLogCfg := zap.NewProductionConfig()
Expand Down Expand Up @@ -1344,30 +1375,26 @@ func (do *Domain) Init(
) error {
do.sysExecutorFactory = sysExecutorFactory
perfschema.Init()
if ebd, ok := do.store.(kv.EtcdBackend); ok {
var addrs []string
var err error
if addrs, err = ebd.EtcdAddrs(); err != nil {
return err
etcdStore, addrs, err := getEtcdAddrs(do.store)
if err != nil {
return errors.Trace(err)
}
if len(addrs) > 0 {
cli, err2 := newEtcdCli(addrs, etcdStore)
if err2 != nil {
return errors.Trace(err2)
}
if addrs != nil {
cli, err := newEtcdCli(addrs, ebd)
if err != nil {
return errors.Trace(err)
}
etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec()))

etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec()))
do.etcdClient = cli

do.etcdClient = cli
do.autoidClient = autoid.NewClientDiscover(cli)

do.autoidClient = autoid.NewClientDiscover(cli)

unprefixedEtcdCli, err := newEtcdCli(addrs, ebd)
if err != nil {
return errors.Trace(err)
}
do.unprefixedEtcdCli = unprefixedEtcdCli
unprefixedEtcdCli, err2 := newEtcdCli(addrs, etcdStore)
if err2 != nil {
return errors.Trace(err2)
}
do.unprefixedEtcdCli = unprefixedEtcdCli
}

ctx, cancelFunc := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1418,7 +1445,6 @@ func (do *Domain) Init(
// step 1: prepare the info/schema syncer which domain reload needed.
pdCli, pdHTTPCli := do.GetPDClient(), do.GetPDHTTPClient()
skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard
var err error
do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID,
do.etcdClient, do.unprefixedEtcdCli, pdCli, pdHTTPCli,
do.Store().GetCodec(), skipRegisterToDashboard)
Expand Down Expand Up @@ -1482,7 +1508,7 @@ func (do *Domain) Init(

// Start starts the domain. After start, DDLs can be executed using session, see
// Init also.
func (do *Domain) Start() error {
func (do *Domain) Start(startMode ddl.StartMode) error {
gCfg := config.GetGlobalConfig()
if gCfg.EnableGlobalKill && do.etcdClient != nil {
do.wg.Add(1)
Expand All @@ -1501,7 +1527,7 @@ func (do *Domain) Start() error {
sysCtxPool := pools.NewResourcePool(sysFac, 512, 512, resourceIdleTimeout)

// start the ddl after the domain reload, avoiding some internal sql running before infoSchema construction.
err := do.ddl.Start(sysCtxPool)
err := do.ddl.Start(startMode, sysCtxPool)
if err != nil {
return err
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestInfo(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/MockReplaceDDL", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/NoDDLDispatchLoop", `return(true)`))
require.NoError(t, dom.Init(sysMockFactory, nil))
require.NoError(t, dom.Start())
require.NoError(t, dom.Start(ddl.Bootstrap))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/NoDDLDispatchLoop"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/MockReplaceDDL"))

Expand Down Expand Up @@ -486,3 +486,19 @@ func TestDeferFn(t *testing.T) {
require.True(t, d)
require.Len(t, df.data, 1)
}

func TestNewEtcdCliGetEtcdAddrs(t *testing.T) {
etcdStore, addrs, err := getEtcdAddrs(nil)
require.NoError(t, err)
require.Empty(t, addrs)
require.Nil(t, etcdStore)

etcdStore, addrs, err = getEtcdAddrs(&mockEtcdBackend{pdAddrs: []string{"localhost:2379"}})
require.NoError(t, err)
require.Equal(t, []string{"localhost:2379"}, addrs)
require.NotNil(t, etcdStore)

cli, err := NewEtcdCli(nil)
require.NoError(t, err)
require.Nil(t, cli)
}
Loading
Loading