Skip to content

Commit

Permalink
Improve tso proxy reliability (#6585)
Browse files Browse the repository at this point in the history
ref #5895

Improve tso proxy reliability.

1. Add protection mechanisms to TSO Proxy.
    a. Throttle the concurrency of TSO Proxy streamings. Default 5000.
    b. If TSO Proxy didn't receive the TSO request from the client for 1 hour, close the stream.
2. Optimize forceLoad lock with RW lock.
3. Enable stress test.
4. Add deadline for API leader forwarding request to TSO service.
5. Make tso response channel more safely.
6. Move tso proxy stress test away from the test suite as it has impact on other test cases.
7. Fix grpc client connection pool (server side) resource leak problem.
8. Make MaxConcurrentTSOProxyStreamings (5000 as default) and TSOProxyClientRecvTimeout (1 hour as default) configurable.
9. Add metrics tsoProxyHandleDuration, tsoProxyBatchSize and tsoProxyForwardTimeoutCounter.

Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing authored Jun 12, 2023
1 parent 181e613 commit 170d287
Show file tree
Hide file tree
Showing 11 changed files with 362 additions and 145 deletions.
13 changes: 12 additions & 1 deletion pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ type LoopWatcher struct {
postEventFn func() error

// forceLoadMu is used to ensure two force loads have minimal interval.
forceLoadMu sync.Mutex
forceLoadMu sync.RWMutex
// lastTimeForceLoad is used to record the last time force loading data from etcd.
lastTimeForceLoad time.Time

Expand Down Expand Up @@ -608,6 +608,17 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error)

// ForceLoad forces to load the key.
func (lw *LoopWatcher) ForceLoad() {
// When NotLeader error happens, a large volume of force load requests will be received here,
// so the minimal interval between two force loads (from etcd) is used to avoid the congestion.
// Two-phase locking is also used to let most of the requests return directly without acquiring
// the write lock and causing the system to choke.
lw.forceLoadMu.RLock()
if time.Since(lw.lastTimeForceLoad) < defaultForceLoadMinimalInterval {
lw.forceLoadMu.RUnlock()
return
}
lw.forceLoadMu.RUnlock()

lw.forceLoadMu.Lock()
if time.Since(lw.lastTimeForceLoad) < defaultForceLoadMinimalInterval {
lw.forceLoadMu.Unlock()
Expand Down
28 changes: 26 additions & 2 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ type Config struct {
LogFileDeprecated string `toml:"log-file" json:"log-file,omitempty"`
LogLevelDeprecated string `toml:"log-level" json:"log-level,omitempty"`

// MaxConcurrentTSOProxyStreamings is the maximum number of concurrent TSO proxy streaming process routines allowed.
// Exceeding this limit will result in an error being returned to the client when a new client starts a TSO streaming.
// Set this to 0 will disable TSO Proxy.
// Set this to the negative value to disable the limit.
MaxConcurrentTSOProxyStreamings int `toml:"max-concurrent-tso-proxy-streamings" json:"max-concurrent-tso-proxy-streamings"`
// TSOProxyClientRecvTimeout is the timeout for the TSO proxy to receive a tso request from a client via grpc TSO stream.
// After the timeout, the TSO proxy will close the grpc TSO stream.
TSOProxyClientRecvTimeout typeutil.Duration `toml:"tso-proxy-client-recv-timeout" json:"tso-proxy-client-recv-timeout"`

// TSOSaveInterval is the interval to save timestamp.
TSOSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"`

Expand Down Expand Up @@ -219,6 +228,9 @@ const (

defaultDRWaitStoreTimeout = time.Minute

defaultMaxConcurrentTSOProxyStreamings = 5000
defaultTSOProxyClientRecvTimeout = 1 * time.Hour

defaultTSOSaveInterval = time.Duration(defaultLeaderLease) * time.Second
// defaultTSOUpdatePhysicalInterval is the default value of the config `TSOUpdatePhysicalInterval`.
defaultTSOUpdatePhysicalInterval = 50 * time.Millisecond
Expand Down Expand Up @@ -442,10 +454,11 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
}
}

configutil.AdjustInt64(&c.LeaderLease, defaultLeaderLease)
configutil.AdjustInt(&c.MaxConcurrentTSOProxyStreamings, defaultMaxConcurrentTSOProxyStreamings)
configutil.AdjustDuration(&c.TSOProxyClientRecvTimeout, defaultTSOProxyClientRecvTimeout)

configutil.AdjustInt64(&c.LeaderLease, defaultLeaderLease)
configutil.AdjustDuration(&c.TSOSaveInterval, defaultTSOSaveInterval)

configutil.AdjustDuration(&c.TSOUpdatePhysicalInterval, defaultTSOUpdatePhysicalInterval)

if c.TSOUpdatePhysicalInterval.Duration > maxTSOUpdatePhysicalInterval {
Expand Down Expand Up @@ -1252,6 +1265,17 @@ func (c *Config) IsLocalTSOEnabled() bool {
return c.EnableLocalTSO
}

// GetMaxConcurrentTSOProxyStreamings returns the max concurrent TSO proxy streamings.
// If the value is negative, there is no limit.
func (c *Config) GetMaxConcurrentTSOProxyStreamings() int {
return c.MaxConcurrentTSOProxyStreamings
}

// GetTSOProxyClientRecvTimeout returns the TSO proxy client receive timeout.
func (c *Config) GetTSOProxyClientRecvTimeout() time.Duration {
return c.TSOProxyClientRecvTimeout.Duration
}

// GetTSOUpdatePhysicalInterval returns TSO update physical interval.
func (c *Config) GetTSOUpdatePhysicalInterval() time.Duration {
return c.TSOUpdatePhysicalInterval.Duration
Expand Down
186 changes: 145 additions & 41 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,26 @@ const (
maxRetryTimesRequestTSOServer = 3
retryIntervalRequestTSOServer = 500 * time.Millisecond
getMinTSFromTSOServerTimeout = 1 * time.Second
defaultGRPCDialTimeout = 3 * time.Second
)

// gRPC errors
var (
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address")
ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout")
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address")
ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout")
ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded")
ErrTSOProxyClientRecvTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy client recv timeout. stream closed by server")
)

// GrpcServer wraps Server to provide grpc service.
type GrpcServer struct {
*Server
concurrentTSOProxyStreamings atomic.Int32
}

type request interface {
Expand Down Expand Up @@ -406,6 +410,7 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
lastForwardedHost string
)
defer func() {
s.concurrentTSOProxyStreamings.Add(-1)
if forwardStream != nil {
forwardStream.CloseSend()
}
Expand All @@ -414,6 +419,12 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
cancel()
}
}()
maxConcurrentTSOProxyStreamings := int32(s.GetMaxConcurrentTSOProxyStreamings())
if maxConcurrentTSOProxyStreamings >= 0 {
if newCount := s.concurrentTSOProxyStreamings.Add(1); newCount > maxConcurrentTSOProxyStreamings {
return errors.WithStack(ErrMaxCountTSOProxyRoutinesExceeded)
}
}

for {
select {
Expand All @@ -424,7 +435,7 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
default:
}

request, err := server.Recv()
request, err := server.Recv(s.GetTSOProxyClientRecvTimeout())
if err == io.EOF {
return nil
}
Expand Down Expand Up @@ -459,25 +470,8 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
lastForwardedHost = forwardedHost
}

tsoReq := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: request.GetHeader().GetClusterId(),
SenderId: request.GetHeader().GetSenderId(),
KeyspaceId: utils.DefaultKeyspaceID,
KeyspaceGroupId: utils.DefaultKeyspaceGroupID,
},
Count: request.GetCount(),
DcLocation: request.GetDcLocation(),
}
if err := forwardStream.Send(tsoReq); err != nil {
return errors.WithStack(err)
}

tsopbResp, err := forwardStream.Recv()
tsopbResp, err := s.forwardTSORequestWithDeadLine(stream.Context(), request, forwardStream)
if err != nil {
if strings.Contains(err.Error(), errs.NotLeaderErr) {
s.tsoPrimaryWatcher.ForceLoad()
}
return errors.WithStack(err)
}

Expand Down Expand Up @@ -513,13 +507,101 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
}
}

func (s *GrpcServer) forwardTSORequestWithDeadLine(
ctx context.Context, request *pdpb.TsoRequest, forwardStream tsopb.TSO_TsoClient,
) (*tsopb.TsoResponse, error) {
defer logutil.LogPanic()
// Create a context with deadline for forwarding TSO request to TSO service.
ctxTimeout, cancel := context.WithTimeout(ctx, tsoutil.DefaultTSOProxyTimeout)
defer cancel()

tsoProxyBatchSize.Observe(float64(request.GetCount()))

// used to receive the result from doSomething function
tsoRespCh := make(chan *tsopbTSOResponse, 1)
start := time.Now()
go s.forwardTSORequestAsync(ctxTimeout, request, forwardStream, tsoRespCh)
select {
case <-ctxTimeout.Done():
tsoProxyForwardTimeoutCounter.Inc()
return nil, ErrForwardTSOTimeout
case tsoResp := <-tsoRespCh:
if tsoResp.err == nil {
tsoProxyHandleDuration.Observe(time.Since(start).Seconds())
}
return tsoResp.response, tsoResp.err
}
}

func (s *GrpcServer) forwardTSORequestAsync(
ctxTimeout context.Context,
request *pdpb.TsoRequest,
forwardStream tsopb.TSO_TsoClient,
tsoRespCh chan<- *tsopbTSOResponse,
) {
tsopbReq := &tsopb.TsoRequest{
Header: &tsopb.RequestHeader{
ClusterId: request.GetHeader().GetClusterId(),
SenderId: request.GetHeader().GetSenderId(),
KeyspaceId: utils.DefaultKeyspaceID,
KeyspaceGroupId: utils.DefaultKeyspaceGroupID,
},
Count: request.GetCount(),
DcLocation: request.GetDcLocation(),
}

if err := forwardStream.Send(tsopbReq); err != nil {
select {
case <-ctxTimeout.Done():
return
case tsoRespCh <- &tsopbTSOResponse{err: err}:
}
return
}

select {
case <-ctxTimeout.Done():
return
default:
}

response, err := forwardStream.Recv()
if err != nil {
if strings.Contains(err.Error(), errs.NotLeaderErr) {
s.tsoPrimaryWatcher.ForceLoad()
}
select {
case <-ctxTimeout.Done():
return
case tsoRespCh <- &tsopbTSOResponse{err: err}:
}
return
}

select {
case <-ctxTimeout.Done():
return
case tsoRespCh <- &tsopbTSOResponse{response: response}:
}
}

type tsopbTSOResponse struct {
response *tsopb.TsoResponse
err error
}

// tsoServer wraps PD_TsoServer to ensure when any error
// occurs on Send() or Recv(), both endpoints will be closed.
type tsoServer struct {
stream pdpb.PD_TsoServer
closed int32
}

type pdpbTSORequest struct {
request *pdpb.TsoRequest
err error
}

func (s *tsoServer) Send(m *pdpb.TsoResponse) error {
if atomic.LoadInt32(&s.closed) == 1 {
return io.EOF
Expand All @@ -541,16 +623,27 @@ func (s *tsoServer) Send(m *pdpb.TsoResponse) error {
}
}

func (s *tsoServer) Recv() (*pdpb.TsoRequest, error) {
func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) {
if atomic.LoadInt32(&s.closed) == 1 {
return nil, io.EOF
}
req, err := s.stream.Recv()
if err != nil {
requestCh := make(chan *pdpbTSORequest, 1)
go func() {
defer logutil.LogPanic()
request, err := s.stream.Recv()
requestCh <- &pdpbTSORequest{request: request, err: err}
}()
select {
case req := <-requestCh:
if req.err != nil {
atomic.StoreInt32(&s.closed, 1)
return nil, errors.WithStack(req.err)
}
return req.request, nil
case <-time.After(timeout):
atomic.StoreInt32(&s.closed, 1)
return nil, errors.WithStack(err)
return nil, ErrTSOProxyClientRecvTimeout
}
return req, nil
}

func (s *GrpcServer) getForwardedHost(ctx, streamCtx context.Context) (forwardedHost string, err error) {
Expand Down Expand Up @@ -1974,19 +2067,30 @@ func (s *GrpcServer) validateInternalRequest(header *pdpb.RequestHeader, onlyAll

func (s *GrpcServer) getDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) {
client, ok := s.clientConns.Load(forwardedHost)
if !ok {
tlsConfig, err := s.GetTLSConfig().ToTLSConfig()
if err != nil {
return nil, err
}
cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig)
if err != nil {
return nil, err
}
client = cc
s.clientConns.Store(forwardedHost, cc)
if ok {
// Mostly, the connection is already established, and return it directly.
return client.(*grpc.ClientConn), nil
}

tlsConfig, err := s.GetTLSConfig().ToTLSConfig()
if err != nil {
return nil, err
}
ctxTimeout, cancel := context.WithTimeout(ctx, defaultGRPCDialTimeout)
defer cancel()
newConn, err := grpcutil.GetClientConn(ctxTimeout, forwardedHost, tlsConfig)
if err != nil {
return nil, err
}
conn, loaded := s.clientConns.LoadOrStore(forwardedHost, newConn)
if !loaded {
// Successfully stored the connection we created.
return newConn, nil
}
return client.(*grpc.ClientConn), nil
// Loaded a connection created/stored by another goroutine, so close the one we created
// and return the one we loaded.
newConn.Close()
return conn.(*grpc.ClientConn), nil
}

func (s *GrpcServer) isLocalRequest(forwardedHost string) bool {
Expand Down
9 changes: 9 additions & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ var (
Buckets: prometheus.ExponentialBuckets(1, 2, 13),
})

tsoProxyForwardTimeoutCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "server",
Name: "tso_proxy_forward_timeout_total",
Help: "Counter of timeouts when tso proxy forwarding tso requests to tso service.",
})

tsoHandleDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "pd",
Expand Down Expand Up @@ -161,6 +169,7 @@ func init() {
prometheus.MustRegister(etcdStateGauge)
prometheus.MustRegister(tsoProxyHandleDuration)
prometheus.MustRegister(tsoProxyBatchSize)
prometheus.MustRegister(tsoProxyForwardTimeoutCounter)
prometheus.MustRegister(tsoHandleDuration)
prometheus.MustRegister(regionHeartbeatHandleDuration)
prometheus.MustRegister(storeHeartbeatHandleDuration)
Expand Down
11 changes: 11 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1903,6 +1903,17 @@ func (s *Server) IsLocalTSOEnabled() bool {
return s.cfg.IsLocalTSOEnabled()
}

// GetMaxConcurrentTSOProxyStreamings returns the max concurrent TSO proxy streamings.
// If the value is negative, there is no limit.
func (s *Server) GetMaxConcurrentTSOProxyStreamings() int {
return s.cfg.GetMaxConcurrentTSOProxyStreamings()
}

// GetTSOProxyClientRecvTimeout returns the TSO proxy client receive timeout.
func (s *Server) GetTSOProxyClientRecvTimeout() time.Duration {
return s.cfg.GetTSOProxyClientRecvTimeout()
}

// GetLeaderLease returns the leader lease.
func (s *Server) GetLeaderLease() int64 {
return s.cfg.GetLeaderLease()
Expand Down
Loading

0 comments on commit 170d287

Please sign in to comment.