Skip to content

Commit

Permalink
rls: delegate pick to child policy as long as it is not in TransientF…
Browse files Browse the repository at this point in the history
…ailure (#5656)
  • Loading branch information
easwars authored Sep 15, 2022
1 parent 7da8a05 commit 9c3e589
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 40 deletions.
44 changes: 11 additions & 33 deletions balancer/rls/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,20 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {

// delegateToChildPolicies is a helper function which iterates through the list
// of child policy wrappers in a cache entry and attempts to find a child policy
// to which this RPC can be routed to. If there is no child policy in READY
// state, we delegate to the first child policy arbitrarily.
// to which this RPC can be routed to. If all child policies are in
// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily.
//
// Caller must hold at least a read-lock on p.lb.cacheMu.
func (p *rlsPicker) delegateToChildPolicies(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
for _, cpw := range dcEntry.childPolicyWrappers {
ok, res, err := p.pickIfFeasible(cpw, info)
if ok {
return res, err
for i, cpw := range dcEntry.childPolicyWrappers {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
// Delegate to the child policy if it is not in TRANSIENT_FAILURE, or if
// it the last one (which handles the case of delegating to the last
// child picker if all child polcies are in TRANSIENT_FAILURE).
if state.ConnectivityState != connectivity.TransientFailure || i == len(dcEntry.childPolicyWrappers)-1 {
return state.Picker.Pick(info)
}
}
if len(dcEntry.childPolicyWrappers) != 0 {
state := (*balancer.State)(atomic.LoadPointer(&dcEntry.childPolicyWrappers[0].state))
return state.Picker.Pick(info)
}
// In the unlikely event that we have a cache entry with no targets, we end up
// queueing the RPC.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
Expand Down Expand Up @@ -249,8 +248,8 @@ func (p *rlsPicker) sendRequestAndReturnPick(cacheKey cacheKey, bs *backoffState
// target if one is configured, or fails the pick with the given error.
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
if p.defaultPolicy != nil {
_, res, err := p.pickIfFeasible(p.defaultPolicy, info)
return res, err
state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
return state.Picker.Pick(info)
}
return balancer.PickResult{}, errOnNoDefault
}
Expand All @@ -275,27 +274,6 @@ func (p *rlsPicker) sendRouteLookupRequest(cacheKey cacheKey, bs *backoffState,
return throttled
}

// pickIfFeasible determines if a pick can be delegated to child policy based on
// its connectivity state.
// - If state is CONNECTING, the pick is to be queued
// - If state is IDLE, the child policy is instructed to exit idle, and the pick
// is to be queued
// - If state is READY, pick it delegated to the child policy's picker
func (p *rlsPicker) pickIfFeasible(cpw *childPolicyWrapper, info balancer.PickInfo) (bool, balancer.PickResult, error) {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
switch state.ConnectivityState {
case connectivity.Connecting:
return true, balancer.PickResult{}, balancer.ErrNoSubConnAvailable
case connectivity.Idle:
p.bg.ExitIdleOne(cpw.target)
return true, balancer.PickResult{}, balancer.ErrNoSubConnAvailable
case connectivity.Ready:
r, e := state.Picker.Pick(info)
return true, r, e
}
return false, balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

// handleRouteLookupResponse is the callback invoked by the control channel upon
// receipt of an RLS response. Modifies the data cache and pending requests map
// and sends a new picker.
Expand Down
51 changes: 47 additions & 4 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,44 @@ func DefaultRouteConfig(routeName, ldsTarget, clusterName string) *v3routepb.Rou

// DefaultCluster returns a basic xds Cluster resource.
func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel) *v3clusterpb.Cluster {
return ClusterResourceWithOptions(&ClusterOptions{
ClusterName: clusterName,
ServiceName: edsServiceName,
Policy: LoadBalancingPolicyRoundRobin,
SecurityLevel: secLevel,
})
}

// LoadBalancingPolicy determines the policy used for balancing load across
// endpoints in the Cluster.
type LoadBalancingPolicy int

const (
// LoadBalancingPolicyRoundRobin results in the use of the weighted_target
// LB policy to balance load across localities and endpoints in the cluster.
LoadBalancingPolicyRoundRobin LoadBalancingPolicy = iota
// LoadBalancingPolicyRingHash results in the use of the ring_hash LB policy
// as the leaf policy.
LoadBalancingPolicyRingHash
)

// ClusterOptions contains options to configure a Cluster resource.
type ClusterOptions struct {
// ClusterName is the name of the Cluster resource.
ClusterName string
// ServiceName is the EDS service name of the Cluster.
ServiceName string
// Policy is the LB policy to be used.
Policy LoadBalancingPolicy
// SecurityLevel determines the security configuration for the Cluster.
SecurityLevel SecurityLevel
}

// ClusterResourceWithOptions returns an xDS Cluster resource configured with
// the provided options.
func ClusterResourceWithOptions(opts *ClusterOptions) *v3clusterpb.Cluster {
var tlsContext *v3tlspb.UpstreamTlsContext
switch secLevel {
switch opts.SecurityLevel {
case SecurityLevelNone:
case SecurityLevelTLS:
tlsContext = &v3tlspb.UpstreamTlsContext{
Expand All @@ -333,18 +369,25 @@ func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel)
}
}

var lbPolicy v3clusterpb.Cluster_LbPolicy
switch opts.Policy {
case LoadBalancingPolicyRoundRobin:
lbPolicy = v3clusterpb.Cluster_ROUND_ROBIN
case LoadBalancingPolicyRingHash:
lbPolicy = v3clusterpb.Cluster_RING_HASH
}
cluster := &v3clusterpb.Cluster{
Name: clusterName,
Name: opts.ClusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
EdsConfig: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{
Ads: &v3corepb.AggregatedConfigSource{},
},
},
ServiceName: edsServiceName,
ServiceName: opts.ServiceName,
},
LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
LbPolicy: lbPolicy,
}
if tlsContext != nil {
cluster.TransportSocket = &v3corepb.TransportSocket{
Expand Down
32 changes: 29 additions & 3 deletions test/xds/xds_rls_clusterspecifier_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,20 @@ import (

// defaultClientResourcesWithRLSCSP returns a set of resources (LDS, RDS, CDS, EDS) for a
// client to connect to a server with a RLS Load Balancer as a child of Cluster Manager.
func defaultClientResourcesWithRLSCSP(params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions {
func defaultClientResourcesWithRLSCSP(lb e2e.LoadBalancingPolicy, params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions {
routeConfigName := "route-" + params.DialTarget
clusterName := "cluster-" + params.DialTarget
endpointsName := "endpoints-" + params.DialTarget
return e2e.UpdateOptions{
NodeID: params.NodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)},
Routes: []*v3routepb.RouteConfiguration{defaultRouteConfigWithRLSCSP(routeConfigName, params.DialTarget, rlsProto)},
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, params.SecLevel)},
Clusters: []*v3clusterpb.Cluster{e2e.ClusterResourceWithOptions(&e2e.ClusterOptions{
ClusterName: clusterName,
ServiceName: endpointsName,
Policy: lb,
SecurityLevel: params.SecLevel,
})},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, []uint32{params.Port})},
}
}
Expand Down Expand Up @@ -93,6 +98,27 @@ func defaultRouteConfigWithRLSCSP(routeName, ldsTarget string, rlsProto *rlspb.R
// target corresponding to this test service. This test asserts an RPC proceeds
// as normal with the RLS Balancer as part of system.
func (s) TestRLSinxDS(t *testing.T) {
tests := []struct {
name string
lbPolicy e2e.LoadBalancingPolicy
}{
{
name: "roundrobin",
lbPolicy: e2e.LoadBalancingPolicyRoundRobin,
},
{
name: "ringhash",
lbPolicy: e2e.LoadBalancingPolicyRingHash,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testRLSinxDS(t, test.lbPolicy)
})
}
}

func testRLSinxDS(t *testing.T, lbPolicy e2e.LoadBalancingPolicy) {
oldRLS := envconfig.XDSRLS
envconfig.XDSRLS = true
internal.RegisterRLSClusterSpecifierPluginForTesting()
Expand All @@ -119,7 +145,7 @@ func (s) TestRLSinxDS(t *testing.T) {
}

const serviceName = "my-service-client-side-xds"
resources := defaultClientResourcesWithRLSCSP(e2e.ResourceParams{
resources := defaultClientResourcesWithRLSCSP(lbPolicy, e2e.ResourceParams{
DialTarget: serviceName,
NodeID: nodeID,
Host: "localhost",
Expand Down

0 comments on commit 9c3e589

Please sign in to comment.