Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streams: Stop cleaning up after orphaned streams #1854

Merged
merged 3 commits into from
Feb 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/balancer"
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
Expand All @@ -40,17 +41,17 @@ import (
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/transport"
)

var (
// ErrClientConnClosing indicates that the operation is illegal because
// the ClientConn is closing.
ErrClientConnClosing = errors.New("grpc: the client connection is closing")
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
// DEPRECATED: Please use context.DeadlineExceeded instead.
ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
//
// Deprecated: this error should not be relied upon by users; use the status
// code of Canceled instead.
ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
Expand Down Expand Up @@ -1374,3 +1375,10 @@ func (ac *addrConn) getState() connectivity.State {
defer ac.mu.Unlock()
return ac.state
}

// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
//
// Deprecated: This error is never returned by grpc and should not be
// referenced by users.
var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
5 changes: 3 additions & 2 deletions go16.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) erro

// toRPCErr converts an error into an error from the status package.
func toRPCErr(err error) error {
if err == nil || err == io.EOF {
return err
}
if _, ok := status.FromError(err); ok {
return err
}
Expand All @@ -62,8 +65,6 @@ func toRPCErr(err error) error {
return status.Error(codes.DeadlineExceeded, err.Error())
case context.Canceled:
return status.Error(codes.Canceled, err.Error())
case ErrClientConnClosing:
return status.Error(codes.FailedPrecondition, err.Error())
}
}
return status.Error(codes.Unknown, err.Error())
Expand Down
5 changes: 3 additions & 2 deletions go17.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) erro

// toRPCErr converts an error into an error from the status package.
func toRPCErr(err error) error {
if err == nil || err == io.EOF {
return err
}
if _, ok := status.FromError(err); ok {
return err
}
Expand All @@ -63,8 +66,6 @@ func toRPCErr(err error) error {
return status.Error(codes.DeadlineExceeded, err.Error())
case context.Canceled, netctx.Canceled:
return status.Error(codes.Canceled, err.Error())
case ErrClientConnClosing:
return status.Error(codes.FailedPrecondition, err.Error())
}
}
return status.Error(codes.Unknown, err.Error())
Expand Down
35 changes: 7 additions & 28 deletions stats/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,10 @@ const (
)

type rpcConfig struct {
count int // Number of requests and responses for streaming RPCs.
success bool // Whether the RPC should succeed or return error.
failfast bool
callType rpcType // Type of RPC.
noLastRecv bool // Whether to call recv for io.EOF. When true, last recv won't be called. Only valid for streaming RPCs.
count int // Number of requests and responses for streaming RPCs.
success bool // Whether the RPC should succeed or return error.
failfast bool
callType rpcType // Type of RPC.
}

func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
Expand Down Expand Up @@ -313,14 +312,8 @@ func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest
if err = stream.CloseSend(); err != nil && err != io.EOF {
return reqs, resps, err
}
if !c.noLastRecv {
if _, err = stream.Recv(); err != io.EOF {
return reqs, resps, err
}
} else {
// In the case of not calling the last recv, sleep to avoid
// returning too fast to miss the remaining stats (InTrailer and End).
time.Sleep(time.Second)
if _, err = stream.Recv(); err != io.EOF {
return reqs, resps, err
}

return reqs, resps, nil
Expand Down Expand Up @@ -651,7 +644,7 @@ func checkEnd(t *testing.T, d *gotData, e *expectedData) {

actual, ok := status.FromError(st.Error)
if !ok {
t.Fatalf("expected st.Error to be a statusError, got %T", st.Error)
t.Fatalf("expected st.Error to be a statusError, got %v (type %T)", st.Error, st.Error)
}

expectedStatus, _ := status.FromError(e.err)
Expand Down Expand Up @@ -1222,20 +1215,6 @@ func TestClientStatsFullDuplexRPCError(t *testing.T) {
})
}

// If the user doesn't call the last recv() on clientStream.
func TestClientStatsFullDuplexRPCNotCallingLastRecv(t *testing.T) {
count := 1
testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, callType: fullDuplexStreamRPC, noLastRecv: true}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, count},
inHeader: {checkInHeader, 1},
inPayload: {checkInPayload, count},
inTrailer: {checkInTrailer, 1},
end: {checkEnd, 1},
})
}

func TestTags(t *testing.T) {
b := []byte{5, 2, 4, 3, 1}
ctx := stats.SetTags(context.Background(), b)
Expand Down
Loading