diff --git a/balancer/rls/picker.go b/balancer/rls/picker.go index ece27f0fc2ed..2119b7a6a39a 100644 --- a/balancer/rls/picker.go +++ b/balancer/rls/picker.go @@ -162,16 +162,17 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { // delegateToChildPolicies is a helper function which iterates through the list // of child policy wrappers in a cache entry and attempts to find a child policy -// to which this RPC can be routed to. If there is no child policy in READY -// state, we delegate to the first child policy arbitrarily. +// to which this RPC can be routed to. If all child policies are in +// TRANSIENT_FAILURE, we delegate to the first child policy arbitrarily. // // Caller must hold at least a read-lock on p.lb.cacheMu. func (p *rlsPicker) delegateToChildPolicies(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) { for _, cpw := range dcEntry.childPolicyWrappers { - ok, res, err := p.pickIfFeasible(cpw, info) - if ok { - return res, err + state := (*balancer.State)(atomic.LoadPointer(&cpw.state)) + if state.ConnectivityState == connectivity.TransientFailure { + continue } + return state.Picker.Pick(info) } if len(dcEntry.childPolicyWrappers) != 0 { state := (*balancer.State)(atomic.LoadPointer(&dcEntry.childPolicyWrappers[0].state)) @@ -249,8 +250,8 @@ func (p *rlsPicker) sendRequestAndReturnPick(cacheKey cacheKey, bs *backoffState // target if one is configured, or fails the pick with the given error. func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) { if p.defaultPolicy != nil { - _, res, err := p.pickIfFeasible(p.defaultPolicy, info) - return res, err + state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state)) + return state.Picker.Pick(info) } return balancer.PickResult{}, errOnNoDefault } @@ -275,27 +276,6 @@ func (p *rlsPicker) sendRouteLookupRequest(cacheKey cacheKey, bs *backoffState, return throttled } -// pickIfFeasible determines if a pick can be delegated to child policy based on -// its connectivity state. -// - If state is CONNECTING, the pick is to be queued -// - If state is IDLE, the child policy is instructed to exit idle, and the pick -// is to be queued -// - If state is READY, pick it delegated to the child policy's picker -func (p *rlsPicker) pickIfFeasible(cpw *childPolicyWrapper, info balancer.PickInfo) (bool, balancer.PickResult, error) { - state := (*balancer.State)(atomic.LoadPointer(&cpw.state)) - switch state.ConnectivityState { - case connectivity.Connecting: - return true, balancer.PickResult{}, balancer.ErrNoSubConnAvailable - case connectivity.Idle: - p.bg.ExitIdleOne(cpw.target) - return true, balancer.PickResult{}, balancer.ErrNoSubConnAvailable - case connectivity.Ready: - r, e := state.Picker.Pick(info) - return true, r, e - } - return false, balancer.PickResult{}, balancer.ErrNoSubConnAvailable -} - // handleRouteLookupResponse is the callback invoked by the control channel upon // receipt of an RLS response. Modifies the data cache and pending requests map // and sends a new picker. diff --git a/internal/testutils/xds/e2e/clientresources.go b/internal/testutils/xds/e2e/clientresources.go index f3f7f6307c53..2dacebb14653 100644 --- a/internal/testutils/xds/e2e/clientresources.go +++ b/internal/testutils/xds/e2e/clientresources.go @@ -305,8 +305,44 @@ func DefaultRouteConfig(routeName, ldsTarget, clusterName string) *v3routepb.Rou // DefaultCluster returns a basic xds Cluster resource. func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel) *v3clusterpb.Cluster { + return ClusterResourceWithOptions(&ClusterOptions{ + ClusterName: clusterName, + ServiceName: edsServiceName, + Policy: LoadBalancingPolicyRoundRobin, + SecurityLevel: secLevel, + }) +} + +// LoadBalancingPolicy determines the policy used for balancing load across +// endpoints in the Cluster. +type LoadBalancingPolicy int + +const ( + // LoadBalancingPolicyRoundRobin results in the use of the weighted_target + // LB policy to balance load across localities and endpoints in the cluster. + LoadBalancingPolicyRoundRobin LoadBalancingPolicy = iota + // LoadBalancingPolicyRingHash results in the use of the ring_hash LB policy + // as the leaf policy. + LoadBalancingPolicyRingHash +) + +// ClusterOptions contains options to configure a Cluster resource. +type ClusterOptions struct { + // ClusterName is the name of the Cluster resource. + ClusterName string + // ServiceName is the EDS service name of the Cluster. + ServiceName string + // Policy is the LB policy to be used. + Policy LoadBalancingPolicy + // SecurityLevel determines the security configuration for the Cluster. + SecurityLevel SecurityLevel +} + +// ClusterResourceWithOptions returns an xDS Cluster resource configured with +// the provided options. +func ClusterResourceWithOptions(opts *ClusterOptions) *v3clusterpb.Cluster { var tlsContext *v3tlspb.UpstreamTlsContext - switch secLevel { + switch opts.SecurityLevel { case SecurityLevelNone: case SecurityLevelTLS: tlsContext = &v3tlspb.UpstreamTlsContext{ @@ -333,8 +369,15 @@ func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel) } } + var lbPolicy v3clusterpb.Cluster_LbPolicy + switch opts.Policy { + case LoadBalancingPolicyRoundRobin: + lbPolicy = v3clusterpb.Cluster_ROUND_ROBIN + case LoadBalancingPolicyRingHash: + lbPolicy = v3clusterpb.Cluster_RING_HASH + } cluster := &v3clusterpb.Cluster{ - Name: clusterName, + Name: opts.ClusterName, ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS}, EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{ EdsConfig: &v3corepb.ConfigSource{ @@ -342,9 +385,9 @@ func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel) Ads: &v3corepb.AggregatedConfigSource{}, }, }, - ServiceName: edsServiceName, + ServiceName: opts.ServiceName, }, - LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN, + LbPolicy: lbPolicy, } if tlsContext != nil { cluster.TransportSocket = &v3corepb.TransportSocket{ diff --git a/test/xds/xds_rls_clusterspecifier_plugin_test.go b/test/xds/xds_rls_clusterspecifier_plugin_test.go index 35b5fe37dc1b..f5ab17bca7ee 100644 --- a/test/xds/xds_rls_clusterspecifier_plugin_test.go +++ b/test/xds/xds_rls_clusterspecifier_plugin_test.go @@ -46,7 +46,7 @@ import ( // defaultClientResourcesWithRLSCSP returns a set of resources (LDS, RDS, CDS, EDS) for a // client to connect to a server with a RLS Load Balancer as a child of Cluster Manager. -func defaultClientResourcesWithRLSCSP(params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions { +func defaultClientResourcesWithRLSCSP(lb e2e.LoadBalancingPolicy, params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions { routeConfigName := "route-" + params.DialTarget clusterName := "cluster-" + params.DialTarget endpointsName := "endpoints-" + params.DialTarget @@ -54,7 +54,12 @@ func defaultClientResourcesWithRLSCSP(params e2e.ResourceParams, rlsProto *rlspb NodeID: params.NodeID, Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)}, Routes: []*v3routepb.RouteConfiguration{defaultRouteConfigWithRLSCSP(routeConfigName, params.DialTarget, rlsProto)}, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, params.SecLevel)}, + Clusters: []*v3clusterpb.Cluster{e2e.ClusterResourceWithOptions(&e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: endpointsName, + Policy: lb, + SecurityLevel: params.SecLevel, + })}, Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, []uint32{params.Port})}, } } @@ -93,6 +98,27 @@ func defaultRouteConfigWithRLSCSP(routeName, ldsTarget string, rlsProto *rlspb.R // target corresponding to this test service. This test asserts an RPC proceeds // as normal with the RLS Balancer as part of system. func (s) TestRLSinxDS(t *testing.T) { + tests := []struct { + name string + lbPolicy e2e.LoadBalancingPolicy + }{ + { + name: "roundrobin", + lbPolicy: e2e.LoadBalancingPolicyRoundRobin, + }, + { + name: "ringhash", + lbPolicy: e2e.LoadBalancingPolicyRingHash, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testRLSinxDS(t, test.lbPolicy) + }) + } +} + +func testRLSinxDS(t *testing.T, lbPolicy e2e.LoadBalancingPolicy) { oldRLS := envconfig.XDSRLS envconfig.XDSRLS = true internal.RegisterRLSClusterSpecifierPluginForTesting() @@ -119,7 +145,7 @@ func (s) TestRLSinxDS(t *testing.T) { } const serviceName = "my-service-client-side-xds" - resources := defaultClientResourcesWithRLSCSP(e2e.ResourceParams{ + resources := defaultClientResourcesWithRLSCSP(lbPolicy, e2e.ResourceParams{ DialTarget: serviceName, NodeID: nodeID, Host: "localhost",