From b499e2503191193e6139cfb4310180c410f41d97 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 12 Dec 2023 16:02:10 +0800 Subject: [PATCH 1/3] http: add more API for lightning's usage, and don't use body io.Reader Signed-off-by: lance6716 --- client/http/client.go | 125 +++++++++++++----- tests/integrations/client/http_client_test.go | 7 + 2 files changed, 98 insertions(+), 34 deletions(-) diff --git a/client/http/client.go b/client/http/client.go index b79aa9ca002..78a97dca8f9 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -54,6 +54,8 @@ type Client interface { GetHistoryHotRegions(context.Context, *HistoryHotRegionsRequest) (*HistoryHotRegions, error) GetRegionStatusByKeyRange(context.Context, *KeyRange, bool) (*RegionStats, error) GetStores(context.Context) (*StoresInfo, error) + GetStore(context.Context, uint64) (*StoreInfo, error) + GetClusterVersion(context.Context) (string, error) /* Config-related interfaces */ GetScheduleConfig(context.Context) (map[string]interface{}, error) SetScheduleConfig(context.Context, map[string]interface{}) error @@ -76,6 +78,8 @@ type Client interface { /* Scheduling-related interfaces */ AccelerateSchedule(context.Context, *KeyRange) error AccelerateScheduleInBatch(context.Context, []*KeyRange) error + GetSchedulers(ctx context.Context) ([]string, error) + PostSchedulerDelay(context.Context, string, int64) error /* Other interfaces */ GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error) @@ -239,7 +243,7 @@ func WithAllowFollowerHandle() HeaderOption { func (c *client) requestWithRetry( ctx context.Context, name, uri, method string, - body io.Reader, res interface{}, + body []byte, res interface{}, headerOpts ...HeaderOption, ) error { var ( @@ -261,7 +265,7 @@ func (c *client) requestWithRetry( func (c *client) request( ctx context.Context, name, url, method string, - body io.Reader, res interface{}, + body []byte, res interface{}, headerOpts ...HeaderOption, ) error { logFields := []zap.Field{ @@ -271,7 +275,7 @@ func (c *client) request( zap.String("caller-id", c.callerID), } log.Debug("[pd] request the http url", logFields...) - req, err := http.NewRequestWithContext(ctx, method, url, body) + req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(body)) if err != nil { log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...) return errors.Trace(err) @@ -333,7 +337,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInf var region RegionInfo err := c.requestWithRetry(ctx, "GetRegionByID", RegionByID(regionID), - http.MethodGet, http.NoBody, ®ion) + http.MethodGet, nil, ®ion) if err != nil { return nil, err } @@ -345,7 +349,7 @@ func (c *client) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, e var region RegionInfo err := c.requestWithRetry(ctx, "GetRegionByKey", RegionByKey(key), - http.MethodGet, http.NoBody, ®ion) + http.MethodGet, nil, ®ion) if err != nil { return nil, err } @@ -357,7 +361,7 @@ func (c *client) GetRegions(ctx context.Context) (*RegionsInfo, error) { var regions RegionsInfo err := c.requestWithRetry(ctx, "GetRegions", Regions, - http.MethodGet, http.NoBody, ®ions) + http.MethodGet, nil, ®ions) if err != nil { return nil, err } @@ -370,7 +374,7 @@ func (c *client) GetRegionsByKeyRange(ctx context.Context, keyRange *KeyRange, l var regions RegionsInfo err := c.requestWithRetry(ctx, "GetRegionsByKeyRange", RegionsByKeyRange(keyRange, limit), - http.MethodGet, http.NoBody, ®ions) + http.MethodGet, nil, ®ions) if err != nil { return nil, err } @@ -382,7 +386,7 @@ func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*Regi var regions RegionsInfo err := c.requestWithRetry(ctx, "GetRegionsByStoreID", RegionsByStoreID(storeID), - http.MethodGet, http.NoBody, ®ions) + http.MethodGet, nil, ®ions) if err != nil { return nil, err } @@ -395,7 +399,7 @@ func (c *client) GetRegionsReplicatedStateByKeyRange(ctx context.Context, keyRan var state string err := c.requestWithRetry(ctx, "GetRegionsReplicatedStateByKeyRange", RegionsReplicatedByKeyRange(keyRange), - http.MethodGet, http.NoBody, &state) + http.MethodGet, nil, &state) if err != nil { return "", err } @@ -407,7 +411,7 @@ func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, er var hotReadRegions StoreHotPeersInfos err := c.requestWithRetry(ctx, "GetHotReadRegions", HotRead, - http.MethodGet, http.NoBody, &hotReadRegions) + http.MethodGet, nil, &hotReadRegions) if err != nil { return nil, err } @@ -419,7 +423,7 @@ func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, e var hotWriteRegions StoreHotPeersInfos err := c.requestWithRetry(ctx, "GetHotWriteRegions", HotWrite, - http.MethodGet, http.NoBody, &hotWriteRegions) + http.MethodGet, nil, &hotWriteRegions) if err != nil { return nil, err } @@ -435,7 +439,7 @@ func (c *client) GetHistoryHotRegions(ctx context.Context, req *HistoryHotRegion var historyHotRegions HistoryHotRegions err = c.requestWithRetry(ctx, "GetHistoryHotRegions", HotHistory, - http.MethodGet, bytes.NewBuffer(reqJSON), &historyHotRegions, + http.MethodGet, reqJSON, &historyHotRegions, WithAllowFollowerHandle()) if err != nil { return nil, err @@ -450,7 +454,7 @@ func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRan var regionStats RegionStats err := c.requestWithRetry(ctx, "GetRegionStatusByKeyRange", RegionStatsByKeyRange(keyRange, onlyCount), - http.MethodGet, http.NoBody, ®ionStats, + http.MethodGet, nil, ®ionStats, ) if err != nil { return nil, err @@ -463,7 +467,7 @@ func (c *client) GetScheduleConfig(ctx context.Context) (map[string]interface{}, var config map[string]interface{} err := c.requestWithRetry(ctx, "GetScheduleConfig", ScheduleConfig, - http.MethodGet, http.NoBody, &config) + http.MethodGet, nil, &config) if err != nil { return nil, err } @@ -478,7 +482,7 @@ func (c *client) SetScheduleConfig(ctx context.Context, config map[string]interf } return c.requestWithRetry(ctx, "SetScheduleConfig", ScheduleConfig, - http.MethodPost, bytes.NewBuffer(configJSON), nil) + http.MethodPost, configJSON, nil) } // GetStores gets the stores info. @@ -486,19 +490,43 @@ func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) { var stores StoresInfo err := c.requestWithRetry(ctx, "GetStores", Stores, - http.MethodGet, http.NoBody, &stores) + http.MethodGet, nil, &stores) if err != nil { return nil, err } return &stores, nil } +// GetStore gets the store info by ID. +func (c *client) GetStore(ctx context.Context, storeID uint64) (*StoreInfo, error) { + var store StoreInfo + err := c.requestWithRetry(ctx, + "GetStore", StoreByID(storeID), + http.MethodGet, nil, &store) + if err != nil { + return nil, err + } + return &store, nil +} + +// GetClusterVersion gets the cluster version. +func (c *client) GetClusterVersion(ctx context.Context) (string, error) { + var version string + err := c.requestWithRetry(ctx, + "GetClusterVersion", ClusterVersion, + http.MethodGet, nil, &version) + if err != nil { + return "", err + } + return version, nil +} + // GetAllPlacementRuleBundles gets all placement rules bundles. func (c *client) GetAllPlacementRuleBundles(ctx context.Context) ([]*GroupBundle, error) { var bundles []*GroupBundle err := c.requestWithRetry(ctx, "GetPlacementRuleBundle", PlacementRuleBundle, - http.MethodGet, http.NoBody, &bundles) + http.MethodGet, nil, &bundles) if err != nil { return nil, err } @@ -510,7 +538,7 @@ func (c *client) GetPlacementRuleBundleByGroup(ctx context.Context, group string var bundle GroupBundle err := c.requestWithRetry(ctx, "GetPlacementRuleBundleByGroup", PlacementRuleBundleByGroup(group), - http.MethodGet, http.NoBody, &bundle) + http.MethodGet, nil, &bundle) if err != nil { return nil, err } @@ -522,7 +550,7 @@ func (c *client) GetPlacementRulesByGroup(ctx context.Context, group string) ([] var rules []*Rule err := c.requestWithRetry(ctx, "GetPlacementRulesByGroup", PlacementRulesByGroup(group), - http.MethodGet, http.NoBody, &rules) + http.MethodGet, nil, &rules) if err != nil { return nil, err } @@ -537,7 +565,7 @@ func (c *client) SetPlacementRule(ctx context.Context, rule *Rule) error { } return c.requestWithRetry(ctx, "SetPlacementRule", PlacementRule, - http.MethodPost, bytes.NewBuffer(ruleJSON), nil) + http.MethodPost, ruleJSON, nil) } // SetPlacementRuleInBatch sets the placement rules in batch. @@ -548,7 +576,7 @@ func (c *client) SetPlacementRuleInBatch(ctx context.Context, ruleOps []*RuleOp) } return c.requestWithRetry(ctx, "SetPlacementRuleInBatch", PlacementRulesInBatch, - http.MethodPost, bytes.NewBuffer(ruleOpsJSON), nil) + http.MethodPost, ruleOpsJSON, nil) } // SetPlacementRuleBundles sets the placement rule bundles. @@ -560,14 +588,14 @@ func (c *client) SetPlacementRuleBundles(ctx context.Context, bundles []*GroupBu } return c.requestWithRetry(ctx, "SetPlacementRuleBundles", PlacementRuleBundleWithPartialParameter(partial), - http.MethodPost, bytes.NewBuffer(bundlesJSON), nil) + http.MethodPost, bundlesJSON, nil) } // DeletePlacementRule deletes the placement rule. func (c *client) DeletePlacementRule(ctx context.Context, group, id string) error { return c.requestWithRetry(ctx, "DeletePlacementRule", PlacementRuleByGroupAndID(group, id), - http.MethodDelete, http.NoBody, nil) + http.MethodDelete, nil, nil) } // GetAllPlacementRuleGroups gets all placement rule groups. @@ -575,7 +603,7 @@ func (c *client) GetAllPlacementRuleGroups(ctx context.Context) ([]*RuleGroup, e var ruleGroups []*RuleGroup err := c.requestWithRetry(ctx, "GetAllPlacementRuleGroups", placementRuleGroups, - http.MethodGet, http.NoBody, &ruleGroups) + http.MethodGet, nil, &ruleGroups) if err != nil { return nil, err } @@ -587,7 +615,7 @@ func (c *client) GetPlacementRuleGroupByID(ctx context.Context, id string) (*Rul var ruleGroup RuleGroup err := c.requestWithRetry(ctx, "GetPlacementRuleGroupByID", PlacementRuleGroupByID(id), - http.MethodGet, http.NoBody, &ruleGroup) + http.MethodGet, nil, &ruleGroup) if err != nil { return nil, err } @@ -602,14 +630,14 @@ func (c *client) SetPlacementRuleGroup(ctx context.Context, ruleGroup *RuleGroup } return c.requestWithRetry(ctx, "SetPlacementRuleGroup", placementRuleGroup, - http.MethodPost, bytes.NewBuffer(ruleGroupJSON), nil) + http.MethodPost, ruleGroupJSON, nil) } // DeletePlacementRuleGroupByID deletes the placement rule group by ID. func (c *client) DeletePlacementRuleGroupByID(ctx context.Context, id string) error { return c.requestWithRetry(ctx, "DeletePlacementRuleGroupByID", PlacementRuleGroupByID(id), - http.MethodDelete, http.NoBody, nil) + http.MethodDelete, nil, nil) } // GetAllRegionLabelRules gets all region label rules. @@ -617,7 +645,7 @@ func (c *client) GetAllRegionLabelRules(ctx context.Context) ([]*LabelRule, erro var labelRules []*LabelRule err := c.requestWithRetry(ctx, "GetAllRegionLabelRules", RegionLabelRules, - http.MethodGet, http.NoBody, &labelRules) + http.MethodGet, nil, &labelRules) if err != nil { return nil, err } @@ -633,7 +661,7 @@ func (c *client) GetRegionLabelRulesByIDs(ctx context.Context, ruleIDs []string) var labelRules []*LabelRule err = c.requestWithRetry(ctx, "GetRegionLabelRulesByIDs", RegionLabelRulesByIDs, - http.MethodGet, bytes.NewBuffer(idsJSON), &labelRules) + http.MethodGet, idsJSON, &labelRules) if err != nil { return nil, err } @@ -648,7 +676,7 @@ func (c *client) SetRegionLabelRule(ctx context.Context, labelRule *LabelRule) e } return c.requestWithRetry(ctx, "SetRegionLabelRule", RegionLabelRule, - http.MethodPost, bytes.NewBuffer(labelRuleJSON), nil) + http.MethodPost, labelRuleJSON, nil) } // PatchRegionLabelRules patches the region label rules. @@ -659,7 +687,7 @@ func (c *client) PatchRegionLabelRules(ctx context.Context, labelRulePatch *Labe } return c.requestWithRetry(ctx, "PatchRegionLabelRules", RegionLabelRules, - http.MethodPatch, bytes.NewBuffer(labelRulePatchJSON), nil) + http.MethodPatch, labelRulePatchJSON, nil) } // AccelerateSchedule accelerates the scheduling of the regions within the given key range. @@ -675,7 +703,7 @@ func (c *client) AccelerateSchedule(ctx context.Context, keyRange *KeyRange) err } return c.requestWithRetry(ctx, "AccelerateSchedule", AccelerateSchedule, - http.MethodPost, bytes.NewBuffer(inputJSON), nil) + http.MethodPost, inputJSON, nil) } // AccelerateScheduleInBatch accelerates the scheduling of the regions within the given key ranges in batch. @@ -695,10 +723,39 @@ func (c *client) AccelerateScheduleInBatch(ctx context.Context, keyRanges []*Key } return c.requestWithRetry(ctx, "AccelerateScheduleInBatch", AccelerateScheduleInBatch, - http.MethodPost, bytes.NewBuffer(inputJSON), nil) + http.MethodPost, inputJSON, nil) +} + +// GetSchedulers gets all scheduler names. +func (c *client) GetSchedulers(ctx context.Context) ([]string, error) { + var schedulers []string + err := c.requestWithRetry(ctx, + "GetSchedulers", Schedulers, + http.MethodGet, nil, &schedulers) + if err != nil { + return nil, err + } + return schedulers, nil +} + +// PostSchedulerDelay changes the delay of given scheduler. +func (c *client) PostSchedulerDelay(ctx context.Context, scheduler string, delaySec int64) error { + m := map[string]int64{ + "delay": delaySec, + } + inputJSON, err := json.Marshal(m) + if err != nil { + return errors.Trace(err) + } + return c.requestWithRetry(ctx, + "PostSchedulerDelay", SchedulerByName(scheduler), + http.MethodPost, inputJSON, nil) } // GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs. +// - When storeIDs has zero length, it will return (cluster-level's min_resolved_ts, nil, nil) when no error. +// - When storeIDs is {"cluster"}, it will return (cluster-level's min_resolved_ts, stores_min_resolved_ts, nil) when no error. +// - When storeID is specified to ID lists, it will return (min_resolved_ts of given stores, stores_min_resolved_ts, nil) when no error. func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) { uri := MinResolvedTSPrefix // scope is an optional parameter, it can be `cluster` or specified store IDs. @@ -720,7 +777,7 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin }{} err := c.requestWithRetry(ctx, "GetMinResolvedTSByStoresIDs", uri, - http.MethodGet, http.NoBody, &resp) + http.MethodGet, nil, &resp) if err != nil { return 0, nil, err } diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 6c636d2a2a1..60e45401877 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -134,6 +134,13 @@ func (suite *httpClientTestSuite) TestMeta() { re.NoError(err) re.Equal(1, store.Count) re.Len(store.Stores, 1) + storeID := uint64(store.Stores[0].Store.ID) // TODO: why type is different? + store2, err := suite.client.GetStore(suite.ctx, storeID) + re.NoError(err) + re.EqualValues(storeID, store2.Store.ID) + version, err := suite.client.GetClusterVersion(suite.ctx) + re.NoError(err) + re.Equal("0.0.0", version) } func (suite *httpClientTestSuite) TestGetMinResolvedTSByStoresIDs() { From 271f9668999fe208f25219a24bb72e5206e0de61 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 13 Dec 2023 14:51:43 +0800 Subject: [PATCH 2/3] add test Signed-off-by: lance6716 --- client/http/client.go | 23 +++++++++---------- tests/integrations/client/http_client_test.go | 4 ++++ 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/client/http/client.go b/client/http/client.go index 3bc815a611d..183217c574f 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -735,6 +735,17 @@ func (c *client) PatchRegionLabelRules(ctx context.Context, labelRulePatch *Labe http.MethodPatch, labelRulePatchJSON, nil) } +// GetSchedulers gets the schedulers from PD cluster. +func (c *client) GetSchedulers(ctx context.Context) ([]string, error) { + var schedulers []string + err := c.requestWithRetry(ctx, "GetSchedulers", Schedulers, + http.MethodGet, nil, &schedulers) + if err != nil { + return nil, err + } + return schedulers, nil +} + // CreateScheduler creates a scheduler to PD cluster. func (c *client) CreateScheduler(ctx context.Context, name string, storeID uint64) error { inputJSON, err := json.Marshal(map[string]interface{}{ @@ -785,18 +796,6 @@ func (c *client) AccelerateScheduleInBatch(ctx context.Context, keyRanges []*Key http.MethodPost, inputJSON, nil) } -// GetSchedulers gets all scheduler names. -func (c *client) GetSchedulers(ctx context.Context) ([]string, error) { - var schedulers []string - err := c.requestWithRetry(ctx, - "GetSchedulers", Schedulers, - http.MethodGet, nil, &schedulers) - if err != nil { - return nil, err - } - return schedulers, nil -} - // PostSchedulerDelay changes the delay of given scheduler. func (c *client) PostSchedulerDelay(ctx context.Context, scheduler string, delaySec int64) error { m := map[string]int64{ diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 8548fe678f5..0563d71650f 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -403,6 +403,10 @@ func (suite *httpClientTestSuite) TestSchedulers() { schedulers, err = suite.client.GetSchedulers(suite.ctx) re.NoError(err) re.Len(schedulers, 1) + err = suite.client.PostSchedulerDelay(suite.ctx, "evict-leader-scheduler", 100) + re.NoError(err) + err = suite.client.PostSchedulerDelay(suite.ctx, "not-exist", 100) + re.ErrorContains(err, "500 Internal Server Error") // TODO: should return friendly error message } func (suite *httpClientTestSuite) TestSetStoreLabels() { From c89be3ac3fc4802951524c2cc2f9e418ad3a943e Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 13 Dec 2023 15:07:59 +0800 Subject: [PATCH 3/3] address comment Signed-off-by: lance6716 --- client/http/client.go | 10 +++++----- tests/integrations/client/http_client_test.go | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/client/http/client.go b/client/http/client.go index 183217c574f..d74c77571d6 100644 --- a/client/http/client.go +++ b/client/http/client.go @@ -44,7 +44,6 @@ const ( // Client is a PD (Placement Driver) HTTP client. type Client interface { /* Meta-related interfaces */ - GetClusterVersion(context.Context) (string, error) GetRegionByID(context.Context, uint64) (*RegionInfo, error) GetRegionByKey(context.Context, []byte) (*RegionInfo, error) GetRegions(context.Context) (*RegionsInfo, error) @@ -64,10 +63,11 @@ type Client interface { /* Config-related interfaces */ GetScheduleConfig(context.Context) (map[string]interface{}, error) SetScheduleConfig(context.Context, map[string]interface{}) error + GetClusterVersion(context.Context) (string, error) /* Scheduler-related interfaces */ GetSchedulers(context.Context) ([]string, error) CreateScheduler(ctx context.Context, name string, storeID uint64) error - PostSchedulerDelay(context.Context, string, int64) error + SetSchedulerDelay(context.Context, string, int64) error /* Rule-related interfaces */ GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error) GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error) @@ -796,8 +796,8 @@ func (c *client) AccelerateScheduleInBatch(ctx context.Context, keyRanges []*Key http.MethodPost, inputJSON, nil) } -// PostSchedulerDelay changes the delay of given scheduler. -func (c *client) PostSchedulerDelay(ctx context.Context, scheduler string, delaySec int64) error { +// SetSchedulerDelay sets the delay of given scheduler. +func (c *client) SetSchedulerDelay(ctx context.Context, scheduler string, delaySec int64) error { m := map[string]int64{ "delay": delaySec, } @@ -806,7 +806,7 @@ func (c *client) PostSchedulerDelay(ctx context.Context, scheduler string, delay return errors.Trace(err) } return c.requestWithRetry(ctx, - "PostSchedulerDelay", SchedulerByName(scheduler), + "SetSchedulerDelay", SchedulerByName(scheduler), http.MethodPost, inputJSON, nil) } diff --git a/tests/integrations/client/http_client_test.go b/tests/integrations/client/http_client_test.go index 0563d71650f..7c8f66f4826 100644 --- a/tests/integrations/client/http_client_test.go +++ b/tests/integrations/client/http_client_test.go @@ -403,9 +403,9 @@ func (suite *httpClientTestSuite) TestSchedulers() { schedulers, err = suite.client.GetSchedulers(suite.ctx) re.NoError(err) re.Len(schedulers, 1) - err = suite.client.PostSchedulerDelay(suite.ctx, "evict-leader-scheduler", 100) + err = suite.client.SetSchedulerDelay(suite.ctx, "evict-leader-scheduler", 100) re.NoError(err) - err = suite.client.PostSchedulerDelay(suite.ctx, "not-exist", 100) + err = suite.client.SetSchedulerDelay(suite.ctx, "not-exist", 100) re.ErrorContains(err, "500 Internal Server Error") // TODO: should return friendly error message }