Skip to content

Commit

Permalink
grpc: delete deprecated API WithBalancerName() (#5232)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Mar 10, 2022
1 parent cf6d4d5 commit eead9a8
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 446 deletions.
43 changes: 8 additions & 35 deletions balancer_switching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -161,7 +161,7 @@ func (s) TestSwitchBalancer(t *testing.T) {
servers, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand All @@ -184,38 +184,11 @@ func (s) TestSwitchBalancer(t *testing.T) {
}
}

// Test that balancer specified by dial option will not be overridden.
func (s) TestBalancerDialOption(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")

const numServers = 2
servers, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
addrs := []resolver.Address{{Addr: servers[0].addr}, {Addr: servers[1].addr}}
r.UpdateState(resolver.State{Addresses: addrs})
// The init balancer is roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
// Switch to pickfirst.
cc.updateResolverState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "pick_first"}`), Addresses: addrs}, nil)
// Balancer is still roundrobin.
if err := checkRoundRobin(cc, servers); err != nil {
t.Fatalf("check roundrobin returned non-nil error: %v", err)
}
}

// First addr update contains grpclb.
func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -275,7 +248,7 @@ func (s) TestSwitchBalancerGRPCLBFirst(t *testing.T) {
func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -351,7 +324,7 @@ func (s) TestSwitchBalancerGRPCLBSecond(t *testing.T) {
func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -413,7 +386,7 @@ func (s) TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) {
func (s) TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -503,7 +476,7 @@ func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
servers, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()

cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r), WithCodec(testCodec{}))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down Expand Up @@ -550,7 +523,7 @@ func init() {
// This test is to make sure this close doesn't cause a deadlock.
func (s) TestSwitchBalancerOldRemoveSubConn(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithResolvers(r))
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
Expand Down
51 changes: 20 additions & 31 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
}

var balCfg serviceconfig.LoadBalancingConfig
if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil {
if cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig.cfg
}

Expand Down Expand Up @@ -714,10 +714,6 @@ func (cc *ClientConn) switchBalancer(name string) {
}

channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
if cc.dopts.balancerBuilder != nil {
channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead")
return
}
if cc.balancerWrapper != nil {
// Don't hold cc.mu while closing the balancers. The balancers may call
// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
Expand Down Expand Up @@ -999,35 +995,28 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
cc.retryThrottler.Store((*retryThrottler)(nil))
}

if cc.dopts.balancerBuilder == nil {
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var newBalancerName string
if cc.sc != nil && cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
} else {
var isGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc != nil && cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else {
newBalancerName = PickFirstBalancerName
// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var newBalancerName string
if cc.sc != nil && cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
} else {
var isGRPCLB bool
for _, a := range addrs {
if a.Type == resolver.GRPCLB {
isGRPCLB = true
break
}
}
cc.switchBalancer(newBalancerName)
} else if cc.balancerWrapper == nil {
// Balancer dial option was set, and this is the first time handling
// resolved addresses. Build a balancer with dopts.balancerBuilder.
cc.curBalancerName = cc.dopts.balancerBuilder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
if isGRPCLB {
newBalancerName = grpclbName
} else if cc.sc != nil && cc.sc.LB != nil {
newBalancerName = *cc.sc.LB
} else {
newBalancerName = PickFirstBalancerName
}
}
cc.switchBalancer(newBalancerName)
}

func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
Expand Down
99 changes: 43 additions & 56 deletions clientconn_state_transition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package grpc

import (
"context"
"fmt"
"net"
"sync"
"testing"
Expand All @@ -28,6 +29,7 @@ import (
"golang.org/x/net/http2"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
Expand Down Expand Up @@ -141,9 +143,6 @@ client enters TRANSIENT FAILURE.`,
}

func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

pl := testutils.NewPipeListener()
defer pl.Close()

Expand All @@ -156,10 +155,9 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s
connMu.Unlock()
}()

client, err := DialContext(ctx,
"",
WithInsecure(),
WithBalancerName(stateRecordingBalancerName),
client, err := Dial("",
WithTransportCredentials(insecure.NewCredentials()),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
WithDialer(pl.Dialer()),
withBackoff(noBackoff{}),
withMinConnectDeadline(func() time.Duration { return time.Millisecond * 100 }))
Expand All @@ -170,12 +168,9 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s
go stayConnected(client)

stateNotifications := testBalancerBuilder.nextStateNotifier()

timeout := time.After(5 * time.Second)

for i := 0; i < len(want); i++ {
select {
case <-timeout:
case <-time.After(defaultTestTimeout):
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen != want[i] {
Expand All @@ -196,16 +191,6 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s

// When a READY connection is closed, the client enters IDLE then CONNECTING.
func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
Expand Down Expand Up @@ -237,7 +222,9 @@ func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
conn.Close()
}()

client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBalancerName(stateRecordingBalancerName))
client, err := Dial(lis.Addr().String(),
WithTransportCredentials(insecure.NewCredentials()),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)))
if err != nil {
t.Fatal(err)
}
Expand All @@ -246,11 +233,15 @@ func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {

stateNotifications := testBalancerBuilder.nextStateNotifier()

timeout := time.After(5 * time.Second)

want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
}
for i := 0; i < len(want); i++ {
select {
case <-timeout:
case <-time.After(defaultTestTimeout):
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen == connectivity.Ready {
Expand All @@ -266,14 +257,6 @@ func (s) TestStateTransitions_ReadyToConnecting(t *testing.T) {
// When the first connection is closed, the client stays in CONNECTING until it
// tries the second address (which succeeds, and then it enters READY).
func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

lis1, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
Expand Down Expand Up @@ -324,19 +307,25 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T)
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
}})
client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb))
client, err := Dial("whatever:///this-gets-overwritten",
WithTransportCredentials(insecure.NewCredentials()),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
WithResolvers(rb))
if err != nil {
t.Fatal(err)
}
defer client.Close()

stateNotifications := testBalancerBuilder.nextStateNotifier()

timeout := time.After(5 * time.Second)

want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for i := 0; i < len(want); i++ {
select {
case <-timeout:
case <-ctx.Done():
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen != want[i] {
Expand All @@ -345,12 +334,12 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T)
}
}
select {
case <-timeout:
case <-ctx.Done():
t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
case <-server1Done:
}
select {
case <-timeout:
case <-ctx.Done():
t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 2")
case <-server2Done:
}
Expand All @@ -359,16 +348,6 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T)
// When there are multiple addresses, and we enter READY on one of them, a
// later closure should cause the client to enter CONNECTING
func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

lis1, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
Expand Down Expand Up @@ -414,20 +393,28 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
}})
client, err := DialContext(ctx, "whatever:///this-gets-overwritten", WithInsecure(), WithBalancerName(stateRecordingBalancerName), WithResolvers(rb))
client, err := Dial("whatever:///this-gets-overwritten",
WithTransportCredentials(insecure.NewCredentials()),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)),
WithResolvers(rb))
if err != nil {
t.Fatal(err)
}
defer client.Close()
go stayConnected(client)

stateNotifications := testBalancerBuilder.nextStateNotifier()

timeout := time.After(2 * time.Second)

want := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
for i := 0; i < len(want); i++ {
select {
case <-timeout:
case <-ctx.Done():
t.Fatalf("timed out waiting for state %d (%v) in flow %v", i, want[i], want)
case seen := <-stateNotifications:
if seen == connectivity.Ready {
Expand All @@ -439,7 +426,7 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
}
}
select {
case <-timeout:
case <-ctx.Done():
t.Fatal("saw the correct state transitions, but timed out waiting for client to finish interactions with server 1")
case <-server1Done:
}
Expand Down
Loading

0 comments on commit eead9a8

Please sign in to comment.