From 6adcc6552ba6b55f55be004602ec3e0c93d0c871 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 7 Feb 2023 23:55:58 +0800 Subject: [PATCH] *: use the latest independent resource group manager client (#41162) ref pingcap/tidb#38825 --- DEPS.bzl | 4 +- ddl/attributes_sql_test.go | 12 +-- ddl/ddl.go | 2 +- ddl/ddl_api.go | 4 +- ddl/main_test.go | 2 +- ddl/resource_group.go | 4 +- ddl/resource_group_test.go | 2 +- ddl/schematracker/checker.go | 4 +- ddl/schematracker/dm_tracker.go | 4 +- domain/db_test.go | 4 +- domain/domain.go | 25 +++-- domain/infosync/BUILD.bazel | 1 - domain/infosync/info.go | 38 ++++--- domain/infosync/info_test.go | 6 +- domain/infosync/resource_group_manager.go | 121 +++++----------------- executor/ddl.go | 2 +- executor/infoschema_reader.go | 2 +- go.mod | 2 +- go.sum | 4 +- meta/meta.go | 4 +- meta/meta_test.go | 2 +- server/stat_test.go | 2 +- store/mockstore/unistore/pd.go | 57 +++++++--- 23 files changed, 143 insertions(+), 165 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index b7d54a3072401..9ae1d238efe0a 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -4085,8 +4085,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:gkAF7XxM2mfh5ZHbyLhXkaKyDd97soe1SMFIZ2vW260=", - version = "v2.0.6-0.20230207040004-9b3ecc1dcaa9", + sum = "h1:1/ow7ZUnsU5CcxHF1cFAKdD+5b58tMbaeb8qAoli1m4=", + version = "v2.0.6-0.20230207090754-29dfcc272912", ) go_repository( name = "com_github_tikv_pd", diff --git a/ddl/attributes_sql_test.go b/ddl/attributes_sql_test.go index 6950b44645ecf..c9497f2381f52 100644 --- a/ddl/attributes_sql_test.go +++ b/ddl/attributes_sql_test.go @@ -273,7 +273,7 @@ PARTITION BY RANGE (c) ( func TestFlashbackTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -331,7 +331,7 @@ PARTITION BY RANGE (c) ( func TestDropTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -384,7 +384,7 @@ PARTITION BY RANGE (c) ( func TestCreateWithSameName(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -448,7 +448,7 @@ PARTITION BY RANGE (c) ( func TestPartition(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -508,7 +508,7 @@ PARTITION BY RANGE (c) ( func TestDropSchema(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -534,7 +534,7 @@ PARTITION BY RANGE (c) ( func TestDefaultKeyword(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") diff --git a/ddl/ddl.go b/ddl/ddl.go index a9e0f4d96478c..c1f5fa64089e1 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -191,7 +191,7 @@ type DDL interface { CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) error DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) error AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacementPolicyStmt) error - CreateResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error + AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error AlterResourceGroup(ctx sessionctx.Context, stmt *ast.AlterResourceGroupStmt) error DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResourceGroupStmt) error FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index cd002a5968251..36e19510d0ec5 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -7613,8 +7613,8 @@ func checkIgnorePlacementDDL(ctx sessionctx.Context) bool { return false } -// CreateResourceGroup implements the DDL interface, creates a resource group. -func (d *ddl) CreateResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) (err error) { +// AddResourceGroup implements the DDL interface, creates a resource group. +func (d *ddl) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) (err error) { groupInfo := &model.ResourceGroupInfo{ResourceGroupSettings: &model.ResourceGroupSettings{}} groupName := stmt.ResourceGroupName groupInfo.Name = groupName diff --git a/ddl/main_test.go b/ddl/main_test.go index 6db3ace76e6a6..84a713dc59bb3 100644 --- a/ddl/main_test.go +++ b/ddl/main_test.go @@ -54,7 +54,7 @@ func TestMain(m *testing.M) { conf.Experimental.AllowsExpressionIndex = true }) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err) os.Exit(1) diff --git a/ddl/resource_group.go b/ddl/resource_group.go index 3d397dd14160c..514e8cf30c2b8 100644 --- a/ddl/resource_group.go +++ b/ddl/resource_group.go @@ -48,11 +48,11 @@ func onCreateResourceGroup(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, case model.StateNone: // none -> public groupInfo.State = model.StatePublic - err := t.CreateResourceGroup(groupInfo) + err := t.AddResourceGroup(groupInfo) if err != nil { return ver, errors.Trace(err) } - err = infosync.CreateResourceGroup(context.TODO(), protoGroup) + err = infosync.AddResourceGroup(context.TODO(), protoGroup) if err != nil { logutil.BgLogger().Warn("create resource group failed", zap.Error(err)) return ver, errors.Trace(err) diff --git a/ddl/resource_group_test.go b/ddl/resource_group_test.go index a0921037c475d..e91db168a4585 100644 --- a/ddl/resource_group_test.go +++ b/ddl/resource_group_test.go @@ -131,7 +131,7 @@ func TestResourceGroupBasic(t *testing.T) { g = testResourceGroupNameFromIS(t, tk.Session(), "y") re.Nil(g) tk.MustContainErrMsg("create resource group x RU_PER_SEC=1000, CPU='8000m';", resourcegroup.ErrInvalidResourceGroupDuplicatedMode.Error()) - groups, err := infosync.GetAllResourceGroups(context.TODO()) + groups, err := infosync.ListResourceGroups(context.TODO()) require.Equal(t, 0, len(groups)) require.NoError(t, err) diff --git a/ddl/schematracker/checker.go b/ddl/schematracker/checker.go index ec6a7892996c9..9477d478388ad 100644 --- a/ddl/schematracker/checker.go +++ b/ddl/schematracker/checker.go @@ -433,9 +433,9 @@ func (d Checker) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPla panic("implement me") } -// CreateResourceGroup implements the DDL interface. +// AddResourceGroup implements the DDL interface. // ResourceGroup do not affect the transaction. -func (d Checker) CreateResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error { +func (d Checker) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error { return nil } diff --git a/ddl/schematracker/dm_tracker.go b/ddl/schematracker/dm_tracker.go index c6bdd892fcff2..323d3b04962f7 100644 --- a/ddl/schematracker/dm_tracker.go +++ b/ddl/schematracker/dm_tracker.go @@ -1173,8 +1173,8 @@ func (SchemaTracker) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.Alte return nil } -// CreateResourceGroup implements the DDL interface, it's no-op in DM's case. -func (SchemaTracker) CreateResourceGroup(_ sessionctx.Context, _ *ast.CreateResourceGroupStmt) error { +// AddResourceGroup implements the DDL interface, it's no-op in DM's case. +func (SchemaTracker) AddResourceGroup(_ sessionctx.Context, _ *ast.CreateResourceGroupStmt) error { return nil } diff --git a/domain/db_test.go b/domain/db_test.go index 9b122664f8397..428b63b6de05b 100644 --- a/domain/db_test.go +++ b/domain/db_test.go @@ -73,7 +73,7 @@ func TestNormalSessionPool(t *testing.T) { domain, err := session.BootstrapSession(store) require.NoError(t, err) defer domain.Close() - info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true) + info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true) require.NoError(t, err1) conf := config.GetGlobalConfig() conf.Socket = "" @@ -107,7 +107,7 @@ func TestAbnormalSessionPool(t *testing.T) { domain, err := session.BootstrapSession(store) require.NoError(t, err) defer domain.Close() - info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true) + info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true) require.NoError(t, err1) conf := config.GetGlobalConfig() conf.Socket = "" diff --git a/domain/domain.go b/domain/domain.go index 67341449876d4..20bd33ca3f794 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1047,18 +1047,13 @@ func (do *Domain) Init( } // step 1: prepare the info/schema syncer which domain reload needed. + pdCli := do.GetPDClient() skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard - do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, skipRegisterToDashboard) + do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, pdCli, skipRegisterToDashboard) if err != nil { return err } - - var pdClient pd.Client - if store, ok := do.store.(kv.StorageWithPD); ok { - pdClient = store.GetPDClient() - } - do.globalCfgSyncer = globalconfigsync.NewGlobalConfigSyncer(pdClient) - + do.globalCfgSyncer = globalconfigsync.NewGlobalConfigSyncer(pdCli) err = do.ddl.SchemaSyncer().Init(ctx) if err != nil { return err @@ -1089,12 +1084,12 @@ func (do *Domain) Init( if !skipRegisterToDashboard { do.wg.Run(do.topologySyncerKeeper, "topologySyncerKeeper") } - if pdClient != nil { + if pdCli != nil { do.wg.Run(func() { - do.closestReplicaReadCheckLoop(ctx, pdClient) + do.closestReplicaReadCheckLoop(ctx, pdCli) }, "closestReplicaReadCheckLoop") } - err = do.initLogBackup(ctx, pdClient) + err = do.initLogBackup(ctx, pdCli) if err != nil { return err } @@ -1339,6 +1334,14 @@ func (do *Domain) GetEtcdClient() *clientv3.Client { return do.etcdClient } +// GetPDClient returns the PD client. +func (do *Domain) GetPDClient() pd.Client { + if store, ok := do.store.(kv.StorageWithPD); ok { + return store.GetPDClient() + } + return nil +} + // LoadPrivilegeLoop create a goroutine loads privilege tables in a loop, it // should be called only once in BootstrapSession. func (do *Domain) LoadPrivilegeLoop(sctx sessionctx.Context) error { diff --git a/domain/infosync/BUILD.bazel b/domain/infosync/BUILD.bazel index 0952dfc300490..0938910b19ac4 100644 --- a/domain/infosync/BUILD.bazel +++ b/domain/infosync/BUILD.bazel @@ -43,7 +43,6 @@ go_library( "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_kvproto//pkg/resource_manager", - "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_pd_client//:client", "@io_etcd_go_etcd_client_v3//:client", diff --git a/domain/infosync/info.go b/domain/infosync/info.go index 6c9e721959cf9..b45216daa8a9f 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -118,7 +118,7 @@ type InfoSyncer struct { placementManager PlacementManager scheduleManager ScheduleManager tiflashReplicaManager TiFlashReplicaManager - resourceGroupManager ResourceGroupManager + resourceGroupManager pd.ResourceManagerClient } // ServerInfo is server static information. @@ -186,7 +186,14 @@ func setGlobalInfoSyncer(is *InfoSyncer) { } // GlobalInfoSyncerInit return a new InfoSyncer. It is exported for testing. -func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() uint64, etcdCli *clientv3.Client, unprefixedEtcdCli *clientv3.Client, skipRegisterToDashBoard bool) (*InfoSyncer, error) { +func GlobalInfoSyncerInit( + ctx context.Context, + id string, + serverIDGetter func() uint64, + etcdCli, unprefixedEtcdCli *clientv3.Client, + pdCli pd.Client, + skipRegisterToDashBoard bool, +) (*InfoSyncer, error) { is := &InfoSyncer{ etcdCli: etcdCli, unprefixedEtcdCli: unprefixedEtcdCli, @@ -201,8 +208,8 @@ func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() is.labelRuleManager = initLabelRuleManager(etcdCli) is.placementManager = initPlacementManager(etcdCli) is.scheduleManager = initScheduleManager(etcdCli) - is.resourceGroupManager = initResourceGroupManager(etcdCli) is.tiflashReplicaManager = initTiFlashReplicaManager(etcdCli) + is.resourceGroupManager = initResourceGroupManager(pdCli) setGlobalInfoSyncer(is) return is, nil } @@ -247,11 +254,11 @@ func initPlacementManager(etcdCli *clientv3.Client) PlacementManager { return &PDPlacementManager{etcdCli: etcdCli} } -func initResourceGroupManager(etcdCli *clientv3.Client) ResourceGroupManager { - if etcdCli == nil { +func initResourceGroupManager(pdCli pd.Client) pd.ResourceManagerClient { + if pdCli == nil { return &mockResourceGroupManager{groups: make(map[string]*rmpb.ResourceGroup)} } - return NewResourceManager(etcdCli) + return pdCli } func initTiFlashReplicaManager(etcdCli *clientv3.Client) TiFlashReplicaManager { @@ -590,23 +597,24 @@ func GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, er return is.resourceGroupManager.GetResourceGroup(ctx, name) } -// GetAllResourceGroups is used to get all resource groups from resource manager. -func GetAllResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { +// ListResourceGroups is used to get all resource groups from resource manager. +func ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { is, err := getGlobalInfoSyncer() if err != nil { return nil, err } - return is.resourceGroupManager.GetAllResourceGroups(ctx) + return is.resourceGroupManager.ListResourceGroups(ctx) } -// CreateResourceGroup is used to create one specific resource group to resource manager. -func CreateResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { +// AddResourceGroup is used to create one specific resource group to resource manager. +func AddResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { is, err := getGlobalInfoSyncer() if err != nil { return err } - return is.resourceGroupManager.CreateResourceGroup(ctx, group) + _, err = is.resourceGroupManager.AddResourceGroup(ctx, group) + return err } // ModifyResourceGroup is used to modify one specific resource group to resource manager. @@ -615,7 +623,8 @@ func ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { if err != nil { return err } - return is.resourceGroupManager.ModifyResourceGroup(ctx, group) + _, err = is.resourceGroupManager.ModifyResourceGroup(ctx, group) + return err } // DeleteResourceGroup is used to delete one specific resource group from resource manager. @@ -624,7 +633,8 @@ func DeleteResourceGroup(ctx context.Context, name string) error { if err != nil { return err } - return is.resourceGroupManager.DeleteResourceGroup(ctx, name) + _, err = is.resourceGroupManager.DeleteResourceGroup(ctx, name) + return err } // PutRuleBundlesWithDefaultRetry will retry for default times diff --git a/domain/infosync/info_test.go b/domain/infosync/info_test.go index 3264c0adca3c6..b7af5ed167837 100644 --- a/domain/infosync/info_test.go +++ b/domain/infosync/info_test.go @@ -67,7 +67,7 @@ func TestTopology(t *testing.T) { require.NoError(t, err) }() - info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, false) + info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, false) require.NoError(t, err) err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt) @@ -152,7 +152,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) { } func TestPutBundlesRetry(t *testing.T) { - _, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, false) + _, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, false) require.NoError(t, err) bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"}) @@ -216,7 +216,7 @@ func TestPutBundlesRetry(t *testing.T) { func TestTiFlashManager(t *testing.T) { ctx := context.Background() - _, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, false) + _, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, false) tiflash := NewMockTiFlash() SetMockTiFlash(tiflash) diff --git a/domain/infosync/resource_group_manager.go b/domain/infosync/resource_group_manager.go index 93c751fc04968..6b8f876f9bb11 100644 --- a/domain/infosync/resource_group_manager.go +++ b/domain/infosync/resource_group_manager.go @@ -19,92 +19,29 @@ import ( "sync" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" - "github.com/pingcap/log" - clientv3 "go.etcd.io/etcd/client/v3" - "go.uber.org/zap" + pd "github.com/tikv/pd/client" ) -// ResourceGroupManager manages resource group settings -type ResourceGroupManager interface { - // GetResourceGroup is used to get one specific rule bundle from ResourceGroup Manager. - GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, error) - // GetAllResourceGroups is used to get all rule bundles from ResourceGroup Manager. - GetAllResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) - // PutResourceGroup is used to post specific rule bundles to ResourceGroup Manager. - CreateResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error - // ModifyResourceGroup is used to modify specific rule bundles to ResourceGroup Manager. - ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error - // DeleteResourceGroup is used to delete specific rule bundles to ResourceGroup Manager. - DeleteResourceGroup(ctx context.Context, name string) error -} - -// externalResourceGroupManager manages placement with resource manager. -// TODO: replace with resource manager client. -type externalResourceGroupManager struct { - etcdCli *clientv3.Client -} - -// NewResourceManager is used to create a new resource manager in client side. -func NewResourceManager(etcdCli *clientv3.Client) ResourceGroupManager { - return &externalResourceGroupManager{etcdCli: etcdCli} -} - -// GetResourceGroupClient is used to get resource group client. -func (m *externalResourceGroupManager) GetResourceGroupClient() rmpb.ResourceManagerClient { - conn := m.etcdCli.ActiveConnection() - return rmpb.NewResourceManagerClient(conn) +type mockResourceGroupManager struct { + sync.RWMutex + groups map[string]*rmpb.ResourceGroup } -// GetResourceGroup is used to get one specific rule bundle from ResourceGroup Manager. -func (m *externalResourceGroupManager) GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, error) { - group := &rmpb.GetResourceGroupRequest{ResourceGroupName: name} - resp, err := m.GetResourceGroupClient().GetResourceGroup(ctx, group) - if err != nil { - return nil, err - } - return resp.GetGroup(), nil -} +var _ pd.ResourceManagerClient = (*mockResourceGroupManager)(nil) -// GetAllResourceGroups is used to get all resource group from ResourceGroup Manager. It is used to load full resource groups from PD while fullload infoschema. -func (m *externalResourceGroupManager) GetAllResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { - req := &rmpb.ListResourceGroupsRequest{} - resp, err := m.GetResourceGroupClient().ListResourceGroups(ctx, req) - if err != nil { - return nil, err +func (m *mockResourceGroupManager) ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { + m.RLock() + defer m.RUnlock() + groups := make([]*rmpb.ResourceGroup, 0, len(m.groups)) + for _, group := range m.groups { + groups = append(groups, group) } - return resp.GetGroups(), nil -} - -// CreateResourceGroup is used to post specific resource group to ResourceGroup Manager. -func (m *externalResourceGroupManager) CreateResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { - req := &rmpb.PutResourceGroupRequest{Group: group} - _, err := m.GetResourceGroupClient().AddResourceGroup(ctx, req) - return err -} - -// ModifyResourceGroup is used to modify specific resource group to ResourceGroup Manager. -func (m *externalResourceGroupManager) ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { - req := &rmpb.PutResourceGroupRequest{Group: group} - _, err := m.GetResourceGroupClient().ModifyResourceGroup(ctx, req) - return err -} - -// DeleteResourceGroup is used to delete specific resource group to ResourceGroup Manager. -func (m *externalResourceGroupManager) DeleteResourceGroup(ctx context.Context, name string) error { - req := &rmpb.DeleteResourceGroupRequest{ResourceGroupName: name} - log.Info("delete resource group", zap.String("name", name)) - _, err := m.GetResourceGroupClient().DeleteResourceGroup(ctx, req) - return err -} - -type mockResourceGroupManager struct { - sync.Mutex - groups map[string]*rmpb.ResourceGroup + return groups, nil } func (m *mockResourceGroupManager) GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, error) { - m.Lock() - defer m.Unlock() + m.RLock() + defer m.RUnlock() group, ok := m.groups[name] if !ok { return nil, nil @@ -112,33 +49,31 @@ func (m *mockResourceGroupManager) GetResourceGroup(ctx context.Context, name st return group, nil } -func (m *mockResourceGroupManager) GetAllResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { - m.Lock() - defer m.Unlock() - groups := make([]*rmpb.ResourceGroup, 0, len(m.groups)) - for _, group := range m.groups { - groups = append(groups, group) - } - return groups, nil -} - -func (m *mockResourceGroupManager) CreateResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { +func (m *mockResourceGroupManager) AddResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) (string, error) { m.Lock() defer m.Unlock() m.groups[group.Name] = group - return nil + return "Success!", nil } -func (m *mockResourceGroupManager) ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { +func (m *mockResourceGroupManager) ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) (string, error) { m.Lock() defer m.Unlock() m.groups[group.Name] = group - return nil + return "Success!", nil } -func (m *mockResourceGroupManager) DeleteResourceGroup(ctx context.Context, name string) error { +func (m *mockResourceGroupManager) DeleteResourceGroup(ctx context.Context, name string) (string, error) { m.Lock() defer m.Unlock() delete(m.groups, name) - return nil + return "Success!", nil +} + +func (m *mockResourceGroupManager) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) { + return nil, nil +} + +func (m *mockResourceGroupManager) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) { + return nil, nil } diff --git a/executor/ddl.go b/executor/ddl.go index be8ffa4e48d9f..28365e2495a74 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -746,7 +746,7 @@ func (e *DDLExec) executeCreateResourceGroup(s *ast.CreateResourceGroupStmt) err if !variable.EnableResourceControl.Load() { return infoschema.ErrResourceGroupSupportDisabled } - return domain.GetDomain(e.ctx).DDL().CreateResourceGroup(e.ctx, s) + return domain.GetDomain(e.ctx).DDL().AddResourceGroup(e.ctx, s) } func (e *DDLExec) executeAlterResourceGroup(s *ast.AlterResourceGroupStmt) error { diff --git a/executor/infoschema_reader.go b/executor/infoschema_reader.go index 39666333762ab..a016d5a459df1 100644 --- a/executor/infoschema_reader.go +++ b/executor/infoschema_reader.go @@ -3390,7 +3390,7 @@ func (e *memtableRetriever) setDataFromPlacementPolicies(sctx sessionctx.Context } func (e *memtableRetriever) setDataFromResourceGroups() error { - resourceGroups, err := infosync.GetAllResourceGroups(context.TODO()) + resourceGroups, err := infosync.ListResourceGroups(context.TODO()) if err != nil { return errors.Errorf("failed to access resource group manager, error message is %s", err.Error()) } diff --git a/go.mod b/go.mod index ab563b449d53c..5b2d4f414b4de 100644 --- a/go.mod +++ b/go.mod @@ -92,7 +92,7 @@ require ( github.com/stretchr/testify v1.8.1 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.6-0.20230207040004-9b3ecc1dcaa9 + github.com/tikv/client-go/v2 v2.0.6-0.20230207090754-29dfcc272912 github.com/tikv/pd/client v0.0.0-20230206191557-2a7c8d4c9676 github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e github.com/twmb/murmur3 v1.1.3 diff --git a/go.sum b/go.sum index 38b5f8c1baaf1..9fc6d63beb8f8 100644 --- a/go.sum +++ b/go.sum @@ -931,8 +931,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/client-go/v2 v2.0.6-0.20230207040004-9b3ecc1dcaa9 h1:gkAF7XxM2mfh5ZHbyLhXkaKyDd97soe1SMFIZ2vW260= -github.com/tikv/client-go/v2 v2.0.6-0.20230207040004-9b3ecc1dcaa9/go.mod h1:ySCWCno8tyITNFptwkLFUYCDJdNCtTVoRecNX1YSQIQ= +github.com/tikv/client-go/v2 v2.0.6-0.20230207090754-29dfcc272912 h1:1/ow7ZUnsU5CcxHF1cFAKdD+5b58tMbaeb8qAoli1m4= +github.com/tikv/client-go/v2 v2.0.6-0.20230207090754-29dfcc272912/go.mod h1:ySCWCno8tyITNFptwkLFUYCDJdNCtTVoRecNX1YSQIQ= github.com/tikv/pd/client v0.0.0-20230206191557-2a7c8d4c9676 h1:lzk+XYHs5iJ1lIxoza4Na3vLl/Z+y/qhpbkLS34WxaE= github.com/tikv/pd/client v0.0.0-20230206191557-2a7c8d4c9676/go.mod h1:ryhYHDwupsZHeOOF/N7So+1hbtAnuw0K2A+pKOElSVs= github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e h1:MV6KaVu/hzByHP0UvJ4HcMGE/8a6A4Rggc/0wx2AvJo= diff --git a/meta/meta.go b/meta/meta.go index 0e350c3d27eb5..801273a98cbf9 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -539,8 +539,8 @@ func (m *Meta) UpdatePolicy(policy *model.PolicyInfo) error { return m.txn.HSet(mPolicies, policyKey, attachMagicByte(data)) } -// CreateResourceGroup creates a resource group. -func (m *Meta) CreateResourceGroup(group *model.ResourceGroupInfo) error { +// AddResourceGroup creates a resource group. +func (m *Meta) AddResourceGroup(group *model.ResourceGroupInfo) error { if group.ID == 0 { return errors.New("group.ID is invalid") } diff --git a/meta/meta_test.go b/meta/meta_test.go index 0d6c37cc31d43..4eda2126b3887 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -128,7 +128,7 @@ func TestResourceGroup(t *testing.T) { RURate: 100, }, } - require.NoError(t, m.CreateResourceGroup(rg)) + require.NoError(t, m.AddResourceGroup(rg)) checkResourceGroup(100) rg.RURate = 200 diff --git a/server/stat_test.go b/server/stat_test.go index 4484823f6dc83..2cea08933c2c0 100644 --- a/server/stat_test.go +++ b/server/stat_test.go @@ -46,7 +46,7 @@ func TestUptime(t *testing.T) { }() require.NoError(t, err) - _, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) + _, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) require.NoError(t, err) tidbdrv := NewTiDBDriver(store) diff --git a/store/mockstore/unistore/pd.go b/store/mockstore/unistore/pd.go index d72247ef8b44f..85c64ba5b3987 100644 --- a/store/mockstore/unistore/pd.go +++ b/store/mockstore/unistore/pd.go @@ -34,10 +34,14 @@ var _ pd.Client = new(pdClient) type pdClient struct { *us.MockPD - serviceSafePoints map[string]uint64 - gcSafePointMu sync.Mutex - globalConfig map[string]string - externalTimestamp atomic.Uint64 + serviceSafePoints map[string]uint64 + gcSafePointMu sync.Mutex + globalConfig map[string]string + externalTimestamp atomic.Uint64 + resourceGroupManager struct { + sync.RWMutex + groups map[string]*rmpb.ResourceGroup + } } func newPDClient(pd *us.MockPD) *pdClient { @@ -45,6 +49,12 @@ func newPDClient(pd *us.MockPD) *pdClient { MockPD: pd, serviceSafePoints: make(map[string]uint64), globalConfig: make(map[string]string), + resourceGroupManager: struct { + sync.RWMutex + groups map[string]*rmpb.ResourceGroup + }{ + groups: make(map[string]*rmpb.ResourceGroup), + }, } } @@ -187,23 +197,44 @@ func (c *pdClient) UpdateKeyspaceState(ctx context.Context, id uint32, state key } func (c *pdClient) ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { - return nil, nil + c.resourceGroupManager.RLock() + defer c.resourceGroupManager.RUnlock() + groups := make([]*rmpb.ResourceGroup, 0, len(c.resourceGroupManager.groups)) + for _, group := range c.resourceGroupManager.groups { + groups = append(groups, group) + } + return groups, nil } -func (c *pdClient) GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error) { - return nil, nil +func (c *pdClient) GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, error) { + c.resourceGroupManager.RLock() + defer c.resourceGroupManager.RUnlock() + group, ok := c.resourceGroupManager.groups[name] + if !ok { + return nil, nil + } + return group, nil } -func (c *pdClient) AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) { - return "", nil +func (c *pdClient) AddResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) (string, error) { + c.resourceGroupManager.Lock() + defer c.resourceGroupManager.Unlock() + c.resourceGroupManager.groups[group.Name] = group + return "Success!", nil } -func (c *pdClient) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) { - return "", nil +func (c *pdClient) ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) (string, error) { + c.resourceGroupManager.Lock() + defer c.resourceGroupManager.Unlock() + c.resourceGroupManager.groups[group.Name] = group + return "Success!", nil } -func (c *pdClient) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) { - return "", nil +func (c *pdClient) DeleteResourceGroup(ctx context.Context, name string) (string, error) { + c.resourceGroupManager.Lock() + defer c.resourceGroupManager.Unlock() + delete(c.resourceGroupManager.groups, name) + return "Success!", nil } func (c *pdClient) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) {