Skip to content

Commit

Permalink
forward
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Oct 28, 2024
1 parent 91ccdec commit d4a8c5d
Showing 1 changed file with 34 additions and 0 deletions.
34 changes: 34 additions & 0 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,29 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
return s.forwardTSO(stream)
}

tsDeadlineCh := make(chan *tsoutil.TSDeadline, 1)
go tsoutil.WatchTSDeadline(stream.Context(), tsDeadlineCh)

var (
doneCh chan struct{}
errCh chan error
// The following are tso forward stream related variables.
forwardStream tsopb.TSO_TsoClient
cancelForward context.CancelFunc
forwardCtx context.Context
tsoStreamErr error
lastForwardedHost string
)

defer func() {
if cancelForward != nil {
cancelForward()
}
if grpcutil.NeedRebuildConnection(tsoStreamErr) {
s.closeDelegateClient(lastForwardedHost)
}
}()

ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
for {
Expand Down Expand Up @@ -569,6 +588,21 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
continue
}

if s.IsServiceIndependent(constant.TSOServiceName) {
if request.GetCount() == 0 {
err = errs.ErrGenerateTimestamp.FastGenByArgs("tso count should be positive")
return status.Error(codes.Unknown, err.Error())
}
forwardCtx, cancelForward, forwardStream, lastForwardedHost, tsoStreamErr, err = s.handleTSOForwarding(forwardCtx, forwardStream, stream, nil, request, tsDeadlineCh, lastForwardedHost, cancelForward)
if tsoStreamErr != nil {
return tsoStreamErr
}
if err != nil {
return err
}
continue
}

start := time.Now()
// TSO uses leader lease to determine validity. No need to check leader here.
if s.IsClosed() {
Expand Down

0 comments on commit d4a8c5d

Please sign in to comment.