Skip to content

Commit

Permalink
test: changes per review comments
Browse files Browse the repository at this point in the history
Fix and undo certain changes based on review comments etc.
  • Loading branch information
spzala committed Jun 4, 2019
1 parent adad957 commit dd5f5e3
Show file tree
Hide file tree
Showing 11 changed files with 36 additions and 31 deletions.
6 changes: 3 additions & 3 deletions clientv3/balancer/grpc1.7-health.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (b *GRPC17Health) Get(ctx context.Context, opts grpc.BalancerGetOptions) (g
addr = b.pinAddr
b.mu.RUnlock()
if closed {
return grpc.Address{Addr: ""}, nil, context.Canceled
return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
}
if addr == "" {
return grpc.Address{Addr: ""}, nil, ErrNoAddrAvailable
Expand All @@ -522,7 +522,7 @@ func (b *GRPC17Health) Get(ctx context.Context, opts grpc.BalancerGetOptions) (g
select {
case <-ch:
case <-b.donec:
return grpc.Address{Addr: ""}, nil, context.Canceled
return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
case <-ctx.Done():
return grpc.Address{Addr: ""}, nil, ctx.Err()
}
Expand All @@ -532,7 +532,7 @@ func (b *GRPC17Health) Get(ctx context.Context, opts grpc.BalancerGetOptions) (g
b.mu.RUnlock()
// Close() which sets b.closed = true can be called before Get(), Get() must exit if balancer is closed.
if closed {
return grpc.Address{Addr: ""}, nil, context.Canceled
return grpc.Address{Addr: ""}, nil, grpc.ErrClientConnClosing
}
if addr != "" {
break
Expand Down
4 changes: 1 addition & 3 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,9 +634,7 @@ func toErr(ctx context.Context, err error) error {
}
case codes.Unavailable:
case codes.FailedPrecondition:
if ctx.Err() != nil {
err = ctx.Err()
}
err = grpc.ErrClientConnClosing
}
}
return err
Expand Down
18 changes: 12 additions & 6 deletions clientv3/concurrency/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,19 @@ func TestResumeElection(t *testing.T) {
go func() {
o := e.Observe(ctx)
respChan <- nil
for resp := range o {
// Ignore any observations that candidate1 was elected
if string(resp.Kvs[0].Value) == "candidate1" {
continue
for {
select {
case resp, ok := <-o:
if !ok {
t.Fatal("Observe() channel closed prematurely")
}
// Ignore any observations that candidate1 was elected
if string(resp.Kvs[0].Value) == "candidate1" {
continue
}
respChan <- &resp
return
}
respChan <- &resp
return
}
}()

Expand Down
2 changes: 1 addition & 1 deletion clientv3/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
// // with etcd clientv3 <= v3.3
// if err == context.Canceled {
// // grpc balancer calls 'Get' with an inflight client.Close
// } else if err == codes.Canceled {
// } else if err == grpc.ErrClientConnClosing {
// // grpc balancer calls 'Get' after client.Close.
// }
// // with etcd clientv3 >= v3.4
Expand Down
10 changes: 5 additions & 5 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/testutil"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func TestKVPutError(t *testing.T) {
Expand Down Expand Up @@ -463,7 +463,7 @@ func TestKVGetErrConnClosed(t *testing.T) {
defer close(donec)
_, err := cli.Get(context.TODO(), "foo")
if !clientv3.IsConnCanceled(err) {
t.Errorf("expected %v or %v, got %v", context.Canceled, codes.Canceled, err)
t.Errorf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
}
}()

Expand All @@ -490,7 +490,7 @@ func TestKVNewAfterClose(t *testing.T) {
go func() {
_, err := cli.Get(context.TODO(), "foo")
if !clientv3.IsConnCanceled(err) {
t.Errorf("expected %v or %v, got %v", context.Canceled, codes.Canceled, err)
t.Errorf("expected %v or %v, got %v", context.Canceled, grpc.ErrClientConnClosing, err)
}
close(donec)
}()
Expand Down Expand Up @@ -923,7 +923,7 @@ func TestKVLargeRequests(t *testing.T) {
maxCallSendBytesClient: 10 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10 * 1024 * 1024,
expectError: status.Errorf(codes.ResourceExhausted, "trying to send message larger than max "),
expectError: grpc.Errorf(codes.ResourceExhausted, "trying to send message larger than max "),
},
{
maxRequestBytesServer: 10 * 1024 * 1024,
Expand All @@ -937,7 +937,7 @@ func TestKVLargeRequests(t *testing.T) {
maxCallSendBytesClient: 10 * 1024 * 1024,
maxCallRecvBytesClient: 0,
valueSize: 10*1024*1024 + 5,
expectError: status.Errorf(codes.ResourceExhausted, "trying to send message larger than max "),
expectError: grpc.Errorf(codes.ResourceExhausted, "trying to send message larger than max "),
},
}
for i, test := range tests {
Expand Down
10 changes: 5 additions & 5 deletions clientv3/integration/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"go.etcd.io/etcd/integration"
"go.etcd.io/etcd/pkg/testutil"

"google.golang.org/grpc/codes"
"google.golang.org/grpc"
)

func TestLeaseNotFoundError(t *testing.T) {
Expand Down Expand Up @@ -300,9 +300,9 @@ func TestLeaseGrantErrConnClosed(t *testing.T) {
defer close(donec)
_, err := cli.Grant(context.TODO(), 5)
if !clientv3.IsConnCanceled(err) {
// codes.Canceled if grpc-go balancer calls 'Get' after client.Close.
// grpc.ErrClientConnClosing if grpc-go balancer calls 'Get' after client.Close.
// context.Canceled if grpc-go balancer calls 'Get' with an inflight client.Close.
t.Errorf("expected %v, %v or server unavailable, got %v", err != context.Canceled, codes.Canceled, err)
t.Errorf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
}
}()

Expand Down Expand Up @@ -372,7 +372,7 @@ func TestLeaseGrantNewAfterClose(t *testing.T) {
go func() {
_, err := cli.Grant(context.TODO(), 5)
if !clientv3.IsConnCanceled(err) {
t.Errorf("expected %v, %v or server unavailable, got %v", err != context.Canceled, codes.Canceled, err)
t.Errorf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
}
close(donec)
}()
Expand Down Expand Up @@ -405,7 +405,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) {
go func() {
_, err := cli.Revoke(context.TODO(), leaseID)
if !clientv3.IsConnCanceled(err) {
t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, codes.Canceled, err)
t.Fatalf("expected %v, %v or server unavailable, got %v", err != context.Canceled, grpc.ErrClientConnClosing, err)
}
close(donec)
}()
Expand Down
2 changes: 1 addition & 1 deletion clientv3/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
// client-side streaming retry limit, only applied to requests where server responds with
// a error code clearly indicating it was unable to process the request such as codes.Unavailable.
// If set to 0, retry is disabled.
defaultStreamMaxRetries = uint(0) // max uint
defaultStreamMaxRetries = ^uint(0) // max uint

// client-side retry backoff wait between requests.
defaultBackoffWaitBetween = 25 * time.Millisecond
Expand Down
3 changes: 3 additions & 0 deletions functional/tester/stresser_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ func (s *keyStresser) isRetryableError(err error) bool {
case context.Canceled.Error():
// from stresser.Cancel method:
return false
case grpc.ErrClientConnClosing.Error():
// from stresser.Cancel method:
return false
}

if status.Convert(err).Code() == codes.Unavailable {
Expand Down
5 changes: 1 addition & 4 deletions proxy/grpcproxy/adapter/chan_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,7 @@ func (s *chanStream) RecvMsg(m interface{}) error {
select {
case msg, ok := <-s.recvc:
if !ok {
if s.ctx.Err() != nil {
err := s.ctx.Err()
return err
}
return grpc.ErrClientConnClosing
}
if err, ok := msg.(error); ok {
return err
Expand Down
2 changes: 1 addition & 1 deletion proxy/grpcproxy/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (lp *leaseProxy) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error
case <-lostLeaderC:
return rpctypes.ErrNoLeader
case <-lp.leader.disconnectNotify():
return context.Canceled
return grpc.ErrClientConnClosing
default:
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions proxy/grpcproxy/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

Expand Down Expand Up @@ -79,7 +80,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
wp.mu.Unlock()
select {
case <-wp.leader.disconnectNotify():
return context.Canceled
return grpc.ErrClientConnClosing
default:
return wp.ctx.Err()
}
Expand Down Expand Up @@ -152,7 +153,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
case <-lostLeaderC:
return rpctypes.ErrNoLeader
case <-wp.leader.disconnectNotify():
return context.Canceled
return grpc.ErrClientConnClosing
default:
return wps.ctx.Err()
}
Expand Down

0 comments on commit dd5f5e3

Please sign in to comment.