Skip to content

Commit

Permalink
client: use ServiceDiscovery to update member in HTTP client (tikv#7668)
Browse files Browse the repository at this point in the history
ref tikv#7576

Signed-off-by: Cabinfever_B <[email protected]>
Signed-off-by: pingandb <[email protected]>
  • Loading branch information
CabinfeverB authored and pingandb committed Jan 18, 2024
1 parent d2c7fbb commit b32fd5a
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 282 deletions.
3 changes: 3 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ type Client interface {
// SetExternalTimestamp sets external timestamp
SetExternalTimestamp(ctx context.Context, timestamp uint64) error

// GetServiceDiscovery returns ServiceDiscovery
GetServiceDiscovery() ServiceDiscovery

// TSOClient is the TSO client.
TSOClient
// MetaStorageClient is the meta storage client.
Expand Down
1 change: 1 addition & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var (
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientNoAvailableMember = errors.Normalize("no available member", errors.RFCCodeText("PD:client:ErrClientNoAvailableMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/client_model v0.2.0
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
Expand All @@ -32,6 +31,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
188 changes: 47 additions & 141 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/retry"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -58,9 +57,7 @@ type clientInner struct {
ctx context.Context
cancel context.CancelFunc

sync.RWMutex
pdAddrs []string
leaderAddrIdx int
sd pd.ServiceDiscovery

// source is used to mark the source of the client creation,
// it will also be used in the caller ID of the inner client.
Expand All @@ -72,12 +69,11 @@ type clientInner struct {
executionDuration *prometheus.HistogramVec
}

func newClientInner(source string) *clientInner {
ctx, cancel := context.WithCancel(context.Background())
return &clientInner{ctx: ctx, cancel: cancel, leaderAddrIdx: -1, source: source}
func newClientInner(ctx context.Context, cancel context.CancelFunc, source string) *clientInner {
return &clientInner{ctx: ctx, cancel: cancel, source: source}
}

func (ci *clientInner) init() {
func (ci *clientInner) init(sd pd.ServiceDiscovery) {
// Init the HTTP client if it's not configured.
if ci.cli == nil {
ci.cli = &http.Client{Timeout: defaultTimeout}
Expand All @@ -87,8 +83,7 @@ func (ci *clientInner) init() {
ci.cli.Transport = transport
}
}
// Start the members info updater daemon.
go ci.membersInfoUpdater(ci.ctx)
ci.sd = sd
}

func (ci *clientInner) close() {
Expand All @@ -98,33 +93,6 @@ func (ci *clientInner) close() {
}
}

// getPDAddrs returns the current PD addresses and the index of the leader address.
func (ci *clientInner) getPDAddrs() ([]string, int) {
ci.RLock()
defer ci.RUnlock()
return ci.pdAddrs, ci.leaderAddrIdx
}

func (ci *clientInner) setPDAddrs(pdAddrs []string, leaderAddrIdx int) {
ci.Lock()
defer ci.Unlock()
// Normalize the addresses with correct scheme prefix.
var scheme string
if ci.tlsConf == nil {
scheme = httpScheme
} else {
scheme = httpsScheme
}
for i, addr := range pdAddrs {
if strings.HasPrefix(addr, httpScheme) {
continue
}
pdAddrs[i] = fmt.Sprintf("%s://%s", scheme, addr)
}
ci.pdAddrs = pdAddrs
ci.leaderAddrIdx = leaderAddrIdx
}

func (ci *clientInner) reqCounter(name, status string) {
if ci.requestCounter == nil {
return
Expand All @@ -151,32 +119,19 @@ func (ci *clientInner) requestWithRetry(
err error
)
execFunc := func() error {
var (
addr string
pdAddrs, leaderAddrIdx = ci.getPDAddrs()
)
// Try to send the request to the PD leader first.
if leaderAddrIdx != -1 {
addr = pdAddrs[leaderAddrIdx]
statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
}
log.Debug("[pd] request leader addr failed",
zap.String("source", ci.source), zap.Int("leader-idx", leaderAddrIdx), zap.String("addr", addr), zap.Error(err))
// It will try to send the request to the PD leader first and then try to send the request to the other PD followers.
clients := ci.sd.GetAllServiceClients()
if len(clients) == 0 {
return errs.ErrClientNoAvailableMember
}
// Try to send the request to the other PD followers.
for idx := 0; idx < len(pdAddrs); idx++ {
if idx == leaderAddrIdx {
continue
}
addr = ci.pdAddrs[idx]
for _, cli := range clients {
addr := cli.GetHTTPAddress()
statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
break
return err
}
log.Debug("[pd] request follower addr failed",
zap.String("source", ci.source), zap.Int("idx", idx), zap.String("addr", addr), zap.Error(err))
log.Debug("[pd] request addr failed",
zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("addr", addr), zap.Error(err))
}
return err
}
Expand Down Expand Up @@ -278,73 +233,6 @@ func (ci *clientInner) doRequest(
return resp.StatusCode, nil
}

func (ci *clientInner) membersInfoUpdater(ctx context.Context) {
ci.updateMembersInfo(ctx)
log.Info("[pd] http client member info updater started", zap.String("source", ci.source))
ticker := time.NewTicker(defaultMembersInfoUpdateInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("[pd] http client member info updater stopped", zap.String("source", ci.source))
return
case <-ticker.C:
ci.updateMembersInfo(ctx)
}
}
}

func (ci *clientInner) updateMembersInfo(ctx context.Context) {
var membersInfo MembersInfo
err := ci.requestWithRetry(ctx, newRequestInfo().
WithCallerID(fmt.Sprintf("%s-%s", ci.source, defaultInnerCallerID)).
WithName(getMembersName).
WithURI(membersPrefix).
WithMethod(http.MethodGet).
WithResp(&membersInfo))
if err != nil {
log.Error("[pd] http client get members info failed", zap.String("source", ci.source), zap.Error(err))
return
}
if len(membersInfo.Members) == 0 {
log.Error("[pd] http client get empty members info", zap.String("source", ci.source))
return
}
var (
newPDAddrs []string
newLeaderAddrIdx int = -1
)
for _, member := range membersInfo.Members {
if membersInfo.Leader != nil && member.GetMemberId() == membersInfo.Leader.GetMemberId() {
newLeaderAddrIdx = len(newPDAddrs)
}
newPDAddrs = append(newPDAddrs, member.GetClientUrls()...)
}
// Prevent setting empty addresses.
if len(newPDAddrs) == 0 {
log.Error("[pd] http client get empty member addresses", zap.String("source", ci.source))
return
}
oldPDAddrs, oldLeaderAddrIdx := ci.getPDAddrs()
ci.setPDAddrs(newPDAddrs, newLeaderAddrIdx)
// Log the member info change if it happens.
var oldPDLeaderAddr, newPDLeaderAddr string
if oldLeaderAddrIdx != -1 {
oldPDLeaderAddr = oldPDAddrs[oldLeaderAddrIdx]
}
if newLeaderAddrIdx != -1 {
newPDLeaderAddr = newPDAddrs[newLeaderAddrIdx]
}
oldMemberNum, newMemberNum := len(oldPDAddrs), len(newPDAddrs)
if oldPDLeaderAddr != newPDLeaderAddr || oldMemberNum != newMemberNum {
log.Info("[pd] http client members info changed", zap.String("source", ci.source),
zap.Int("old-member-num", oldMemberNum), zap.Int("new-member-num", newMemberNum),
zap.Strings("old-addrs", oldPDAddrs), zap.Strings("new-addrs", newPDAddrs),
zap.Int("old-leader-addr-idx", oldLeaderAddrIdx), zap.Int("new-leader-addr-idx", newLeaderAddrIdx),
zap.String("old-leader-addr", oldPDLeaderAddr), zap.String("new-leader-addr", newPDLeaderAddr))
}
}

type client struct {
inner *clientInner

Expand Down Expand Up @@ -397,19 +285,36 @@ func WithLoggerRedirection(logLevel, fileName string) ClientOption {
return func(c *client) {}
}

// NewClientWithServiceDiscovery creates a PD HTTP client with the given PD service discovery.
func NewClientWithServiceDiscovery(
source string,
sd pd.ServiceDiscovery,
opts ...ClientOption,
) Client {
ctx, cancel := context.WithCancel(context.Background())
c := &client{inner: newClientInner(ctx, cancel, source), callerID: defaultCallerID}
// Apply the options first.
for _, opt := range opts {
opt(c)
}
c.inner.init(sd)
return c
}

// NewClient creates a PD HTTP client with the given PD addresses and TLS config.
func NewClient(
source string,
pdAddrs []string,
opts ...ClientOption,
) Client {
c := &client{inner: newClientInner(source), callerID: defaultCallerID}
ctx, cancel := context.WithCancel(context.Background())
c := &client{inner: newClientInner(ctx, cancel, source), callerID: defaultCallerID}
// Apply the options first.
for _, opt := range opts {
opt(c)
}
c.inner.setPDAddrs(pdAddrs, -1)
c.inner.init()
sd := pd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
c.inner.init(sd)
return c
}

Expand Down Expand Up @@ -466,16 +371,17 @@ func (c *client) request(ctx context.Context, reqInfo *requestInfo, headerOpts .
headerOpts...)
}

// UpdateMembersInfo updates the members info of the PD cluster in the inner client.
// Exported for testing.
func (c *client) UpdateMembersInfo() {
c.inner.updateMembersInfo(c.inner.ctx)
// requestChecker is used to check the HTTP request sent by the client.
type requestChecker func(req *http.Request) error

// RoundTrip implements the `http.RoundTripper` interface.
func (rc requestChecker) RoundTrip(req *http.Request) (resp *http.Response, err error) {
return &http.Response{StatusCode: http.StatusOK}, rc(req)
}

// setLeaderAddrIdx sets the index of the leader address in the inner client.
// only used for testing.
func (c *client) setLeaderAddrIdx(idx int) {
c.inner.Lock()
defer c.inner.Unlock()
c.inner.leaderAddrIdx = idx
// NewHTTPClientWithRequestChecker returns a http client with checker.
func NewHTTPClientWithRequestChecker(checker requestChecker) *http.Client {
return &http.Client{
Transport: checker,
}
}
Loading

0 comments on commit b32fd5a

Please sign in to comment.