From da5a8540186646c86d5ca32ece98585d889007ea Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 14 Sep 2022 16:28:13 -0700 Subject: [PATCH 1/2] rls: delegate pick to child policy as long as it is not in TransientFailure --- balancer/rls/picker.go | 36 +++---------- internal/testutils/xds/e2e/clientresources.go | 51 +++++++++++++++++-- .../xds_rls_clusterspecifier_plugin_test.go | 32 ++++++++++-- 3 files changed, 84 insertions(+), 35 deletions(-) diff --git a/balancer/rls/picker.go b/balancer/rls/picker.go index ece27f0fc2ed..2119b7a6a39a 100644 --- a/balancer/rls/picker.go +++ b/balancer/rls/picker.go @@ -162,16 +162,17 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { // delegateToChildPolicies is a helper function which iterates through the list // of child policy wrappers in a cache entry and attempts to find a child policy -// to which this RPC can be routed to. If there is no child policy in READY -// state, we delegate to the first child policy arbitrarily. +// to which this RPC can be routed to. If all child policies are in +// TRANSIENT_FAILURE, we delegate to the first child policy arbitrarily. // // Caller must hold at least a read-lock on p.lb.cacheMu. func (p *rlsPicker) delegateToChildPolicies(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) { for _, cpw := range dcEntry.childPolicyWrappers { - ok, res, err := p.pickIfFeasible(cpw, info) - if ok { - return res, err + state := (*balancer.State)(atomic.LoadPointer(&cpw.state)) + if state.ConnectivityState == connectivity.TransientFailure { + continue } + return state.Picker.Pick(info) } if len(dcEntry.childPolicyWrappers) != 0 { state := (*balancer.State)(atomic.LoadPointer(&dcEntry.childPolicyWrappers[0].state)) @@ -249,8 +250,8 @@ func (p *rlsPicker) sendRequestAndReturnPick(cacheKey cacheKey, bs *backoffState // target if one is configured, or fails the pick with the given error. func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) { if p.defaultPolicy != nil { - _, res, err := p.pickIfFeasible(p.defaultPolicy, info) - return res, err + state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state)) + return state.Picker.Pick(info) } return balancer.PickResult{}, errOnNoDefault } @@ -275,27 +276,6 @@ func (p *rlsPicker) sendRouteLookupRequest(cacheKey cacheKey, bs *backoffState, return throttled } -// pickIfFeasible determines if a pick can be delegated to child policy based on -// its connectivity state. -// - If state is CONNECTING, the pick is to be queued -// - If state is IDLE, the child policy is instructed to exit idle, and the pick -// is to be queued -// - If state is READY, pick it delegated to the child policy's picker -func (p *rlsPicker) pickIfFeasible(cpw *childPolicyWrapper, info balancer.PickInfo) (bool, balancer.PickResult, error) { - state := (*balancer.State)(atomic.LoadPointer(&cpw.state)) - switch state.ConnectivityState { - case connectivity.Connecting: - return true, balancer.PickResult{}, balancer.ErrNoSubConnAvailable - case connectivity.Idle: - p.bg.ExitIdleOne(cpw.target) - return true, balancer.PickResult{}, balancer.ErrNoSubConnAvailable - case connectivity.Ready: - r, e := state.Picker.Pick(info) - return true, r, e - } - return false, balancer.PickResult{}, balancer.ErrNoSubConnAvailable -} - // handleRouteLookupResponse is the callback invoked by the control channel upon // receipt of an RLS response. Modifies the data cache and pending requests map // and sends a new picker. diff --git a/internal/testutils/xds/e2e/clientresources.go b/internal/testutils/xds/e2e/clientresources.go index f3f7f6307c53..2dacebb14653 100644 --- a/internal/testutils/xds/e2e/clientresources.go +++ b/internal/testutils/xds/e2e/clientresources.go @@ -305,8 +305,44 @@ func DefaultRouteConfig(routeName, ldsTarget, clusterName string) *v3routepb.Rou // DefaultCluster returns a basic xds Cluster resource. func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel) *v3clusterpb.Cluster { + return ClusterResourceWithOptions(&ClusterOptions{ + ClusterName: clusterName, + ServiceName: edsServiceName, + Policy: LoadBalancingPolicyRoundRobin, + SecurityLevel: secLevel, + }) +} + +// LoadBalancingPolicy determines the policy used for balancing load across +// endpoints in the Cluster. +type LoadBalancingPolicy int + +const ( + // LoadBalancingPolicyRoundRobin results in the use of the weighted_target + // LB policy to balance load across localities and endpoints in the cluster. + LoadBalancingPolicyRoundRobin LoadBalancingPolicy = iota + // LoadBalancingPolicyRingHash results in the use of the ring_hash LB policy + // as the leaf policy. + LoadBalancingPolicyRingHash +) + +// ClusterOptions contains options to configure a Cluster resource. +type ClusterOptions struct { + // ClusterName is the name of the Cluster resource. + ClusterName string + // ServiceName is the EDS service name of the Cluster. + ServiceName string + // Policy is the LB policy to be used. + Policy LoadBalancingPolicy + // SecurityLevel determines the security configuration for the Cluster. + SecurityLevel SecurityLevel +} + +// ClusterResourceWithOptions returns an xDS Cluster resource configured with +// the provided options. +func ClusterResourceWithOptions(opts *ClusterOptions) *v3clusterpb.Cluster { var tlsContext *v3tlspb.UpstreamTlsContext - switch secLevel { + switch opts.SecurityLevel { case SecurityLevelNone: case SecurityLevelTLS: tlsContext = &v3tlspb.UpstreamTlsContext{ @@ -333,8 +369,15 @@ func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel) } } + var lbPolicy v3clusterpb.Cluster_LbPolicy + switch opts.Policy { + case LoadBalancingPolicyRoundRobin: + lbPolicy = v3clusterpb.Cluster_ROUND_ROBIN + case LoadBalancingPolicyRingHash: + lbPolicy = v3clusterpb.Cluster_RING_HASH + } cluster := &v3clusterpb.Cluster{ - Name: clusterName, + Name: opts.ClusterName, ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS}, EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{ EdsConfig: &v3corepb.ConfigSource{ @@ -342,9 +385,9 @@ func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel) Ads: &v3corepb.AggregatedConfigSource{}, }, }, - ServiceName: edsServiceName, + ServiceName: opts.ServiceName, }, - LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN, + LbPolicy: lbPolicy, } if tlsContext != nil { cluster.TransportSocket = &v3corepb.TransportSocket{ diff --git a/test/xds/xds_rls_clusterspecifier_plugin_test.go b/test/xds/xds_rls_clusterspecifier_plugin_test.go index 35b5fe37dc1b..f5ab17bca7ee 100644 --- a/test/xds/xds_rls_clusterspecifier_plugin_test.go +++ b/test/xds/xds_rls_clusterspecifier_plugin_test.go @@ -46,7 +46,7 @@ import ( // defaultClientResourcesWithRLSCSP returns a set of resources (LDS, RDS, CDS, EDS) for a // client to connect to a server with a RLS Load Balancer as a child of Cluster Manager. -func defaultClientResourcesWithRLSCSP(params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions { +func defaultClientResourcesWithRLSCSP(lb e2e.LoadBalancingPolicy, params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions { routeConfigName := "route-" + params.DialTarget clusterName := "cluster-" + params.DialTarget endpointsName := "endpoints-" + params.DialTarget @@ -54,7 +54,12 @@ func defaultClientResourcesWithRLSCSP(params e2e.ResourceParams, rlsProto *rlspb NodeID: params.NodeID, Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)}, Routes: []*v3routepb.RouteConfiguration{defaultRouteConfigWithRLSCSP(routeConfigName, params.DialTarget, rlsProto)}, - Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, params.SecLevel)}, + Clusters: []*v3clusterpb.Cluster{e2e.ClusterResourceWithOptions(&e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: endpointsName, + Policy: lb, + SecurityLevel: params.SecLevel, + })}, Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, []uint32{params.Port})}, } } @@ -93,6 +98,27 @@ func defaultRouteConfigWithRLSCSP(routeName, ldsTarget string, rlsProto *rlspb.R // target corresponding to this test service. This test asserts an RPC proceeds // as normal with the RLS Balancer as part of system. func (s) TestRLSinxDS(t *testing.T) { + tests := []struct { + name string + lbPolicy e2e.LoadBalancingPolicy + }{ + { + name: "roundrobin", + lbPolicy: e2e.LoadBalancingPolicyRoundRobin, + }, + { + name: "ringhash", + lbPolicy: e2e.LoadBalancingPolicyRingHash, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testRLSinxDS(t, test.lbPolicy) + }) + } +} + +func testRLSinxDS(t *testing.T, lbPolicy e2e.LoadBalancingPolicy) { oldRLS := envconfig.XDSRLS envconfig.XDSRLS = true internal.RegisterRLSClusterSpecifierPluginForTesting() @@ -119,7 +145,7 @@ func (s) TestRLSinxDS(t *testing.T) { } const serviceName = "my-service-client-side-xds" - resources := defaultClientResourcesWithRLSCSP(e2e.ResourceParams{ + resources := defaultClientResourcesWithRLSCSP(lbPolicy, e2e.ResourceParams{ DialTarget: serviceName, NodeID: nodeID, Host: "localhost", From 00544e915b2d51b0c0069fcf5de53e7d9cf55eb7 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 15 Sep 2022 12:58:04 -0700 Subject: [PATCH 2/2] delegate to the last child policy is all children are in TF --- balancer/rls/picker.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/balancer/rls/picker.go b/balancer/rls/picker.go index 2119b7a6a39a..f73fe7b1028f 100644 --- a/balancer/rls/picker.go +++ b/balancer/rls/picker.go @@ -163,20 +163,18 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { // delegateToChildPolicies is a helper function which iterates through the list // of child policy wrappers in a cache entry and attempts to find a child policy // to which this RPC can be routed to. If all child policies are in -// TRANSIENT_FAILURE, we delegate to the first child policy arbitrarily. +// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily. // // Caller must hold at least a read-lock on p.lb.cacheMu. func (p *rlsPicker) delegateToChildPolicies(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) { - for _, cpw := range dcEntry.childPolicyWrappers { + for i, cpw := range dcEntry.childPolicyWrappers { state := (*balancer.State)(atomic.LoadPointer(&cpw.state)) - if state.ConnectivityState == connectivity.TransientFailure { - continue + // Delegate to the child policy if it is not in TRANSIENT_FAILURE, or if + // it the last one (which handles the case of delegating to the last + // child picker if all child polcies are in TRANSIENT_FAILURE). + if state.ConnectivityState != connectivity.TransientFailure || i == len(dcEntry.childPolicyWrappers)-1 { + return state.Picker.Pick(info) } - return state.Picker.Pick(info) - } - if len(dcEntry.childPolicyWrappers) != 0 { - state := (*balancer.State)(atomic.LoadPointer(&dcEntry.childPolicyWrappers[0].state)) - return state.Picker.Pick(info) } // In the unlikely event that we have a cache entry with no targets, we end up // queueing the RPC.