Skip to content

Commit

Permalink
xds: make comparison of server configs in bootstrap more reliable (#6112
Browse files Browse the repository at this point in the history
)
  • Loading branch information
easwars authored Mar 15, 2023
1 parent 7507ea6 commit 52ca957
Show file tree
Hide file tree
Showing 27 changed files with 497 additions and 361 deletions.
15 changes: 10 additions & 5 deletions xds/googledirectpath/googlec2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/google"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/googlecloud"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/xds" // To register xds resolvers and balancers.
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/types/known/structpb"

v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"

_ "google.golang.org/grpc/xds" // To register xds resolvers and balancers.
)

const (
Expand Down Expand Up @@ -116,9 +116,14 @@ func (c2pResolverBuilder) Build(t resolver.Target, cc resolver.ClientConn, opts
if balancerName == "" {
balancerName = tdURL
}
serverConfig := &bootstrap.ServerConfig{
ServerURI: balancerName,
Creds: grpc.WithCredentialsBundle(google.NewDefaultCredentials()),
serverConfig, err := bootstrap.ServerConfigFromJSON([]byte(fmt.Sprintf(`
{
"server_uri": "%s",
"channel_creds": [{"type": "google_default"}],
"server_features": ["xds_v3"]
}`, balancerName)))
if err != nil {
return nil, fmt.Errorf("failed to build bootstrap configuration: %v", err)
}
config := &bootstrap.Config{
XDSServer: serverConfig,
Expand Down
20 changes: 13 additions & 7 deletions xds/googledirectpath/googlec2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package googledirectpath

import (
"fmt"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -212,15 +213,20 @@ func TestBuildXDS(t *testing.T) {
},
}
}
serverConfig := &bootstrap.ServerConfig{
ServerURI: tdURL,
wantServerConfig, err := bootstrap.ServerConfigFromJSON([]byte(fmt.Sprintf(`{
"server_uri": "%s",
"channel_creds": [{"type": "google_default"}],
"server_features": ["xds_v3"]
}`, tdURL)))
if err != nil {
t.Fatalf("Failed to build server bootstrap config: %v", err)
}
wantConfig := &bootstrap.Config{
XDSServer: serverConfig,
XDSServer: wantServerConfig,
ClientDefaultListenerResourceNameTemplate: "%s",
Authorities: map[string]*bootstrap.Authority{
"traffic-director-c2p.xds.googleapis.com": {
XDSServer: serverConfig,
XDSServer: wantServerConfig,
},
},
NodeProto: wantNode,
Expand All @@ -234,9 +240,9 @@ func TestBuildXDS(t *testing.T) {
protocmp.Transform(),
}
select {
case c := <-configCh:
if diff := cmp.Diff(c, wantConfig, cmpOpts); diff != "" {
t.Fatalf("%v", diff)
case gotConfig := <-configCh:
if diff := cmp.Diff(wantConfig, gotConfig, cmpOpts); diff != "" {
t.Fatalf("Unexpected diff in bootstrap config (-want +got):\n%s", diff)
}
case <-time.After(time.Second):
t.Fatalf("timeout waiting for client config")
Expand Down
4 changes: 3 additions & 1 deletion xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ const (
var (
defaultTestAuthorityServerConfig = &bootstrap.ServerConfig{
ServerURI: "self_server",
CredsType: "self_creds",
Creds: bootstrap.ChannelCreds{
Type: "insecure",
},
}
noopODLBCfg = outlierdetection.LBConfig{
Interval: 1<<63 - 1,
Expand Down
8 changes: 6 additions & 2 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ var (
}
testLRSServerConfig = &bootstrap.ServerConfig{
ServerURI: "trafficdirector.googleapis.com:443",
CredsType: "google_default",
Creds: bootstrap.ChannelCreds{
Type: "google_default",
},
}

cmpOpts = cmp.Options{
Expand Down Expand Up @@ -720,7 +722,9 @@ func (s) TestUpdateLRSServer(t *testing.T) {

testLRSServerConfig2 := &bootstrap.ServerConfig{
ServerURI: "trafficdirector-another.googleapis.com:443",
CredsType: "google_default",
Creds: bootstrap.ChannelCreds{
Type: "google_default",
},
}
// Update LRS server to a different name.
if err := b.UpdateClientConnState(balancer.ClientConnState{
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (b *clusterImplBalancer) updateLoadStore(newConfig *LBConfig) error {
} else {
// Old is not nil, new is not nil, compare string values, if
// different, stop old and start new.
if *b.lrsServer != *newConfig.LoadReportingServer {
if !b.lrsServer.Equal(newConfig.LoadReportingServer) {
b.lrsServer = newConfig.LoadReportingServer
stopOldLoadReport = true
startNewLoadReport = true
Expand Down
166 changes: 166 additions & 0 deletions xds/internal/balancer/clusterimpl/tests/balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package clusterimpl_test

import (
"context"
"fmt"
"net"
"strconv"
"strings"
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/status"

v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
testgrpc "google.golang.org/grpc/test/grpc_testing"
testpb "google.golang.org/grpc/test/grpc_testing"

_ "google.golang.org/grpc/xds"
)

const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 100 * time.Millisecond
)

type s struct {
grpctest.Tester
}

func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}

// TestConfigUpdateWithSameLoadReportingServerConfig tests the scenario where
// the clusterimpl LB policy receives a config update with no change in the load
// reporting server configuration. The test verifies that the existing load
// repoting stream is not terminated and that a new load reporting stream is not
// created.
func (s) TestConfigUpdateWithSameLoadReportingServerConfig(t *testing.T) {
// Create an xDS management server that serves ADS and LRS requests.
opts := e2e.ManagementServerOptions{SupportLoadReportingService: true}
mgmtServer, nodeID, _, resolver, mgmtServerCleanup := e2e.SetupManagementServer(t, opts)
defer mgmtServerCleanup()

// Start a server backend exposing the test service.
backend := &stubserver.StubServer{
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
}
backend.StartServer()
defer backend.Stop()

// Extract the host and port where the server backend is running.
_, p, err := net.SplitHostPort(backend.Address)
if err != nil {
t.Fatalf("Invalid serving address for server backend: %v", err)
}
port, err := strconv.ParseUint(p, 10, 32)
if err != nil {
t.Fatalf("Invalid serving port for server backend: %v", err)
}
t.Logf("Started server backend at %q", backend.Address)

// Configure the xDS management server with default resources. Override the
// default cluster to include an LRS server config pointing to self.
const serviceName = "my-test-xds-service"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: nodeID,
Host: "localhost",
Port: uint32(port),
SecLevel: e2e.SecurityLevelNone,
})
resources.Clusters[0].LrsServer = &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Self{
Self: &v3corepb.SelfConfigSource{},
},
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create a ClientConn and make a successful RPC.
cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()

client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}

// Ensure that an LRS stream is created.
if _, err := mgmtServer.LRSServer.LRSStreamOpenChan.Receive(ctx); err != nil {
t.Fatalf("Failure when waiting for an LRS stream to be opened: %v", err)
}

// Configure a new resource on the management server with drop config that
// drops all RPCs, but with no change in the load reporting server config.
resources.Endpoints = []*v3endpointpb.ClusterLoadAssignment{
e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
ClusterName: "endpoints-" + serviceName,
Host: "localhost",
Ports: []uint32{uint32(port)},
DropPercents: map[string]int{"test-drop-everything": 100},
}),
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Repeatedly send RPCs until we sees that they are getting dropped, or the
// test context deadline expires. The former indicates that new config with
// drops has been applied.
for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
_, err := client.EmptyCall(ctx, &testpb.Empty{})
if err != nil && status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "RPC is dropped") {
break
}
}
if ctx.Err() != nil {
t.Fatalf("Timeout when waiting for RPCs to be dropped after config update")
}

// Ensure that the old LRS stream is not closed.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := mgmtServer.LRSServer.LRSStreamCloseChan.Receive(sCtx); err == nil {
t.Fatal("LRS stream closed when expected not to")
}

// Also ensure that a new LRS stream is not created.
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if _, err := mgmtServer.LRSServer.LRSStreamOpenChan.Receive(sCtx); err == nil {
t.Fatal("New LRS stream created when expected not to")
}
}
4 changes: 3 additions & 1 deletion xds/internal/balancer/clusterresolver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ const (

var testLRSServerConfig = &bootstrap.ServerConfig{
ServerURI: "trafficdirector.googleapis.com:443",
CredsType: "google_default",
Creds: bootstrap.ChannelCreds{
Type: "google_default",
},
}

func TestParseConfig(t *testing.T) {
Expand Down
12 changes: 3 additions & 9 deletions xds/internal/resolver/xds_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -53,6 +52,7 @@ import (
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/httpfilter"
"google.golang.org/grpc/xds/internal/httpfilter/router"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
Expand Down Expand Up @@ -213,10 +213,7 @@ func (s) TestResolverBuilder_DifferentBootstrapConfigs(t *testing.T) {

// Add top-level xDS server config corresponding to the above
// management server.
test.bootstrapCfg.XDSServer = &bootstrap.ServerConfig{
ServerURI: mgmtServer.Address,
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
}
test.bootstrapCfg.XDSServer = xdstestutils.ServerConfigForAddress(t, mgmtServer.Address)

// Override xDS client creation to use bootstrap configuration
// specified by the test.
Expand Down Expand Up @@ -538,10 +535,7 @@ func (s) TestResolverWatchCallbackAfterClose(t *testing.T) {
// closes the xDS client.
func (s) TestResolverCloseClosesXDSClient(t *testing.T) {
bootstrapCfg := &bootstrap.Config{
XDSServer: &bootstrap.ServerConfig{
ServerURI: "dummy-management-server-address",
Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
},
XDSServer: xdstestutils.ServerConfigForAddress(t, "dummy-management-server-address"),
}

// Override xDS client creation to use bootstrap configuration pointing to a
Expand Down
21 changes: 21 additions & 0 deletions xds/internal/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package testutils

import (
"fmt"
"testing"

"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
)
Expand All @@ -45,3 +49,20 @@ func BuildResourceName(typ xdsresource.ResourceType, auth, id string, ctxParams
ContextParams: ctxParams,
}).String()
}

// ServerConfigForAddress returns a bootstrap.ServerConfig for the given address
// with default values of insecure channel_creds and v3 server_features.
func ServerConfigForAddress(t *testing.T, addr string) *bootstrap.ServerConfig {
t.Helper()

jsonCfg := fmt.Sprintf(`{
"server_uri": "%s",
"channel_creds": [{"type": "insecure"}],
"server_features": ["xds_v3"]
}`, addr)
sc, err := bootstrap.ServerConfigFromJSON([]byte(jsonCfg))
if err != nil {
t.Fatalf("Failed to create server config from JSON %s: %v", jsonCfg, err)
}
return sc
}
Loading

0 comments on commit 52ca957

Please sign in to comment.