Skip to content

Commit

Permalink
Implement the retry and metrics integration
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Nov 13, 2023
1 parent 0dbfe3a commit 3eff39e
Showing 1 changed file with 77 additions and 20 deletions.
97 changes: 77 additions & 20 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

const defaultTimeout = 30 * time.Second
const (
httpScheme = "http"
httpsScheme = "https"
networkErrorStatus = "network error"

defaultTimeout = 30 * time.Second
)

// Client is a PD (Placement Driver) HTTP client.
type Client interface {
Expand All @@ -51,6 +58,9 @@ type client struct {
pdAddrs []string
tlsConf *tls.Config
cli *http.Client

requestCounter *prometheus.CounterVec
executionDuration *prometheus.HistogramVec
}

// ClientOption configures the HTTP client.
Expand All @@ -71,6 +81,17 @@ func WithTLSConfig(tlsConf *tls.Config) ClientOption {
}
}

// WithMetrics configures the client with metrics.
func WithMetrics(
requestCounter *prometheus.CounterVec,
executionDuration *prometheus.HistogramVec,
) ClientOption {
return func(c *client) {
c.requestCounter = requestCounter
c.executionDuration = executionDuration

Check warning on line 91 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L88-L91

Added lines #L88 - L91 were not covered by tests
}
}

// NewClient creates a PD HTTP client with the given PD addresses and TLS config.
func NewClient(
pdAddrs []string,
Expand All @@ -83,13 +104,14 @@ func NewClient(
}
// Normalize the addresses with correct scheme prefix.
for i, addr := range pdAddrs {
if !strings.HasPrefix(addr, "http") {
if !strings.HasPrefix(addr, httpScheme) {
var scheme string
if c.tlsConf != nil {
addr = "https://" + addr
scheme = httpsScheme
} else {
addr = "http://" + addr
scheme = httpScheme

Check warning on line 112 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L108-L112

Added lines #L108 - L112 were not covered by tests
}
pdAddrs[i] = addr
pdAddrs[i] = fmt.Sprintf("%s://%s", scheme, addr)

Check warning on line 114 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L114

Added line #L114 was not covered by tests
}
}
c.pdAddrs = pdAddrs
Expand All @@ -114,17 +136,49 @@ func (c *client) Close() {
log.Info("[pd] http client closed")
}

func (c *client) pdAddr() string {
// TODO: support the customized PD address selection strategy.
return c.pdAddrs[0]
func (c *client) reqCounter(name, status string) {
if c.requestCounter == nil {
return
}
c.requestCounter.WithLabelValues(name, status).Inc()

Check warning on line 143 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L143

Added line #L143 was not covered by tests
}

func (c *client) request(
func (c *client) execDuration(name string, duration time.Duration) {
if c.executionDuration == nil {
return
}
c.executionDuration.WithLabelValues(name).Observe(duration.Seconds())

Check warning on line 150 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L150

Added line #L150 was not covered by tests
}

// At present, we will use the retry strategy of polling by default to keep
// it consistent with the current implementation of some clients (e.g. TiDB).
func (c *client) requestWithRetry(
ctx context.Context,
name, uri string,
res interface{},
) error {
reqURL := fmt.Sprintf("%s%s", c.pdAddr(), uri)
var (
err error
addr string
)
for idx := 0; idx < len(c.pdAddrs); idx++ {
addr = c.pdAddrs[idx]
err = c.request(ctx, name, addr, uri, res)
if err == nil {
break
}
log.Debug("[pd] request one addr failed",
zap.Int("idx", idx), zap.String("addr", addr), zap.Error(err))

Check warning on line 171 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L170-L171

Added lines #L170 - L171 were not covered by tests
}
return err
}

func (c *client) request(
ctx context.Context,
name, addr, uri string,
res interface{},
) error {
reqURL := fmt.Sprintf("%s%s", addr, uri)
logFields := []zap.Field{
zap.String("name", name),
zap.String("url", reqURL),
Expand All @@ -135,12 +189,15 @@ func (c *client) request(
log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...)
return errors.Trace(err)

Check warning on line 190 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L189-L190

Added lines #L189 - L190 were not covered by tests
}
// TODO: integrate the metrics.
start := time.Now()
resp, err := c.cli.Do(req)
if err != nil {
c.reqCounter(name, networkErrorStatus)
log.Error("[pd] do http request failed", append(logFields, zap.Error(err))...)
return errors.Trace(err)

Check warning on line 197 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L195-L197

Added lines #L195 - L197 were not covered by tests
}
c.execDuration(name, time.Since(start))
c.reqCounter(name, resp.Status)
defer func() {
err = resp.Body.Close()
if err != nil {
Expand Down Expand Up @@ -172,7 +229,7 @@ func (c *client) request(
// GetRegionByID gets the region info by ID.
func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error) {
var region RegionInfo
err := c.request(ctx, "GetRegionByID", RegionByID(regionID), &region)
err := c.requestWithRetry(ctx, "GetRegionByID", RegionByID(regionID), &region)
if err != nil {
return nil, err

Check warning on line 234 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L231-L234

Added lines #L231 - L234 were not covered by tests
}
Expand All @@ -182,7 +239,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*RegionInf
// GetRegionByKey gets the region info by key.
func (c *client) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, error) {
var region RegionInfo
err := c.request(ctx, "GetRegionByKey", RegionByKey(key), &region)
err := c.requestWithRetry(ctx, "GetRegionByKey", RegionByKey(key), &region)
if err != nil {
return nil, err

Check warning on line 244 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L241-L244

Added lines #L241 - L244 were not covered by tests
}
Expand All @@ -192,7 +249,7 @@ func (c *client) GetRegionByKey(ctx context.Context, key []byte) (*RegionInfo, e
// GetRegions gets the regions info.
func (c *client) GetRegions(ctx context.Context) (*RegionsInfo, error) {
var regions RegionsInfo
err := c.request(ctx, "GetRegions", Regions, &regions)
err := c.requestWithRetry(ctx, "GetRegions", Regions, &regions)
if err != nil {
return nil, err

Check warning on line 254 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L251-L254

Added lines #L251 - L254 were not covered by tests
}
Expand All @@ -202,7 +259,7 @@ func (c *client) GetRegions(ctx context.Context) (*RegionsInfo, error) {
// GetRegionsByKey gets the regions info by key range. If the limit is -1, it will return all regions within the range.
func (c *client) GetRegionsByKey(ctx context.Context, startKey, endKey []byte, limit int) (*RegionsInfo, error) {
var regions RegionsInfo
err := c.request(ctx, "GetRegionsByKey", RegionsByKey(startKey, endKey, limit), &regions)
err := c.requestWithRetry(ctx, "GetRegionsByKey", RegionsByKey(startKey, endKey, limit), &regions)
if err != nil {
return nil, err

Check warning on line 264 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L261-L264

Added lines #L261 - L264 were not covered by tests
}
Expand All @@ -212,7 +269,7 @@ func (c *client) GetRegionsByKey(ctx context.Context, startKey, endKey []byte, l
// GetRegionsByStoreID gets the regions info by store ID.
func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*RegionsInfo, error) {
var regions RegionsInfo
err := c.request(ctx, "GetRegionsByStoreID", RegionsByStoreID(storeID), &regions)
err := c.requestWithRetry(ctx, "GetRegionsByStoreID", RegionsByStoreID(storeID), &regions)
if err != nil {
return nil, err

Check warning on line 274 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L271-L274

Added lines #L271 - L274 were not covered by tests
}
Expand All @@ -222,7 +279,7 @@ func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*Regi
// GetHotReadRegions gets the hot read region statistics info.
func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, error) {
var hotReadRegions StoreHotPeersInfos
err := c.request(ctx, "GetHotReadRegions", HotRead, &hotReadRegions)
err := c.requestWithRetry(ctx, "GetHotReadRegions", HotRead, &hotReadRegions)
if err != nil {
return nil, err

Check warning on line 284 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L281-L284

Added lines #L281 - L284 were not covered by tests
}
Expand All @@ -232,7 +289,7 @@ func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, er
// GetHotWriteRegions gets the hot write region statistics info.
func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, error) {
var hotWriteRegions StoreHotPeersInfos
err := c.request(ctx, "GetHotWriteRegions", HotWrite, &hotWriteRegions)
err := c.requestWithRetry(ctx, "GetHotWriteRegions", HotWrite, &hotWriteRegions)
if err != nil {
return nil, err

Check warning on line 294 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L291-L294

Added lines #L291 - L294 were not covered by tests
}
Expand All @@ -242,7 +299,7 @@ func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, e
// GetStores gets the stores info.
func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) {
var stores StoresInfo
err := c.request(ctx, "GetStores", Stores, &stores)
err := c.requestWithRetry(ctx, "GetStores", Stores, &stores)
if err != nil {
return nil, err

Check warning on line 304 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L301-L304

Added lines #L301 - L304 were not covered by tests
}
Expand All @@ -269,7 +326,7 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin
IsRealTime bool `json:"is_real_time,omitempty"`
StoresMinResolvedTS map[uint64]uint64 `json:"stores_min_resolved_ts"`
}{}
err := c.request(ctx, "GetMinResolvedTSByStoresIDs", uri, &resp)
err := c.requestWithRetry(ctx, "GetMinResolvedTSByStoresIDs", uri, &resp)
if err != nil {
return 0, nil, err

Check warning on line 331 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L331

Added line #L331 was not covered by tests
}
Expand Down

0 comments on commit 3eff39e

Please sign in to comment.