Skip to content

Commit

Permalink
Merge branch 'master' into hot-config2
Browse files Browse the repository at this point in the history
  • Loading branch information
HunDunDM authored Oct 21, 2022
2 parents 2465409 + 1a485f7 commit 8306837
Show file tree
Hide file tree
Showing 70 changed files with 1,290 additions and 545 deletions.
49 changes: 27 additions & 22 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ const (
updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation.
tsLoopDCCheckInterval = time.Minute
defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst
retryInterval = time.Second
maxRetryTimes = 5
retryInterval = 500 * time.Millisecond
maxRetryTimes = 6
)

// LeaderHealthCheckInterval might be changed in the unit to shorten the testing time.
Expand Down Expand Up @@ -698,12 +698,11 @@ func (c *client) handleDispatcher(
dc string,
tbc *tsoBatchController) {
var (
retryTimeConsuming time.Duration
err error
streamAddr string
stream pdpb.PD_TsoClient
streamCtx context.Context
cancel context.CancelFunc
err error
streamAddr string
stream pdpb.PD_TsoClient
streamCtx context.Context
cancel context.CancelFunc
// addr -> connectionContext
connectionCtxs sync.Map
opts []opentracing.StartSpanOption
Expand Down Expand Up @@ -760,6 +759,7 @@ func (c *client) handleDispatcher(
}

// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(c.option.timeout)
tsoBatchLoop:
for {
select {
Expand All @@ -782,6 +782,7 @@ tsoBatchLoop:
if maxBatchWaitInterval >= 0 {
tbc.adjustBestBatchSize()
}
streamLoopTimer.Reset(c.option.timeout)
// Choose a stream to send the TSO gRPC request.
streamChoosingLoop:
for {
Expand All @@ -792,24 +793,22 @@ tsoBatchLoop:
// Check stream and retry if necessary.
if stream == nil {
log.Info("[pd] tso stream is not ready", zap.String("dc", dc))
c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs)
if retryTimeConsuming >= c.option.timeout {
err = errs.ErrClientCreateTSOStream.FastGenByArgs("retry timeout")
log.Error("[pd] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.ScheduleCheckLeader()
c.finishTSORequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
retryTimeConsuming = 0
continue tsoBatchLoop
if c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) {
continue streamChoosingLoop
}
select {
case <-dispatcherCtx.Done():
return
case <-time.After(time.Second):
retryTimeConsuming += time.Second
continue
case <-streamLoopTimer.C:
err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr)
log.Error("[pd] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.ScheduleCheckLeader()
c.finishTSORequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
continue tsoBatchLoop
case <-time.After(retryInterval):
continue streamChoosingLoop
}
}
retryTimeConsuming = 0
select {
case <-streamCtx.Done():
log.Info("[pd] tso stream is canceled", zap.String("dc", dc), zap.String("stream-addr", streamAddr))
Expand Down Expand Up @@ -903,15 +902,17 @@ type connectionContext struct {
cancel context.CancelFunc
}

func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) {
func (c *client) updateConnectionCtxs(updaterCtx context.Context, dc string, connectionCtxs *sync.Map) bool {
// Normal connection creating, it will be affected by the `enableForwarding`.
createTSOConnection := c.tryConnect
if c.allowTSOFollowerProxy(dc) {
createTSOConnection = c.tryConnectWithProxy
}
if err := createTSOConnection(updaterCtx, dc, connectionCtxs); err != nil {
log.Error("[pd] update connection contexts failed", zap.String("dc", dc), errs.ZapError(err))
return false
}
return true
}

// tryConnect will try to connect to the TSO allocator leader. If the connection becomes unreachable
Expand All @@ -927,6 +928,8 @@ func (c *client) tryConnect(
networkErrNum uint64
err error
stream pdpb.PD_TsoClient
url string
cc *grpc.ClientConn
)
updateAndClear := func(newAddr string, connectionCtx *connectionContext) {
if cc, loaded := connectionCtxs.LoadOrStore(newAddr, connectionCtx); loaded {
Expand All @@ -942,9 +945,11 @@ func (c *client) tryConnect(
return true
})
}
cc, url := c.getAllocatorClientConnByDCLocation(dc)
// retry several times before falling back to the follower when the network problem happens

for i := 0; i < maxRetryTimes; i++ {
c.ScheduleCheckLeader()
cc, url = c.getAllocatorClientConnByDCLocation(dc)
cctx, cancel := context.WithCancel(dispatcherCtx)
stream, err = c.createTsoStream(cctx, cancel, pdpb.NewPDClient(cc))
failpoint.Inject("unreachableNetwork", func() {
Expand Down
1 change: 1 addition & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
NotLeaderErr = "is not leader"
// MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader.
MismatchLeaderErr = "mismatch leader id"
RetryTimeoutErr = "retry timeout"
)

// client errors
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20220818063303-5c20f55db5ad
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.11.0
github.com/stretchr/testify v1.7.0
go.uber.org/goleak v1.1.11
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZ
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20220818063303-5c20f55db5ad h1:lGKxsEwdE0pVXzHYD1SQ1vfa3t/bFVU/latrQz8b/w0=
github.com/pingcap/kvproto v0.0.0-20220818063303-5c20f55db5ad/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM=
github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 h1:URLoJ61DmmY++Sa/yyPEQHG2s/ZBeV1FbIswHEMrdoY=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/kvproto v0.0.0-20220818063303-5c20f55db5ad
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354
github.com/pingcap/kvproto v0.0.0-20221014081430-26e28e6a281a
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d
github.com/pingcap/tidb-dashboard v0.0.0-20220728104842-3743e533b594
github.com/prometheus/client_golang v1.1.0
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,14 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20220818063303-5c20f55db5ad h1:lGKxsEwdE0pVXzHYD1SQ1vfa3t/bFVU/latrQz8b/w0=
github.com/pingcap/kvproto v0.0.0-20220818063303-5c20f55db5ad/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221014081430-26e28e6a281a h1:McYxPhA8SHqfUtLfQHHN0fQl4dy93IkhlX4Pp2MKIFA=
github.com/pingcap/kvproto v0.0.0-20221014081430-26e28e6a281a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354 h1:SvWCbCPh1YeHd9yQLksvJYAgft6wLTY1aNG81tpyscQ=
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 h1:URLoJ61DmmY++Sa/yyPEQHG2s/ZBeV1FbIswHEMrdoY=
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d h1:k3/APKZjXOyJrFy8VyYwRlZhMelpD3qBLJNsw3bPl/g=
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d/go.mod h1:7j18ezaWTao2LHOyMlsc2Dg1vW+mDY9dEbPzVyOlaeM=
github.com/pingcap/tidb-dashboard v0.0.0-20220728104842-3743e533b594 h1:kL1CW5qsn459kHZ2YoBYb+YOSWjSlshk55YP/XNQNWo=
Expand Down
Loading

0 comments on commit 8306837

Please sign in to comment.