From 48e3ed569627040d97345ef296f2a728eab6f4e7 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Tue, 16 Jul 2024 21:44:36 +0530 Subject: [PATCH] xds_cluster_impl_experimental: ensure picker update synchronously on receipt of config update --- .../balancer/clusterimpl/balancer_test.go | 59 +++++++++++++++++++ .../balancer/clusterimpl/clusterimpl.go | 8 ++- 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go index 5a4bb0f270b2..ab940f06ae56 100644 --- a/xds/internal/balancer/clusterimpl/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -820,6 +820,65 @@ func (s) TestUpdateLRSServer(t *testing.T) { } } +// Test ensures the picker is updated synchronously upon +// receipt of a configuration update. +func (s) TestPickerUpdatedSynchronouslyOnConfigUpdate(t *testing.T) { + var testLocality = xdsinternal.LocalityID{ + Region: "test-region", + Zone: "test-zone", + SubZone: "test-sub-zone", + } + + // Create xds client + xdsC := fakeclient.NewClient() + + builder := balancer.Get(Name) + cc := testutils.NewBalancerClientConn(t) + b := builder.Build(cc, balancer.BuildOptions{}) + defer b.Close() + + addrs := make([]resolver.Address, len(testBackendAddrs)) + // Set locality to addresses in testBackendAddrs + for i, a := range testBackendAddrs { + addrs[i] = xdsinternal.SetLocalityID(a, testLocality) + } + testLRSServerConfig, err := bootstrap.ServerConfigForTesting(bootstrap.ServerConfigTestingOptions{ + URI: "trafficdirector.googleapis.com:443", + ChannelCreds: []bootstrap.ChannelCreds{{Type: "google_default"}}, + }) + if err != nil { + t.Fatalf("Failed to create LRS server config for testing: %v", err) + } + + // errCh verifies that UpdateClientConnState was not successful + errCh := make(chan error, 1) + go func() { + err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC), + BalancerConfig: &LBConfig{ + Cluster: testClusterName, + EDSServiceName: testServiceName, + LoadReportingServer: testLRSServerConfig, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: roundrobin.Name, + }, + }, + }) + errCh <- err + }() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + select { + case err := <-errCh: + if err != nil { + t.Fatalf("error updating client conn state: %v", err) + } + case <-ctx.Done(): + t.Fatalf("xds_cluster_impl config update couldn't complete: %v", ctx.Err().Error()) + } +} + func assertString(f func() (string, error)) string { s, err := f() if err != nil { diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 9058f0d01fc8..3a71baee9e25 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -70,6 +70,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba loadWrapper: loadstore.NewWrapper(), pickerUpdateCh: buffer.NewUnbounded(), requestCountMax: defaultRequestCountMax, + cfgUpdateDone: make(chan struct{}), } b.logger = prefixLogger(b) b.child = gracefulswitch.NewBalancer(b, bOpts) @@ -105,7 +106,9 @@ type clusterImplBalancer struct { logger *grpclog.PrefixLogger xdsClient xdsclient.XDSClient - config *LBConfig + config *LBConfig + // cfgUpdateDone verifies the completion of config update + cfgUpdateDone chan struct{} child *gracefulswitch.Balancer cancelLoadReport func() edsServiceName string @@ -256,6 +259,8 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) // Notify run() of this new config, in case drop and request counter need // update (which means a new picker needs to be generated). b.pickerUpdateCh.Put(newConfig) + // Wait for LB config update to be done + <-b.cfgUpdateDone // Addresses and sub-balancer config are sent to sub-balancer. return b.child.UpdateClientConnState(balancer.ClientConnState{ @@ -499,6 +504,7 @@ func (b *clusterImplBalancer) run() { Picker: b.newPicker(dc), }) } + b.cfgUpdateDone <- struct{}{} } b.mu.Unlock() case <-b.closed.Done():