diff --git a/.bazelrc b/.bazelrc index 6d31d4e95ac74..1ac13a3e138a7 100644 --- a/.bazelrc +++ b/.bazelrc @@ -1,4 +1,4 @@ -startup --host_jvm_args=-Xmx4g +startup --host_jvm_args=-Xmx8g startup --unlimit_coredumps run:ci --color=yes diff --git a/DEPS.bzl b/DEPS.bzl index b7d54a3072401..9ae1d238efe0a 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -4085,8 +4085,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:gkAF7XxM2mfh5ZHbyLhXkaKyDd97soe1SMFIZ2vW260=", - version = "v2.0.6-0.20230207040004-9b3ecc1dcaa9", + sum = "h1:1/ow7ZUnsU5CcxHF1cFAKdD+5b58tMbaeb8qAoli1m4=", + version = "v2.0.6-0.20230207090754-29dfcc272912", ) go_repository( name = "com_github_tikv_pd", diff --git a/Makefile.common b/Makefile.common index e1ff465336c59..3b7caa149f260 100644 --- a/Makefile.common +++ b/Makefile.common @@ -109,7 +109,7 @@ DUMPLING_LDFLAGS += -X "github.com/pingcap/tidb/dumpling/cli.GitHash=$(shell git DUMPLING_LDFLAGS += -X "github.com/pingcap/tidb/dumpling/cli.GitBranch=$(shell git rev-parse --abbrev-ref HEAD)" DUMPLING_LDFLAGS += -X "github.com/pingcap/tidb/dumpling/cli.GoVersion=$(shell go version)" -DUMPLING_GOBUILD := CGO_ENABLED=0 GO111MODULE=on go build -trimpath -ldflags '$(DUMPLING_LDFLAGS)' +DUMPLING_GOBUILD := CGO_ENABLED=1 GO111MODULE=on go build -trimpath -ldflags '$(DUMPLING_LDFLAGS)' DUMPLING_GOTEST := CGO_ENABLED=1 GO111MODULE=on go test -ldflags '$(DUMPLING_LDFLAGS)' TEST_COVERAGE_DIR := "test_coverage" diff --git a/ddl/attributes_sql_test.go b/ddl/attributes_sql_test.go index 6950b44645ecf..c9497f2381f52 100644 --- a/ddl/attributes_sql_test.go +++ b/ddl/attributes_sql_test.go @@ -273,7 +273,7 @@ PARTITION BY RANGE (c) ( func TestFlashbackTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -331,7 +331,7 @@ PARTITION BY RANGE (c) ( func TestDropTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -384,7 +384,7 @@ PARTITION BY RANGE (c) ( func TestCreateWithSameName(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -448,7 +448,7 @@ PARTITION BY RANGE (c) ( func TestPartition(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -508,7 +508,7 @@ PARTITION BY RANGE (c) ( func TestDropSchema(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -534,7 +534,7 @@ PARTITION BY RANGE (c) ( func TestDefaultKeyword(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") diff --git a/ddl/ddl.go b/ddl/ddl.go index a9e0f4d96478c..c1f5fa64089e1 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -191,7 +191,7 @@ type DDL interface { CreatePlacementPolicy(ctx sessionctx.Context, stmt *ast.CreatePlacementPolicyStmt) error DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPlacementPolicyStmt) error AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPlacementPolicyStmt) error - CreateResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error + AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error AlterResourceGroup(ctx sessionctx.Context, stmt *ast.AlterResourceGroupStmt) error DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResourceGroupStmt) error FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) error diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index cd002a5968251..36e19510d0ec5 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -7613,8 +7613,8 @@ func checkIgnorePlacementDDL(ctx sessionctx.Context) bool { return false } -// CreateResourceGroup implements the DDL interface, creates a resource group. -func (d *ddl) CreateResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) (err error) { +// AddResourceGroup implements the DDL interface, creates a resource group. +func (d *ddl) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) (err error) { groupInfo := &model.ResourceGroupInfo{ResourceGroupSettings: &model.ResourceGroupSettings{}} groupName := stmt.ResourceGroupName groupInfo.Name = groupName diff --git a/ddl/main_test.go b/ddl/main_test.go index 6db3ace76e6a6..84a713dc59bb3 100644 --- a/ddl/main_test.go +++ b/ddl/main_test.go @@ -54,7 +54,7 @@ func TestMain(m *testing.M) { conf.Experimental.AllowsExpressionIndex = true }) - _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true) + _, err := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true) if err != nil { _, _ = fmt.Fprintf(os.Stderr, "ddl: infosync.GlobalInfoSyncerInit: %v\n", err) os.Exit(1) diff --git a/ddl/resource_group.go b/ddl/resource_group.go index 3d397dd14160c..514e8cf30c2b8 100644 --- a/ddl/resource_group.go +++ b/ddl/resource_group.go @@ -48,11 +48,11 @@ func onCreateResourceGroup(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, case model.StateNone: // none -> public groupInfo.State = model.StatePublic - err := t.CreateResourceGroup(groupInfo) + err := t.AddResourceGroup(groupInfo) if err != nil { return ver, errors.Trace(err) } - err = infosync.CreateResourceGroup(context.TODO(), protoGroup) + err = infosync.AddResourceGroup(context.TODO(), protoGroup) if err != nil { logutil.BgLogger().Warn("create resource group failed", zap.Error(err)) return ver, errors.Trace(err) diff --git a/ddl/resource_group_test.go b/ddl/resource_group_test.go index a0921037c475d..e91db168a4585 100644 --- a/ddl/resource_group_test.go +++ b/ddl/resource_group_test.go @@ -131,7 +131,7 @@ func TestResourceGroupBasic(t *testing.T) { g = testResourceGroupNameFromIS(t, tk.Session(), "y") re.Nil(g) tk.MustContainErrMsg("create resource group x RU_PER_SEC=1000, CPU='8000m';", resourcegroup.ErrInvalidResourceGroupDuplicatedMode.Error()) - groups, err := infosync.GetAllResourceGroups(context.TODO()) + groups, err := infosync.ListResourceGroups(context.TODO()) require.Equal(t, 0, len(groups)) require.NoError(t, err) diff --git a/ddl/schematracker/checker.go b/ddl/schematracker/checker.go index ec6a7892996c9..9477d478388ad 100644 --- a/ddl/schematracker/checker.go +++ b/ddl/schematracker/checker.go @@ -433,9 +433,9 @@ func (d Checker) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterPla panic("implement me") } -// CreateResourceGroup implements the DDL interface. +// AddResourceGroup implements the DDL interface. // ResourceGroup do not affect the transaction. -func (d Checker) CreateResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error { +func (d Checker) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateResourceGroupStmt) error { return nil } diff --git a/ddl/schematracker/dm_tracker.go b/ddl/schematracker/dm_tracker.go index c6bdd892fcff2..323d3b04962f7 100644 --- a/ddl/schematracker/dm_tracker.go +++ b/ddl/schematracker/dm_tracker.go @@ -1173,8 +1173,8 @@ func (SchemaTracker) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.Alte return nil } -// CreateResourceGroup implements the DDL interface, it's no-op in DM's case. -func (SchemaTracker) CreateResourceGroup(_ sessionctx.Context, _ *ast.CreateResourceGroupStmt) error { +// AddResourceGroup implements the DDL interface, it's no-op in DM's case. +func (SchemaTracker) AddResourceGroup(_ sessionctx.Context, _ *ast.CreateResourceGroupStmt) error { return nil } diff --git a/domain/db_test.go b/domain/db_test.go index 9b122664f8397..428b63b6de05b 100644 --- a/domain/db_test.go +++ b/domain/db_test.go @@ -73,7 +73,7 @@ func TestNormalSessionPool(t *testing.T) { domain, err := session.BootstrapSession(store) require.NoError(t, err) defer domain.Close() - info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true) + info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true) require.NoError(t, err1) conf := config.GetGlobalConfig() conf.Socket = "" @@ -107,7 +107,7 @@ func TestAbnormalSessionPool(t *testing.T) { domain, err := session.BootstrapSession(store) require.NoError(t, err) defer domain.Close() - info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, true) + info, err1 := infosync.GlobalInfoSyncerInit(context.Background(), "t", func() uint64 { return 1 }, nil, nil, nil, true) require.NoError(t, err1) conf := config.GetGlobalConfig() conf.Socket = "" diff --git a/domain/domain.go b/domain/domain.go index 67341449876d4..20bd33ca3f794 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1047,18 +1047,13 @@ func (do *Domain) Init( } // step 1: prepare the info/schema syncer which domain reload needed. + pdCli := do.GetPDClient() skipRegisterToDashboard := config.GetGlobalConfig().SkipRegisterToDashboard - do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, skipRegisterToDashboard) + do.info, err = infosync.GlobalInfoSyncerInit(ctx, do.ddl.GetID(), do.ServerID, do.etcdClient, do.unprefixedEtcdCli, pdCli, skipRegisterToDashboard) if err != nil { return err } - - var pdClient pd.Client - if store, ok := do.store.(kv.StorageWithPD); ok { - pdClient = store.GetPDClient() - } - do.globalCfgSyncer = globalconfigsync.NewGlobalConfigSyncer(pdClient) - + do.globalCfgSyncer = globalconfigsync.NewGlobalConfigSyncer(pdCli) err = do.ddl.SchemaSyncer().Init(ctx) if err != nil { return err @@ -1089,12 +1084,12 @@ func (do *Domain) Init( if !skipRegisterToDashboard { do.wg.Run(do.topologySyncerKeeper, "topologySyncerKeeper") } - if pdClient != nil { + if pdCli != nil { do.wg.Run(func() { - do.closestReplicaReadCheckLoop(ctx, pdClient) + do.closestReplicaReadCheckLoop(ctx, pdCli) }, "closestReplicaReadCheckLoop") } - err = do.initLogBackup(ctx, pdClient) + err = do.initLogBackup(ctx, pdCli) if err != nil { return err } @@ -1339,6 +1334,14 @@ func (do *Domain) GetEtcdClient() *clientv3.Client { return do.etcdClient } +// GetPDClient returns the PD client. +func (do *Domain) GetPDClient() pd.Client { + if store, ok := do.store.(kv.StorageWithPD); ok { + return store.GetPDClient() + } + return nil +} + // LoadPrivilegeLoop create a goroutine loads privilege tables in a loop, it // should be called only once in BootstrapSession. func (do *Domain) LoadPrivilegeLoop(sctx sessionctx.Context) error { diff --git a/domain/infosync/BUILD.bazel b/domain/infosync/BUILD.bazel index 0952dfc300490..0938910b19ac4 100644 --- a/domain/infosync/BUILD.bazel +++ b/domain/infosync/BUILD.bazel @@ -43,7 +43,6 @@ go_library( "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_kvproto//pkg/resource_manager", - "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_pd_client//:client", "@io_etcd_go_etcd_client_v3//:client", diff --git a/domain/infosync/info.go b/domain/infosync/info.go index 6c9e721959cf9..b45216daa8a9f 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -118,7 +118,7 @@ type InfoSyncer struct { placementManager PlacementManager scheduleManager ScheduleManager tiflashReplicaManager TiFlashReplicaManager - resourceGroupManager ResourceGroupManager + resourceGroupManager pd.ResourceManagerClient } // ServerInfo is server static information. @@ -186,7 +186,14 @@ func setGlobalInfoSyncer(is *InfoSyncer) { } // GlobalInfoSyncerInit return a new InfoSyncer. It is exported for testing. -func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() uint64, etcdCli *clientv3.Client, unprefixedEtcdCli *clientv3.Client, skipRegisterToDashBoard bool) (*InfoSyncer, error) { +func GlobalInfoSyncerInit( + ctx context.Context, + id string, + serverIDGetter func() uint64, + etcdCli, unprefixedEtcdCli *clientv3.Client, + pdCli pd.Client, + skipRegisterToDashBoard bool, +) (*InfoSyncer, error) { is := &InfoSyncer{ etcdCli: etcdCli, unprefixedEtcdCli: unprefixedEtcdCli, @@ -201,8 +208,8 @@ func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func() is.labelRuleManager = initLabelRuleManager(etcdCli) is.placementManager = initPlacementManager(etcdCli) is.scheduleManager = initScheduleManager(etcdCli) - is.resourceGroupManager = initResourceGroupManager(etcdCli) is.tiflashReplicaManager = initTiFlashReplicaManager(etcdCli) + is.resourceGroupManager = initResourceGroupManager(pdCli) setGlobalInfoSyncer(is) return is, nil } @@ -247,11 +254,11 @@ func initPlacementManager(etcdCli *clientv3.Client) PlacementManager { return &PDPlacementManager{etcdCli: etcdCli} } -func initResourceGroupManager(etcdCli *clientv3.Client) ResourceGroupManager { - if etcdCli == nil { +func initResourceGroupManager(pdCli pd.Client) pd.ResourceManagerClient { + if pdCli == nil { return &mockResourceGroupManager{groups: make(map[string]*rmpb.ResourceGroup)} } - return NewResourceManager(etcdCli) + return pdCli } func initTiFlashReplicaManager(etcdCli *clientv3.Client) TiFlashReplicaManager { @@ -590,23 +597,24 @@ func GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, er return is.resourceGroupManager.GetResourceGroup(ctx, name) } -// GetAllResourceGroups is used to get all resource groups from resource manager. -func GetAllResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { +// ListResourceGroups is used to get all resource groups from resource manager. +func ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { is, err := getGlobalInfoSyncer() if err != nil { return nil, err } - return is.resourceGroupManager.GetAllResourceGroups(ctx) + return is.resourceGroupManager.ListResourceGroups(ctx) } -// CreateResourceGroup is used to create one specific resource group to resource manager. -func CreateResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { +// AddResourceGroup is used to create one specific resource group to resource manager. +func AddResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { is, err := getGlobalInfoSyncer() if err != nil { return err } - return is.resourceGroupManager.CreateResourceGroup(ctx, group) + _, err = is.resourceGroupManager.AddResourceGroup(ctx, group) + return err } // ModifyResourceGroup is used to modify one specific resource group to resource manager. @@ -615,7 +623,8 @@ func ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { if err != nil { return err } - return is.resourceGroupManager.ModifyResourceGroup(ctx, group) + _, err = is.resourceGroupManager.ModifyResourceGroup(ctx, group) + return err } // DeleteResourceGroup is used to delete one specific resource group from resource manager. @@ -624,7 +633,8 @@ func DeleteResourceGroup(ctx context.Context, name string) error { if err != nil { return err } - return is.resourceGroupManager.DeleteResourceGroup(ctx, name) + _, err = is.resourceGroupManager.DeleteResourceGroup(ctx, name) + return err } // PutRuleBundlesWithDefaultRetry will retry for default times diff --git a/domain/infosync/info_test.go b/domain/infosync/info_test.go index 3264c0adca3c6..b7af5ed167837 100644 --- a/domain/infosync/info_test.go +++ b/domain/infosync/info_test.go @@ -67,7 +67,7 @@ func TestTopology(t *testing.T) { require.NoError(t, err) }() - info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, false) + info, err := GlobalInfoSyncerInit(ctx, currentID, func() uint64 { return 1 }, client, client, nil, false) require.NoError(t, err) err = info.newTopologySessionAndStoreServerInfo(ctx, util2.NewSessionDefaultRetryCnt) @@ -152,7 +152,7 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) { } func TestPutBundlesRetry(t *testing.T) { - _, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, false) + _, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, nil, nil, false) require.NoError(t, err) bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"}) @@ -216,7 +216,7 @@ func TestPutBundlesRetry(t *testing.T) { func TestTiFlashManager(t *testing.T) { ctx := context.Background() - _, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, false) + _, err := GlobalInfoSyncerInit(ctx, "test", func() uint64 { return 1 }, nil, nil, nil, false) tiflash := NewMockTiFlash() SetMockTiFlash(tiflash) diff --git a/domain/infosync/resource_group_manager.go b/domain/infosync/resource_group_manager.go index 93c751fc04968..6b8f876f9bb11 100644 --- a/domain/infosync/resource_group_manager.go +++ b/domain/infosync/resource_group_manager.go @@ -19,92 +19,29 @@ import ( "sync" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" - "github.com/pingcap/log" - clientv3 "go.etcd.io/etcd/client/v3" - "go.uber.org/zap" + pd "github.com/tikv/pd/client" ) -// ResourceGroupManager manages resource group settings -type ResourceGroupManager interface { - // GetResourceGroup is used to get one specific rule bundle from ResourceGroup Manager. - GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, error) - // GetAllResourceGroups is used to get all rule bundles from ResourceGroup Manager. - GetAllResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) - // PutResourceGroup is used to post specific rule bundles to ResourceGroup Manager. - CreateResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error - // ModifyResourceGroup is used to modify specific rule bundles to ResourceGroup Manager. - ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error - // DeleteResourceGroup is used to delete specific rule bundles to ResourceGroup Manager. - DeleteResourceGroup(ctx context.Context, name string) error -} - -// externalResourceGroupManager manages placement with resource manager. -// TODO: replace with resource manager client. -type externalResourceGroupManager struct { - etcdCli *clientv3.Client -} - -// NewResourceManager is used to create a new resource manager in client side. -func NewResourceManager(etcdCli *clientv3.Client) ResourceGroupManager { - return &externalResourceGroupManager{etcdCli: etcdCli} -} - -// GetResourceGroupClient is used to get resource group client. -func (m *externalResourceGroupManager) GetResourceGroupClient() rmpb.ResourceManagerClient { - conn := m.etcdCli.ActiveConnection() - return rmpb.NewResourceManagerClient(conn) +type mockResourceGroupManager struct { + sync.RWMutex + groups map[string]*rmpb.ResourceGroup } -// GetResourceGroup is used to get one specific rule bundle from ResourceGroup Manager. -func (m *externalResourceGroupManager) GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, error) { - group := &rmpb.GetResourceGroupRequest{ResourceGroupName: name} - resp, err := m.GetResourceGroupClient().GetResourceGroup(ctx, group) - if err != nil { - return nil, err - } - return resp.GetGroup(), nil -} +var _ pd.ResourceManagerClient = (*mockResourceGroupManager)(nil) -// GetAllResourceGroups is used to get all resource group from ResourceGroup Manager. It is used to load full resource groups from PD while fullload infoschema. -func (m *externalResourceGroupManager) GetAllResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { - req := &rmpb.ListResourceGroupsRequest{} - resp, err := m.GetResourceGroupClient().ListResourceGroups(ctx, req) - if err != nil { - return nil, err +func (m *mockResourceGroupManager) ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { + m.RLock() + defer m.RUnlock() + groups := make([]*rmpb.ResourceGroup, 0, len(m.groups)) + for _, group := range m.groups { + groups = append(groups, group) } - return resp.GetGroups(), nil -} - -// CreateResourceGroup is used to post specific resource group to ResourceGroup Manager. -func (m *externalResourceGroupManager) CreateResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { - req := &rmpb.PutResourceGroupRequest{Group: group} - _, err := m.GetResourceGroupClient().AddResourceGroup(ctx, req) - return err -} - -// ModifyResourceGroup is used to modify specific resource group to ResourceGroup Manager. -func (m *externalResourceGroupManager) ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { - req := &rmpb.PutResourceGroupRequest{Group: group} - _, err := m.GetResourceGroupClient().ModifyResourceGroup(ctx, req) - return err -} - -// DeleteResourceGroup is used to delete specific resource group to ResourceGroup Manager. -func (m *externalResourceGroupManager) DeleteResourceGroup(ctx context.Context, name string) error { - req := &rmpb.DeleteResourceGroupRequest{ResourceGroupName: name} - log.Info("delete resource group", zap.String("name", name)) - _, err := m.GetResourceGroupClient().DeleteResourceGroup(ctx, req) - return err -} - -type mockResourceGroupManager struct { - sync.Mutex - groups map[string]*rmpb.ResourceGroup + return groups, nil } func (m *mockResourceGroupManager) GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, error) { - m.Lock() - defer m.Unlock() + m.RLock() + defer m.RUnlock() group, ok := m.groups[name] if !ok { return nil, nil @@ -112,33 +49,31 @@ func (m *mockResourceGroupManager) GetResourceGroup(ctx context.Context, name st return group, nil } -func (m *mockResourceGroupManager) GetAllResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { - m.Lock() - defer m.Unlock() - groups := make([]*rmpb.ResourceGroup, 0, len(m.groups)) - for _, group := range m.groups { - groups = append(groups, group) - } - return groups, nil -} - -func (m *mockResourceGroupManager) CreateResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { +func (m *mockResourceGroupManager) AddResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) (string, error) { m.Lock() defer m.Unlock() m.groups[group.Name] = group - return nil + return "Success!", nil } -func (m *mockResourceGroupManager) ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) error { +func (m *mockResourceGroupManager) ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) (string, error) { m.Lock() defer m.Unlock() m.groups[group.Name] = group - return nil + return "Success!", nil } -func (m *mockResourceGroupManager) DeleteResourceGroup(ctx context.Context, name string) error { +func (m *mockResourceGroupManager) DeleteResourceGroup(ctx context.Context, name string) (string, error) { m.Lock() defer m.Unlock() delete(m.groups, name) - return nil + return "Success!", nil +} + +func (m *mockResourceGroupManager) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) { + return nil, nil +} + +func (m *mockResourceGroupManager) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) { + return nil, nil } diff --git a/executor/ddl.go b/executor/ddl.go index be8ffa4e48d9f..28365e2495a74 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -746,7 +746,7 @@ func (e *DDLExec) executeCreateResourceGroup(s *ast.CreateResourceGroupStmt) err if !variable.EnableResourceControl.Load() { return infoschema.ErrResourceGroupSupportDisabled } - return domain.GetDomain(e.ctx).DDL().CreateResourceGroup(e.ctx, s) + return domain.GetDomain(e.ctx).DDL().AddResourceGroup(e.ctx, s) } func (e *DDLExec) executeAlterResourceGroup(s *ast.AlterResourceGroupStmt) error { diff --git a/executor/index_advise_test.go b/executor/index_advise_test.go index 3415ffe83537b..a50c294294cea 100644 --- a/executor/index_advise_test.go +++ b/executor/index_advise_test.go @@ -65,3 +65,135 @@ func TestIndexAdvise(t *testing.T) { require.Equal(t, uint64(4), ia.MaxIndexNum.PerTable) require.Equal(t, uint64(5), ia.MaxIndexNum.PerDB) } + +func TestIndexJoinProjPattern(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`create table t1( +pnbrn_cnaps varchar(5) not null, +new_accno varchar(18) not null, +primary key(pnbrn_cnaps,new_accno) nonclustered +);`) + tk.MustExec(`create table t2( +pnbrn_cnaps varchar(5) not null, +txn_accno varchar(18) not null, +txn_dt date not null, +yn_frz varchar(1) default null +);`) + tk.MustExec(`insert into t1(pnbrn_cnaps,new_accno) values ("40001","123")`) + tk.MustExec(`insert into t2(pnbrn_cnaps, txn_accno, txn_dt, yn_frz) values ("40001","123","20221201","0");`) + + sql := `update +/*+ inl_join(a) */ +t2 b, +( +select t1.pnbrn_cnaps, +t1.new_accno +from t1 +where t1.pnbrn_cnaps = '40001' +) a +set b.yn_frz = '1' +where b.txn_dt = str_to_date('20221201', '%Y%m%d') +and b.pnbrn_cnaps = a.pnbrn_cnaps +and b.txn_accno = a.new_accno;` + rows := [][]interface{}{ + {"Update_8"}, + {"└─IndexJoin_14"}, + {" ├─TableReader_25(Build)"}, + {" │ └─Selection_24"}, + {" │ └─TableFullScan_23"}, + {" └─IndexReader_12(Probe)"}, + {" └─Selection_11"}, + {" └─IndexRangeScan_10"}, + } + tk.Session().GetSessionVars().EnableIndexJoinInnerSideMultiPattern = true + tk.MustQuery("explain "+sql).CheckAt([]int{0}, rows) + rows = [][]interface{}{ + {"Update_8"}, + {"└─HashJoin_10"}, + {" ├─IndexReader_17(Build)"}, + {" │ └─IndexRangeScan_16"}, + {" └─TableReader_14(Probe)"}, + {" └─Selection_13"}, + {" └─TableFullScan_12"}, + } + tk.Session().GetSessionVars().EnableIndexJoinInnerSideMultiPattern = false + tk.MustQuery("explain "+sql).CheckAt([]int{0}, rows) + + tk.Session().GetSessionVars().EnableIndexJoinInnerSideMultiPattern = true + tk.MustExec(sql) + tk.MustQuery("select yn_frz from t2").Check(testkit.Rows("1")) +} + +func TestIndexJoinSelPattern(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(` create table tbl_miss( +id bigint(20) unsigned not null +,txn_dt date default null +,perip_sys_uuid varchar(32) not null +,rvrs_idr varchar(1) not null +,primary key(id) clustered +,key idx1 (txn_dt, perip_sys_uuid, rvrs_idr) +); +`) + tk.MustExec(`insert into tbl_miss (id,txn_dt,perip_sys_uuid,rvrs_idr) values (1,"20221201","123","1");`) + tk.MustExec(`create table tbl_src( +txn_dt date default null +,uuid varchar(32) not null +,rvrs_idr char(1) +,expd_inf varchar(5000) +,primary key(uuid,rvrs_idr) nonclustered +); +`) + tk.MustExec(`insert into tbl_src (txn_dt,uuid,rvrs_idr) values ("20221201","123","1");`) + sql := `select /*+ use_index(mis,) inl_join(src) */ + * + from tbl_miss mis + ,tbl_src src + where src.txn_dt >= str_to_date('20221201', '%Y%m%d') + and mis.id between 1 and 10000 + and mis.perip_sys_uuid = src.uuid + and mis.rvrs_idr = src.rvrs_idr + and mis.txn_dt = src.txn_dt + and ( + case when isnull(src.expd_inf) = 1 then '' + else + substr(concat_ws('',src.expd_inf,'~~'), + instr(concat_ws('',src.expd_inf,'~~'),'~~a4') + 4, + instr(substr(concat_ws('',src.expd_inf,'~~'), + instr(concat_ws('',src.expd_inf,'~~'),'~~a4') + 4, length(concat_ws('',src.expd_inf,'~~'))),'~~') -1) + end + ) != '01';` + rows := [][]interface{}{ + {"HashJoin_9"}, + {"├─TableReader_12(Build)"}, + {"│ └─Selection_11"}, + {"│ └─TableRangeScan_10"}, + {"└─Selection_13(Probe)"}, + {" └─TableReader_16"}, + {" └─Selection_15"}, + {" └─TableFullScan_14"}, + } + tk.Session().GetSessionVars().EnableIndexJoinInnerSideMultiPattern = false + tk.MustQuery("explain "+sql).CheckAt([]int{0}, rows) + rows = [][]interface{}{ + {"IndexJoin_13"}, + {"├─TableReader_25(Build)"}, + {"│ └─Selection_24"}, + {"│ └─TableRangeScan_23"}, + {"└─Selection_12(Probe)"}, + {" └─IndexLookUp_11"}, + {" ├─IndexRangeScan_8(Build)"}, + {" └─Selection_10(Probe)"}, + {" └─TableRowIDScan_9"}, + } + tk.Session().GetSessionVars().EnableIndexJoinInnerSideMultiPattern = true + tk.MustQuery("explain "+sql).CheckAt([]int{0}, rows) + tk.Session().GetSessionVars().EnableIndexJoinInnerSideMultiPattern = true + tk.MustQuery(sql).Check(testkit.Rows("1 2022-12-01 123 1 2022-12-01 123 1 ")) + tk.Session().GetSessionVars().EnableIndexJoinInnerSideMultiPattern = false + tk.MustQuery(sql).Check(testkit.Rows("1 2022-12-01 123 1 2022-12-01 123 1 ")) +} diff --git a/executor/infoschema_reader.go b/executor/infoschema_reader.go index 39666333762ab..a016d5a459df1 100644 --- a/executor/infoschema_reader.go +++ b/executor/infoschema_reader.go @@ -3390,7 +3390,7 @@ func (e *memtableRetriever) setDataFromPlacementPolicies(sctx sessionctx.Context } func (e *memtableRetriever) setDataFromResourceGroups() error { - resourceGroups, err := infosync.GetAllResourceGroups(context.TODO()) + resourceGroups, err := infosync.ListResourceGroups(context.TODO()) if err != nil { return errors.Errorf("failed to access resource group manager, error message is %s", err.Error()) } diff --git a/expression/builtin_compare.go b/expression/builtin_compare.go index dec5d06983679..bed48c0e59096 100644 --- a/expression/builtin_compare.go +++ b/expression/builtin_compare.go @@ -1556,6 +1556,37 @@ func RefineComparedConstant(ctx sessionctx.Context, targetFieldType types.FieldT return con, false } +// Since the argument refining of cmp functions can bring some risks to the plan-cache, the optimizer +// needs to decide to whether to skip the refining or skip plan-cache for safety. +// For example, `unsigned_int_col > ?(-1)` can be refined to `True`, but the validation of this result +// can be broken if the parameter changes to 1 after. +func allowCmpArgsRefining4PlanCache(ctx sessionctx.Context, args []Expression) (allowRefining bool) { + if !MaybeOverOptimized4PlanCache(ctx, args) { + return true // plan-cache disabled or no parameter in these args + } + + // For these 2 cases below which may affect the index selection a lot, skip plan-cache, + // and for all other cases, skip the refining. + // 1. int-expr string-const + // 2. int-expr float/double/decimal-const + for conIdx := 0; conIdx < 2; conIdx++ { + if args[1-conIdx].GetType().EvalType() != types.ETInt { + continue // not a int-expr + } + if _, isCon := args[conIdx].(*Constant); !isCon { + continue // not a constant + } + conType := args[conIdx].GetType().EvalType() + if conType == types.ETString || conType == types.ETReal || conType == types.ETDecimal { + reason := errors.Errorf("skip plan-cache: '%v' may be converted to INT", args[conIdx].String()) + ctx.GetSessionVars().StmtCtx.SetSkipPlanCache(reason) + return true + } + } + + return false +} + // refineArgs will rewrite the arguments if the compare expression is `int column non-int constant` or // `non-int constant int column`. E.g., `a < 1.1` will be rewritten to `a < 2`. It also handles comparing year type // with int constant if the int constant falls into a sensible year representation. @@ -1565,31 +1596,17 @@ func (c *compareFunctionClass) refineArgs(ctx sessionctx.Context, args []Express arg0Type, arg1Type := args[0].GetType(), args[1].GetType() arg0IsInt := arg0Type.EvalType() == types.ETInt arg1IsInt := arg1Type.EvalType() == types.ETInt - arg0IsString := arg0Type.EvalType() == types.ETString - arg1IsString := arg1Type.EvalType() == types.ETString arg0, arg0IsCon := args[0].(*Constant) arg1, arg1IsCon := args[1].(*Constant) isExceptional, finalArg0, finalArg1 := false, args[0], args[1] isPositiveInfinite, isNegativeInfinite := false, false - if MaybeOverOptimized4PlanCache(ctx, args) { - // To keep the result be compatible with MySQL, refine `int non-constant str constant` - // here and skip this refine operation in all other cases for safety. - if (arg0IsInt && !arg0IsCon && arg1IsString && arg1IsCon) || (arg1IsInt && !arg1IsCon && arg0IsString && arg0IsCon) { - var reason error - if arg1IsString { - reason = errors.Errorf("skip plan-cache: '%v' may be converted to INT", arg1.String()) - } else { // arg0IsString - reason = errors.Errorf("skip plan-cache: '%v' may be converted to INT", arg0.String()) - } - ctx.GetSessionVars().StmtCtx.SetSkipPlanCache(reason) - RemoveMutableConst(ctx, args) - } else { - return args - } - } else if !ctx.GetSessionVars().StmtCtx.UseCache { - // We should remove the mutable constant for correctness, because its value may be changed. - RemoveMutableConst(ctx, args) + + if !allowCmpArgsRefining4PlanCache(ctx, args) { + return args } + // We should remove the mutable constant for correctness, because its value may be changed. + RemoveMutableConst(ctx, args) + // int non-constant [cmp] non-int constant if arg0IsInt && !arg0IsCon && !arg1IsInt && arg1IsCon { arg1, isExceptional = RefineComparedConstant(ctx, *arg0Type, arg1, c.op) diff --git a/go.mod b/go.mod index ab563b449d53c..5b2d4f414b4de 100644 --- a/go.mod +++ b/go.mod @@ -92,7 +92,7 @@ require ( github.com/stretchr/testify v1.8.1 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.6-0.20230207040004-9b3ecc1dcaa9 + github.com/tikv/client-go/v2 v2.0.6-0.20230207090754-29dfcc272912 github.com/tikv/pd/client v0.0.0-20230206191557-2a7c8d4c9676 github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e github.com/twmb/murmur3 v1.1.3 diff --git a/go.sum b/go.sum index 38b5f8c1baaf1..9fc6d63beb8f8 100644 --- a/go.sum +++ b/go.sum @@ -931,8 +931,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/client-go/v2 v2.0.6-0.20230207040004-9b3ecc1dcaa9 h1:gkAF7XxM2mfh5ZHbyLhXkaKyDd97soe1SMFIZ2vW260= -github.com/tikv/client-go/v2 v2.0.6-0.20230207040004-9b3ecc1dcaa9/go.mod h1:ySCWCno8tyITNFptwkLFUYCDJdNCtTVoRecNX1YSQIQ= +github.com/tikv/client-go/v2 v2.0.6-0.20230207090754-29dfcc272912 h1:1/ow7ZUnsU5CcxHF1cFAKdD+5b58tMbaeb8qAoli1m4= +github.com/tikv/client-go/v2 v2.0.6-0.20230207090754-29dfcc272912/go.mod h1:ySCWCno8tyITNFptwkLFUYCDJdNCtTVoRecNX1YSQIQ= github.com/tikv/pd/client v0.0.0-20230206191557-2a7c8d4c9676 h1:lzk+XYHs5iJ1lIxoza4Na3vLl/Z+y/qhpbkLS34WxaE= github.com/tikv/pd/client v0.0.0-20230206191557-2a7c8d4c9676/go.mod h1:ryhYHDwupsZHeOOF/N7So+1hbtAnuw0K2A+pKOElSVs= github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e h1:MV6KaVu/hzByHP0UvJ4HcMGE/8a6A4Rggc/0wx2AvJo= diff --git a/meta/meta.go b/meta/meta.go index 0e350c3d27eb5..801273a98cbf9 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -539,8 +539,8 @@ func (m *Meta) UpdatePolicy(policy *model.PolicyInfo) error { return m.txn.HSet(mPolicies, policyKey, attachMagicByte(data)) } -// CreateResourceGroup creates a resource group. -func (m *Meta) CreateResourceGroup(group *model.ResourceGroupInfo) error { +// AddResourceGroup creates a resource group. +func (m *Meta) AddResourceGroup(group *model.ResourceGroupInfo) error { if group.ID == 0 { return errors.New("group.ID is invalid") } diff --git a/meta/meta_test.go b/meta/meta_test.go index 0d6c37cc31d43..4eda2126b3887 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -128,7 +128,7 @@ func TestResourceGroup(t *testing.T) { RURate: 100, }, } - require.NoError(t, m.CreateResourceGroup(rg)) + require.NoError(t, m.AddResourceGroup(rg)) checkResourceGroup(100) rg.RURate = 200 diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 61575810da1fb..a39a14874ce7c 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -717,33 +717,78 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou } else { innerJoinKeys, outerJoinKeys, _, _ = p.GetJoinKeys() } - ds, isDataSource := innerChild.(*DataSource) - us, isUnionScan := innerChild.(*LogicalUnionScan) - if (!isDataSource && !isUnionScan) || (isDataSource && ds.preferStoreType&preferTiFlash != 0) { + innerChildWrapper := p.extractIndexJoinInnerChildPattern(innerChild) + if innerChildWrapper == nil { return nil } - if isUnionScan { - // The child of union scan may be union all for partition table. - ds, isDataSource = us.Children()[0].(*DataSource) + + var avgInnerRowCnt float64 + if outerChild.statsInfo().RowCount > 0 { + avgInnerRowCnt = p.equalCondOutCnt / outerChild.statsInfo().RowCount + } + joins = p.buildIndexJoinInner2TableScan(prop, innerChildWrapper, innerJoinKeys, outerJoinKeys, outerIdx, avgInnerRowCnt) + if joins != nil { + return + } + return p.buildIndexJoinInner2IndexScan(prop, innerChildWrapper, innerJoinKeys, outerJoinKeys, outerIdx, avgInnerRowCnt) +} + +type indexJoinInnerChildWrapper struct { + ds *DataSource + us *LogicalUnionScan + proj *LogicalProjection + sel *LogicalSelection +} + +func (p *LogicalJoin) extractIndexJoinInnerChildPattern(innerChild LogicalPlan) *indexJoinInnerChildWrapper { + wrapper := &indexJoinInnerChildWrapper{} + switch child := innerChild.(type) { + case *DataSource: + wrapper.ds = child + case *LogicalUnionScan: + wrapper.us = child + ds, isDataSource := wrapper.us.Children()[0].(*DataSource) if !isDataSource { return nil } + wrapper.ds = ds // If one of the union scan children is a TiFlash table, then we can't choose index join. - for _, child := range us.Children() { + for _, child := range wrapper.us.Children() { if ds, ok := child.(*DataSource); ok && ds.preferStoreType&preferTiFlash != 0 { return nil } } + case *LogicalProjection: + if !p.ctx.GetSessionVars().EnableIndexJoinInnerSideMultiPattern { + return nil + } + // For now, we only allow proj with all Column expression can be the inner side of index join + for _, expr := range child.Exprs { + if _, ok := expr.(*expression.Column); !ok { + return nil + } + } + wrapper.proj = child + ds, isDataSource := wrapper.proj.Children()[0].(*DataSource) + if !isDataSource { + return nil + } + wrapper.ds = ds + case *LogicalSelection: + if !p.ctx.GetSessionVars().EnableIndexJoinInnerSideMultiPattern { + return nil + } + wrapper.sel = child + ds, isDataSource := wrapper.sel.Children()[0].(*DataSource) + if !isDataSource { + return nil + } + wrapper.ds = ds } - var avgInnerRowCnt float64 - if outerChild.statsInfo().RowCount > 0 { - avgInnerRowCnt = p.equalCondOutCnt / outerChild.statsInfo().RowCount - } - joins = p.buildIndexJoinInner2TableScan(prop, ds, innerJoinKeys, outerJoinKeys, outerIdx, us, avgInnerRowCnt) - if joins != nil { - return + if wrapper.ds == nil || wrapper.ds.preferStoreType&preferTiFlash != 0 { + return nil } - return p.buildIndexJoinInner2IndexScan(prop, ds, innerJoinKeys, outerJoinKeys, outerIdx, us, avgInnerRowCnt) + return wrapper } func (p *LogicalJoin) getIndexJoinBuildHelper(ds *DataSource, innerJoinKeys []*expression.Column, checkPathValid func(path *util.AccessPath) bool, outerJoinKeys []*expression.Column) (*indexJoinBuildHelper, []int) { @@ -783,8 +828,10 @@ func (p *LogicalJoin) getIndexJoinBuildHelper(ds *DataSource, innerJoinKeys []*e // fetched from the inner side for every tuple from the outer side. This will be // promised to be no worse than building IndexScan as the inner child. func (p *LogicalJoin) buildIndexJoinInner2TableScan( - prop *property.PhysicalProperty, ds *DataSource, innerJoinKeys, outerJoinKeys []*expression.Column, - outerIdx int, us *LogicalUnionScan, avgInnerRowCnt float64) (joins []PhysicalPlan) { + prop *property.PhysicalProperty, wrapper *indexJoinInnerChildWrapper, innerJoinKeys, outerJoinKeys []*expression.Column, + outerIdx int, avgInnerRowCnt float64) (joins []PhysicalPlan) { + ds := wrapper.ds + us := wrapper.us var tblPath *util.AccessPath for _, path := range ds.possibleAccessPaths { if path.IsTablePath() && path.StoreType == kv.TiKV { @@ -806,13 +853,13 @@ func (p *LogicalJoin) buildIndexJoinInner2TableScan( return nil } rangeInfo := helper.buildRangeDecidedByInformation(helper.chosenPath.IdxCols, outerJoinKeys) - innerTask = p.constructInnerTableScanTask(ds, helper.chosenRanges.Range(), outerJoinKeys, us, rangeInfo, false, false, avgInnerRowCnt) + innerTask = p.constructInnerTableScanTask(wrapper, helper.chosenRanges.Range(), outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt) // The index merge join's inner plan is different from index join, so we // should construct another inner plan for it. // Because we can't keep order for union scan, if there is a union scan in inner task, // we can't construct index merge join. if us == nil { - innerTask2 = p.constructInnerTableScanTask(ds, helper.chosenRanges.Range(), outerJoinKeys, us, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) + innerTask2 = p.constructInnerTableScanTask(wrapper, helper.chosenRanges.Range(), outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) } ranges = helper.chosenRanges } else { @@ -846,13 +893,13 @@ func (p *LogicalJoin) buildIndexJoinInner2TableScan( } buffer.WriteString("]") rangeInfo := buffer.String() - innerTask = p.constructInnerTableScanTask(ds, ranges, outerJoinKeys, us, rangeInfo, false, false, avgInnerRowCnt) + innerTask = p.constructInnerTableScanTask(wrapper, ranges, outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt) // The index merge join's inner plan is different from index join, so we // should construct another inner plan for it. // Because we can't keep order for union scan, if there is a union scan in inner task, // we can't construct index merge join. if us == nil { - innerTask2 = p.constructInnerTableScanTask(ds, ranges, outerJoinKeys, us, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) + innerTask2 = p.constructInnerTableScanTask(wrapper, ranges, outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt) } } var ( @@ -880,8 +927,10 @@ func (p *LogicalJoin) buildIndexJoinInner2TableScan( } func (p *LogicalJoin) buildIndexJoinInner2IndexScan( - prop *property.PhysicalProperty, ds *DataSource, innerJoinKeys, outerJoinKeys []*expression.Column, - outerIdx int, us *LogicalUnionScan, avgInnerRowCnt float64) (joins []PhysicalPlan) { + prop *property.PhysicalProperty, wrapper *indexJoinInnerChildWrapper, innerJoinKeys, outerJoinKeys []*expression.Column, + outerIdx int, avgInnerRowCnt float64) (joins []PhysicalPlan) { + ds := wrapper.ds + us := wrapper.us helper, keyOff2IdxOff := p.getIndexJoinBuildHelper(ds, innerJoinKeys, func(path *util.AccessPath) bool { return !path.IsTablePath() }, outerJoinKeys) if helper == nil { return nil @@ -898,7 +947,7 @@ func (p *LogicalJoin) buildIndexJoinInner2IndexScan( maxOneRow = ok && (sf.FuncName.L == ast.EQ) } } - innerTask := p.constructInnerIndexScanTask(ds, helper.chosenPath, helper.chosenRanges.Range(), helper.chosenRemained, outerJoinKeys, us, rangeInfo, false, false, avgInnerRowCnt, maxOneRow) + innerTask := p.constructInnerIndexScanTask(wrapper, helper.chosenPath, helper.chosenRanges.Range(), helper.chosenRemained, outerJoinKeys, rangeInfo, false, false, avgInnerRowCnt, maxOneRow) failpoint.Inject("MockOnlyEnableIndexHashJoin", func(val failpoint.Value) { if val.(bool) && !p.ctx.GetSessionVars().InRestrictedSQL { failpoint.Return(p.constructIndexHashJoin(prop, outerIdx, innerTask, helper.chosenRanges, keyOff2IdxOff, helper.chosenPath, helper.lastColManager)) @@ -913,7 +962,7 @@ func (p *LogicalJoin) buildIndexJoinInner2IndexScan( // Because we can't keep order for union scan, if there is a union scan in inner task, // we can't construct index merge join. if us == nil { - innerTask2 := p.constructInnerIndexScanTask(ds, helper.chosenPath, helper.chosenRanges.Range(), helper.chosenRemained, outerJoinKeys, us, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow) + innerTask2 := p.constructInnerIndexScanTask(wrapper, helper.chosenPath, helper.chosenRanges.Range(), helper.chosenRemained, outerJoinKeys, rangeInfo, true, !prop.IsSortItemEmpty() && prop.SortItems[0].Desc, avgInnerRowCnt, maxOneRow) if innerTask2 != nil { joins = append(joins, p.constructIndexMergeJoin(prop, outerIdx, innerTask2, helper.chosenRanges, keyOff2IdxOff, helper.chosenPath, helper.lastColManager)...) } @@ -968,15 +1017,15 @@ func (ijHelper *indexJoinBuildHelper) buildRangeDecidedByInformation(idxCols []* // constructInnerTableScanTask is specially used to construct the inner plan for PhysicalIndexJoin. func (p *LogicalJoin) constructInnerTableScanTask( - ds *DataSource, + wrapper *indexJoinInnerChildWrapper, ranges ranger.Ranges, outerJoinKeys []*expression.Column, - us *LogicalUnionScan, rangeInfo string, keepOrder bool, desc bool, rowCount float64, ) task { + ds := wrapper.ds // If `ds.tableInfo.GetPartitionInfo() != nil`, // it means the data source is a partition table reader. // If the inner task need to keep order, the partition table reader can't satisfy it. @@ -1038,10 +1087,51 @@ func (p *LogicalJoin) constructInnerTableScanTask( ts.addPushedDownSelection(copTask, selStats) t := copTask.convertToRootTask(ds.ctx) reader := t.p - t.p = p.constructInnerUnionScan(us, reader) + t.p = p.constructInnerByWrapper(wrapper, reader) return t } +func (p *LogicalJoin) constructInnerByWrapper(wrapper *indexJoinInnerChildWrapper, child PhysicalPlan) PhysicalPlan { + if !p.ctx.GetSessionVars().EnableIndexJoinInnerSideMultiPattern { + if wrapper.us != nil { + return p.constructInnerUnionScan(wrapper.us, child) + } + return child + } + if wrapper.us != nil { + return p.constructInnerUnionScan(wrapper.us, child) + } else if wrapper.proj != nil { + return p.constructInnerProj(wrapper.proj, child) + } else if wrapper.sel != nil { + return p.constructInnerSel(wrapper.sel, child) + } + return child +} + +func (p *LogicalJoin) constructInnerSel(sel *LogicalSelection, child PhysicalPlan) PhysicalPlan { + if sel == nil { + return child + } + physicalSel := PhysicalSelection{ + Conditions: sel.Conditions, + }.Init(sel.ctx, sel.stats, sel.blockOffset, nil) + physicalSel.SetChildren(child) + return physicalSel +} + +func (p *LogicalJoin) constructInnerProj(proj *LogicalProjection, child PhysicalPlan) PhysicalPlan { + if proj == nil { + return child + } + physicalProj := PhysicalProjection{ + Exprs: proj.Exprs, + CalculateNoDelay: proj.CalculateNoDelay, + AvoidColumnEvaluator: proj.AvoidColumnEvaluator, + }.Init(proj.ctx, proj.stats, proj.blockOffset, nil) + physicalProj.SetChildren(child) + return physicalProj +} + func (p *LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader PhysicalPlan) PhysicalPlan { if us == nil { return reader @@ -1058,18 +1148,18 @@ func (p *LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader Physi // constructInnerIndexScanTask is specially used to construct the inner plan for PhysicalIndexJoin. func (p *LogicalJoin) constructInnerIndexScanTask( - ds *DataSource, + wrapper *indexJoinInnerChildWrapper, path *util.AccessPath, ranges ranger.Ranges, filterConds []expression.Expression, _ []*expression.Column, - us *LogicalUnionScan, rangeInfo string, keepOrder bool, desc bool, rowCount float64, maxOneRow bool, ) task { + ds := wrapper.ds // If `ds.tableInfo.GetPartitionInfo() != nil`, // it means the data source is a partition table reader. // If the inner task need to keep order, the partition table reader can't satisfy it. @@ -1194,7 +1284,7 @@ func (p *LogicalJoin) constructInnerIndexScanTask( is.addPushedDownSelection(cop, ds, tmpPath, finalStats) t := cop.convertToRootTask(ds.ctx) reader := t.p - t.p = p.constructInnerUnionScan(us, reader) + t.p = p.constructInnerByWrapper(wrapper, reader) return t } diff --git a/planner/core/plan_cache_test.go b/planner/core/plan_cache_test.go index 8278050681553..4ecb7b3731317 100644 --- a/planner/core/plan_cache_test.go +++ b/planner/core/plan_cache_test.go @@ -457,6 +457,45 @@ func TestIssue40225(t *testing.T) { tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1")) } +func TestIssue40679(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (a int, key(a));") + tk.MustExec("prepare st from 'select * from t use index(a) where a < ?'") + tk.MustExec("set @a1=1.1") + tk.MustExec("execute st using @a1") + + tkProcess := tk.Session().ShowProcess() + ps := []*util.ProcessInfo{tkProcess} + tk.Session().SetSessionManager(&testkit.MockSessionManager{PS: ps}) + rows := tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Rows() + require.True(t, strings.Contains(rows[1][0].(string), "RangeScan")) // RangeScan not FullScan + + tk.MustExec("execute st using @a1") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: '1.1' may be converted to INT")) +} + +func TestIssue41032(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`CREATE TABLE PK_SIGNED_10087 ( + COL1 mediumint(8) unsigned NOT NULL, + COL2 varchar(20) DEFAULT NULL, + COL4 datetime DEFAULT NULL, + COL3 bigint(20) DEFAULT NULL, + COL5 float DEFAULT NULL, + PRIMARY KEY (COL1) )`) + tk.MustExec(`insert into PK_SIGNED_10087 values(0, "痥腜蟿鮤枓欜喧檕澙姭袐裄钭僇剕焍哓閲疁櫘", "0017-11-14 05:40:55", -4504684261333179273, 7.97449e37)`) + tk.MustExec(`prepare stmt from 'SELECT/*+ HASH_JOIN(t1, t2) */ t2.* FROM PK_SIGNED_10087 t1 JOIN PK_SIGNED_10087 t2 ON t1.col1 = t2.col1 WHERE t2.col1 >= ? AND t1.col1 >= ?;'`) + tk.MustExec(`set @a=0, @b=0`) + tk.MustQuery(`execute stmt using @a,@b`).Check(testkit.Rows("0 痥腜蟿鮤枓欜喧檕澙姭袐裄钭僇剕焍哓閲疁櫘 0017-11-14 05:40:55 -4504684261333179273 79744900000000000000000000000000000000")) + tk.MustExec(`set @a=8950167, @b=16305982`) + tk.MustQuery(`execute stmt using @a,@b`).Check(testkit.Rows()) + tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1")) +} + func TestPlanCacheWithLimit(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index f1c65fcd9fb4d..a449cee57e69f 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -1142,3 +1143,15 @@ func TestJSONPlanInExplain(t *testing.T) { } } } + +func TestIssue40535(t *testing.T) { + store := testkit.CreateMockStore(t) + var cfg kv.InjectionConfig + tk := testkit.NewTestKit(t, kv.NewInjectedStore(store, &cfg)) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t1; drop table if exists t2;") + tk.MustExec("CREATE TABLE `t1`(`c1` bigint(20) NOT NULL DEFAULT '-2312745469307452950', `c2` datetime DEFAULT '5316-02-03 06:54:49', `c3` tinyblob DEFAULT NULL, PRIMARY KEY (`c1`) /*T![clustered_index] CLUSTERED */) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;") + tk.MustExec("CREATE TABLE `t2`(`c1` set('kn8pu','7et','vekx6','v3','liwrh','q14','1met','nnd5i','5o0','8cz','l') DEFAULT '7et,vekx6,liwrh,q14,1met', `c2` float DEFAULT '1.683167', KEY `k1` (`c2`,`c1`), KEY `k2` (`c2`)) ENGINE=InnoDB DEFAULT CHARSET=gbk COLLATE=gbk_chinese_ci;") + tk.MustExec("(select /*+ agg_to_cop()*/ locate(t1.c3, t1.c3) as r0, t1.c3 as r1 from t1 where not( IsNull(t1.c1)) order by r0,r1) union all (select concat_ws(',', t2.c2, t2.c1) as r0, t2.c1 as r1 from t2 order by r0, r1) order by 1 limit 273;") + require.Empty(t, tk.Session().LastMessage()) +} diff --git a/planner/core/rule_topn_push_down.go b/planner/core/rule_topn_push_down.go index 5dacac7579caa..2b4998049911a 100644 --- a/planner/core/rule_topn_push_down.go +++ b/planner/core/rule_topn_push_down.go @@ -136,6 +136,20 @@ func (p *LogicalProjection) pushDownTopN(topN *LogicalTopN, opt *logicalOptimize topN.ByItems = append(topN.ByItems[:i], topN.ByItems[i+1:]...) } } + + // if topN.ByItems contains a column(with ID=0) generated by projection, projection will prevent the optimizer from pushing topN down. + for _, by := range topN.ByItems { + cols := expression.ExtractColumns(by.Expr) + for _, col := range cols { + if col.ID == 0 && p.Schema().Contains(col) { + // check whether the column is generated by projection + if !p.children[0].Schema().Contains(col) { + p.children[0] = p.children[0].pushDownTopN(nil, opt) + return topN.setChild(p, opt) + } + } + } + } } p.children[0] = p.children[0].pushDownTopN(topN, opt) return p diff --git a/server/stat_test.go b/server/stat_test.go index 4484823f6dc83..2cea08933c2c0 100644 --- a/server/stat_test.go +++ b/server/stat_test.go @@ -46,7 +46,7 @@ func TestUptime(t *testing.T) { }() require.NoError(t, err) - _, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), true) + _, err = infosync.GlobalInfoSyncerInit(context.Background(), dom.DDL().GetID(), dom.ServerID, dom.GetEtcdClient(), dom.GetEtcdClient(), dom.GetPDClient(), true) require.NoError(t, err) tidbdrv := NewTiDBDriver(store) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 64c79ea32646e..46cc6110af27e 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1344,6 +1344,10 @@ type SessionVars struct { // PessimisticTransactionAggressiveLocking controls whether aggressive locking for pessimistic transaction // is enabled. PessimisticTransactionAggressiveLocking bool + + // EnableIndexJoinInnerSideMultiPattern indicates whether enable multi pattern for index join inner side + // For now it is not public to user + EnableIndexJoinInnerSideMultiPattern bool } // planReplayerSessionFinishedTaskKeyLen is used to control the max size for the finished plan replayer task key in session diff --git a/statistics/handle/update.go b/statistics/handle/update.go index 8437eb57006b0..eb65e2da7c75e 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -408,22 +408,35 @@ var ( dumpStatsMaxDuration = time.Hour ) -// needDumpStatsDelta returns true when only updates a small portion of the table and the time since last update -// do not exceed one hour. -func needDumpStatsDelta(h *Handle, id int64, item variable.TableDelta, currentTime time.Time) bool { - if item.InitTime.IsZero() { - item.InitTime = currentTime +// needDumpStatsDelta checks whether to dump stats delta. +// 1. If the table doesn't exist or is a mem table or system table, then return false. +// 2. If the mode is DumpAll, then return true. +// 3. If the stats delta haven't been dumped in the past hour, then return true. +// 4. If the table stats is pseudo or empty or `Modify Count / Table Count` exceeds the threshold. +func (h *Handle) needDumpStatsDelta(is infoschema.InfoSchema, mode dumpMode, id int64, item variable.TableDelta, currentTime time.Time) bool { + tbl, ok := h.getTableByPhysicalID(is, id) + if !ok { + return false } - tbl, ok := h.statsCache.Load().(statsCache).Get(id) + dbInfo, ok := is.SchemaByTable(tbl.Meta()) if !ok { - // No need to dump if the stats is invalid. return false } + if util.IsMemOrSysDB(dbInfo.Name.L) { + return false + } + if mode == DumpAll { + return true + } + if item.InitTime.IsZero() { + item.InitTime = currentTime + } if currentTime.Sub(item.InitTime) > dumpStatsMaxDuration { // Dump the stats to kv at least once an hour. return true } - if tbl.Count == 0 || float64(item.Count)/float64(tbl.Count) > DumpStatsDeltaRatio { + statsTbl := h.GetPartitionStats(tbl.Meta(), id) + if statsTbl.Pseudo || statsTbl.Count == 0 || float64(item.Count)/float64(statsTbl.Count) > DumpStatsDeltaRatio { // Dump the stats when there are many modifications. return true } @@ -492,9 +505,15 @@ func (h *Handle) DumpStatsDeltaToKV(mode dumpMode) error { h.globalMap.data = deltaMap h.globalMap.Unlock() }() + // TODO: pass in do.InfoSchema() to DumpStatsDeltaToKV. + is := func() infoschema.InfoSchema { + h.mu.Lock() + defer h.mu.Unlock() + return h.mu.ctx.GetDomainInfoSchema().(infoschema.InfoSchema) + }() currentTime := time.Now() for id, item := range deltaMap { - if mode == DumpDelta && !needDumpStatsDelta(h, id, item, currentTime) { + if !h.needDumpStatsDelta(is, mode, id, item, currentTime) { continue } updated, err := h.dumpTableStatCountToKV(id, item) diff --git a/statistics/handle/update_test.go b/statistics/handle/update_test.go index e4aba902a8022..edbef59135696 100644 --- a/statistics/handle/update_test.go +++ b/statistics/handle/update_test.go @@ -2667,20 +2667,42 @@ func TestFillMissingStatsMeta(t *testing.T) { } tk.MustExec("insert into t1 values (1, 2), (3, 4)") - require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll)) + require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpDelta)) + require.NoError(t, h.Update(is)) ver1 := checkStatsMeta(tbl1ID, "2", "2") tk.MustExec("delete from t1 where a = 1") - require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll)) + require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpDelta)) + require.NoError(t, h.Update(is)) ver2 := checkStatsMeta(tbl1ID, "3", "1") require.Greater(t, ver2, ver1) tk.MustExec("insert into t2 values (1, 2), (3, 4)") - require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll)) + require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpDelta)) + require.NoError(t, h.Update(is)) checkStatsMeta(p0ID, "2", "2") globalVer1 := checkStatsMeta(tbl2ID, "2", "2") tk.MustExec("insert into t2 values (11, 12)") - require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll)) + require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpDelta)) + require.NoError(t, h.Update(is)) checkStatsMeta(p1ID, "1", "1") globalVer2 := checkStatsMeta(tbl2ID, "3", "3") require.Greater(t, globalVer2, globalVer1) } + +func TestNotDumpSysTable(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1 (a int, b int)") + h := dom.StatsHandle() + require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + tk.MustQuery("select count(1) from mysql.stats_meta").Check(testkit.Rows("1")) + // After executing `delete from mysql.stats_meta`, a delta for mysql.stats_meta is created but it would not be dumped. + tk.MustExec("delete from mysql.stats_meta") + require.NoError(t, h.DumpStatsDeltaToKV(handle.DumpAll)) + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("mysql"), model.NewCIStr("stats_meta")) + require.NoError(t, err) + tblID := tbl.Meta().ID + tk.MustQuery(fmt.Sprintf("select * from mysql.stats_meta where table_id = %v", tblID)).Check(testkit.Rows()) +} diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 573f746a6b0d3..e1c5fb0b91d00 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -70,6 +70,7 @@ const ( copNextMaxBackoff = 20000 CopSmallTaskRow = 32 // 32 is the initial batch size of TiKV smallTaskSigma = 0.5 + smallConcPerCore = 20 ) // CopClient is coprocessor client. @@ -200,7 +201,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars } if tryRowHint { var smallTasks int - smallTasks, it.smallTaskConcurrency = smallTaskConcurrency(tasks) + smallTasks, it.smallTaskConcurrency = smallTaskConcurrency(tasks, c.store.numcpu) if len(tasks)-smallTasks < it.concurrency { it.concurrency = len(tasks) - smallTasks } @@ -580,7 +581,7 @@ func isSmallTask(task *copTask) bool { // smallTaskConcurrency counts the small tasks of tasks, // then returns the task count and extra concurrency for small tasks. -func smallTaskConcurrency(tasks []*copTask) (int, int) { +func smallTaskConcurrency(tasks []*copTask, numcpu int) (int, int) { res := 0 for _, task := range tasks { if isSmallTask(task) { @@ -592,8 +593,15 @@ func smallTaskConcurrency(tasks []*copTask) (int, int) { } // Calculate the extra concurrency for small tasks // extra concurrency = tasks / (1 + sigma * sqrt(log(tasks ^ 2))) - extraConc := float64(res) / (1 + smallTaskSigma*math.Sqrt(2*math.Log(float64(res)))) - return res, int(extraConc) + extraConc := int(float64(res) / (1 + smallTaskSigma*math.Sqrt(2*math.Log(float64(res))))) + if numcpu <= 0 { + numcpu = 1 + } + smallTaskConcurrencyLimit := smallConcPerCore * numcpu + if extraConc > smallTaskConcurrencyLimit { + extraConc = smallTaskConcurrencyLimit + } + return res, extraConc } type copIterator struct { diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index c94d441932d8c..ed6f2c6f3cb81 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -696,7 +696,7 @@ func TestBasicSmallTaskConc(t *testing.T) { require.True(t, isSmallTask(&copTask{RowCountHint: 6})) require.True(t, isSmallTask(&copTask{RowCountHint: CopSmallTaskRow})) require.False(t, isSmallTask(&copTask{RowCountHint: CopSmallTaskRow + 1})) - _, conc := smallTaskConcurrency([]*copTask{}) + _, conc := smallTaskConcurrency([]*copTask{}, 16) require.GreaterOrEqual(t, conc, 0) } @@ -734,7 +734,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { require.Equal(t, tasks[2].RowCountHint, 3) // task[3] ["t"-"x", "y"-"z"] require.Equal(t, tasks[3].RowCountHint, 3+CopSmallTaskRow) - _, conc := smallTaskConcurrency(tasks) + _, conc := smallTaskConcurrency(tasks, 16) require.Equal(t, conc, 1) ranges = buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z") @@ -753,7 +753,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { require.Equal(t, tasks[2].RowCountHint, 3) // task[3] ["t"-"x", "y"-"z"] require.Equal(t, tasks[3].RowCountHint, 6) - _, conc = smallTaskConcurrency(tasks) + _, conc = smallTaskConcurrency(tasks, 16) require.Equal(t, conc, 2) // cross-region long range @@ -774,3 +774,20 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { // task[3] ["t"-"z"] require.Equal(t, tasks[3].RowCountHint, 10) } + +func TestSmallTaskConcurrencyLimit(t *testing.T) { + smallTaskCount := 1000 + tasks := make([]*copTask, 0, smallTaskCount) + for i := 0; i < smallTaskCount; i++ { + tasks = append(tasks, &copTask{ + RowCountHint: 1, + }) + } + count, conc := smallTaskConcurrency(tasks, 1) + require.Equal(t, smallConcPerCore, conc) + require.Equal(t, smallTaskCount, count) + // also handle 0 value. + count, conc = smallTaskConcurrency(tasks, 0) + require.Equal(t, smallConcPerCore, conc) + require.Equal(t, smallTaskCount, count) +} diff --git a/store/copr/store.go b/store/copr/store.go index 32553961acc67..afd1004bdba4d 100644 --- a/store/copr/store.go +++ b/store/copr/store.go @@ -17,6 +17,7 @@ package copr import ( "context" "math/rand" + "runtime" "sync/atomic" "time" @@ -76,6 +77,7 @@ type Store struct { *kvStore coprCache *coprCache replicaReadSeed uint32 + numcpu int } // NewStore creates a new store instance. @@ -90,6 +92,7 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store kvStore: &kvStore{store: s}, coprCache: coprCache, replicaReadSeed: rand.Uint32(), + numcpu: runtime.GOMAXPROCS(0), }, nil } diff --git a/store/mockstore/unistore/pd.go b/store/mockstore/unistore/pd.go index d72247ef8b44f..85c64ba5b3987 100644 --- a/store/mockstore/unistore/pd.go +++ b/store/mockstore/unistore/pd.go @@ -34,10 +34,14 @@ var _ pd.Client = new(pdClient) type pdClient struct { *us.MockPD - serviceSafePoints map[string]uint64 - gcSafePointMu sync.Mutex - globalConfig map[string]string - externalTimestamp atomic.Uint64 + serviceSafePoints map[string]uint64 + gcSafePointMu sync.Mutex + globalConfig map[string]string + externalTimestamp atomic.Uint64 + resourceGroupManager struct { + sync.RWMutex + groups map[string]*rmpb.ResourceGroup + } } func newPDClient(pd *us.MockPD) *pdClient { @@ -45,6 +49,12 @@ func newPDClient(pd *us.MockPD) *pdClient { MockPD: pd, serviceSafePoints: make(map[string]uint64), globalConfig: make(map[string]string), + resourceGroupManager: struct { + sync.RWMutex + groups map[string]*rmpb.ResourceGroup + }{ + groups: make(map[string]*rmpb.ResourceGroup), + }, } } @@ -187,23 +197,44 @@ func (c *pdClient) UpdateKeyspaceState(ctx context.Context, id uint32, state key } func (c *pdClient) ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) { - return nil, nil + c.resourceGroupManager.RLock() + defer c.resourceGroupManager.RUnlock() + groups := make([]*rmpb.ResourceGroup, 0, len(c.resourceGroupManager.groups)) + for _, group := range c.resourceGroupManager.groups { + groups = append(groups, group) + } + return groups, nil } -func (c *pdClient) GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error) { - return nil, nil +func (c *pdClient) GetResourceGroup(ctx context.Context, name string) (*rmpb.ResourceGroup, error) { + c.resourceGroupManager.RLock() + defer c.resourceGroupManager.RUnlock() + group, ok := c.resourceGroupManager.groups[name] + if !ok { + return nil, nil + } + return group, nil } -func (c *pdClient) AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) { - return "", nil +func (c *pdClient) AddResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) (string, error) { + c.resourceGroupManager.Lock() + defer c.resourceGroupManager.Unlock() + c.resourceGroupManager.groups[group.Name] = group + return "Success!", nil } -func (c *pdClient) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) { - return "", nil +func (c *pdClient) ModifyResourceGroup(ctx context.Context, group *rmpb.ResourceGroup) (string, error) { + c.resourceGroupManager.Lock() + defer c.resourceGroupManager.Unlock() + c.resourceGroupManager.groups[group.Name] = group + return "Success!", nil } -func (c *pdClient) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) { - return "", nil +func (c *pdClient) DeleteResourceGroup(ctx context.Context, name string) (string, error) { + c.resourceGroupManager.Lock() + defer c.resourceGroupManager.Unlock() + delete(c.resourceGroupManager.groups, name) + return "Success!", nil } func (c *pdClient) WatchResourceGroup(ctx context.Context, revision int64) (chan []*rmpb.ResourceGroup, error) {