diff --git a/pkg/grpc_testing/stub_server.go b/pkg/grpc_testing/stub_server.go index e9f0d094f8d..afe6fd863fb 100644 --- a/pkg/grpc_testing/stub_server.go +++ b/pkg/grpc_testing/stub_server.go @@ -18,6 +18,8 @@ import ( "context" "fmt" "net" + "strconv" + "sync/atomic" "google.golang.org/grpc" testpb "google.golang.org/grpc/test/grpc_testing" @@ -93,14 +95,16 @@ func (ss *StubServer) Addr() string { type dummyStubServer struct { testpb.UnimplementedTestServiceServer - body []byte + counter uint64 } -func (d dummyStubServer) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { +func (d *dummyStubServer) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + newCount := atomic.AddUint64(&d.counter, 1) + return &testpb.SimpleResponse{ Payload: &testpb.Payload{ Type: testpb.PayloadType_COMPRESSABLE, - Body: d.body, + Body: []byte(strconv.FormatUint(newCount, 10)), }, }, nil } @@ -108,5 +112,5 @@ func (d dummyStubServer) UnaryCall(context.Context, *testpb.SimpleRequest) (*tes // NewDummyStubServer creates a simple test server that serves Unary calls with // responses with the given payload. func NewDummyStubServer(body []byte) *StubServer { - return New(dummyStubServer{body: body}) + return New(&dummyStubServer{}) } diff --git a/tests/integration/clientv3/naming/resolver_test.go b/tests/integration/clientv3/naming/resolver_test.go index 14c3ad72374..6a5b3803763 100644 --- a/tests/integration/clientv3/naming/resolver_test.go +++ b/tests/integration/clientv3/naming/resolver_test.go @@ -17,8 +17,9 @@ package naming_test import ( "bytes" "context" + "fmt" + "strconv" "testing" - "time" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/v3/naming/endpoints" @@ -29,25 +30,23 @@ import ( testpb "google.golang.org/grpc/test/grpc_testing" ) -// This test mimics scenario described in grpc_naming.md doc. +func testEtcdGrpcResolver(t *testing.T, lbPolicy string) { -func TestEtcdGrpcResolver(t *testing.T) { - integration2.BeforeTest(t) - - s1PayloadBody := []byte{'1'} - s1 := grpc_testing.NewDummyStubServer(s1PayloadBody) + // Setup two new dummy stub servers + payloadBody := []byte{'1'} + s1 := grpc_testing.NewDummyStubServer(payloadBody) if err := s1.Start(nil); err != nil { t.Fatal("failed to start dummy grpc server (s1)", err) } defer s1.Stop() - s2PayloadBody := []byte{'2'} - s2 := grpc_testing.NewDummyStubServer(s2PayloadBody) + s2 := grpc_testing.NewDummyStubServer(payloadBody) if err := s2.Start(nil); err != nil { t.Fatal("failed to start dummy grpc server (s2)", err) } defer s2.Stop() + // Create new cluster with endpoint manager with two endpoints clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3}) defer clus.Terminate(t) @@ -64,55 +63,90 @@ func TestEtcdGrpcResolver(t *testing.T) { t.Fatal("failed to add foo", err) } + err = em.AddEndpoint(context.TODO(), "foo/e2", e2) + if err != nil { + t.Fatal("failed to add foo", err) + } + b, err := resolver.NewBuilder(clus.Client(1)) if err != nil { t.Fatal("failed to new resolver builder", err) } - conn, err := grpc.Dial("etcd:///foo", grpc.WithInsecure(), grpc.WithResolvers(b)) + // Create connection with provided lb policy + conn, err := grpc.Dial("etcd:///foo", grpc.WithInsecure(), grpc.WithResolvers(b), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, lbPolicy))) if err != nil { t.Fatal("failed to connect to foo", err) } defer conn.Close() + // Send an initial request that should go to e1 c := testpb.NewTestServiceClient(conn) resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true)) if err != nil { t.Fatal("failed to invoke rpc to foo (e1)", err) } - if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s1PayloadBody) { + if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), payloadBody) { t.Fatalf("unexpected response from foo (e1): %s", resp.GetPayload().GetBody()) } - em.DeleteEndpoint(context.TODO(), "foo/e1") - em.AddEndpoint(context.TODO(), "foo/e2", e2) + // Send more requests + lastResponse := []byte{'1'} + totalRequests := 100 + for i := 1; i < totalRequests; i++ { + resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true)) + if err != nil { + t.Fatal("failed to invoke rpc to foo", err) + } - // We use a loop with deadline of 30s to avoid test getting flake - // as it's asynchronous for gRPC Client to update underlying connections. - maxRetries := 300 - retryPeriod := 100 * time.Millisecond - retries := 0 - for { - time.Sleep(retryPeriod) - retries++ + t.Logf("Response: %v", string(resp.GetPayload().GetBody())) - resp, err = c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}) - if err != nil { - if retries < maxRetries { - continue - } - t.Fatal("failed to invoke rpc to foo (e2)", err) + if resp.GetPayload() == nil { + t.Fatalf("unexpected response from foo: %s", resp.GetPayload().GetBody()) + } + lastResponse = resp.GetPayload().GetBody() + } + + // If the load balancing policy is pick first then return payload should equal number of requests + t.Logf("Last response: %v", string(lastResponse)) + if lbPolicy == "pick_first" { + if string(lastResponse) != "100" { + t.Fatalf("unexpected total responses from foo: %s", string(lastResponse)) } - if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s2PayloadBody) { - if retries < maxRetries { - continue - } - t.Fatalf("unexpected response from foo (e2): %s", resp.GetPayload().GetBody()) + } + + // If the load balancing policy is round robin we should see roughly half total requests served by each server + if lbPolicy == "round_robin" { + responses, err := strconv.Atoi(string(lastResponse)) + if err != nil { + t.Fatalf("couldn't convert to int: %s", string(lastResponse)) } - break + + // Allow 10% tolerance as round robin is not perfect and we don't want the test to flake + expected := float64(totalRequests) * 0.5 + assert.InEpsilon(t, float64(expected), float64(responses), 0.1, "unexpected total responses from foo: %s", string(lastResponse)) } } +// TestEtcdGrpcResolverPickFirst mimics scenarios described in grpc_naming.md doc. +func TestEtcdGrpcResolverPickFirst(t *testing.T) { + + integration2.BeforeTest(t) + + // Pick first is the default load balancer policy for grpc-go + testEtcdGrpcResolver(t, "pick_first") +} + +// TestEtcdGrpcResolverRoundRobin mimics scenarios described in grpc_naming.md doc. +func TestEtcdGrpcResolverRoundRobin(t *testing.T) { + + integration2.BeforeTest(t) + + // Round robin is a common alternative for more production oriented scenarios + testEtcdGrpcResolver(t, "round_robin") +} + func TestEtcdEndpointManager(t *testing.T) { integration2.BeforeTest(t)