From 7353fbeea8c0d901d6847be9aabd5a5285deea3a Mon Sep 17 00:00:00 2001 From: JmPotato Date: Fri, 1 Dec 2023 19:23:49 +0800 Subject: [PATCH] infosync: fully integrated PD HTTP client into InfoSyncer (#49006) ref pingcap/tidb#35319 --- DEPS.bzl | 24 +- br/pkg/lightning/backend/local/local_test.go | 2 +- br/pkg/pdutil/pd.go | 4 +- br/pkg/streamhelper/basic_lib_for_test.go | 2 +- br/tests/br_key_locked/codec.go | 3 +- go.mod | 4 +- go.sum | 8 +- pkg/ddl/placement/rule.go | 99 ----- pkg/domain/domain_test.go | 1 + pkg/domain/infosync/BUILD.bazel | 2 +- pkg/domain/infosync/info.go | 152 +++----- pkg/domain/infosync/info_test.go | 40 -- pkg/domain/infosync/region.go | 41 +-- pkg/domain/infosync/schedule_manager.go | 49 +-- pkg/domain/infosync/tiflash_manager.go | 346 +++--------------- pkg/store/helper/helper.go | 2 +- pkg/store/mockstore/unistore/pd.go | 2 +- .../mockstore/unistore/tikv/mock_region.go | 6 +- pkg/ttl/cache/split_test.go | 2 +- 19 files changed, 157 insertions(+), 632 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index efe874b98ae06..a499f80da9523 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -7006,26 +7006,26 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sha256 = "548df2ca5c27559e3318b97b4cb91703d5c253410e7f9fa0eb926e2d3aa28b59", - strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20231116051730-1c2351c28173", + sha256 = "c6c5bfe8a84d0b680266035d1ac87af6beca4cc552693d6753d4a9f18610c30d", + strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20231201024404-0ff16620f6c0", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231116051730-1c2351c28173.zip", - "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231116051730-1c2351c28173.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231116051730-1c2351c28173.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231116051730-1c2351c28173.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231201024404-0ff16620f6c0.zip", + "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231201024404-0ff16620f6c0.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231201024404-0ff16620f6c0.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231201024404-0ff16620f6c0.zip", ], ) go_repository( name = "com_github_tikv_pd_client", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/pd/client", - sha256 = "5232ba0bba677a6d4614ae2cc102554d59cd00d473d9138739508d6f25169f02", - strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231127075044-9f4803d8bd05", + sha256 = "a6dcccfd436232d847d2ba2669d67d9708d7bc97f7571a4e57da18fdefeed852", + strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231130081618-862eee18738e", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip", - "http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231130081618-862eee18738e.zip", + "http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231130081618-862eee18738e.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231130081618-862eee18738e.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231130081618-862eee18738e.zip", ], ) go_repository( diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 59b19de75cd77..b6347d5bc8f3f 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -668,7 +668,7 @@ func (c *mockPdClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOpti return c.stores, nil } -func (c *mockPdClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*pd.Region, error) { +func (c *mockPdClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...pd.GetRegionOption) ([]*pd.Region, error) { return c.regions, nil } diff --git a/br/pkg/pdutil/pd.go b/br/pkg/pdutil/pd.go index 7eae3a0652989..27395e6adfcce 100644 --- a/br/pkg/pdutil/pd.go +++ b/br/pkg/pdutil/pd.go @@ -389,7 +389,9 @@ func (p *PdController) getRegionCountWith( } var err error for _, addr := range p.getAllPDAddrs() { - v, e := get(ctx, addr, pdhttp.RegionStatsByKeyRange(pdhttp.NewKeyRange(start, end)), p.cli, http.MethodGet, nil) + v, e := get(ctx, addr, + pdhttp.RegionStatsByKeyRange(pdhttp.NewKeyRange(start, end), false), + p.cli, http.MethodGet, nil) if e != nil { err = e continue diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 59490e32ec7b9..ca9624ec45ccd 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -712,7 +712,7 @@ type mockPDClient struct { fakeRegions []*region } -func (p *mockPDClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*pd.Region, error) { +func (p *mockPDClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int, _ ...pd.GetRegionOption) ([]*pd.Region, error) { sort.Slice(p.fakeRegions, func(i, j int) bool { return bytes.Compare(p.fakeRegions[i].rng.StartKey, p.fakeRegions[j].rng.StartKey) < 0 }) diff --git a/br/tests/br_key_locked/codec.go b/br/tests/br_key_locked/codec.go index 9ed40d2677ef5..df95491e3b028 100644 --- a/br/tests/br_key_locked/codec.go +++ b/br/tests/br_key_locked/codec.go @@ -56,13 +56,14 @@ func (c *codecPDClient) ScanRegions( startKey []byte, endKey []byte, limit int, + opts ...pd.GetRegionOption, ) ([]*pd.Region, error) { startKey = codec.EncodeBytes(nil, startKey) if len(endKey) > 0 { endKey = codec.EncodeBytes(nil, endKey) } - regions, err := c.Client.ScanRegions(ctx, startKey, endKey, limit) + regions, err := c.Client.ScanRegions(ctx, startKey, endKey, limit, opts...) if err != nil { return nil, errors.Trace(err) } diff --git a/go.mod b/go.mod index e2b4631f638ed..7edced824176d 100644 --- a/go.mod +++ b/go.mod @@ -102,8 +102,8 @@ require ( github.com/stretchr/testify v1.8.4 github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173 - github.com/tikv/pd/client v0.0.0-20231127075044-9f4803d8bd05 + github.com/tikv/client-go/v2 v2.0.8-0.20231201024404-0ff16620f6c0 + github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 github.com/twmb/murmur3 v1.1.6 github.com/uber/jaeger-client-go v2.22.1+incompatible diff --git a/go.sum b/go.sum index 9caa01c14d7fd..08e1774963021 100644 --- a/go.sum +++ b/go.sum @@ -851,10 +851,10 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173 h1:lmJzX0kqrV7kO21wrZPbtjkidzwbDCfXeQrhDWEi5dE= -github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173/go.mod h1:BOGTSZtbMHEnGC4HOpbONdnTQF+E9nb2Io7c3P9sb7g= -github.com/tikv/pd/client v0.0.0-20231127075044-9f4803d8bd05 h1:87NPUfzaVrO5MTBwVCPQ/FlJGpFnHi6WFYHDYD3n3Zc= -github.com/tikv/pd/client v0.0.0-20231127075044-9f4803d8bd05/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= +github.com/tikv/client-go/v2 v2.0.8-0.20231201024404-0ff16620f6c0 h1:am8ME/PpDyhIM6oFg7QLq1FswcuL2DHi4PRBzkSDgGY= +github.com/tikv/client-go/v2 v2.0.8-0.20231201024404-0ff16620f6c0/go.mod h1:M+i6/DR7y1J7mG3SOdrmdkuLZe1gEqkdiV8J+mX1DiI= +github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e h1:11cWLLmEreKof/VJi6LLQ+Jkav5ZqPJgeI+KX4pc/DE= +github.com/tikv/pd/client v0.0.0-20231130081618-862eee18738e/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= diff --git a/pkg/ddl/placement/rule.go b/pkg/ddl/placement/rule.go index 5364607a1bb96..ab15e5f582e91 100644 --- a/pkg/ddl/placement/rule.go +++ b/pkg/ddl/placement/rule.go @@ -15,13 +15,10 @@ package placement import ( - "encoding/hex" - "encoding/json" "fmt" "regexp" "strings" - "github.com/pingcap/tidb/pkg/util/codec" pd "github.com/tikv/pd/client/http" "gopkg.in/yaml.v2" ) @@ -32,102 +29,6 @@ const ( attributeEvictLeader = "evict-leader" ) -// RuleGroupConfig defines basic config of rule group -type RuleGroupConfig struct { - ID string `json:"id"` - Index int `json:"index"` - Override bool `json:"override"` -} - -var ( - _ json.Marshaler = (*TiFlashRule)(nil) - _ json.Unmarshaler = (*TiFlashRule)(nil) -) - -// TiFlashRule extends Rule with other necessary fields. -type TiFlashRule struct { - GroupID string - ID string - Index int - Override bool - Role pd.PeerRoleType - Count int - Constraints []pd.LabelConstraint - LocationLabels []string - IsolationLevel string - StartKey []byte - EndKey []byte -} - -type tiFlashRule struct { - GroupID string `json:"group_id"` - ID string `json:"id"` - Index int `json:"index,omitempty"` - Override bool `json:"override,omitempty"` - Role pd.PeerRoleType `json:"role"` - Count int `json:"count"` - Constraints []pd.LabelConstraint `json:"label_constraints,omitempty"` - LocationLabels []string `json:"location_labels,omitempty"` - IsolationLevel string `json:"isolation_level,omitempty"` - StartKeyHex string `json:"start_key"` - EndKeyHex string `json:"end_key"` -} - -// MarshalJSON implements json.Marshaler interface for TiFlashRule. -func (r *TiFlashRule) MarshalJSON() ([]byte, error) { - return json.Marshal(&tiFlashRule{ - GroupID: r.GroupID, - ID: r.ID, - Index: r.Index, - Override: r.Override, - Role: r.Role, - Count: r.Count, - Constraints: r.Constraints, - LocationLabels: r.LocationLabels, - IsolationLevel: r.IsolationLevel, - StartKeyHex: hex.EncodeToString(codec.EncodeBytes(nil, r.StartKey)), - EndKeyHex: hex.EncodeToString(codec.EncodeBytes(nil, r.EndKey)), - }) -} - -// UnmarshalJSON implements json.Unmarshaler interface for TiFlashRule. -func (r *TiFlashRule) UnmarshalJSON(bytes []byte) error { - var rule tiFlashRule - if err := json.Unmarshal(bytes, &rule); err != nil { - return err - } - *r = TiFlashRule{ - GroupID: rule.GroupID, - ID: rule.ID, - Index: rule.Index, - Override: rule.Override, - Role: rule.Role, - Count: rule.Count, - Constraints: rule.Constraints, - LocationLabels: rule.LocationLabels, - IsolationLevel: rule.IsolationLevel, - } - - startKey, err := hex.DecodeString(rule.StartKeyHex) - if err != nil { - return err - } - - endKey, err := hex.DecodeString(rule.EndKeyHex) - if err != nil { - return err - } - - _, r.StartKey, err = codec.DecodeBytes(startKey, nil) - if err != nil { - return err - } - - _, r.EndKey, err = codec.DecodeBytes(endKey, nil) - - return err -} - // RuleBuilder is used to build the Rules from a constraint string. type RuleBuilder struct { role pd.PeerRoleType diff --git a/pkg/domain/domain_test.go b/pkg/domain/domain_test.go index 861b6b71b465e..7a667e84f0984 100644 --- a/pkg/domain/domain_test.go +++ b/pkg/domain/domain_test.go @@ -58,6 +58,7 @@ func TestInfo(t *testing.T) { t.Skip("ETCD use ip:port as unix socket address, skip when it is unavailable.") } + // NOTICE: this failpoint has been REMOVED, be aware of this if you want to reopen this test. require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/infosync/FailPlacement", `return(true)`)) s, err := mockstore.NewMockStore() diff --git a/pkg/domain/infosync/BUILD.bazel b/pkg/domain/infosync/BUILD.bazel index 2680731b34312..1b20551cf539a 100644 --- a/pkg/domain/infosync/BUILD.bazel +++ b/pkg/domain/infosync/BUILD.bazel @@ -62,7 +62,7 @@ go_test( srcs = ["info_test.go"], embed = [":infosync"], flaky = True, - shard_count = 4, + shard_count = 3, deps = [ "//pkg/ddl/placement", "//pkg/ddl/util", diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index 5b53adc260cff..ba101953be528 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -105,6 +105,7 @@ type InfoSyncer struct { // It must be used when the etcd path isn't needed to separate by keyspace. // See keyspace RFC: https://github.com/pingcap/tidb/pull/39685 unprefixedEtcdCli *clientv3.Client + pdHTTPCli pdhttp.Client info *ServerInfo serverInfoPath string minStartTS uint64 @@ -198,9 +199,13 @@ func GlobalInfoSyncerInit( codec tikv.Codec, skipRegisterToDashBoard bool, ) (*InfoSyncer, error) { + if pdHTTPCli != nil { + pdHTTPCli = pdHTTPCli.WithRespHandler(pdResponseHandler) + } is := &InfoSyncer{ etcdCli: etcdCli, unprefixedEtcdCli: unprefixedEtcdCli, + pdHTTPCli: pdHTTPCli, info: getServerInfo(id, serverIDGetter), serverInfoPath: fmt.Sprintf("%s/%s", ServerInformationPath, id), minStartTSPath: fmt.Sprintf("%s/%s", ServerMinStartTSPath, id), @@ -209,14 +214,11 @@ func GlobalInfoSyncerInit( if err != nil { return nil, err } - if pdHTTPCli != nil { - pdHTTPCli = pdHTTPCli.WithRespHandler(pdResponseHandler) - } - is.labelRuleManager = initLabelRuleManager(pdHTTPCli) - is.placementManager = initPlacementManager(pdHTTPCli) - is.scheduleManager = initScheduleManager(etcdCli) - is.tiflashReplicaManager = initTiFlashReplicaManager(etcdCli, codec) - is.resourceManagerClient = initResourceManagerClient(pdCli) + is.initLabelRuleManager() + is.initPlacementManager() + is.initScheduleManager() + is.initTiFlashReplicaManager(codec) + is.initResourceManagerClient(pdCli) setGlobalInfoSyncer(is) return is, nil } @@ -247,22 +249,24 @@ func (is *InfoSyncer) GetSessionManager() util2.SessionManager { return is.managerMu.SessionManager } -func initLabelRuleManager(pdHTTPCli pdhttp.Client) LabelRuleManager { - if pdHTTPCli == nil { - return &mockLabelManager{labelRules: map[string][]byte{}} +func (is *InfoSyncer) initLabelRuleManager() { + if is.pdHTTPCli == nil { + is.labelRuleManager = &mockLabelManager{labelRules: map[string][]byte{}} + return } - return &PDLabelManager{pdHTTPCli} + is.labelRuleManager = &PDLabelManager{is.pdHTTPCli} } -func initPlacementManager(pdHTTPCli pdhttp.Client) PlacementManager { - if pdHTTPCli == nil { - return &mockPlacementManager{} +func (is *InfoSyncer) initPlacementManager() { + if is.pdHTTPCli == nil { + is.placementManager = &mockPlacementManager{} + return } - return &PDPlacementManager{pdHTTPCli} + is.placementManager = &PDPlacementManager{is.pdHTTPCli} } -func initResourceManagerClient(pdCli pd.Client) (cli pd.ResourceManagerClient) { - cli = pdCli +func (is *InfoSyncer) initResourceManagerClient(pdCli pd.Client) { + var cli pd.ResourceManagerClient = pdCli if pdCli == nil { cli = NewMockResourceManagerClient() } @@ -296,23 +300,24 @@ func initResourceManagerClient(pdCli pd.Client) (cli pd.ResourceManagerClient) { } } }) - return + is.resourceManagerClient = cli } -func initTiFlashReplicaManager(etcdCli *clientv3.Client, codec tikv.Codec) TiFlashReplicaManager { - if etcdCli == nil { - m := mockTiFlashReplicaManagerCtx{tiflashProgressCache: make(map[int64]float64)} - return &m +func (is *InfoSyncer) initTiFlashReplicaManager(codec tikv.Codec) { + if is.pdHTTPCli == nil { + is.tiflashReplicaManager = &mockTiFlashReplicaManagerCtx{tiflashProgressCache: make(map[int64]float64)} + return } - logutil.BgLogger().Warn("init TiFlashReplicaManager", zap.Strings("pd addrs", etcdCli.Endpoints())) - return &TiFlashReplicaManagerCtx{etcdCli: etcdCli, tiflashProgressCache: make(map[int64]float64), codec: codec} + logutil.BgLogger().Warn("init TiFlashReplicaManager") + is.tiflashReplicaManager = &TiFlashReplicaManagerCtx{pdHTTPCli: is.pdHTTPCli, tiflashProgressCache: make(map[int64]float64), codec: codec} } -func initScheduleManager(etcdCli *clientv3.Client) ScheduleManager { - if etcdCli == nil { - return &mockScheduleManager{} +func (is *InfoSyncer) initScheduleManager() { + if is.pdHTTPCli == nil { + is.scheduleManager = &mockScheduleManager{} + return } - return &PDScheduleManager{etcdCli: etcdCli} + is.scheduleManager = &PDScheduleManager{is.pdHTTPCli} } // GetMockTiFlash can only be used in tests to get MockTiFlash @@ -450,7 +455,7 @@ func MustGetTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores *m } // pdResponseHandler will be injected into the PD HTTP client to handle the response, -// this is to maintain consistency with the logic in the `doRequest`. +// this is to maintain consistency with the original logic without the PD HTTP client. func pdResponseHandler(resp *http.Response, res interface{}) error { defer func() { terror.Log(resp.Body.Close()) }() bodyBytes, err := io.ReadAll(resp.Body) @@ -475,77 +480,6 @@ func pdResponseHandler(resp *http.Response, res interface{}) error { return nil } -// TODO: replace with the unified PD HTTP client. -func doRequest(ctx context.Context, apiName string, addrs []string, route, method string, body io.Reader) ([]byte, error) { - var ( - err error - req *http.Request - res *http.Response - ) - for idx, addr := range addrs { - url := util2.ComposeURL(addr, route) - req, err = http.NewRequestWithContext(ctx, method, url, body) - if err != nil { - return nil, err - } - if body != nil { - req.Header.Set("Content-Type", "application/json") - } - start := time.Now() - res, err = doRequestWithFailpoint(req) - if err == nil { - metrics.PDAPIExecutionHistogram.WithLabelValues(apiName).Observe(time.Since(start).Seconds()) - metrics.PDAPIRequestCounter.WithLabelValues(apiName, res.Status).Inc() - bodyBytes, err := io.ReadAll(res.Body) - if err != nil { - terror.Log(res.Body.Close()) - return nil, err - } - if res.StatusCode != http.StatusOK { - logutil.BgLogger().Warn("response not 200", - zap.String("method", method), - zap.String("hosts", addr), - zap.String("url", url), - zap.Int("http status", res.StatusCode), - zap.Int("address order", idx), - ) - err = ErrHTTPServiceError.FastGen("%s", bodyBytes) - if res.StatusCode == http.StatusNotFound || res.StatusCode == http.StatusPreconditionFailed { - err = nil - bodyBytes = nil - } - } - terror.Log(res.Body.Close()) - return bodyBytes, err - } - metrics.PDAPIRequestCounter.WithLabelValues(apiName, "network error").Inc() - logutil.BgLogger().Warn("fail to doRequest", - zap.Error(err), - zap.Bool("retry next address", idx == len(addrs)-1), - zap.String("method", method), - zap.String("hosts", addr), - zap.String("url", url), - zap.Int("address order", idx), - ) - } - return nil, err -} - -func doRequestWithFailpoint(req *http.Request) (resp *http.Response, err error) { - fpEnabled := false - failpoint.Inject("FailPlacement", func(val failpoint.Value) { - if val.(bool) { - fpEnabled = true - resp = &http.Response{StatusCode: http.StatusNotFound, Body: http.NoBody} - err = nil - } - }) - if fpEnabled { - return - } - return util2.InternalHTTPClient().Do(req) -} - // GetAllRuleBundles is used to get all rule bundles from PD It is used to load full rules from PD while fullload infoschema. func GetAllRuleBundles(ctx context.Context) ([]*placement.Bundle, error) { is, err := getGlobalInfoSyncer() @@ -1177,13 +1111,13 @@ func SetTiFlashGroupConfig(ctx context.Context) error { // SetTiFlashPlacementRule is a helper function to set placement rule. // It is discouraged to use SetTiFlashPlacementRule directly, // use `ConfigureTiFlashPDForTable`/`ConfigureTiFlashPDForPartitions` instead. -func SetTiFlashPlacementRule(ctx context.Context, rule placement.TiFlashRule) error { +func SetTiFlashPlacementRule(ctx context.Context, rule pdhttp.Rule) error { is, err := getGlobalInfoSyncer() if err != nil { return errors.Trace(err) } logutil.BgLogger().Info("SetTiFlashPlacementRule", zap.String("ruleID", rule.ID)) - return is.tiflashReplicaManager.SetPlacementRule(ctx, rule) + return is.tiflashReplicaManager.SetPlacementRule(ctx, &rule) } // DeleteTiFlashPlacementRule is to delete placement rule for certain group. @@ -1197,7 +1131,7 @@ func DeleteTiFlashPlacementRule(ctx context.Context, group string, ruleID string } // GetTiFlashGroupRules to get all placement rule in a certain group. -func GetTiFlashGroupRules(ctx context.Context, group string) ([]placement.TiFlashRule, error) { +func GetTiFlashGroupRules(ctx context.Context, group string) ([]*pdhttp.Rule, error) { is, err := getGlobalInfoSyncer() if err != nil { return nil, errors.Trace(err) @@ -1241,7 +1175,7 @@ func ConfigureTiFlashPDForTable(id int64, count uint64, locationLabels *[]string ctx := context.Background() logutil.BgLogger().Info("ConfigureTiFlashPDForTable", zap.Int64("tableID", id), zap.Uint64("count", count)) ruleNew := MakeNewRule(id, count, *locationLabels) - if e := is.tiflashReplicaManager.SetPlacementRule(ctx, ruleNew); e != nil { + if e := is.tiflashReplicaManager.SetPlacementRule(ctx, &ruleNew); e != nil { return errors.Trace(e) } return nil @@ -1254,12 +1188,12 @@ func ConfigureTiFlashPDForPartitions(accel bool, definitions *[]model.PartitionD return errors.Trace(err) } ctx := context.Background() - rules := make([]placement.TiFlashRule, 0, len(*definitions)) + rules := make([]*pdhttp.Rule, 0, len(*definitions)) pids := make([]int64, 0, len(*definitions)) for _, p := range *definitions { logutil.BgLogger().Info("ConfigureTiFlashPDForPartitions", zap.Int64("tableID", tableID), zap.Int64("partID", p.ID), zap.Bool("accel", accel), zap.Uint64("count", count)) ruleNew := MakeNewRule(p.ID, count, *locationLabels) - rules = append(rules, ruleNew) + rules = append(rules, &ruleNew) pids = append(pids, p.ID) } if e := is.tiflashReplicaManager.SetPlacementRuleBatch(ctx, rules); e != nil { @@ -1325,7 +1259,7 @@ func GetPDScheduleConfig(ctx context.Context) (map[string]interface{}, error) { if err != nil { return nil, errors.Trace(err) } - return is.scheduleManager.GetPDScheduleConfig(ctx) + return is.scheduleManager.GetScheduleConfig(ctx) } // SetPDScheduleConfig sets the schedule information for pd @@ -1334,7 +1268,7 @@ func SetPDScheduleConfig(ctx context.Context, config map[string]interface{}) err if err != nil { return errors.Trace(err) } - return is.scheduleManager.SetPDScheduleConfig(ctx, config) + return is.scheduleManager.SetScheduleConfig(ctx, config) } // TiProxyServerInfo is the server info for TiProxy. diff --git a/pkg/domain/infosync/info_test.go b/pkg/domain/infosync/info_test.go index 79c8ebf37e0a6..957ac2986a018 100644 --- a/pkg/domain/infosync/info_test.go +++ b/pkg/domain/infosync/info_test.go @@ -279,43 +279,3 @@ func TestTiFlashManager(t *testing.T) { CloseTiFlashManager(ctx) } - -func TestRuleOp(t *testing.T) { - rule := MakeNewRule(1, 2, []string{"a"}) - ruleOp := RuleOp{ - TiFlashRule: &rule, - Action: RuleOpAdd, - DeleteByIDPrefix: false, - } - j, err := json.Marshal(&ruleOp) - require.NoError(t, err) - ruleOpExpect := &RuleOp{} - json.Unmarshal(j, ruleOpExpect) - require.Equal(t, ruleOp.Action, ruleOpExpect.Action) - require.Equal(t, *ruleOp.TiFlashRule, *ruleOpExpect.TiFlashRule) - ruleOps := make([]RuleOp, 0, 2) - for i := 0; i < 10; i += 2 { - rule := MakeNewRule(int64(i), 2, []string{"a"}) - ruleOps = append(ruleOps, RuleOp{ - TiFlashRule: &rule, - Action: RuleOpAdd, - DeleteByIDPrefix: false, - }) - } - for i := 1; i < 10; i += 2 { - rule := MakeNewRule(int64(i), 2, []string{"b"}) - ruleOps = append(ruleOps, RuleOp{ - TiFlashRule: &rule, - Action: RuleOpDel, - DeleteByIDPrefix: false, - }) - } - j, err = json.Marshal(ruleOps) - require.NoError(t, err) - var ruleOpsExpect []RuleOp - json.Unmarshal(j, &ruleOpsExpect) - for i := 0; i < len(ruleOps); i++ { - require.Equal(t, ruleOps[i].Action, ruleOpsExpect[i].Action) - require.Equal(t, *ruleOps[i].TiFlashRule, *ruleOpsExpect[i].TiFlashRule) - } -} diff --git a/pkg/domain/infosync/region.go b/pkg/domain/infosync/region.go index 17208ecb6e153..d31b8e415d1ad 100644 --- a/pkg/domain/infosync/region.go +++ b/pkg/domain/infosync/region.go @@ -16,11 +16,7 @@ package infosync import ( "context" - "encoding/hex" - "encoding/json" - "fmt" - "github.com/pingcap/errors" pd "github.com/tikv/pd/client/http" ) @@ -55,32 +51,21 @@ func GetReplicationState(ctx context.Context, startKey []byte, endKey []byte) (P if err != nil { return PlacementScheduleStatePending, err } - - if is.etcdCli == nil { + if is.pdHTTPCli == nil { return PlacementScheduleStatePending, nil } - - addrs := is.etcdCli.Endpoints() - - if len(addrs) == 0 { - return PlacementScheduleStatePending, errors.Errorf("pd unavailable") + state, err := is.pdHTTPCli.GetRegionsReplicatedStateByKeyRange(ctx, pd.NewKeyRange(startKey, endKey)) + if err != nil || len(state) == 0 { + return PlacementScheduleStatePending, err } - - res, err := doRequest(ctx, "GetReplicationState", addrs, fmt.Sprintf("%s/replicated?startKey=%s&endKey=%s", pd.Regions, hex.EncodeToString(startKey), hex.EncodeToString(endKey)), "GET", nil) - if err == nil && res != nil { - st := PlacementScheduleStatePending - // it should not fail - var state string - _ = json.Unmarshal(res, &state) - switch state { - case "REPLICATED": - st = PlacementScheduleStateScheduled - case "INPROGRESS": - st = PlacementScheduleStateInProgress - case "PENDING": - st = PlacementScheduleStatePending - } - return st, nil + st := PlacementScheduleStatePending + switch state { + case "REPLICATED": + st = PlacementScheduleStateScheduled + case "INPROGRESS": + st = PlacementScheduleStateInProgress + case "PENDING": + st = PlacementScheduleStatePending } - return PlacementScheduleStatePending, err + return st, nil } diff --git a/pkg/domain/infosync/schedule_manager.go b/pkg/domain/infosync/schedule_manager.go index 1578007deb4d7..eef4b2c1bf996 100644 --- a/pkg/domain/infosync/schedule_manager.go +++ b/pkg/domain/infosync/schedule_manager.go @@ -15,56 +15,21 @@ package infosync import ( - "bytes" "context" - "encoding/json" - "path" "sync" - "github.com/pingcap/errors" pd "github.com/tikv/pd/client/http" - clientv3 "go.etcd.io/etcd/client/v3" ) // ScheduleManager manages schedule configs type ScheduleManager interface { - GetPDScheduleConfig(ctx context.Context) (map[string]interface{}, error) - SetPDScheduleConfig(ctx context.Context, config map[string]interface{}) error + GetScheduleConfig(ctx context.Context) (map[string]interface{}, error) + SetScheduleConfig(ctx context.Context, config map[string]interface{}) error } // PDScheduleManager manages schedule with pd type PDScheduleManager struct { - etcdCli *clientv3.Client -} - -// GetPDScheduleConfig get schedule config from pd -func (sm *PDScheduleManager) GetPDScheduleConfig(ctx context.Context) (map[string]interface{}, error) { - ret, err := doRequest(ctx, "GetPDSchedule", sm.etcdCli.Endpoints(), path.Join(pd.Config, "schedule"), "GET", nil) - if err != nil { - return nil, errors.Trace(err) - } - - var schedule map[string]interface{} - if err = json.Unmarshal(ret, &schedule); err != nil { - return nil, errors.Trace(err) - } - - return schedule, nil -} - -// SetPDScheduleConfig set schedule config to pd -func (sm *PDScheduleManager) SetPDScheduleConfig(ctx context.Context, config map[string]interface{}) error { - configJSON, err := json.Marshal(config) - if err != nil { - return err - } - - _, err = doRequest(ctx, "SetPDSchedule", sm.etcdCli.Endpoints(), path.Join(pd.Config, "schedule"), "POST", bytes.NewReader(configJSON)) - if err != nil { - return errors.Trace(err) - } - - return nil + pd.Client } type mockScheduleManager struct { @@ -72,8 +37,8 @@ type mockScheduleManager struct { schedules map[string]interface{} } -// GetPDScheduleConfig get schedule config from schedules map -func (mm *mockScheduleManager) GetPDScheduleConfig(ctx context.Context) (map[string]interface{}, error) { +// GetScheduleConfig get schedule config from schedules map +func (mm *mockScheduleManager) GetScheduleConfig(ctx context.Context) (map[string]interface{}, error) { mm.Lock() schedules := make(map[string]interface{}) @@ -85,8 +50,8 @@ func (mm *mockScheduleManager) GetPDScheduleConfig(ctx context.Context) (map[str return schedules, nil } -// SetPDScheduleConfig set schedule config to schedules map -func (mm *mockScheduleManager) SetPDScheduleConfig(ctx context.Context, config map[string]interface{}) error { +// SetScheduleConfig set schedule config to schedules map +func (mm *mockScheduleManager) SetScheduleConfig(ctx context.Context, config map[string]interface{}) error { mm.Lock() if mm.schedules == nil { diff --git a/pkg/domain/infosync/tiflash_manager.go b/pkg/domain/infosync/tiflash_manager.go index f4ea36b21c63a..78517fb5e6955 100644 --- a/pkg/domain/infosync/tiflash_manager.go +++ b/pkg/domain/infosync/tiflash_manager.go @@ -18,11 +18,9 @@ import ( "bytes" "context" "encoding/hex" - "encoding/json" "fmt" "net/http" "net/http/httptest" - "path" "strconv" "strings" "sync" @@ -40,7 +38,6 @@ import ( "github.com/pingcap/tidb/pkg/util/syncutil" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client/http" - clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -54,13 +51,13 @@ type TiFlashReplicaManager interface { // SetTiFlashGroupConfig sets the group index of the tiflash placement rule SetTiFlashGroupConfig(ctx context.Context) error // SetPlacementRule is a helper function to set placement rule. - SetPlacementRule(ctx context.Context, rule placement.TiFlashRule) error + SetPlacementRule(ctx context.Context, rule *pd.Rule) error // SetPlacementRuleBatch is a helper function to set a batch of placement rules. - SetPlacementRuleBatch(ctx context.Context, rules []placement.TiFlashRule) error + SetPlacementRuleBatch(ctx context.Context, rules []*pd.Rule) error // DeletePlacementRule is to delete placement rule for certain group. DeletePlacementRule(ctx context.Context, group string, ruleID string) error // GetGroupRules to get all placement rule in a certain group. - GetGroupRules(ctx context.Context, group string) ([]placement.TiFlashRule, error) + GetGroupRules(ctx context.Context, group string) ([]*pd.Rule, error) // PostAccelerateScheduleBatch sends `regions/accelerate-schedule/batch` request. PostAccelerateScheduleBatch(ctx context.Context, tableIDs []int64) error // GetRegionCountFromPD is a helper function calling `/stats/region`. @@ -83,7 +80,7 @@ type TiFlashReplicaManager interface { // TiFlashReplicaManagerCtx manages placement with pd and replica progress for TiFlash. type TiFlashReplicaManagerCtx struct { - etcdCli *clientv3.Client + pdHTTPCli pd.Client sync.RWMutex // protect tiflashProgressCache tiflashProgressCache map[int64]float64 codec tikv.Codec @@ -151,10 +148,9 @@ func calculateTiFlashProgress(keyspaceID tikv.KeyspaceID, tableID int64, replica return progress, nil } -func encodeRule(c tikv.Codec, rule *placement.TiFlashRule) placement.TiFlashRule { +func encodeRule(c tikv.Codec, rule *pd.Rule) { rule.StartKey, rule.EndKey = c.EncodeRange(rule.StartKey, rule.EndKey) rule.ID = encodeRuleID(c, rule.ID) - return *rule } // encodeRule encodes the rule ID by the following way: @@ -204,259 +200,77 @@ func (m *TiFlashReplicaManagerCtx) CleanTiFlashProgressCache() { // SetTiFlashGroupConfig sets the tiflash's rule group config func (m *TiFlashReplicaManagerCtx) SetTiFlashGroupConfig(ctx context.Context) error { - res, err := doRequest(ctx, - "GetRuleGroupConfig", - m.etcdCli.Endpoints(), - path.Join(pd.Config, "rule_group", placement.TiFlashRuleGroupID), - "GET", - nil, - ) - + groupConfig, err := m.pdHTTPCli.GetPlacementRuleGroupByID(ctx, placement.TiFlashRuleGroupID) if err != nil { return errors.Trace(err) } - - var groupConfig placement.RuleGroupConfig - shouldUpdate := res == nil - if res != nil { - if err = json.Unmarshal(res, &groupConfig); err != nil { - return errors.Trace(err) - } - - if groupConfig.Index != placement.RuleIndexTiFlash || groupConfig.Override { - shouldUpdate = true - } + if groupConfig != nil && groupConfig.Index == placement.RuleIndexTiFlash && !groupConfig.Override { + return nil } - - if shouldUpdate { - groupConfig.ID = placement.TiFlashRuleGroupID - groupConfig.Index = placement.RuleIndexTiFlash - groupConfig.Override = false - - body, err := json.Marshal(&groupConfig) - if err != nil { - return errors.Trace(err) - } - - _, err = doRequest(ctx, - "SetRuleGroupConfig", - m.etcdCli.Endpoints(), - path.Join(pd.Config, "rule_group"), - "POST", - bytes.NewBuffer(body), - ) - - if err != nil { - return errors.Trace(err) - } + groupConfig = &pd.RuleGroup{ + ID: placement.TiFlashRuleGroupID, + Index: placement.RuleIndexTiFlash, + Override: false, } - return nil + return m.pdHTTPCli.SetPlacementRuleGroup(ctx, groupConfig) } // SetPlacementRule is a helper function to set placement rule. -func (m *TiFlashReplicaManagerCtx) SetPlacementRule(ctx context.Context, rule placement.TiFlashRule) error { - r := encodeRule(m.codec, &rule) - return m.doSetPlacementRule(ctx, r) +func (m *TiFlashReplicaManagerCtx) SetPlacementRule(ctx context.Context, rule *pd.Rule) error { + encodeRule(m.codec, rule) + return m.doSetPlacementRule(ctx, rule) } -func (m *TiFlashReplicaManagerCtx) doSetPlacementRule(ctx context.Context, rule placement.TiFlashRule) error { +func (m *TiFlashReplicaManagerCtx) doSetPlacementRule(ctx context.Context, rule *pd.Rule) error { if err := m.SetTiFlashGroupConfig(ctx); err != nil { return err } - if rule.Count == 0 { - return m.doDeletePlacementRule(ctx, rule.GroupID, rule.ID) - } - - j, err := rule.MarshalJSON() - if err != nil { - return errors.Trace(err) - } - buf := bytes.NewBuffer(j) - res, err := doRequest(ctx, "SetPlacementRule", m.etcdCli.Endpoints(), path.Join(pd.Config, "rule"), "POST", buf) - if err != nil { - return errors.Trace(err) + return m.pdHTTPCli.DeletePlacementRule(ctx, rule.GroupID, rule.ID) } - if res == nil { - return fmt.Errorf("TiFlashReplicaManagerCtx returns error in SetPlacementRule") - } - return nil + return m.pdHTTPCli.SetPlacementRule(ctx, rule) } // SetPlacementRuleBatch is a helper function to set a batch of placement rules. -func (m *TiFlashReplicaManagerCtx) SetPlacementRuleBatch(ctx context.Context, rules []placement.TiFlashRule) error { - r := make([]placement.TiFlashRule, 0, len(rules)) +func (m *TiFlashReplicaManagerCtx) SetPlacementRuleBatch(ctx context.Context, rules []*pd.Rule) error { + r := make([]*pd.Rule, 0, len(rules)) for _, rule := range rules { - r = append(r, encodeRule(m.codec, &rule)) + encodeRule(m.codec, rule) + r = append(r, rule) } return m.doSetPlacementRuleBatch(ctx, r) } -// RuleOpType indicates the operation type -type RuleOpType string - -const ( - // RuleOpAdd a placement rule, only need to specify the field *Rule - RuleOpAdd RuleOpType = "add" - // RuleOpDel a placement rule, only need to specify the field `GroupID`, `ID`, `MatchID` - RuleOpDel RuleOpType = "del" -) - -// RuleOp is for batching placement rule actions. The action type is -// distinguished by the field `Action`. -type RuleOp struct { - *placement.TiFlashRule // information of the placement rule to add/delete the operation type - Action RuleOpType - DeleteByIDPrefix bool // if action == delete, delete by the prefix of id -} - -var _ json.Marshaler = (*RuleOp)(nil) -var _ json.Unmarshaler = (*RuleOp)(nil) - -type ruleOp struct { - GroupID string `json:"group_id"` - ID string `json:"id"` - Index int `json:"index,omitempty"` - Override bool `json:"override,omitempty"` - Role pd.PeerRoleType `json:"role"` - Count int `json:"count"` - Constraints []pd.LabelConstraint `json:"label_constraints,omitempty"` - LocationLabels []string `json:"location_labels,omitempty"` - IsolationLevel string `json:"isolation_level,omitempty"` - StartKeyHex string `json:"start_key"` - EndKeyHex string `json:"end_key"` - Action RuleOpType `json:"action"` - DeleteByIDPrefix bool `json:"delete_by_id_prefix"` -} - -// MarshalJSON implements json.Marshaler interface for RuleOp. -func (r *RuleOp) MarshalJSON() ([]byte, error) { - return json.Marshal(&ruleOp{ - GroupID: r.GroupID, - ID: r.ID, - Index: r.Index, - Override: r.Override, - Role: r.Role, - Count: r.Count, - Constraints: r.Constraints, - LocationLabels: r.LocationLabels, - IsolationLevel: r.IsolationLevel, - StartKeyHex: hex.EncodeToString(codec.EncodeBytes(nil, r.StartKey)), - EndKeyHex: hex.EncodeToString(codec.EncodeBytes(nil, r.EndKey)), - Action: r.Action, - DeleteByIDPrefix: r.DeleteByIDPrefix, - }) -} - -// UnmarshalJSON implements json.Unmarshaler interface for RuleOp. -func (r *RuleOp) UnmarshalJSON(bytes []byte) error { - var rule ruleOp - if err := json.Unmarshal(bytes, &rule); err != nil { - return err - } - *r = RuleOp{ - TiFlashRule: &placement.TiFlashRule{ - GroupID: rule.GroupID, - ID: rule.ID, - Index: rule.Index, - Override: rule.Override, - Role: rule.Role, - Count: rule.Count, - Constraints: rule.Constraints, - LocationLabels: rule.LocationLabels, - IsolationLevel: rule.IsolationLevel, - }, - Action: rule.Action, - DeleteByIDPrefix: rule.DeleteByIDPrefix, - } - - startKey, err := hex.DecodeString(rule.StartKeyHex) - if err != nil { - return err - } - - endKey, err := hex.DecodeString(rule.EndKeyHex) - if err != nil { - return err - } - - _, r.StartKey, err = codec.DecodeBytes(startKey, nil) - if err != nil { - return err - } - - _, r.EndKey, err = codec.DecodeBytes(endKey, nil) - - return err -} - -func (m *TiFlashReplicaManagerCtx) doSetPlacementRuleBatch(ctx context.Context, rules []placement.TiFlashRule) error { +func (m *TiFlashReplicaManagerCtx) doSetPlacementRuleBatch(ctx context.Context, rules []*pd.Rule) error { if err := m.SetTiFlashGroupConfig(ctx); err != nil { return err } - ruleOps := make([]RuleOp, 0, len(rules)) + ruleOps := make([]*pd.RuleOp, 0, len(rules)) for i, r := range rules { if r.Count == 0 { - ruleOps = append(ruleOps, RuleOp{ - TiFlashRule: &rules[i], - Action: RuleOpDel, + ruleOps = append(ruleOps, &pd.RuleOp{ + Rule: rules[i], + Action: pd.RuleOpDel, }) } else { - ruleOps = append(ruleOps, RuleOp{ - TiFlashRule: &rules[i], - Action: RuleOpAdd, + ruleOps = append(ruleOps, &pd.RuleOp{ + Rule: rules[i], + Action: pd.RuleOpAdd, }) } } - j, err := json.Marshal(ruleOps) - if err != nil { - return errors.Trace(err) - } - buf := bytes.NewBuffer(j) - res, err := doRequest(ctx, "SetPlacementRuleBatch", m.etcdCli.Endpoints(), path.Join(pd.Config, "rules", "batch"), "POST", buf) - if err != nil { - return errors.Trace(err) - } - if res == nil { - return fmt.Errorf("TiFlashReplicaManagerCtx returns error in SetPlacementRuleBatch") - } - return nil + return m.pdHTTPCli.SetPlacementRuleInBatch(ctx, ruleOps) } // DeletePlacementRule is to delete placement rule for certain group. func (m *TiFlashReplicaManagerCtx) DeletePlacementRule(ctx context.Context, group string, ruleID string) error { ruleID = encodeRuleID(m.codec, ruleID) - return m.doDeletePlacementRule(ctx, group, ruleID) -} - -func (m *TiFlashReplicaManagerCtx) doDeletePlacementRule(ctx context.Context, group string, ruleID string) error { - res, err := doRequest(ctx, "DeletePlacementRule", m.etcdCli.Endpoints(), path.Join(pd.Config, "rule", group, ruleID), "DELETE", nil) - if err != nil { - return errors.Trace(err) - } - if res == nil { - return fmt.Errorf("TiFlashReplicaManagerCtx returns error in DeletePlacementRule") - } - return nil + return m.pdHTTPCli.DeletePlacementRule(ctx, group, ruleID) } // GetGroupRules to get all placement rule in a certain group. -func (m *TiFlashReplicaManagerCtx) GetGroupRules(ctx context.Context, group string) ([]placement.TiFlashRule, error) { - res, err := doRequest(ctx, "GetGroupRules", m.etcdCli.Endpoints(), path.Join(pd.Config, "rules", "group", group), "GET", nil) - if err != nil { - return nil, errors.Trace(err) - } - if res == nil { - return nil, fmt.Errorf("TiFlashReplicaManagerCtx returns error in GetGroupRules") - } - - var rules []placement.TiFlashRule - err = json.Unmarshal(res, &rules) - if err != nil { - return nil, errors.Trace(err) - } - - return rules, nil +func (m *TiFlashReplicaManagerCtx) GetGroupRules(ctx context.Context, group string) ([]*pd.Rule, error) { + return m.pdHTTPCli.GetPlacementRulesByGroup(ctx, group) } // PostAccelerateScheduleBatch sends `regions/batch-accelerate-schedule` request. @@ -464,29 +278,14 @@ func (m *TiFlashReplicaManagerCtx) PostAccelerateScheduleBatch(ctx context.Conte if len(tableIDs) == 0 { return nil } - input := make([]map[string]string, 0, len(tableIDs)) + input := make([]*pd.KeyRange, 0, len(tableIDs)) for _, tableID := range tableIDs { startKey := tablecodec.GenTableRecordPrefix(tableID) endKey := tablecodec.EncodeTablePrefix(tableID + 1) startKey, endKey = m.codec.EncodeRegionRange(startKey, endKey) - input = append(input, map[string]string{ - "start_key": hex.EncodeToString(startKey), - "end_key": hex.EncodeToString(endKey), - }) - } - j, err := json.Marshal(input) - if err != nil { - return errors.Trace(err) - } - buf := bytes.NewBuffer(j) - res, err := doRequest(ctx, "PostAccelerateScheduleBatch", m.etcdCli.Endpoints(), path.Join(pd.Regions, "accelerate-schedule", "batch"), http.MethodPost, buf) - if err != nil { - return errors.Trace(err) - } - if res == nil { - return fmt.Errorf("TiFlashReplicaManagerCtx returns error in PostAccelerateScheduleBatch") + input = append(input, pd.NewKeyRange(startKey, endKey)) } - return nil + return m.pdHTTPCli.AccelerateScheduleInBatch(ctx, input) } // GetRegionCountFromPD is a helper function calling `/stats/region`. @@ -494,19 +293,9 @@ func (m *TiFlashReplicaManagerCtx) GetRegionCountFromPD(ctx context.Context, tab startKey := tablecodec.GenTableRecordPrefix(tableID) endKey := tablecodec.EncodeTablePrefix(tableID + 1) startKey, endKey = m.codec.EncodeRegionRange(startKey, endKey) - - p := fmt.Sprintf("%s&count", pd.RegionStatsByKeyRange(pd.NewKeyRange(startKey, endKey))) - res, err := doRequest(ctx, "GetPDRegionStats", m.etcdCli.Endpoints(), p, "GET", nil) - if err != nil { - return errors.Trace(err) - } - if res == nil { - return fmt.Errorf("TiFlashReplicaManagerCtx returns error in GetRegionCountFromPD") - } - var stats pd.RegionStats - err = json.Unmarshal(res, &stats) + stats, err := m.pdHTTPCli.GetRegionStatusByKeyRange(ctx, pd.NewKeyRange(startKey, endKey), true) if err != nil { - return errors.Trace(err) + return err } *regionCount = stats.Count return nil @@ -514,20 +303,7 @@ func (m *TiFlashReplicaManagerCtx) GetRegionCountFromPD(ctx context.Context, tab // GetStoresStat gets the TiKV store information by accessing PD's api. func (m *TiFlashReplicaManagerCtx) GetStoresStat(ctx context.Context) (*pd.StoresInfo, error) { - var storesStat pd.StoresInfo - res, err := doRequest(ctx, "GetStoresStat", m.etcdCli.Endpoints(), pd.Stores, "GET", nil) - if err != nil { - return nil, errors.Trace(err) - } - if res == nil { - return nil, fmt.Errorf("TiFlashReplicaManagerCtx returns error in GetStoresStat") - } - - err = json.Unmarshal(res, &storesStat) - if err != nil { - return nil, errors.Trace(err) - } - return &storesStat, err + return m.pdHTTPCli.GetStores(ctx) } type mockTiFlashReplicaManagerCtx struct { @@ -538,15 +314,15 @@ type mockTiFlashReplicaManagerCtx struct { tiflashProgressCache map[int64]float64 } -func makeBaseRule() placement.TiFlashRule { - return placement.TiFlashRule{ +func makeBaseRule() pd.Rule { + return pd.Rule{ GroupID: placement.TiFlashRuleGroupID, ID: "", Index: placement.RuleIndexTiFlash, Override: false, Role: pd.Learner, Count: 2, - Constraints: []pd.LabelConstraint{ + LabelConstraints: []pd.LabelConstraint{ { Key: "engine", Op: pd.In, @@ -557,7 +333,7 @@ func makeBaseRule() placement.TiFlashRule { } // MakeNewRule creates a pd rule for TiFlash. -func MakeNewRule(id int64, count uint64, locationLabels []string) placement.TiFlashRule { +func MakeNewRule(id int64, count uint64, locationLabels []string) pd.Rule { ruleID := MakeRuleID(id) startKey := tablecodec.GenTableRecordPrefix(id) endKey := tablecodec.EncodeTablePrefix(id + 1) @@ -604,7 +380,7 @@ type MockTiFlash struct { StatusServer *httptest.Server SyncStatus map[int]mockTiFlashTableInfo StoreInfo map[uint64]pd.MetaStore - GlobalTiFlashPlacementRules map[string]placement.TiFlashRule + GlobalTiFlashPlacementRules map[string]*pd.Rule PdEnabled bool TiflashDelay time.Duration StartTime time.Time @@ -666,7 +442,7 @@ func NewMockTiFlash() *MockTiFlash { StatusServer: nil, SyncStatus: make(map[int]mockTiFlashTableInfo), StoreInfo: make(map[uint64]pd.MetaStore), - GlobalTiFlashPlacementRules: make(map[string]placement.TiFlashRule), + GlobalTiFlashPlacementRules: make(map[string]*pd.Rule), PdEnabled: true, TiflashDelay: 0, StartTime: time.Now(), @@ -677,7 +453,7 @@ func NewMockTiFlash() *MockTiFlash { } // HandleSetPlacementRule is mock function for SetTiFlashPlacementRule. -func (tiflash *MockTiFlash) HandleSetPlacementRule(rule placement.TiFlashRule) error { +func (tiflash *MockTiFlash) HandleSetPlacementRule(rule *pd.Rule) error { tiflash.Lock() defer tiflash.Unlock() tiflash.groupIndex = placement.RuleIndexTiFlash @@ -722,7 +498,7 @@ func (tiflash *MockTiFlash) HandleSetPlacementRule(rule placement.TiFlashRule) e } // HandleSetPlacementRuleBatch is mock function for batch SetTiFlashPlacementRule. -func (tiflash *MockTiFlash) HandleSetPlacementRuleBatch(rules []placement.TiFlashRule) error { +func (tiflash *MockTiFlash) HandleSetPlacementRuleBatch(rules []*pd.Rule) error { for _, r := range rules { if err := tiflash.HandleSetPlacementRule(r); err != nil { return err @@ -758,10 +534,10 @@ func (tiflash *MockTiFlash) HandleDeletePlacementRule(group string, ruleID strin } // HandleGetGroupRules is mock function for GetTiFlashGroupRules. -func (tiflash *MockTiFlash) HandleGetGroupRules(group string) ([]placement.TiFlashRule, error) { +func (tiflash *MockTiFlash) HandleGetGroupRules(group string) ([]*pd.Rule, error) { tiflash.Lock() defer tiflash.Unlock() - var result = make([]placement.TiFlashRule, 0) + var result = make([]*pd.Rule, 0) for _, item := range tiflash.GlobalTiFlashPlacementRules { result = append(result, item) } @@ -867,13 +643,13 @@ func (tiflash *MockTiFlash) GetRuleGroupIndex() int { } // Compare supposed rule, and we actually get from TableInfo -func isRuleMatch(rule placement.TiFlashRule, startKey []byte, endKey []byte, count int, labels []string) bool { +func isRuleMatch(rule pd.Rule, startKey []byte, endKey []byte, count int, labels []string) bool { // Compute startKey if !(bytes.Equal(rule.StartKey, startKey) && bytes.Equal(rule.EndKey, endKey)) { return false } ok := false - for _, c := range rule.Constraints { + for _, c := range rule.LabelConstraints { if c.Key == "engine" && len(c.Values) == 1 && c.Values[0] == "tiflash" && c.Op == pd.In { ok = true break @@ -901,7 +677,7 @@ func isRuleMatch(rule placement.TiFlashRule, startKey []byte, endKey []byte, cou } // CheckPlacementRule find if a given rule precisely matches already set rules. -func (tiflash *MockTiFlash) CheckPlacementRule(rule placement.TiFlashRule) bool { +func (tiflash *MockTiFlash) CheckPlacementRule(rule pd.Rule) bool { tiflash.Lock() defer tiflash.Unlock() for _, r := range tiflash.GlobalTiFlashPlacementRules { @@ -913,12 +689,12 @@ func (tiflash *MockTiFlash) CheckPlacementRule(rule placement.TiFlashRule) bool } // GetPlacementRule find a rule by name. -func (tiflash *MockTiFlash) GetPlacementRule(ruleName string) (*placement.TiFlashRule, bool) { +func (tiflash *MockTiFlash) GetPlacementRule(ruleName string) (*pd.Rule, bool) { tiflash.Lock() defer tiflash.Unlock() if r, ok := tiflash.GlobalTiFlashPlacementRules[ruleName]; ok { p := r - return &p, ok + return p, ok } return nil, false } @@ -927,7 +703,7 @@ func (tiflash *MockTiFlash) GetPlacementRule(ruleName string) (*placement.TiFlas func (tiflash *MockTiFlash) CleanPlacementRules() { tiflash.Lock() defer tiflash.Unlock() - tiflash.GlobalTiFlashPlacementRules = make(map[string]placement.TiFlashRule) + tiflash.GlobalTiFlashPlacementRules = make(map[string]*pd.Rule) } // PlacementRulesLen gets length of all currently set placement rules. @@ -1015,7 +791,7 @@ func (m *mockTiFlashReplicaManagerCtx) SetTiFlashGroupConfig(_ context.Context) } // SetPlacementRule is a helper function to set placement rule. -func (m *mockTiFlashReplicaManagerCtx) SetPlacementRule(ctx context.Context, rule placement.TiFlashRule) error { +func (m *mockTiFlashReplicaManagerCtx) SetPlacementRule(ctx context.Context, rule *pd.Rule) error { m.Lock() defer m.Unlock() if m.tiflash == nil { @@ -1025,7 +801,7 @@ func (m *mockTiFlashReplicaManagerCtx) SetPlacementRule(ctx context.Context, rul } // SetPlacementRuleBatch is a helper function to set a batch of placement rules. -func (m *mockTiFlashReplicaManagerCtx) SetPlacementRuleBatch(ctx context.Context, rules []placement.TiFlashRule) error { +func (m *mockTiFlashReplicaManagerCtx) SetPlacementRuleBatch(ctx context.Context, rules []*pd.Rule) error { m.Lock() defer m.Unlock() if m.tiflash == nil { @@ -1047,11 +823,11 @@ func (m *mockTiFlashReplicaManagerCtx) DeletePlacementRule(ctx context.Context, } // GetGroupRules to get all placement rule in a certain group. -func (m *mockTiFlashReplicaManagerCtx) GetGroupRules(ctx context.Context, group string) ([]placement.TiFlashRule, error) { +func (m *mockTiFlashReplicaManagerCtx) GetGroupRules(ctx context.Context, group string) ([]*pd.Rule, error) { m.Lock() defer m.Unlock() if m.tiflash == nil { - return []placement.TiFlashRule{}, nil + return []*pd.Rule{}, nil } return m.tiflash.HandleGetGroupRules(group) } diff --git a/pkg/store/helper/helper.go b/pkg/store/helper/helper.go index 824d93159e5f9..82dd22b131b58 100644 --- a/pkg/store/helper/helper.go +++ b/pkg/store/helper/helper.go @@ -806,7 +806,7 @@ func (h *Helper) GetPDRegionStats(ctx context.Context, tableID int64, noIndexSta startKey = codec.EncodeBytes([]byte{}, startKey) endKey = codec.EncodeBytes([]byte{}, endKey) - return pdCli.GetRegionStatusByKeyRange(ctx, pd.NewKeyRange(startKey, endKey)) + return pdCli.GetRegionStatusByKeyRange(ctx, pd.NewKeyRange(startKey, endKey), false) } // GetTiFlashTableIDFromEndKey computes tableID from pd rule's endKey. diff --git a/pkg/store/mockstore/unistore/pd.go b/pkg/store/mockstore/unistore/pd.go index 04b6bae42ee14..f6325088a1032 100644 --- a/pkg/store/mockstore/unistore/pd.go +++ b/pkg/store/mockstore/unistore/pd.go @@ -165,7 +165,7 @@ func (c *pdClient) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byt return nil, nil } -func (c *pdClient) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string) (*pd.Region, error) { +func (c *pdClient) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...pd.GetRegionOption) (*pd.Region, error) { return nil, nil } diff --git a/pkg/store/mockstore/unistore/tikv/mock_region.go b/pkg/store/mockstore/unistore/tikv/mock_region.go index 41a4da282cd37..709d4c10f2c04 100644 --- a/pkg/store/mockstore/unistore/tikv/mock_region.go +++ b/pkg/store/mockstore/unistore/tikv/mock_region.go @@ -590,7 +590,7 @@ func (rm *MockRegionManager) saveRegions(regions []*regionCtx) error { // Limit limits the maximum number of regions returned. // If a region has no leader, corresponding leader will be placed by a peer // with empty value (PeerID is 0). -func (rm *MockRegionManager) ScanRegions(startKey, endKey []byte, limit int) []*pdclient.Region { +func (rm *MockRegionManager) ScanRegions(startKey, endKey []byte, limit int, _ ...pdclient.GetRegionOption) []*pdclient.Region { rm.mu.RLock() defer rm.mu.RUnlock() @@ -854,8 +854,8 @@ func (pd *MockPD) GetAllStores(ctx context.Context, opts ...pdclient.GetStoreOpt // Limit limits the maximum number of regions returned. // If a region has no leader, corresponding leader will be placed by a peer // with empty value (PeerID is 0). -func (pd *MockPD) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int) ([]*pdclient.Region, error) { - regions := pd.rm.ScanRegions(startKey, endKey, limit) +func (pd *MockPD) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...pdclient.GetRegionOption) ([]*pdclient.Region, error) { + regions := pd.rm.ScanRegions(startKey, endKey, limit, opts...) return regions, nil } diff --git a/pkg/ttl/cache/split_test.go b/pkg/ttl/cache/split_test.go index d1a34a5ea2e5b..298dedc7a563a 100644 --- a/pkg/ttl/cache/split_test.go +++ b/pkg/ttl/cache/split_test.go @@ -62,7 +62,7 @@ type mockPDClient struct { regionsSorted bool } -func (c *mockPDClient) ScanRegions(_ context.Context, key, endKey []byte, limit int) ([]*pd.Region, error) { +func (c *mockPDClient) ScanRegions(_ context.Context, key, endKey []byte, limit int, _ ...pd.GetRegionOption) ([]*pd.Region, error) { if len(c.regions) == 0 { return []*pd.Region{newMockRegion(1, []byte{}, []byte{0xFF, 0xFF})}, nil }