Skip to content

Commit

Permalink
Merge branch 'master' into ru_dont_add_token
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Jan 3, 2024
2 parents da29fb5 + baf4345 commit 4c4ad7c
Show file tree
Hide file tree
Showing 100 changed files with 2,452 additions and 3,128 deletions.
24 changes: 10 additions & 14 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -108,23 +108,23 @@ pd-server-basic:
# Tools

pd-ctl:
GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-ctl tools/pd-ctl/main.go
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-ctl pd-ctl/main.go
pd-tso-bench:
cd tools/pd-tso-bench && CGO_ENABLED=0 go build -o $(BUILD_BIN_PATH)/pd-tso-bench main.go
cd tools && CGO_ENABLED=0 go build -o $(BUILD_BIN_PATH)/pd-tso-bench pd-tso-bench/main.go
pd-api-bench:
cd tools/pd-api-bench && CGO_ENABLED=0 go build -o $(BUILD_BIN_PATH)/pd-api-bench main.go
cd tools && CGO_ENABLED=0 go build -o $(BUILD_BIN_PATH)/pd-api-bench pd-api-bench/main.go
pd-recover:
GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-recover tools/pd-recover/main.go
cd tools && GOEXPERIMENT=$(BUILD_GOEXPERIMENT) CGO_ENABLED=$(BUILD_TOOL_CGO_ENABLED) go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-recover pd-recover/main.go
pd-analysis:
CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-analysis tools/pd-analysis/main.go
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-analysis pd-analysis/main.go
pd-heartbeat-bench:
CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-heartbeat-bench tools/pd-heartbeat-bench/main.go
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-heartbeat-bench pd-heartbeat-bench/main.go
simulator:
CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-simulator tools/pd-simulator/main.go
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/pd-simulator pd-simulator/main.go
regions-dump:
CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/regions-dump tools/regions-dump/main.go
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/regions-dump regions-dump/main.go
stores-dump:
CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/stores-dump tools/stores-dump/main.go
cd tools && CGO_ENABLED=0 go build -gcflags '$(GCFLAGS)' -ldflags '$(LDFLAGS)' -o $(BUILD_BIN_PATH)/stores-dump stores-dump/main.go

.PHONY: pd-ctl pd-tso-bench pd-recover pd-analysis pd-heartbeat-bench simulator regions-dump stores-dump pd-api-bench

Expand Down Expand Up @@ -240,11 +240,7 @@ basic-test: install-tools

ci-test-job: install-tools dashboard-ui
@$(FAILPOINT_ENABLE)
if [[ $(JOB_INDEX) -le 10 ]]; then \
CGO_ENABLED=1 go test -timeout=15m -tags deadlock -race -covermode=atomic -coverprofile=covprofile -coverpkg=./... $(shell ./scripts/ci-subtask.sh $(JOB_COUNT) $(JOB_INDEX)); \
else \
for mod in $(shell ./scripts/ci-subtask.sh $(JOB_COUNT) $(JOB_INDEX)); do cd $$mod && $(MAKE) ci-test-job && cd $(ROOT_PATH) > /dev/null && cat $$mod/covprofile >> covprofile; done; \
fi
./scripts/ci-subtask.sh $(JOB_COUNT) $(JOB_INDEX)
@$(FAILPOINT_DISABLE)

TSO_INTEGRATION_TEST_PKGS := $(PD_PKG)/tests/server/tso
Expand Down
19 changes: 0 additions & 19 deletions build.ps1

This file was deleted.

87 changes: 66 additions & 21 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (k *serviceModeKeeper) close() {
type client struct {
keyspaceID uint32
svrUrls []string
pdSvcDiscovery ServiceDiscovery
pdSvcDiscovery *pdServiceDiscovery
tokenDispatcher *tokenDispatcher

// For service mode switching.
Expand Down Expand Up @@ -503,7 +503,7 @@ func newClientWithKeyspaceName(
return err
}
// c.keyspaceID is the source of truth for keyspace id.
c.pdSvcDiscovery.(*pdServiceDiscovery).SetKeyspaceID(c.keyspaceID)
c.pdSvcDiscovery.SetKeyspaceID(c.keyspaceID)
return nil
}

Expand Down Expand Up @@ -733,6 +733,23 @@ func (c *client) getClientAndContext(ctx context.Context) (pdpb.PDClient, contex
return pdpb.NewPDClient(serviceClient.GetClientConn()), serviceClient.BuildGRPCTargetContext(ctx, true)
}

// getClientAndContext returns the leader pd client and the original context. If leader is unhealthy, it returns
// follower pd client and the context which holds forward information.
func (c *client) getRegionAPIClientAndContext(ctx context.Context, allowFollower bool) (ServiceClient, context.Context) {
var serviceClient ServiceClient
if allowFollower {
serviceClient = c.pdSvcDiscovery.getServiceClientByKind(regionAPIKind)
if serviceClient != nil {
return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower)
}
}
serviceClient = c.pdSvcDiscovery.GetServiceClient()
if serviceClient == nil {
return nil, ctx
}
return serviceClient, serviceClient.BuildGRPCTargetContext(ctx, !allowFollower)
}

func (c *client) GetTSAsync(ctx context.Context) TSFuture {
return c.GetLocalTSAsync(ctx, globalDCLocation)
}
Expand Down Expand Up @@ -885,6 +902,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
start := time.Now()
defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()

options := &GetRegionOp{}
for _, opt := range opts {
Expand All @@ -895,13 +913,18 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt
RegionKey: key,
NeedBuckets: options.needBuckets,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetRegion(ctx, req)
cancel()
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err = protoClient.GetRegion(cctx, req)
}

if err = c.respForErr(cmdFailDurationGetRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand All @@ -917,6 +940,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
start := time.Now()
defer func() { cmdDurationGetPrevRegion.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()

options := &GetRegionOp{}
for _, opt := range opts {
Expand All @@ -927,13 +951,18 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio
RegionKey: key,
NeedBuckets: options.needBuckets,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetPrevRegion(ctx, req)
cancel()
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err = protoClient.GetPrevRegion(cctx, req)
}

if err = c.respForErr(cmdFailDurationGetPrevRegion, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand All @@ -949,6 +978,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
start := time.Now()
defer func() { cmdDurationGetRegionByID.Observe(time.Since(start).Seconds()) }()
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
defer cancel()

options := &GetRegionOp{}
for _, opt := range opts {
Expand All @@ -959,13 +989,18 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get
RegionId: regionID,
NeedBuckets: options.needBuckets,
}
protoClient, ctx := c.getClientAndContext(ctx)
if protoClient == nil {
cancel()
serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetRegionByID(ctx, req)
cancel()
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req)
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err = protoClient.GetRegionByID(cctx, req)
}

if err = c.respForErr(cmdFailedDurationGetRegionByID, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand All @@ -987,18 +1022,28 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
scanCtx, cancel = context.WithTimeout(ctx, c.option.timeout)
defer cancel()
}
options := &GetRegionOp{}
for _, opt := range opts {
opt(options)
}
req := &pdpb.ScanRegionsRequest{
Header: c.requestHeader(),
StartKey: key,
EndKey: endKey,
Limit: int32(limit),
}
protoClient, scanCtx := c.getClientAndContext(scanCtx)
if protoClient == nil {
cancel()
serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.allowFollowerHandle && c.option.getEnableFollowerHandle())
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.ScanRegions(scanCtx, req)
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).ScanRegions(cctx, req)
if !serviceClient.IsConnectedToLeader() && err != nil || resp.Header.GetError() != nil {
protoClient, cctx := c.getClientAndContext(scanCtx)
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err = protoClient.ScanRegions(cctx, req)
}

if err = c.respForErr(cmdFailedDurationScanRegions, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/client_model v0.2.0
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
google.golang.org/grpc v1.54.0
google.golang.org/grpc v1.59.0
google.golang.org/grpc/examples v0.0.0-20231221225426-4f03f3ff32c9
)

require (
Expand All @@ -30,15 +32,13 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/grpc/examples v0.0.0-20230419000256-16651f60ddc5
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
12 changes: 6 additions & 6 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30 h1:EvqKcDT7ceGLW0mXqM8Cp5Z8DfgQRnwj2YTnlCLj2QI=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2 h1:364A6VCS+l0oHBKZKotX9LzmfEtIO/NTccTIQcPp3Ug=
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -229,10 +229,10 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 h1:Jyp0Hsi0bmHXG6k9eATXoYtjd6e2UzZ1SCn/wIupY14=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:oQ5rr10WTTMvP4A36n8JpR1OrO1BEiV4f78CneXZxkA=
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g=
google.golang.org/grpc/examples v0.0.0-20230419000256-16651f60ddc5 h1:FjbWL/mGfyRQNxjagfT1chiHL1569WEA/OGH0ZIzGcI=
google.golang.org/grpc/examples v0.0.0-20230419000256-16651f60ddc5/go.mod h1:5av8LiY5jU2KRcrX+SHtvLHnaOpPJ7gzWStBurgHlqY=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/grpc/examples v0.0.0-20231221225426-4f03f3ff32c9 h1:ATnmU8nL2NfIyTSiBvJVDIDIr3qBmeW+c7z7XU21eWs=
google.golang.org/grpc/examples v0.0.0-20231221225426-4f03f3ff32c9/go.mod h1:j5uROIAAgi3YmtiETMt1LW0d/lHqQ7wwrIY4uGRXLQ4=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
2 changes: 2 additions & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ const (
PProfGoroutine = "/pd/api/v1/debug/pprof/goroutine"
// Others
MinResolvedTSPrefix = "/pd/api/v1/min-resolved-ts"
Cluster = "/pd/api/v1/cluster"
ClusterStatus = "/pd/api/v1/cluster/status"
Status = "/pd/api/v1/status"
Version = "/pd/api/v1/version"
// Micro Service
Expand Down
29 changes: 28 additions & 1 deletion client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -161,7 +162,10 @@ func (ci *clientInner) requestWithRetry(
zap.String("source", ci.source), zap.Int("leader-idx", leaderAddrIdx), zap.String("addr", addr), zap.Error(err))
}
// Try to send the request to the other PD followers.
for idx := 0; idx < len(pdAddrs) && idx != leaderAddrIdx; idx++ {
for idx := 0; idx < len(pdAddrs); idx++ {
if idx == leaderAddrIdx {
continue
}
addr = ci.pdAddrs[idx]
err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
if err == nil {
Expand Down Expand Up @@ -356,6 +360,21 @@ func WithMetrics(
}
}

// WithLoggerRedirection configures the client with the given logger redirection.
func WithLoggerRedirection(logLevel, fileName string) ClientOption {
cfg := &log.Config{}
cfg.Level = logLevel
if fileName != "" {
f, _ := os.CreateTemp(".", fileName)
fname := f.Name()
f.Close()
cfg.File.Filename = fname
}
lg, p, _ := log.InitLogger(cfg)
log.ReplaceGlobals(lg, p)
return func(c *client) {}
}

// NewClient creates a PD HTTP client with the given PD addresses and TLS config.
func NewClient(
source string,
Expand Down Expand Up @@ -422,3 +441,11 @@ func (c *client) request(ctx context.Context, reqInfo *requestInfo, headerOpts .
func (c *client) UpdateMembersInfo() {
c.inner.updateMembersInfo(c.inner.ctx)
}

// setLeaderAddrIdx sets the index of the leader address in the inner client.
// only used for testing.
func (c *client) setLeaderAddrIdx(idx int) {
c.inner.Lock()
defer c.inner.Unlock()
c.inner.leaderAddrIdx = idx
}
Loading

0 comments on commit 4c4ad7c

Please sign in to comment.