Skip to content

Commit

Permalink
Fault inject in TSO Proxy
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <[email protected]>
  • Loading branch information
binshi-bing committed Jun 13, 2023
1 parent 170d287 commit 0334fa5
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 21 deletions.
14 changes: 7 additions & 7 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ type Config struct {
// Set this to 0 will disable TSO Proxy.
// Set this to the negative value to disable the limit.
MaxConcurrentTSOProxyStreamings int `toml:"max-concurrent-tso-proxy-streamings" json:"max-concurrent-tso-proxy-streamings"`
// TSOProxyClientRecvTimeout is the timeout for the TSO proxy to receive a tso request from a client via grpc TSO stream.
// TSOProxyRecvFromClientTimeout is the timeout for the TSO proxy to receive a tso request from a client via grpc TSO stream.
// After the timeout, the TSO proxy will close the grpc TSO stream.
TSOProxyClientRecvTimeout typeutil.Duration `toml:"tso-proxy-client-recv-timeout" json:"tso-proxy-client-recv-timeout"`
TSOProxyRecvFromClientTimeout typeutil.Duration `toml:"tso-proxy-recv-from-client-timeout" json:"tso-proxy-recv-from-client-timeout"`

// TSOSaveInterval is the interval to save timestamp.
TSOSaveInterval typeutil.Duration `toml:"tso-save-interval" json:"tso-save-interval"`
Expand Down Expand Up @@ -229,7 +229,7 @@ const (
defaultDRWaitStoreTimeout = time.Minute

defaultMaxConcurrentTSOProxyStreamings = 5000
defaultTSOProxyClientRecvTimeout = 1 * time.Hour
defaultTSOProxyRecvFromClientTimeout = 1 * time.Hour

defaultTSOSaveInterval = time.Duration(defaultLeaderLease) * time.Second
// defaultTSOUpdatePhysicalInterval is the default value of the config `TSOUpdatePhysicalInterval`.
Expand Down Expand Up @@ -455,7 +455,7 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
}

configutil.AdjustInt(&c.MaxConcurrentTSOProxyStreamings, defaultMaxConcurrentTSOProxyStreamings)
configutil.AdjustDuration(&c.TSOProxyClientRecvTimeout, defaultTSOProxyClientRecvTimeout)
configutil.AdjustDuration(&c.TSOProxyRecvFromClientTimeout, defaultTSOProxyRecvFromClientTimeout)

configutil.AdjustInt64(&c.LeaderLease, defaultLeaderLease)
configutil.AdjustDuration(&c.TSOSaveInterval, defaultTSOSaveInterval)
Expand Down Expand Up @@ -1271,9 +1271,9 @@ func (c *Config) GetMaxConcurrentTSOProxyStreamings() int {
return c.MaxConcurrentTSOProxyStreamings
}

// GetTSOProxyClientRecvTimeout returns the TSO proxy client receive timeout.
func (c *Config) GetTSOProxyClientRecvTimeout() time.Duration {
return c.TSOProxyClientRecvTimeout.Duration
// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client.
func (c *Config) GetTSOProxyRecvFromClientTimeout() time.Duration {
return c.TSOProxyRecvFromClientTimeout.Duration
}

// GetTSOUpdatePhysicalInterval returns TSO update physical interval.
Expand Down
34 changes: 23 additions & 11 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ var (
ErrNotFoundTSOAddr = status.Errorf(codes.NotFound, "not found tso address")
ErrForwardTSOTimeout = status.Errorf(codes.DeadlineExceeded, "forward tso request timeout")
ErrMaxCountTSOProxyRoutinesExceeded = status.Errorf(codes.ResourceExhausted, "max count of concurrent tso proxy routines exceeded")
ErrTSOProxyClientRecvTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy client recv timeout. stream closed by server")
ErrTSOProxyRecvFromClientTimeout = status.Errorf(codes.DeadlineExceeded, "tso proxy timeout when receiving from client. stream closed by server")
)

// GrpcServer wraps Server to provide grpc service.
Expand Down Expand Up @@ -435,7 +435,7 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error {
default:
}

request, err := server.Recv(s.GetTSOProxyClientRecvTimeout())
request, err := server.Recv(s.GetTSOProxyRecvFromClientTimeout())
if err == io.EOF {
return nil
}
Expand Down Expand Up @@ -550,6 +550,11 @@ func (s *GrpcServer) forwardTSORequestAsync(
DcLocation: request.GetDcLocation(),
}

failpoint.Inject("tsoProxySendToTSOTimeout", func() {
<-ctxTimeout.Done()
failpoint.Return()
})

if err := forwardStream.Send(tsopbReq); err != nil {
select {
case <-ctxTimeout.Done():
Expand All @@ -565,23 +570,21 @@ func (s *GrpcServer) forwardTSORequestAsync(
default:
}

failpoint.Inject("tsoProxyRecvFromTSOTimeout", func() {
<-ctxTimeout.Done()
failpoint.Return()
})

response, err := forwardStream.Recv()
if err != nil {
if strings.Contains(err.Error(), errs.NotLeaderErr) {
s.tsoPrimaryWatcher.ForceLoad()
}
select {
case <-ctxTimeout.Done():
return
case tsoRespCh <- &tsopbTSOResponse{err: err}:
}
return
}

select {
case <-ctxTimeout.Done():
return
case tsoRespCh <- &tsopbTSOResponse{response: response}:
case tsoRespCh <- &tsopbTSOResponse{response: response, err: err}:
}
}

Expand Down Expand Up @@ -609,6 +612,10 @@ func (s *tsoServer) Send(m *pdpb.TsoResponse) error {
done := make(chan error, 1)
go func() {
defer logutil.LogPanic()
failpoint.Inject("tsoProxyFailToSendToClient", func() {
done <- errors.New("injected error")
failpoint.Return()
})
done <- s.stream.Send(m)
}()
select {
Expand All @@ -627,6 +634,11 @@ func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) {
if atomic.LoadInt32(&s.closed) == 1 {
return nil, io.EOF
}
failpoint.Inject("tsoProxyRecvFromClientTimeout", func(val failpoint.Value) {
if customTimeoutInSeconds, ok := val.(int); ok {
timeout = time.Duration(customTimeoutInSeconds) * time.Second
}
})
requestCh := make(chan *pdpbTSORequest, 1)
go func() {
defer logutil.LogPanic()
Expand All @@ -642,7 +654,7 @@ func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) {
return req.request, nil
case <-time.After(timeout):
atomic.StoreInt32(&s.closed, 1)
return nil, ErrTSOProxyClientRecvTimeout
return nil, ErrTSOProxyRecvFromClientTimeout
}
}

Expand Down
6 changes: 3 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1909,9 +1909,9 @@ func (s *Server) GetMaxConcurrentTSOProxyStreamings() int {
return s.cfg.GetMaxConcurrentTSOProxyStreamings()
}

// GetTSOProxyClientRecvTimeout returns the TSO proxy client receive timeout.
func (s *Server) GetTSOProxyClientRecvTimeout() time.Duration {
return s.cfg.GetTSOProxyClientRecvTimeout()
// GetTSOProxyRecvFromClientTimeout returns timeout value for TSO proxy receiving from the client.
func (s *Server) GetTSOProxyRecvFromClientTimeout() time.Duration {
return s.cfg.GetTSOProxyRecvFromClientTimeout()
}

// GetLeaderLease returns the leader lease.
Expand Down
74 changes: 74 additions & 0 deletions tests/integrations/mcs/tso/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -194,6 +195,79 @@ func (s *tsoProxyTestSuite) TestTSOProxyClientsWithSameContext() {
s.cleanupGRPCStreams(cleanupFuncs)
}

// TestTSOProxyRecvFromClientTimeout tests the TSO Proxy can properly close the grpc stream on the server side
// when the client does not send any request to the server for a long time.
func (s *tsoProxyTestSuite) TestTSOProxyRecvFromClientTimeout() {
re := s.Require()

// Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second.
re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxyRecvFromClientTimeout", `return(1)`))
streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1)
// Sleep 2 seconds to make the TSO Proxy's grpc stream timeout on the server side.
time.Sleep(2 * time.Second)
err := streams[0].Send(s.defaultReq)
re.Error(err)
s.cleanupGRPCStreams(cleanupFuncs)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxyRecvFromClientTimeout"))

// Verify the streams with no fault injection can work correctly.
s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true)
}

// TestTSOProxyFailToSendToClient tests the TSO Proxy can properly close the grpc stream on the server side
// when it fails to send the response to the client.
func (s *tsoProxyTestSuite) TestTSOProxyFailToSendToClient() {
re := s.Require()

// Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second.
re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxyFailToSendToClient", `return(true)`))
streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1)
err := streams[0].Send(s.defaultReq)
re.NoError(err)
_, err = streams[0].Recv()
re.Error(err)
s.cleanupGRPCStreams(cleanupFuncs)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxyFailToSendToClient"))

s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true)
}

// TestTSOProxySendToTSOTimeout tests the TSO Proxy can properly close the grpc stream on the server side
// when it sends the request to the TSO service and encounters timeout.
func (s *tsoProxyTestSuite) TestTSOProxySendToTSOTimeout() {
re := s.Require()

// Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second.
re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxySendToTSOTimeout", `return(true)`))
streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1)
err := streams[0].Send(s.defaultReq)
re.NoError(err)
_, err = streams[0].Recv()
re.Error(err)
s.cleanupGRPCStreams(cleanupFuncs)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxySendToTSOTimeout"))

s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true)
}

// TestTSOProxyRecvFromTSOTimeout tests the TSO Proxy can properly close the grpc stream on the server side
// when it receives the response from the TSO service and encounters timeout.
func (s *tsoProxyTestSuite) TestTSOProxyRecvFromTSOTimeout() {
re := s.Require()

// Enable the failpoint to make the TSO Proxy's grpc stream timeout on the server side to be 1 second.
re.NoError(failpoint.Enable("github.com/tikv/pd/server/tsoProxyRecvFromTSOTimeout", `return(true)`))
streams, cleanupFuncs := createTSOStreams(re, s.ctx, s.backendEndpoints, 1)
err := streams[0].Send(s.defaultReq)
re.NoError(err)
_, err = streams[0].Recv()
re.Error(err)
s.cleanupGRPCStreams(cleanupFuncs)
re.NoError(failpoint.Disable("github.com/tikv/pd/server/tsoProxyRecvFromTSOTimeout"))

s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 1, true)
}

func (s *tsoProxyTestSuite) cleanupGRPCStreams(cleanupFuncs []testutil.CleanupFunc) {
for i := 0; i < len(cleanupFuncs); i++ {
if cleanupFuncs[i] != nil {
Expand Down

0 comments on commit 0334fa5

Please sign in to comment.