Skip to content

Commit

Permalink
streams: Stop cleaning up after orphaned streams (#1854)
Browse files Browse the repository at this point in the history
This change introduces some behavior changes that should not impact users that
are following the proper stream protocol. Specifically, one of the following
conditions must be satisfied:

1. The user calls Close on the ClientConn.
2. The user cancels the context provided to NewClientStream, or its deadline
    expires. (Note that it if the context is no longer needed before the deadline
    expires, it is still recommended to call cancel to prevent bloat.) It is always
    recommended to cancel contexts when they are no longer needed, and to
    never use the background context directly, so all users should always be
    doing this.
3. The user calls RecvMsg (or Recv in generated code) until a non-nil error is
    returned.
4. The user receives any error from Header or SendMsg (or Send in generated
    code) besides io.EOF.  If none of the above happen, this will leak a goroutine
    and a context, and grpc will not call the optionally-configured stats handler
    with a stats.End message.

Before this change, if a user created a stream and the server ended the stream,
the stats handler would be invoked with a stats.End containing the final status
of the stream. Subsequent calls to RecvMsg would then trigger the stats handler
with InPayloads, which may be unexpected by stats handlers.
  • Loading branch information
dfawley authored Feb 8, 2018
1 parent 7646b53 commit 365770f
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 178 deletions.
18 changes: 13 additions & 5 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/balancer"
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
Expand All @@ -40,17 +41,17 @@ import (
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)

var (
// ErrClientConnClosing indicates that the operation is illegal because
// the ClientConn is closing.
ErrClientConnClosing = errors.New("grpc: the client connection is closing")
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
// DEPRECATED: Please use context.DeadlineExceeded instead.
ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
//
// Deprecated: this error should not be relied upon by users; use the status
// code of Canceled instead.
ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
Expand Down Expand Up @@ -1374,3 +1375,10 @@ func (ac *addrConn) getState() connectivity.State {
defer ac.mu.Unlock()
return ac.state
}

// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
//
// Deprecated: This error is never returned by grpc and should not be
// referenced by users.
var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
5 changes: 3 additions & 2 deletions go16.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) erro

// toRPCErr converts an error into an error from the status package.
func toRPCErr(err error) error {
if err == nil || err == io.EOF {
return err
}
if _, ok := status.FromError(err); ok {
return err
}
Expand All @@ -62,8 +65,6 @@ func toRPCErr(err error) error {
return status.Error(codes.DeadlineExceeded, err.Error())
case context.Canceled:
return status.Error(codes.Canceled, err.Error())
case ErrClientConnClosing:
return status.Error(codes.FailedPrecondition, err.Error())
}
}
return status.Error(codes.Unknown, err.Error())
Expand Down
5 changes: 3 additions & 2 deletions go17.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) erro

// toRPCErr converts an error into an error from the status package.
func toRPCErr(err error) error {
if err == nil || err == io.EOF {
return err
}
if _, ok := status.FromError(err); ok {
return err
}
Expand All @@ -63,8 +66,6 @@ func toRPCErr(err error) error {
return status.Error(codes.DeadlineExceeded, err.Error())
case context.Canceled, netctx.Canceled:
return status.Error(codes.Canceled, err.Error())
case ErrClientConnClosing:
return status.Error(codes.FailedPrecondition, err.Error())
}
}
return status.Error(codes.Unknown, err.Error())
Expand Down
35 changes: 7 additions & 28 deletions stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,10 @@ const (
)

type rpcConfig struct {
count int // Number of requests and responses for streaming RPCs.
success bool // Whether the RPC should succeed or return error.
failfast bool
callType rpcType // Type of RPC.
noLastRecv bool // Whether to call recv for io.EOF. When true, last recv won't be called. Only valid for streaming RPCs.
count int // Number of requests and responses for streaming RPCs.
success bool // Whether the RPC should succeed or return error.
failfast bool
callType rpcType // Type of RPC.
}

func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
Expand Down Expand Up @@ -313,14 +312,8 @@ func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest
if err = stream.CloseSend(); err != nil && err != io.EOF {
return reqs, resps, err
}
if !c.noLastRecv {
if _, err = stream.Recv(); err != io.EOF {
return reqs, resps, err
}
} else {
// In the case of not calling the last recv, sleep to avoid
// returning too fast to miss the remaining stats (InTrailer and End).
time.Sleep(time.Second)
if _, err = stream.Recv(); err != io.EOF {
return reqs, resps, err
}

return reqs, resps, nil
Expand Down Expand Up @@ -651,7 +644,7 @@ func checkEnd(t *testing.T, d *gotData, e *expectedData) {

actual, ok := status.FromError(st.Error)
if !ok {
t.Fatalf("expected st.Error to be a statusError, got %T", st.Error)
t.Fatalf("expected st.Error to be a statusError, got %v (type %T)", st.Error, st.Error)
}

expectedStatus, _ := status.FromError(e.err)
Expand Down Expand Up @@ -1222,20 +1215,6 @@ func TestClientStatsFullDuplexRPCError(t *testing.T) {
})
}

// If the user doesn't call the last recv() on clientStream.
func TestClientStatsFullDuplexRPCNotCallingLastRecv(t *testing.T) {
count := 1
testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC, noLastRecv: true}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, count},
inHeader: {checkInHeader, 1},
inPayload: {checkInPayload, count},
inTrailer: {checkInTrailer, 1},
end: {checkEnd, 1},
})
}

func TestTags(t *testing.T) {
b := []byte{5, 2, 4, 3, 1}
ctx := stats.SetTags(context.Background(), b)
Expand Down
Loading

0 comments on commit 365770f

Please sign in to comment.