Skip to content

Commit

Permalink
Merge branch 'master' into follower_forward
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Nov 29, 2023
2 parents d926df2 + 1934450 commit dd5627d
Show file tree
Hide file tree
Showing 42 changed files with 1,309 additions and 450 deletions.
22 changes: 17 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type Client interface {
// client should retry later.
GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error)
// GetRegionFromMember gets a region from certain members.
GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string) (*Region, error)
GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error)
// GetPrevRegion gets the previous region and its leader Peer of the region where the key is located.
GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
Expand All @@ -100,7 +100,7 @@ type Client interface {
// Limit limits the maximum number of regions returned.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*Region, error)
ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error)
// GetStore gets a store from PD by store id.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
Expand Down Expand Up @@ -200,7 +200,8 @@ func WithSkipStoreLimit() RegionsOption {

// GetRegionOp represents available options when getting regions.
type GetRegionOp struct {
needBuckets bool
needBuckets bool
allowFollowerHandle bool
}

// GetRegionOption configures GetRegionOp.
Expand All @@ -211,6 +212,11 @@ func WithBuckets() GetRegionOption {
return func(op *GetRegionOp) { op.needBuckets = true }
}

// WithAllowFollowerHandle means that client can send request to follower and let it handle this request.
func WithAllowFollowerHandle() GetRegionOption {
return func(op *GetRegionOp) { op.allowFollowerHandle = true }
}

// LeaderHealthCheckInterval might be changed in the unit to shorten the testing time.
var LeaderHealthCheckInterval = time.Second

Expand Down Expand Up @@ -701,6 +707,12 @@ func (c *client) UpdateOption(option DynamicOption, value interface{}) error {
return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool")
}
c.option.setEnableTSOFollowerProxy(enable)
case EnableFollowerHandle:
enable, ok := value.(bool)
if !ok {
return errors.New("[pd] invalid value type for EnableFollowerHandle option, it should be bool")
}
c.option.setEnableFollowerHandle(enable)
default:
return errors.New("[pd] unsupported client option")
}
Expand Down Expand Up @@ -954,7 +966,7 @@ func isNetworkError(code codes.Code) bool {
return code == codes.Unavailable || code == codes.DeadlineExceeded
}

func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string) (*Region, error) {
func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -1058,7 +1070,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
return handleRegionResponse(resp), nil
}

func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*Region, error) {
func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down
52 changes: 29 additions & 23 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,20 @@ import (
// The following constants are the paths of PD HTTP APIs.
const (
// Metadata
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
HotHistory = "/pd/api/v1/hotspot/regions/history"
RegionByIDPrefix = "/pd/api/v1/region/id"
regionByKey = "/pd/api/v1/region/key"
Regions = "/pd/api/v1/regions"
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
HotHistory = "/pd/api/v1/hotspot/regions/history"
RegionByIDPrefix = "/pd/api/v1/region/id"
regionByKey = "/pd/api/v1/region/key"
Regions = "/pd/api/v1/regions"
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
AccelerateScheduleInBatch = "/pd/api/v1/regions/accelerate-schedule/batch"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
// Config
Config = "/pd/api/v1/config"
ClusterVersion = "/pd/api/v1/config/cluster-version"
Expand All @@ -44,8 +45,11 @@ const (
// Rule
PlacementRule = "/pd/api/v1/config/rule"
PlacementRules = "/pd/api/v1/config/rules"
PlacementRulesInBatch = "/pd/api/v1/config/rules/batch"
placementRulesByGroup = "/pd/api/v1/config/rules/group"
PlacementRuleBundle = "/pd/api/v1/config/placement-rule"
placementRuleGroup = "/pd/api/v1/config/rule_group"
placementRuleGroups = "/pd/api/v1/config/rule_groups"
RegionLabelRule = "/pd/api/v1/config/region-label/rule"
RegionLabelRules = "/pd/api/v1/config/region-label/rules"
RegionLabelRulesByIDs = "/pd/api/v1/config/region-label/rules/ids"
Expand Down Expand Up @@ -79,13 +83,11 @@ func RegionByKey(key []byte) string {
return fmt.Sprintf("%s/%s", regionByKey, url.QueryEscape(string(key)))
}

// RegionsByKey returns the path of PD HTTP API to scan regions with given start key, end key and limit parameters.
func RegionsByKey(startKey, endKey []byte, limit int) string {
// RegionsByKeyRange returns the path of PD HTTP API to scan regions with given start key, end key and limit parameters.
func RegionsByKeyRange(keyRange *KeyRange, limit int) string {
startKeyStr, endKeyStr := keyRange.EscapeAsUTF8Str()
return fmt.Sprintf("%s?start_key=%s&end_key=%s&limit=%d",
regionsByKey,
url.QueryEscape(string(startKey)),
url.QueryEscape(string(endKey)),
limit)
regionsByKey, startKeyStr, endKeyStr, limit)
}

// RegionsByStoreID returns the path of PD HTTP API to get regions by store ID.
Expand All @@ -94,11 +96,10 @@ func RegionsByStoreID(storeID uint64) string {
}

// RegionStatsByKeyRange returns the path of PD HTTP API to get region stats by start key and end key.
func RegionStatsByKeyRange(startKey, endKey []byte) string {
func RegionStatsByKeyRange(keyRange *KeyRange) string {
startKeyStr, endKeyStr := keyRange.EscapeAsUTF8Str()
return fmt.Sprintf("%s?start_key=%s&end_key=%s",
StatsRegion,
url.QueryEscape(string(startKey)),
url.QueryEscape(string(endKey)))
StatsRegion, startKeyStr, endKeyStr)
}

// StoreByID returns the store API with store ID parameter.
Expand Down Expand Up @@ -136,6 +137,11 @@ func PlacementRuleBundleWithPartialParameter(partial bool) string {
return fmt.Sprintf("%s?partial=%t", PlacementRuleBundle, partial)
}

// PlacementRuleGroupByID returns the path of PD HTTP API to get placement rule group by ID.
func PlacementRuleGroupByID(id string) string {
return fmt.Sprintf("%s/%s", placementRuleGroup, id)
}

// SchedulerByName returns the scheduler API with the given scheduler name.
func SchedulerByName(name string) string {
return fmt.Sprintf("%s/%s", Schedulers, name)
Expand Down
Loading

0 comments on commit dd5627d

Please sign in to comment.