diff --git a/clientv3/balancer/grpc1.7-health.go b/clientv3/balancer/grpc1.7-health.go index a5a1d5fbcafa..2153767354de 100644 --- a/clientv3/balancer/grpc1.7-health.go +++ b/clientv3/balancer/grpc1.7-health.go @@ -507,7 +507,7 @@ func (b *GRPC17Health) Get(ctx context.Context, opts grpc.BalancerGetOptions) (g addr = b.pinAddr b.mu.RUnlock() if closed { - return grpc.Address{Addr: ""}, nil, context.Canceled + return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing } if addr == "" { return grpc.Address{Addr: ""}, nil, ErrNoAddrAvailable @@ -522,7 +522,7 @@ func (b *GRPC17Health) Get(ctx context.Context, opts grpc.BalancerGetOptions) (g select { case <-ch: case <-b.donec: - return grpc.Address{Addr: ""}, nil, context.Canceled + return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing case <-ctx.Done(): return grpc.Address{Addr: ""}, nil, ctx.Err() } @@ -532,7 +532,7 @@ func (b *GRPC17Health) Get(ctx context.Context, opts grpc.BalancerGetOptions) (g b.mu.RUnlock() // Close() which sets b.closed = true can be called before Get(), Get() must exit if balancer is closed. if closed { - return grpc.Address{Addr: ""}, nil, context.Canceled + return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing } if addr != "" { break diff --git a/clientv3/client.go b/clientv3/client.go index 5de33925b07d..b91cbf958ef8 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -634,9 +634,7 @@ func toErr(ctx context.Context, err error) error { } case codes.Unavailable: case codes.FailedPrecondition: - if ctx.Err() != nil { - err = ctx.Err() - } + err = grpc.ErrClientConnClosing } } return err diff --git a/clientv3/concurrency/election_test.go b/clientv3/concurrency/election_test.go index 4a187c823ed5..0e45d91ed6af 100644 --- a/clientv3/concurrency/election_test.go +++ b/clientv3/concurrency/election_test.go @@ -67,13 +67,19 @@ func TestResumeElection(t *testing.T) { go func() { o := e.Observe(ctx) respChan <- nil - for resp := range o { - // Ignore any observations that candidate1 was elected - if string(resp.Kvs[0].Value) == "candidate1" { - continue + for { + select { + case resp, ok := <-o: + if !ok { + t.Fatal("Observe() channel closed prematurely") + } + // Ignore any observations that candidate1 was elected + if string(resp.Kvs[0].Value) == "candidate1" { + continue + } + respChan <- &resp + return } - respChan <- &resp - return } }() diff --git a/clientv3/doc.go b/clientv3/doc.go index 3a547195476a..01a3f5961a7e 100644 --- a/clientv3/doc.go +++ b/clientv3/doc.go @@ -90,7 +90,7 @@ // // with etcd clientv3 <= v3.3 // if err == context.Canceled { // // grpc balancer calls 'Get' with an inflight client.Close -// } else if err == codes.Canceled { +// } else if err == grpc.ErrClientConnClosing { // // grpc balancer calls 'Get' after client.Close. // } // // with etcd clientv3 >= v3.4 diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 7e0cb09701b7..b31fd0920bf0 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -29,8 +29,8 @@ import ( "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/testutil" + "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) func TestKVPutError(t *testing.T) { @@ -463,7 +463,7 @@ func TestKVGetErrConnClosed(t *testing.T) { defer close(donec) _, err := cli.Get(context.TODO(), "foo") if !clientv3.IsConnCanceled(err) { - t.Errorf("expected %v or %v, got %v", context.Canceled, codes.Canceled, err) + t.Errorf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err) } }() @@ -490,7 +490,7 @@ func TestKVNewAfterClose(t *testing.T) { go func() { _, err := cli.Get(context.TODO(), "foo") if !clientv3.IsConnCanceled(err) { - t.Errorf("expected %v or %v, got %v", context.Canceled, codes.Canceled, err) + t.Errorf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err) } close(donec) }() @@ -923,7 +923,7 @@ func TestKVLargeRequests(t *testing.T) { maxCallSendBytesClient: 10 * 1024 * 1024, maxCallRecvBytesClient: 0, valueSize: 10 * 1024 * 1024, - expectError: status.Errorf(codes.ResourceExhausted, "trying to send message larger than max "), + expectError: grpc.Errorf(codes.ResourceExhausted, "trying to send message larger than max "), }, { maxRequestBytesServer: 10 * 1024 * 1024, @@ -937,7 +937,7 @@ func TestKVLargeRequests(t *testing.T) { maxCallSendBytesClient: 10 * 1024 * 1024, maxCallRecvBytesClient: 0, valueSize: 10*1024*1024 + 5, - expectError: status.Errorf(codes.ResourceExhausted, "trying to send message larger than max "), + expectError: grpc.Errorf(codes.ResourceExhausted, "trying to send message larger than max "), }, } for i, test := range tests { diff --git a/clientv3/integration/lease_test.go b/clientv3/integration/lease_test.go index 86aff8582429..111739d396fc 100644 --- a/clientv3/integration/lease_test.go +++ b/clientv3/integration/lease_test.go @@ -28,7 +28,7 @@ import ( "go.etcd.io/etcd/integration" "go.etcd.io/etcd/pkg/testutil" - "google.golang.org/grpc/codes" + "google.golang.org/grpc" ) func TestLeaseNotFoundError(t *testing.T) { @@ -300,9 +300,9 @@ func TestLeaseGrantErrConnClosed(t *testing.T) { defer close(donec) _, err := cli.Grant(context.TODO(), 5) if !clientv3.IsConnCanceled(err) { - // codes.Canceled if grpc-go balancer calls 'Get' after client.Close. + // grpc.ErrClientConnClosing if grpc-go balancer calls 'Get' after client.Close. // context.Canceled if grpc-go balancer calls 'Get' with an inflight client.Close. - t.Errorf("expected %v, %v or server unavailable, got %v", err != context.Canceled, codes.Canceled, err) + t.Errorf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err) } }() @@ -372,7 +372,7 @@ func TestLeaseGrantNewAfterClose(t *testing.T) { go func() { _, err := cli.Grant(context.TODO(), 5) if !clientv3.IsConnCanceled(err) { - t.Errorf("expected %v, %v or server unavailable, got %v", err != context.Canceled, codes.Canceled, err) + t.Errorf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err) } close(donec) }() @@ -405,7 +405,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) { go func() { _, err := cli.Revoke(context.TODO(), leaseID) if !clientv3.IsConnCanceled(err) { - t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, codes.Canceled, err) + t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err) } close(donec) }() diff --git a/clientv3/options.go b/clientv3/options.go index aceff31971ba..700714c08637 100644 --- a/clientv3/options.go +++ b/clientv3/options.go @@ -47,7 +47,7 @@ var ( // client-side streaming retry limit, only applied to requests where server responds with // a error code clearly indicating it was unable to process the request such as codes.Unavailable. // If set to 0, retry is disabled. - defaultStreamMaxRetries = uint(0) // max uint + defaultStreamMaxRetries = ^uint(0) // max uint // client-side retry backoff wait between requests. defaultBackoffWaitBetween = 25 * time.Millisecond diff --git a/functional/tester/stresser_key.go b/functional/tester/stresser_key.go index c31b5b211756..7feec4637324 100644 --- a/functional/tester/stresser_key.go +++ b/functional/tester/stresser_key.go @@ -176,6 +176,9 @@ func (s *keyStresser) isRetryableError(err error) bool { case context.Canceled.Error(): // from stresser.Cancel method: return false + case grpc.ErrClientConnClosing.Error(): + // from stresser.Cancel method: + return false } if status.Convert(err).Code() == codes.Unavailable { diff --git a/proxy/grpcproxy/adapter/chan_stream.go b/proxy/grpcproxy/adapter/chan_stream.go index 0be39765e7b6..82e34119311f 100644 --- a/proxy/grpcproxy/adapter/chan_stream.go +++ b/proxy/grpcproxy/adapter/chan_stream.go @@ -120,10 +120,7 @@ func (s *chanStream) RecvMsg(m interface{}) error { select { case msg, ok := <-s.recvc: if !ok { - if s.ctx.Err() != nil { - err := s.ctx.Err() - return err - } + return grpc.ErrClientConnClosing } if err, ok := msg.(error); ok { return err diff --git a/proxy/grpcproxy/lease.go b/proxy/grpcproxy/lease.go index 1110a946a4da..a688d429a204 100644 --- a/proxy/grpcproxy/lease.go +++ b/proxy/grpcproxy/lease.go @@ -214,7 +214,7 @@ func (lp *leaseProxy) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error case <-lostLeaderC: return rpctypes.ErrNoLeader case <-lp.leader.disconnectNotify(): - return context.Canceled + return grpc.ErrClientConnClosing default: if err != nil { return err diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 733f6ad81b17..639bf8e2d60d 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -23,6 +23,7 @@ import ( "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" + "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -79,7 +80,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { wp.mu.Unlock() select { case <-wp.leader.disconnectNotify(): - return context.Canceled + return grpc.ErrClientConnClosing default: return wp.ctx.Err() } @@ -152,7 +153,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { case <-lostLeaderC: return rpctypes.ErrNoLeader case <-wp.leader.disconnectNotify(): - return context.Canceled + return grpc.ErrClientConnClosing default: return wps.ctx.Err() }