Skip to content

Commit

Permalink
rls: update picker synchronously upon receipt of configuration update
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed May 31, 2024
1 parent 59954c8 commit 40e31a1
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 1 deletion.
5 changes: 4 additions & 1 deletion balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,10 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
// `resizeCache` boolean) because `cacheMu` needs to be grabbed before
// `stateMu` if we are to hold both locks at the same time.
b.cacheMu.Lock()
b.dataCache.resize(newCfg.cacheSizeBytes)
evicted := b.dataCache.resize(newCfg.cacheSizeBytes)
if evicted {
b.sendNewPickerLocked()
}
b.cacheMu.Unlock()
}
return nil
Expand Down
157 changes: 157 additions & 0 deletions balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,163 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
verifyRLSRequest(t, rlsReqCh, true)
}

// TestPickerUpdate_DataCacheSizeDecrease tests the scenario where decrase in cache size updates the picker.
// Verifies that entries are evicted from the cache.
func (s) TestPickerUpdate_DataCacheSizeDecrease(t *testing.T) {
// Override the clientConn update hook to get notified.
clientConnUpdateDone := make(chan struct{}, 1)
origClientConnUpdateHook := clientConnUpdateHook
clientConnUpdateHook = func() { clientConnUpdateDone <- struct{}{} }
defer func() { clientConnUpdateHook = origClientConnUpdateHook }()

// Override the cache entry size func, and always return 1.
origEntrySizeFunc := computeDataCacheEntrySize
computeDataCacheEntrySize = func(cacheKey, *cacheEntry) int64 { return 1 }
defer func() { computeDataCacheEntrySize = origEntrySizeFunc }()

// Override the minEvictionDuration to ensure that when the config update
// reduces the cache size, the resize operation is not stopped because
// we find an entry whose minExpiryDuration has not elapsed.
origMinEvictDuration := minEvictDuration
minEvictDuration = time.Duration(0)
defer func() { minEvictDuration = origMinEvictDuration }()

// Register the top-level wrapping balancer which forwards calls to RLS.
topLevelBalancerName := t.Name() + "top-level"
var ccWrapper *testCCWrapper
stub.Register(topLevelBalancerName, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
ccWrapper = &testCCWrapper{ClientConn: bd.ClientConn}
bd.Data = balancer.Get(Name).Build(ccWrapper, bd.BuildOptions)
},
ParseConfig: func(sc json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
parser := balancer.Get(Name).(balancer.ConfigParser)
return parser.ParseConfig(sc)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
bal := bd.Data.(balancer.Balancer)
return bal.UpdateClientConnState(ccs)
},
Close: func(bd *stub.BalancerData) {
bal := bd.Data.(balancer.Balancer)
bal.Close()
},
})

// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Register an LB policy to act as the child policy for RLS LB policy.
childPolicyName := "test-child-policy" + t.Name()
e2e.RegisterRLSChildPolicy(childPolicyName, nil)
t.Logf("Registered child policy with name %q", childPolicyName)

// Build RLS service config with header matchers.
rlsConfig := buildBasicRLSConfig(childPolicyName, rlsServer.Address)

// Start a couple of test backends, and set up the fake RLS server to return
// these as targets in the RLS response, based on request keys.
backendCh1, backendAddress1 := startBackend(t)
backendCh2, backendAddress2 := startBackend(t)
rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
if req.KeyMap["k1"] == "v1" {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
}
if req.KeyMap["k2"] == "v2" {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
}
return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")}
})

// Register a manual resolver and push the RLS service config through it.
r := manual.NewBuilderWithScheme("rls-e2e")
scJSON := fmt.Sprintf(`
{
"loadBalancingConfig": [
{
"%s": {
"routeLookupConfig": {
"grpcKeybuilders": [{
"names": [{"service": "grpc.testing.TestService"}],
"headers": [
{
"key": "k1",
"names": [
"n1"
]
},
{
"key": "k2",
"names": [
"n2"
]
}
]
}],
"lookupService": "%s",
"cacheSizeBytes": 1000
},
"childPolicy": [{"%s": {}}],
"childPolicyConfigTargetFieldName": "Backend"
}
}
]
}`, topLevelBalancerName, rlsServer.Address, childPolicyName)
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
r.InitialState(resolver.State{ServiceConfig: sc})

cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

<-clientConnUpdateDone

// Make an RPC and ensure it gets routed to the first backend.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh1)

// Make sure an RLS request is sent out.
verifyRLSRequest(t, rlsReqCh, true)

testutils.AwaitState(ctx, t, cc, connectivity.Ready)

// Cache the state changes seen up to this point.
states0 := ccWrapper.getStates()

// Make another RPC with a different set of headers. This will force the LB
// policy to send out a new RLS request, resulting in a new data cache
// entry.
ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n2", "v2")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2)

// Make sure an RLS request is sent out.
verifyRLSRequest(t, rlsReqCh, true)

// We currently have two cache entries. Setting the size to 1, will cause
// the entry corresponding to backend1 to be evicted.
rlsConfig.RouteLookupConfig.CacheSizeBytes = 1

// Push the config update through the manual resolver.
scJSON1, err := rlsConfig.ServiceConfigJSON()
if err != nil {
t.Fatal(err)
}
sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1)
r.UpdateState(resolver.State{ServiceConfig: sc1})

<-clientConnUpdateDone

states1 := ccWrapper.getStates()
if len(states1) == len(states0)+1 {
t.Fatalf("more than one state update seen. before %v, after %v", states0, states1)
}
}

// TestDataCachePurging verifies that the LB policy periodically evicts expired
// entries from the data cache.
func (s) TestDataCachePurging(t *testing.T) {
Expand Down

0 comments on commit 40e31a1

Please sign in to comment.