Skip to content

Commit

Permalink
grpc
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Gahlot <[email protected]>
  • Loading branch information
gauravgahlot committed Oct 19, 2020
1 parent 44adc93 commit 385152a
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 38 deletions.
8 changes: 4 additions & 4 deletions balancer_switching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func checkPickFirst(cc *ClientConn, servers []*server) error {
)
connected := false
for i := 0; i < 5000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port {
if err = cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); errorDesc(err) == servers[0].port {
if connected {
// connected is set to false if peer is not server[0]. So if
// connected is true here, this is the second time we saw
Expand All @@ -102,7 +102,7 @@ func checkPickFirst(cc *ClientConn, servers []*server) error {
}
// The following RPCs should all succeed with the first server.
for i := 0; i < 3; i++ {
err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
err = cc.Invoke(context.TODO(), "/foo/bar", &req, &reply)
if errorDesc(err) != servers[0].port {
return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[0].port, err)
}
Expand All @@ -124,7 +124,7 @@ func checkRoundRobin(cc *ClientConn, servers []*server) error {
for _, s := range servers {
var up bool
for i := 0; i < 5000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); errorDesc(err) == s.port {
if err = cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); errorDesc(err) == s.port {
up = true
break
}
Expand All @@ -138,7 +138,7 @@ func checkRoundRobin(cc *ClientConn, servers []*server) error {

serverCount := len(servers)
for i := 0; i < 3*serverCount; i++ {
err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
err = cc.Invoke(context.TODO(), "/foo/bar", &req, &reply)
if errorDesc(err) != servers[i%serverCount].port {
return fmt.Errorf("index %d: want peer %v, got peer %v", i, servers[i%serverCount].port, err)
}
Expand Down
18 changes: 9 additions & 9 deletions call_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (s) TestUnaryClientInterceptor(t *testing.T) {
}()

var reply string
ctx := context.Background()
ctx := context.TODO()
parentCtx := context.WithValue(ctx, ctxKey("parentKey"), 0)
if err := cc.Invoke(parentCtx, "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
Expand Down Expand Up @@ -305,7 +305,7 @@ func (s) TestChainUnaryClientInterceptor(t *testing.T) {
}()

var reply string
ctx := context.Background()
ctx := context.TODO()
parentCtx := context.WithValue(ctx, ctxKey("parentKey"), 0)
if err := cc.Invoke(parentCtx, "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse+"321" {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
Expand Down Expand Up @@ -346,7 +346,7 @@ func (s) TestChainOnBaseUnaryClientInterceptor(t *testing.T) {
}()

var reply string
ctx := context.Background()
ctx := context.TODO()
parentCtx := context.WithValue(ctx, ctxKey("parentKey"), 0)
if err := cc.Invoke(parentCtx, "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
Expand Down Expand Up @@ -407,7 +407,7 @@ func (s) TestChainStreamClientInterceptor(t *testing.T) {
server.stop()
}()

ctx := context.Background()
ctx := context.TODO()
parentCtx := context.WithValue(ctx, ctxKey("parentKey"), 0)
_, err := cc.NewStream(parentCtx, &StreamDesc{}, "/foo/bar")
if err != nil {
Expand All @@ -418,7 +418,7 @@ func (s) TestChainStreamClientInterceptor(t *testing.T) {
func (s) TestInvoke(t *testing.T) {
server, cc := setUp(t, 0, math.MaxUint32)
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
if err := cc.Invoke(context.TODO(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
}
cc.Close()
Expand All @@ -429,7 +429,7 @@ func (s) TestInvokeLargeErr(t *testing.T) {
server, cc := setUp(t, 0, math.MaxUint32)
var reply string
req := "hello"
err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
err := cc.Invoke(context.TODO(), "/foo/bar", &req, &reply)
if _, ok := status.FromError(err); !ok {
t.Fatalf("grpc.Invoke(_, _, _, _, _) receives non rpc error.")
}
Expand All @@ -445,7 +445,7 @@ func (s) TestInvokeErrorSpecialChars(t *testing.T) {
server, cc := setUp(t, 0, math.MaxUint32)
var reply string
req := "weird error"
err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
err := cc.Invoke(context.TODO(), "/foo/bar", &req, &reply)
if _, ok := status.FromError(err); !ok {
t.Fatalf("grpc.Invoke(_, _, _, _, _) receives non rpc error.")
}
Expand All @@ -462,7 +462,7 @@ func (s) TestInvokeCancel(t *testing.T) {
var reply string
req := "canceled"
for i := 0; i < 100; i++ {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.TODO())
cancel()
cc.Invoke(ctx, "/foo/bar", &req, &reply)
}
Expand All @@ -480,7 +480,7 @@ func (s) TestInvokeCancelClosedNonFailFast(t *testing.T) {
var reply string
cc.Close()
req := "hello"
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.TODO())
cancel()
if err := cc.Invoke(ctx, "/foo/bar", &req, &reply, WaitForReady(true)); err == nil {
t.Fatalf("canceled invoke on closed connection should fail")
Expand Down
2 changes: 1 addition & 1 deletion clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (s) TestDial_OneBackoffPerRetryGroup(t *testing.T) {
}

func (s) TestDialContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.TODO())
cancel()
if _, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithInsecure()); err != context.Canceled {
t.Fatalf("DialContext(%v, _) = _, %v, want _, %v", ctx, err, context.Canceled)
Expand Down
2 changes: 1 addition & 1 deletion internal/transport/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (s) TestMapAddressEnv(t *testing.T) {
defer overwrite(hpfe)()

// envTestAddr should be handled by ProxyFromEnvironment.
got, err := mapAddress(context.Background(), envTestAddr)
got, err := mapAddress(context.TODO(), envTestAddr)
if err != nil {
t.Error(err)
}
Expand Down
10 changes: 5 additions & 5 deletions picker_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (p *testingPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error

func (s) TestBlockingPickTimeout(t *testing.T) {
bp := newPickerWrapper()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
ctx, cancel := context.WithTimeout(context.TODO(), time.Millisecond)
defer cancel()
if _, _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded {
t.Errorf("bp.pick returned error %v, want DeadlineExceeded", err)
Expand All @@ -80,7 +80,7 @@ func (s) TestBlockingPick(t *testing.T) {
var finishedCount uint64
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT {
if tr, _, err := bp.pick(context.TODO(), true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned non-nil error: %v", err)
}
atomic.AddUint64(&finishedCount, 1)
Expand All @@ -100,7 +100,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
// All goroutines should block because picker returns no sc available.
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT {
if tr, _, err := bp.pick(context.TODO(), true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned non-nil error: %v", err)
}
atomic.AddUint64(&finishedCount, 1)
Expand All @@ -121,7 +121,7 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
// picks are not failfast.
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(context.Background(), false, balancer.PickInfo{}); err != nil || tr != testT {
if tr, _, err := bp.pick(context.TODO(), false, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned non-nil error: %v", err)
}
atomic.AddUint64(&finishedCount, 1)
Expand All @@ -141,7 +141,7 @@ func (s) TestBlockingPickSCNotReady(t *testing.T) {
// All goroutines should block because sc is not ready.
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT {
if tr, _, err := bp.pick(context.TODO(), true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned non-nil error: %v", err)
}
atomic.AddUint64(&finishedCount, 1)
Expand Down
34 changes: 17 additions & 17 deletions pickfirst_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s) TestOneBackendPickfirst(t *testing.T) {
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}}})
// The second RPC should succeed.
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
if err = cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
return
}
time.Sleep(time.Millisecond)
Expand Down Expand Up @@ -97,7 +97,7 @@ func (s) TestBackendsPickfirst(t *testing.T) {
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}})
// The second RPC should succeed with the first server.
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
if err = cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
return
}
time.Sleep(time.Millisecond)
Expand Down Expand Up @@ -132,7 +132,7 @@ func (s) TestNewAddressWhileBlockingPickfirst(t *testing.T) {
go func() {
defer wg.Done()
// This RPC blocks until NewAddress is called.
cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
cc.Invoke(context.TODO(), "/foo/bar", &req, &reply)
}()
}
time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -167,7 +167,7 @@ func (s) TestCloseWithPendingRPCPickfirst(t *testing.T) {
go func() {
defer wg.Done()
// This RPC blocks until NewAddress is called.
cc.Invoke(context.Background(), "/foo/bar", &req, &reply)
cc.Invoke(context.TODO(), "/foo/bar", &req, &reply)
}()
}
time.Sleep(50 * time.Millisecond)
Expand Down Expand Up @@ -199,15 +199,15 @@ func (s) TestOneServerDownPickfirst(t *testing.T) {
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}})
// The second RPC should succeed with the first server.
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
if err = cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
break
}
time.Sleep(time.Millisecond)
}

servers[0].stop()
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
if err = cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
return
}
time.Sleep(time.Millisecond)
Expand Down Expand Up @@ -239,7 +239,7 @@ func (s) TestAllServersDownPickfirst(t *testing.T) {
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}})
// The second RPC should succeed with the first server.
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
if err = cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
break
}
time.Sleep(time.Millisecond)
Expand All @@ -249,7 +249,7 @@ func (s) TestAllServersDownPickfirst(t *testing.T) {
servers[i].stop()
}
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); status.Code(err) == codes.Unavailable {
if err = cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); status.Code(err) == codes.Unavailable {
return
}
time.Sleep(time.Millisecond)
Expand Down Expand Up @@ -280,13 +280,13 @@ func (s) TestAddressesRemovedPickfirst(t *testing.T) {

r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}, {Addr: servers[2].addr}}})
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
if err = cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
break
}
time.Sleep(time.Millisecond)
}
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
if err := cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
}
time.Sleep(10 * time.Millisecond)
Expand All @@ -295,13 +295,13 @@ func (s) TestAddressesRemovedPickfirst(t *testing.T) {
// Remove server[0].
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}}})
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
if err = cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
break
}
time.Sleep(time.Millisecond)
}
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
if err := cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
}
time.Sleep(10 * time.Millisecond)
Expand All @@ -310,7 +310,7 @@ func (s) TestAddressesRemovedPickfirst(t *testing.T) {
// Append server[0], nothing should change.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[1].addr}, {Addr: servers[2].addr}, {Addr: servers[0].addr}}})
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
if err := cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
}
time.Sleep(10 * time.Millisecond)
Expand All @@ -319,13 +319,13 @@ func (s) TestAddressesRemovedPickfirst(t *testing.T) {
// Remove server[1].
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[2].addr}, {Addr: servers[0].addr}}})
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
if err = cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
break
}
time.Sleep(time.Millisecond)
}
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port {
if err := cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
}
time.Sleep(10 * time.Millisecond)
Expand All @@ -334,13 +334,13 @@ func (s) TestAddressesRemovedPickfirst(t *testing.T) {
// Remove server[2].
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: servers[0].addr}}})
for i := 0; i < 1000; i++ {
if err = cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
if err = cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
break
}
time.Sleep(time.Millisecond)
}
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
if err := cc.Invoke(context.TODO(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
}
time.Sleep(10 * time.Millisecond)
Expand Down
2 changes: 1 addition & 1 deletion server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s) TestGetServiceInfo(t *testing.T) {

func (s) TestStreamContext(t *testing.T) {
expectedStream := &transport.Stream{}
ctx := NewContextWithServerTransportStream(context.Background(), expectedStream)
ctx := NewContextWithServerTransportStream(context.TODO(), expectedStream)
s := ServerTransportStreamFromContext(ctx)
stream, ok := s.(*transport.Stream)
if !ok || expectedStream != stream {
Expand Down

0 comments on commit 385152a

Please sign in to comment.