Skip to content

Commit

Permalink
pegasus2/session: fix bug when receiving a stale response (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored Jul 2, 2018
1 parent 2797a37 commit 59b240c
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 7 deletions.
3 changes: 2 additions & 1 deletion pegasus2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func NewClient(cfg pegasus.Config) *Client {

func (p *Client) OpenTable(ctx context.Context, tableName string) (pegasus.TableConnector, error) {
tb, err := func() (pegasus.TableConnector, error) {
// each table instance holds set of replica sessions.
// Each table holds an independent set of replica sessions.
// The meta sessions are shared by tables created from the same client.
replicaMgr := session.NewReplicaManager(newNodeSession)
return pegasus.ConnectTable(ctx, tableName, p.metaMgr, replicaMgr)
}()
Expand Down
27 changes: 21 additions & 6 deletions pegasus2/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package pegasus2

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/XiaoMi/pegasus-go-client/idl/base"
"github.com/XiaoMi/pegasus-go-client/pegalog"
"github.com/XiaoMi/pegasus-go-client/rpc"
"github.com/XiaoMi/pegasus-go-client/session"
"time"
)

type nodeSession struct {
Expand All @@ -21,7 +22,8 @@ type nodeSession struct {
codec rpc.Codec
seqId int32

mu sync.Mutex
// mutex lock exclusive for CallWithGpid
callMut sync.Mutex
}

func newNodeSession(addr string, ntype session.NodeType) session.NodeSession {
Expand All @@ -45,8 +47,8 @@ func (n *nodeSession) String() string {

func (n *nodeSession) CallWithGpid(ctx context.Context, gpid *base.Gpid, args session.RpcRequestArgs, name string) (session.RpcResponseResult, error) {
// ensure CallWithGpid not being called concurrently
n.mu.Lock()
defer n.mu.Unlock()
n.callMut.Lock()
defer n.callMut.Unlock()

if n.conn.GetState() != rpc.ConnStateReady {
if err := n.conn.TryConnect(); err != nil {
Expand All @@ -59,7 +61,8 @@ func (n *nodeSession) CallWithGpid(ctx context.Context, gpid *base.Gpid, args se
timeout := deadline.Sub(time.Now())
if timeout > time.Duration(0) {
n.conn.SetWriteTimeout(timeout)
n.conn.SetReadTimeout(timeout)
} else {
return nil, errors.New("send rpc timeout")
}
}

Expand All @@ -74,14 +77,26 @@ func (n *nodeSession) CallWithGpid(ctx context.Context, gpid *base.Gpid, args se
}
}

{ // read response
for { // read response
if hasDeadline {
timeout := deadline.Sub(time.Now())
if timeout > time.Duration(0) {
n.conn.SetReadTimeout(timeout)
}
}

rcallRecv, err := session.ReadRpcResponse(n.conn, n.codec)
if err != nil {
return nil, err
}
if rcallRecv.Err != nil {
return nil, rcallRecv.Err
}
if n.seqId != rcallRecv.SeqId {
n.logger.Printf("ignore stale response (seqId: %d, current seqId: %d) from %s: %s",
rcallRecv.SeqId, n.seqId, n, rcallRecv.Name)
continue
}
return rcallRecv.Result, nil
}
}
Expand Down
83 changes: 83 additions & 0 deletions pegasus2/session_test.go
Original file line number Diff line number Diff line change
@@ -1 +1,84 @@
package pegasus2

import (
"context"
"strings"
"testing"
"time"

"encoding/binary"
"github.com/XiaoMi/pegasus-go-client/idl/base"
"github.com/XiaoMi/pegasus-go-client/idl/rrdb"
"github.com/XiaoMi/pegasus-go-client/pegasus"
"github.com/XiaoMi/pegasus-go-client/session"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
)

func TestNodeSession_SendTimeout(t *testing.T) {
defer leaktest.Check(t)()

ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*1)

client := NewClient(pegasus.Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", "0.0.0.0:34603"},
})
tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
defer client.Close()

// ensure deadline has expired.
time.Sleep(time.Second)

// send must timeout
err = tb.Set(ctx, []byte("h1"), []byte("s1"), []byte("v1"))
assert.NotNil(t, err)
assert.True(t, strings.Contains(err.Error(), "send rpc timeout"))
}

// test the case: request is seqId:3, but response is seqId:1
func TestNodeSession_ReadStaleResponse(t *testing.T) {
defer leaktest.Check(t)()

// start echo server first
n := newNodeSession("0.0.0.0:8800", session.NodeTypeMeta).(*nodeSession)
defer n.Close()

var expected []byte
var actual []byte

mockCodec := &session.MockCodec{}
mockCodec.MockMarshal(func(v interface{}) ([]byte, error) {
expected, _ = new(session.PegasusCodec).Marshal(v)
buf := make([]byte, len(expected)+4)

// prefixed with length
binary.BigEndian.PutUint32(buf, uint32(len(buf)))
copy(buf[4:], expected)

return buf, nil
})

mockCodec.MockUnMarshal(func(data []byte, v interface{}) error {
actual = data
r, _ := v.(*session.PegasusRpcCall)
r.SeqId = 1
r.Name = "RPC_RRDB_RRDB_GET_ACK"
r.Result = &rrdb.RrdbGetResult{
Success: rrdb.NewReadResponse(),
}
return nil
})

n.codec = mockCodec

ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*1000)

n.seqId = 2
args := &rrdb.RrdbGetArgs{Key: &base.Blob{Data: []byte("a")}}
_, err := n.CallWithGpid(ctx, &base.Gpid{}, args, "RPC_RRDB_RRDB_GET")

// ensure rpc timeout due to stale response
assert.NotNil(t, err)
}

0 comments on commit 59b240c

Please sign in to comment.