Skip to content

Commit

Permalink
xds_cluster_impl_experimental: ensure picker update synchronously on …
Browse files Browse the repository at this point in the history
…receipt of config update
  • Loading branch information
aranjans committed Jul 21, 2024
1 parent 2bcbcab commit 48e3ed5
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
59 changes: 59 additions & 0 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,65 @@ func (s) TestUpdateLRSServer(t *testing.T) {
}
}

// Test ensures the picker is updated synchronously upon
// receipt of a configuration update.
func (s) TestPickerUpdatedSynchronouslyOnConfigUpdate(t *testing.T) {
var testLocality = xdsinternal.LocalityID{
Region: "test-region",
Zone: "test-zone",
SubZone: "test-sub-zone",
}

// Create xds client
xdsC := fakeclient.NewClient()

builder := balancer.Get(Name)
cc := testutils.NewBalancerClientConn(t)
b := builder.Build(cc, balancer.BuildOptions{})
defer b.Close()

addrs := make([]resolver.Address, len(testBackendAddrs))
// Set locality to addresses in testBackendAddrs
for i, a := range testBackendAddrs {
addrs[i] = xdsinternal.SetLocalityID(a, testLocality)
}
testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{
URI: "trafficdirector.googleapis.com:443",
ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}},
})
if err != nil {
t.Fatalf("Failed to create LRS server config for testing: %v", err)
}

// errCh verifies that UpdateClientConnState was not successful
errCh := make(chan error, 1)
go func() {
err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
BalancerConfig: &LBConfig{
Cluster: testClusterName,
EDSServiceName: testServiceName,
LoadReportingServer: testLRSServerConfig,
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: roundrobin.Name,
},
},
})
errCh <- err
}()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
select {
case err := <-errCh:
if err != nil {
t.Fatalf("error updating client conn state: %v", err)
}
case <-ctx.Done():
t.Fatalf("xds_cluster_impl config update couldn't complete: %v", ctx.Err().Error())
}
}

func assertString(f func() (string, error)) string {
s, err := f()
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
loadWrapper: loadstore.NewWrapper(),
pickerUpdateCh: buffer.NewUnbounded(),
requestCountMax: defaultRequestCountMax,
cfgUpdateDone: make(chan struct{}),
}
b.logger = prefixLogger(b)
b.child = gracefulswitch.NewBalancer(b, bOpts)
Expand Down Expand Up @@ -105,7 +106,9 @@ type clusterImplBalancer struct {
logger *grpclog.PrefixLogger
xdsClient xdsclient.XDSClient

config *LBConfig
config *LBConfig
// cfgUpdateDone verifies the completion of config update
cfgUpdateDone chan struct{}
child *gracefulswitch.Balancer
cancelLoadReport func()
edsServiceName string
Expand Down Expand Up @@ -256,6 +259,8 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)
// Notify run() of this new config, in case drop and request counter need
// update (which means a new picker needs to be generated).
b.pickerUpdateCh.Put(newConfig)
// Wait for LB config update to be done
<-b.cfgUpdateDone

// Addresses and sub-balancer config are sent to sub-balancer.
return b.child.UpdateClientConnState(balancer.ClientConnState{
Expand Down Expand Up @@ -499,6 +504,7 @@ func (b *clusterImplBalancer) run() {
Picker: b.newPicker(dc),
})
}
b.cfgUpdateDone <- struct{}{}
}
b.mu.Unlock()
case <-b.closed.Done():
Expand Down

0 comments on commit 48e3ed5

Please sign in to comment.