Skip to content

Commit

Permalink
rls: update picker synchronously upon receipt of configuration update
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Jul 5, 2024
1 parent 59954c8 commit 7a2f4c5
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 1 deletion.
6 changes: 5 additions & 1 deletion balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,11 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
// `resizeCache` boolean) because `cacheMu` needs to be grabbed before
// `stateMu` if we are to hold both locks at the same time.
b.cacheMu.Lock()
b.dataCache.resize(newCfg.cacheSizeBytes)
evicted := b.dataCache.resize(newCfg.cacheSizeBytes)
b.logger.Infof("Resizing cache: got evicted as %v", evicted)
if evicted {
b.sendNewPickerLocked()
}
b.cacheMu.Unlock()
}
return nil
Expand Down
207 changes: 207 additions & 0 deletions balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,213 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
verifyRLSRequest(t, rlsReqCh, true)
}

// TestPickerUpdate_DataCacheSizeDecrease tests the scenario where decrase in cache size updates the picker.
// Verifies that entries are evicted from the cache.
func (s) TestPickerUpdate_DataCacheSizeDecrease(t *testing.T) {
// Create a restartable listener which can close existing connections.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)

// Override the reset backoff hook to get notified.
resetBackoffDone := make(chan struct{}, 1)
origResetBackoffHook := resetBackoffHook
resetBackoffHook = func() { resetBackoffDone <- struct{}{} }
defer func() { resetBackoffHook = origResetBackoffHook }()

// Override the clientConn update hook to get notified.
clientConnUpdateDone := make(chan struct{}, 1)
origClientConnUpdateHook := clientConnUpdateHook
clientConnUpdateHook = func() { clientConnUpdateDone <- struct{}{} }
defer func() { clientConnUpdateHook = origClientConnUpdateHook }()

// Override the backoff strategy to return a large backoff which
// will make sure the date cache entry remains in backoff for the
// duration of the test.
origBackoffStrategy := defaultBackoffStrategy
defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout}
defer func() { defaultBackoffStrategy = origBackoffStrategy }()

// Override the minEvictionDuration to ensure that when the config update
// reduces the cache size, the resize operation is not stopped because
// we find an entry whose minExpiryDuration has not elapsed.
//origMinEvictDuration := minEvictDuration
//minEvictDuration = time.Duration(0)
//defer func() { minEvictDuration = origMinEvictDuration }()

// Register the top-level wrapping balancer which forwards calls to RLS.
topLevelBalancerName := t.Name() + "top-level"
var ccWrapper *testCCWrapper
stub.Register(topLevelBalancerName, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
ccWrapper = &testCCWrapper{ClientConn: bd.ClientConn}
bd.Data = balancer.Get(Name).Build(ccWrapper, bd.BuildOptions)
},
ParseConfig: func(sc json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
parser := balancer.Get(Name).(balancer.ConfigParser)
return parser.ParseConfig(sc)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
bal := bd.Data.(balancer.Balancer)
return bal.UpdateClientConnState(ccs)
},
Close: func(bd *stub.BalancerData) {
bal := bd.Data.(balancer.Balancer)
bal.Close()
},
})

// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Register an LB policy to act as the child policy for RLS LB policy.
childPolicyName := "test-child-policy" + t.Name()
e2e.RegisterRLSChildPolicy(childPolicyName, nil)
t.Logf("Registered child policy with name %q", childPolicyName)

// Build RLS service config with header matchers.
rlsConfig := buildBasicRLSConfig(childPolicyName, rlsServer.Address)

// Start a couple of test backends, and set up the fake RLS server to return
// these as targets in the RLS response, based on request keys.
backendCh1, backendAddress1 := startBackend(t)
backendCh2, backendAddress2 := startBackend(t)
rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
if req.KeyMap["k1"] == "v1" {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
}
if req.KeyMap["k2"] == "v2" {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
}
return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")}
})

// Register a manual resolver and push the RLS service config through it.
r := manual.NewBuilderWithScheme("rls-e2e")
scJSON := fmt.Sprintf(`
{
"loadBalancingConfig": [
{
"%s": {
"routeLookupConfig": {
"grpcKeybuilders": [{
"names": [{"service": "grpc.testing.TestService"}],
"headers": [
{
"key": "k1",
"names": [
"n1"
]
},
{
"key": "k2",
"names": [
"n2"
]
}
]
}],
"lookupService": "%s",
"cacheSizeBytes": 1000
},
"childPolicy": [{"%s": {}}],
"childPolicyConfigTargetFieldName": "Backend"
}
}
]
}`, topLevelBalancerName, rlsServer.Address, childPolicyName)
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
r.InitialState(resolver.State{ServiceConfig: sc})

cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

<-clientConnUpdateDone
// Make another RPC similar to the first one. Since the above cache entry
// would have expired by now, this should trigger another RLS request. And
// since the RLS server is down, RLS request will fail and the cache entry
// will enter backoff, and we have overridden the default backoff strategy to
// return a value which will keep this entry in backoff for the whole duration
// of the test.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh1)

// Stop the RLS server.
lis.Stop()

// Make another RPC similar to the first one. Since the above cache entry
// would have expired by now, this should trigger another RLS request. And
// since the RLS server is down, RLS request will fail and the cache entry
// will enter backoff, and we have overridden the default backoff strategy to
// return a value which will keep this entry in backoff for the whole duration
// of the test.
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil)

// Restart the RLS server.
lis.Restart()

// Cache the state changes seen up to this point.
states0 := ccWrapper.getStates()

// When we closed the RLS server earlier, the existing transport to the RLS
// server would have closed, and the RLS control channel would have moved to
// TRANSIENT_FAILURE with a subConn backoff before moving to IDLE. This
// backoff will last for about a second. We need to keep retrying RPCs for the
// subConn to eventually come out of backoff and attempt to reconnect.
//
// Make this RPC with a different set of headers leading to the creation of
// a new cache entry and a new RLS request. This RLS request will also fail
// till the control channel comes moves back to READY. So, override the
// backoff strategy to perform a small backoff on this entry.
defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestShortTimeout}
testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Make another RPC with a different set of headers. This will force the LB
// policy to send out a new RLS request, resulting in a new data cache
// entry.
ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n2", "v2")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2)
select {
case <-ctx.Done():
t.Fatalf("Timed out waiting for resetBackoffDone")
case <-resetBackoffDone:
}

// Make sure an RLS request is sent out.
verifyRLSRequest(t, rlsReqCh, true)

// We currently have two cache entries. Setting the size to 1, will cause
// the entry corresponding to backend1 to be evicted.
rlsConfig.RouteLookupConfig.CacheSizeBytes = 1

// Push the config update through the manual resolver.
scJSON1, err := rlsConfig.ServiceConfigJSON()
if err != nil {
t.Fatal(err)
}
sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1)
r.UpdateState(resolver.State{ServiceConfig: sc1})

<-clientConnUpdateDone

states1 := ccWrapper.getStates()
if len(states1) == len(states0)+1 {
t.Fatalf("Balancer state count not matched. got %v, want %v", len(states1), len(states0)+2)
}
t.Logf("Balancer state count: got %v, want %v", len(states1), len(states0)+2)
t.Logf("initial states: %v", states0)
t.Logf("final states: %v", states1)
}

// TestDataCachePurging verifies that the LB policy periodically evicts expired
// entries from the data cache.
func (s) TestDataCachePurging(t *testing.T) {
Expand Down

0 comments on commit 7a2f4c5

Please sign in to comment.