Skip to content

Commit

Permalink
clusterimpl: propagate state update from child when drop/request conf…
Browse files Browse the repository at this point in the history
…ig remains unchanged (grpc#7844)
  • Loading branch information
easwars authored Nov 14, 2024
1 parent 89737ae commit 66385b2
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 4 deletions.
20 changes: 17 additions & 3 deletions internal/testutils/xds/e2e/clientresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,13 @@ func ClusterResourceWithOptions(opts ClusterOptions) *v3clusterpb.Cluster {
return cluster
}

// LocalityID represents a locality identifier.
type LocalityID struct {
Region string
Zone string
SubZone string
}

// LocalityOptions contains options to configure a Locality.
type LocalityOptions struct {
// Name is the unique locality name.
Expand All @@ -677,6 +684,9 @@ type LocalityOptions struct {
Backends []BackendOptions
// Priority is the priority of the locality. Defaults to 0.
Priority uint32
// Locality is the locality identifier. If not specified, a random
// identifier is generated.
Locality LocalityID
}

// BackendOptions contains options to configure individual backends in a
Expand Down Expand Up @@ -752,12 +762,16 @@ func EndpointResourceWithOptions(opts EndpointOptions) *v3endpointpb.ClusterLoad
})
}

endpoints = append(endpoints, &v3endpointpb.LocalityLbEndpoints{
Locality: &v3corepb.Locality{
l := locality.Locality
if l == (LocalityID{}) {
l = LocalityID{
Region: fmt.Sprintf("region-%d", i+1),
Zone: fmt.Sprintf("zone-%d", i+1),
SubZone: fmt.Sprintf("subzone-%d", i+1),
},
}
}
endpoints = append(endpoints, &v3endpointpb.LocalityLbEndpoints{
Locality: &v3corepb.Locality{Region: l.Region, Zone: l.Zone, SubZone: l.SubZone},
LbEndpoints: lbEndpoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: locality.Weight},
Priority: locality.Priority,
Expand Down
211 changes: 211 additions & 0 deletions test/xds/xds_client_priority_locality_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package xds_test

import (
"context"
"fmt"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
rrutil "google.golang.org/grpc/internal/testutils/roundrobin"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/testutils/xds/e2e/setup"
"google.golang.org/grpc/resolver"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
testgrpc "google.golang.org/grpc/interop/grpc_testing"
)

// backendAddressesAndPorts extracts the address and port of each of the
// StubServers passed in and returns them. Fails the test if any of the
// StubServers passed have an invalid address.
func backendAddressesAndPorts(t *testing.T, servers []*stubserver.StubServer) ([]resolver.Address, []uint32) {
addrs := make([]resolver.Address, len(servers))
ports := make([]uint32, len(servers))
for i := 0; i < len(servers); i++ {
addrs[i] = resolver.Address{Addr: servers[i].Address}
ports[i] = testutils.ParsePort(t, servers[i].Address)
}
return addrs, ports
}

// Tests scenarios involving localities moving between priorities.
// - The test starts off with a cluster that contains two priorities, one
// locality in each, and one endpoint in each. Verifies that traffic reaches
// the endpoint in the higher priority.
// - The test then moves the locality in the lower priority over to the higher
// priority. At that point, we would have a cluster with a single priority,
// but two localities, and one endpoint in each. Verifies that traffic is
// split between the endpoints.
// - The test then deletes the locality that was originally in the higher
// priority.Verifies that all traffic is now reaching the only remaining
// endpoint.
func (s) TestClientSideXDS_LocalityChangesPriority(t *testing.T) {
// Spin up a management server and two test service backends.
managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t)
backend0 := stubserver.StartTestService(t, nil)
defer backend0.Stop()
backend1 := stubserver.StartTestService(t, nil)
defer backend1.Stop()
addrs, ports := backendAddressesAndPorts(t, []*stubserver.StubServer{backend0, backend1})

// Configure resources on the management server. We use default client side
// resources for listener, route configuration and cluster. For the
// endpoints resource though, we create one with two priorities, and one
// locality each, and one endpoint each.
const serviceName = "my-service-client-side-xds"
const routeConfigName = "route-" + serviceName
const clusterName = "cluster-" + serviceName
const endpointsName = "endpoints-" + serviceName
locality1 := e2e.LocalityID{Region: "my-region-1", Zone: "my-zone-1", SubZone: "my-subzone-1"}
locality2 := e2e.LocalityID{Region: "my-region-2", Zone: "my-zone-2", SubZone: "my-subzone-2"}
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)},
Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)},
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, e2e.SecurityLevelNone)},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
ClusterName: endpointsName,
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Name: "my-locality-1",
Weight: 1000000,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: ports[0]}},
Locality: locality1,
},
{
Name: "my-locality-2",
Weight: 1000000,
Priority: 1,
Backends: []e2e.BackendOptions{{Port: ports[1]}},
Locality: locality2,
},
},
})},
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create a ClientConn and make a successful RPC.
cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolver))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

// // Ensure that RPCs get routed to the backend in the higher priority.
client := testgrpc.NewTestServiceClient(cc)
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
t.Fatal(err)
}

// Update the endpoints resource to contain a single priority with two
// localities, and one endpoint each. The locality weights are equal at this
// point, and we expect RPCs to be round-robined across the two localities.
resources.Endpoints = []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
ClusterName: endpointsName,
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Name: "my-locality-1",
Weight: 500000,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend0.Address)}},
Locality: locality1,
},
{
Name: "my-locality-2",
Weight: 500000,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend1.Address)}},
Locality: locality2,
},
},
})}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
t.Fatal(err)
}

// Update the locality weights ever so slightly. We still expect RPCs to be
// round-robined across the two localities.
resources.Endpoints = []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
ClusterName: endpointsName,
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Name: "my-locality-1",
Weight: 499884,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend0.Address)}},
Locality: locality1,
},
{
Name: "my-locality-2",
Weight: 500115,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend1.Address)}},
Locality: locality2,
},
},
})}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
t.Fatal(err)
}

// Update the endpoints resource to contain a single priority with one
// locality. The locality which was originally in the higher priority is now
// dropped.
resources.Endpoints = []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
ClusterName: endpointsName,
Host: "localhost",
Localities: []e2e.LocalityOptions{
{
Name: "my-locality-2",
Weight: 1000000,
Priority: 0,
Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, backend1.Address)}},
Locality: locality2,
},
},
})}
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
t.Fatal(err)
}
}
15 changes: 14 additions & 1 deletion xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type clusterImplBalancer struct {
mu sync.Mutex
clusterName string // The cluster name for credentials handshaking.
inhibitPickerUpdates bool // Inhibits state updates from child policy when processing an update from the parent.
pendingPickerUpdates bool // True if a picker update from the child policy was inhibited when processing an update from the parent.
childState balancer.State // Most recent state update from the child policy.
drops []*dropper // Drops implementation.
requestCounterCluster string // The cluster name for the request counter, from LB config.
Expand Down Expand Up @@ -293,7 +294,15 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState)

b.mu.Lock()
b.telemetryLabels = newConfig.TelemetryLabels
if b.handleDropAndRequestCountLocked(newConfig) && b.childState.Picker != nil {
// We want to send a picker update to the parent if one of the two
// conditions are met:
// - drop/request config has changed *and* there is already a picker from
// the child, or
// - there is a pending picker update from the child (and this covers the
// case where the drop/request config has not changed, but the child sent
// a picker update while we were still processing config from our parent).
if (b.handleDropAndRequestCountLocked(newConfig) && b.childState.Picker != nil) || b.pendingPickerUpdates {
b.pendingPickerUpdates = false
b.ClientConn.UpdateState(balancer.State{
ConnectivityState: b.childState.ConnectivityState,
Picker: b.newPickerLocked(),
Expand Down Expand Up @@ -356,6 +365,10 @@ func (b *clusterImplBalancer) UpdateState(state balancer.State) {
// parent. Update the childState field regardless.
b.childState = state
if b.inhibitPickerUpdates {
b.pendingPickerUpdates = true
if b.logger.V(2) {
b.logger.Infof("Received a picker update from the child when processing an update from the parent")
}
return
}

Expand Down

0 comments on commit 66385b2

Please sign in to comment.