Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Jul 19, 2024
1 parent b2f4155 commit 750332c
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 40 deletions.
11 changes: 5 additions & 6 deletions balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ var (

// Following functions are no-ops in actual code, but can be overridden in
// tests to give tests visibility into exactly when certain events happen.
clientConnUpdateHook = func() {}
dataCachePurgeHook = func() {}
resetBackoffHook = func() {}
entryWithValidBackoffEvicted = func() {}
clientConnUpdateHook = func() {}
dataCachePurgeHook = func() {}
resetBackoffHook = func() {}
newPickerGenerated = func() {}
)

func init() {
Expand Down Expand Up @@ -511,11 +511,10 @@ func (b *rlsBalancer) sendNewPickerLocked() {
ConnectivityState: aggregatedState,
Picker: picker,
}

newPickerGenerated()
if !b.inhibitPickerUpdates {
b.logger.Infof("New balancer.State: %+v", state)
b.cc.UpdateState(state)
entryWithValidBackoffEvicted()
} else {
b.logger.Infof("Delaying picker update: %+v", state)
}
Expand Down
66 changes: 32 additions & 34 deletions balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,14 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
// in cache size, the picker is updated accordingly.
func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
// Override the clientConn update hook to get notified.
clientConnUpdateDone := make(chan struct{}, 1)
clientConnUpdateDone := make(chan struct{})
origClientConnUpdateHook := clientConnUpdateHook
clientConnUpdateHook = func() { clientConnUpdateDone <- struct{}{} }
clientConnUpdateHook = func() {
select {
case clientConnUpdateDone <- struct{}{}:
default:
}
}
defer func() { clientConnUpdateHook = origClientConnUpdateHook }()

// Override the cache entry size func, and always return 1.
Expand All @@ -678,19 +683,6 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
minEvictDuration = time.Duration(0)
defer func() { minEvictDuration = origMinEvictDuration }()

// Override the entryWithValidBackoffEvicted to ensure we update
// picker when an entry with valid backoff time was evicted.
backOffItemEvicted := make(chan struct{}, 1)
origBackoffItemEvicted := entryWithValidBackoffEvicted
entryWithValidBackoffEvicted = func() {
select {
case backOffItemEvicted <- struct{}{}:
default:
// Do nothing if the channel is full
}
}
defer func() { entryWithValidBackoffEvicted = origBackoffItemEvicted }()

// Register the top-level wrapping balancer which forwards calls to RLS.
topLevelBalancerName := t.Name() + "top-level"
var ccWrapper *testCCWrapper
Expand Down Expand Up @@ -728,12 +720,9 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
backendCh2, backendAddress2 := startBackend(t)
rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
if req.KeyMap["k1"] == "v1" {
return &rlstest.RouteLookupResponse{Err: errors.New("throwing error from control channel for first entry")}
}
if req.KeyMap["k2"] == "v2" {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
}
if req.KeyMap["k3"] == "v3" {
if req.KeyMap["k2"] == "v2" {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
}
return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")}
Expand All @@ -754,12 +743,6 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
"names": [
"n2"
]
},
{
"key": "k3",
"names": [
"n3"
]
}
]
`
Expand Down Expand Up @@ -804,27 +787,42 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil)
t.Logf("Verifying if RPC failed when listener is stopped.")

ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n2", "v2")
ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh1)
verifyRLSRequest(t, rlsReqCh, true)

ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n3", "v3")
ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n2", "v2")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2)
verifyRLSRequest(t, rlsReqCh, true)

// Setting the size to 1 will cause the entries to be
// evicted.
// Override the newPickerGenerated to measure the number of times
// the picker is generated because state updates can be inhibited.
//
// Note: This needs to be initialised here and not in the beginning
// of the test, as otherwise for every RPC call we make, it'd have
// affected total picker count.
pckrSentBeforeClientConnUpdate := make(chan struct{}, 1)
totalPickerGenerated := 0
origNewPickerGenerated := newPickerGenerated
newPickerGenerated = func() {
totalPickerGenerated++
if totalPickerGenerated == 2 {
pckrSentBeforeClientConnUpdate <- struct{}{}
}
}
defer func() { newPickerGenerated = origNewPickerGenerated }()

// Setting the size to 1 will cause the entries to be evicted.
scJSON1 := fmt.Sprintf(configJSON, topLevelBalancerName, headers, rlsServer.Address, 1, childPolicyName)
sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1)
go r.UpdateState(resolver.State{ServiceConfig: sc1})
select {
// Wait for backOffItemEvicted to ensure picker was updated
// synchronously when there was cache resize on config update.
case <-backOffItemEvicted:
case <-pckrSentBeforeClientConnUpdate:
case <-clientConnUpdateDone:
t.Fatalf("Client conn update was completed before picker update.")
case <-ctx.Done():
t.Errorf("Error sending picker update on eviction of cache entry with valid backoff: %v", ctx.Err().Error())
t.Errorf("client conn update could not complete: %v", ctx.Err().Error())
}
<-clientConnUpdateDone
}

// TestDataCachePurging verifies that the LB policy periodically evicts expired
Expand Down

0 comments on commit 750332c

Please sign in to comment.