From 59b240ccfba480bf83723138b708ec206dfe519d Mon Sep 17 00:00:00 2001 From: Wu Tao Date: Mon, 2 Jul 2018 16:57:29 +0800 Subject: [PATCH] pegasus2/session: fix bug when receiving a stale response (#20) --- pegasus2/client.go | 3 +- pegasus2/session.go | 27 ++++++++++--- pegasus2/session_test.go | 83 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+), 7 deletions(-) diff --git a/pegasus2/client.go b/pegasus2/client.go index 5a4074cfc9..c3dc98e88a 100644 --- a/pegasus2/client.go +++ b/pegasus2/client.go @@ -26,7 +26,8 @@ func NewClient(cfg pegasus.Config) *Client { func (p *Client) OpenTable(ctx context.Context, tableName string) (pegasus.TableConnector, error) { tb, err := func() (pegasus.TableConnector, error) { - // each table instance holds set of replica sessions. + // Each table holds an independent set of replica sessions. + // The meta sessions are shared by tables created from the same client. replicaMgr := session.NewReplicaManager(newNodeSession) return pegasus.ConnectTable(ctx, tableName, p.metaMgr, replicaMgr) }() diff --git a/pegasus2/session.go b/pegasus2/session.go index 5e13cdbf68..b8efde9115 100644 --- a/pegasus2/session.go +++ b/pegasus2/session.go @@ -2,14 +2,15 @@ package pegasus2 import ( "context" + "errors" "fmt" "sync" + "time" "github.com/XiaoMi/pegasus-go-client/idl/base" "github.com/XiaoMi/pegasus-go-client/pegalog" "github.com/XiaoMi/pegasus-go-client/rpc" "github.com/XiaoMi/pegasus-go-client/session" - "time" ) type nodeSession struct { @@ -21,7 +22,8 @@ type nodeSession struct { codec rpc.Codec seqId int32 - mu sync.Mutex + // mutex lock exclusive for CallWithGpid + callMut sync.Mutex } func newNodeSession(addr string, ntype session.NodeType) session.NodeSession { @@ -45,8 +47,8 @@ func (n *nodeSession) String() string { func (n *nodeSession) CallWithGpid(ctx context.Context, gpid *base.Gpid, args session.RpcRequestArgs, name string) (session.RpcResponseResult, error) { // ensure CallWithGpid not being called concurrently - n.mu.Lock() - defer n.mu.Unlock() + n.callMut.Lock() + defer n.callMut.Unlock() if n.conn.GetState() != rpc.ConnStateReady { if err := n.conn.TryConnect(); err != nil { @@ -59,7 +61,8 @@ func (n *nodeSession) CallWithGpid(ctx context.Context, gpid *base.Gpid, args se timeout := deadline.Sub(time.Now()) if timeout > time.Duration(0) { n.conn.SetWriteTimeout(timeout) - n.conn.SetReadTimeout(timeout) + } else { + return nil, errors.New("send rpc timeout") } } @@ -74,7 +77,14 @@ func (n *nodeSession) CallWithGpid(ctx context.Context, gpid *base.Gpid, args se } } - { // read response + for { // read response + if hasDeadline { + timeout := deadline.Sub(time.Now()) + if timeout > time.Duration(0) { + n.conn.SetReadTimeout(timeout) + } + } + rcallRecv, err := session.ReadRpcResponse(n.conn, n.codec) if err != nil { return nil, err @@ -82,6 +92,11 @@ func (n *nodeSession) CallWithGpid(ctx context.Context, gpid *base.Gpid, args se if rcallRecv.Err != nil { return nil, rcallRecv.Err } + if n.seqId != rcallRecv.SeqId { + n.logger.Printf("ignore stale response (seqId: %d, current seqId: %d) from %s: %s", + rcallRecv.SeqId, n.seqId, n, rcallRecv.Name) + continue + } return rcallRecv.Result, nil } } diff --git a/pegasus2/session_test.go b/pegasus2/session_test.go index bf06c4f4e9..d8d9c2d60a 100644 --- a/pegasus2/session_test.go +++ b/pegasus2/session_test.go @@ -1 +1,84 @@ package pegasus2 + +import ( + "context" + "strings" + "testing" + "time" + + "encoding/binary" + "github.com/XiaoMi/pegasus-go-client/idl/base" + "github.com/XiaoMi/pegasus-go-client/idl/rrdb" + "github.com/XiaoMi/pegasus-go-client/pegasus" + "github.com/XiaoMi/pegasus-go-client/session" + "github.com/fortytw2/leaktest" + "github.com/stretchr/testify/assert" +) + +func TestNodeSession_SendTimeout(t *testing.T) { + defer leaktest.Check(t)() + + ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*1) + + client := NewClient(pegasus.Config{ + MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"}, + }) + tb, err := client.OpenTable(context.Background(), "temp") + assert.Nil(t, err) + defer tb.Close() + defer client.Close() + + // ensure deadline has expired. + time.Sleep(time.Second) + + // send must timeout + err = tb.Set(ctx, []byte("h1"), []byte("s1"), []byte("v1")) + assert.NotNil(t, err) + assert.True(t, strings.Contains(err.Error(), "send rpc timeout")) +} + +// test the case: request is seqId:3, but response is seqId:1 +func TestNodeSession_ReadStaleResponse(t *testing.T) { + defer leaktest.Check(t)() + + // start echo server first + n := newNodeSession("0.0.0.0:8800", session.NodeTypeMeta).(*nodeSession) + defer n.Close() + + var expected []byte + var actual []byte + + mockCodec := &session.MockCodec{} + mockCodec.MockMarshal(func(v interface{}) ([]byte, error) { + expected, _ = new(session.PegasusCodec).Marshal(v) + buf := make([]byte, len(expected)+4) + + // prefixed with length + binary.BigEndian.PutUint32(buf, uint32(len(buf))) + copy(buf[4:], expected) + + return buf, nil + }) + + mockCodec.MockUnMarshal(func(data []byte, v interface{}) error { + actual = data + r, _ := v.(*session.PegasusRpcCall) + r.SeqId = 1 + r.Name = "RPC_RRDB_RRDB_GET_ACK" + r.Result = &rrdb.RrdbGetResult{ + Success: rrdb.NewReadResponse(), + } + return nil + }) + + n.codec = mockCodec + + ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*1000) + + n.seqId = 2 + args := &rrdb.RrdbGetArgs{Key: &base.Blob{Data: []byte("a")}} + _, err := n.CallWithGpid(ctx, &base.Gpid{}, args, "RPC_RRDB_RRDB_GET") + + // ensure rpc timeout due to stale response + assert.NotNil(t, err) +}