Skip to content

Commit

Permalink
Set peer before sending request (#1423)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack0 authored and menghanl committed Aug 14, 2017
1 parent db24830 commit e84570c
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
6 changes: 3 additions & 3 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
dopts.copts.StatsHandler.HandleRPC(ctx, inPayload)
}
c.trailerMD = stream.Trailer()
if peer, ok := peer.FromContext(stream.Context()); ok {
c.peer = peer
}
return nil
}

Expand Down Expand Up @@ -262,6 +259,9 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
return toRPCErr(err)
}
if peer, ok := peer.FromContext(stream.Context()); ok {
c.peer = peer
}
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts)
if err != nil {
if put != nil {
Expand Down
56 changes: 56 additions & 0 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2264,6 +2264,62 @@ func testPeerNegative(t *testing.T, e env) {
tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer))
}

func TestPeerFailedRPC(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
testPeerFailedRPC(t, e)
}
}

func testPeerFailedRPC(t *testing.T, e env) {
te := newTest(t, e)
te.maxServerReceiveMsgSize = newInt(1 * 1024)
te.startServer(&testServer{security: e.security})

defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())

// first make a successful request to the server
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
}

// make a second request that will be rejected by the server
const largeSize = 5 * 1024
largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
if err != nil {
t.Fatal(err)
}
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
Payload: largePayload,
}

peer := new(peer.Peer)
if _, err := tc.UnaryCall(context.Background(), req, grpc.Peer(peer)); err == nil || grpc.Code(err) != codes.ResourceExhausted {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
} else {
pa := peer.Addr.String()
if e.network == "unix" {
if pa != te.srvAddr {
t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
}
return
}
_, pp, err := net.SplitHostPort(pa)
if err != nil {
t.Fatalf("Failed to parse address from peer.")
}
_, sp, err := net.SplitHostPort(te.srvAddr)
if err != nil {
t.Fatalf("Failed to parse address of test server.")
}
if pp != sp {
t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
}
}
}

func TestMetadataUnaryRPC(t *testing.T) {
defer leakCheck(t)()
for _, e := range listTestEnv() {
Expand Down

0 comments on commit e84570c

Please sign in to comment.