Skip to content

Commit

Permalink
fix: replace cfgUpdateDone channel with inhibitPickerUpdates to inhib…
Browse files Browse the repository at this point in the history
…it picker updates from the child LB policy
  • Loading branch information
aranjans committed Aug 2, 2024
1 parent 4b21e3b commit 3e4fc77
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 26 deletions.
28 changes: 12 additions & 16 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,16 +823,16 @@ func (s) TestUpdateLRSServer(t *testing.T) {
// Test ensures the picker is updated synchronously upon
// receipt of a configuration update.
func (s) TestPickerUpdatedSynchronouslyOnConfigUpdate(t *testing.T) {
// Override the newPickerUpdated to ensure picker was updated.
pckrUpdated := make(chan struct{})
origNewPickerUpdated := newPickerUpdated
newPickerUpdated = func() {
// Override the newPickerHook to ensure picker was updated.
pickerUpdated := make(chan struct{})
origNewPickerHook := newPickerHook
newPickerHook = func() {
select {
case pckrUpdated <- struct{}{}:
case pickerUpdated <- struct{}{}:
default:
}
}
defer func() { newPickerUpdated = origNewPickerUpdated }()
defer func() { newPickerHook = origNewPickerHook }()

var testLocality = xdsinternal.LocalityID{
Region: "test-region",
Expand Down Expand Up @@ -880,24 +880,20 @@ func (s) TestPickerUpdatedSynchronouslyOnConfigUpdate(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Wait for the config update to be done
//
// Note: We don't need to check separately if picker update is
// happening synchronously with config update , as
// UpdateClientConnState waits for the picker update and then
// returns from it.

// Wait for the picker update to be done
select {
case <-pckrUpdated:
case <-pickerUpdated:
case err := <-errCh:
t.Fatalf("client conn state updated before picker was updated: %v", err)
t.Fatalf("Client conn state updated before picker was updated: %v", err)
case <-ctx.Done():
t.Fatalf("xds_cluster_impl config update couldn't complete: %v", ctx.Err().Error())
t.Fatal("Timed out waiting for picker update on receipt of configuration update")
}

// Once picker was updated, wait for client conn update
// to complete.
if err := <-errCh; err != nil {
t.Fatalf("error updating client conn state: %v", err)
t.Fatalf("Error updating client conn state: %v", err)
}
}

Expand Down
42 changes: 32 additions & 10 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var (

// Below function is no-op in actual code, but can be overridden in
// tests to give tests visibility into exactly when certain events happen.
newPickerUpdated = func() {}
newPickerHook = func() {}
)

func init() {
Expand All @@ -76,7 +76,6 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
loadWrapper: loadstore.NewWrapper(),
pickerUpdateCh: buffer.NewUnbounded(),
requestCountMax: defaultRequestCountMax,
cfgUpdateDone: make(chan struct{}),
}
b.logger = prefixLogger(b)
b.child = gracefulswitch.NewBalancer(b, bOpts)
Expand Down Expand Up @@ -112,9 +111,7 @@ type clusterImplBalancer struct {
logger *grpclog.PrefixLogger
xdsClient xdsclient.XDSClient

config *LBConfig
// cfgUpdateDone verifies the completion of config update
cfgUpdateDone chan struct{}
config *LBConfig
child *gracefulswitch.Balancer
cancelLoadReport func()
edsServiceName string
Expand All @@ -136,7 +133,12 @@ type clusterImplBalancer struct {
requestCounter *xdsclient.ClusterRequestsCounter
requestCountMax uint32
telemetryLabels map[string]string
pickerUpdateCh *buffer.Unbounded
// Set during UpdateClientConnState when pushing updates to child policies.
// Prevents state updates from child policies causing new pickers to be sent
// up the channel. Cleared after all child policies have processed the
// updates sent to them, after which a new picker is sent up the channel.
inhibitPickerUpdates bool
pickerUpdateCh *buffer.Unbounded
}

// updateLoadStore checks the config for load store, and decides whether it
Expand Down Expand Up @@ -217,6 +219,11 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
return nil
}

type resumePickerUpdates struct {
done chan struct{}
lbConfig *LBConfig
}

func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
if b.closed.HasFired() {
b.logger.Warningf("xds: received ClientConnState {%+v} after clusterImplBalancer was closed", s)
Expand Down Expand Up @@ -262,12 +269,16 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)
}
b.config = newConfig

b.mu.Lock()
b.logger.Infof("Delaying picker updates until config is propagated to and processed by child policies")
b.inhibitPickerUpdates = true
b.mu.Unlock()
// Notify run() of this new config, in case drop and request counter need
// update (which means a new picker needs to be generated).
b.pickerUpdateCh.Put(newConfig)
// Wait for LB config update to be done
<-b.cfgUpdateDone
newPickerUpdated()
done := make(chan struct{})
b.pickerUpdateCh.Put(resumePickerUpdates{done: done, lbConfig: newConfig})
<-done

// Addresses and sub-balancer config are sent to sub-balancer.
return b.child.UpdateClientConnState(balancer.ClientConnState{
Expand Down Expand Up @@ -502,6 +513,18 @@ func (b *clusterImplBalancer) run() {
requestCountMax: b.requestCountMax,
}),
})
case resumePickerUpdates:
dc := b.handleDropAndRequestCount(u.lbConfig)
if dc != nil && b.childState.Picker != nil {
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: b.newPicker(dc),
})
}
newPickerHook()
b.logger.Infof("Resuming picker updates after config propagation to child policies")
b.inhibitPickerUpdates = false
close(u.done)
case *LBConfig:
b.telemetryLabels = u.TelemetryLabels
dc := b.handleDropAndRequestCount(u)
Expand All @@ -511,7 +534,6 @@ func (b *clusterImplBalancer) run() {
Picker: b.newPicker(dc),
})
}
b.cfgUpdateDone <- struct{}{}
}
b.mu.Unlock()
case <-b.closed.Done():
Expand Down

0 comments on commit 3e4fc77

Please sign in to comment.