Skip to content

Commit

Permalink
Add new test for round robin resolver.
Browse files Browse the repository at this point in the history
Signed-off-by: James Blair <[email protected]>
  • Loading branch information
jmhbnz committed Apr 25, 2023
1 parent 8c5e9ad commit 18e3aca
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 37 deletions.
12 changes: 8 additions & 4 deletions pkg/grpc_testing/stub_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"net"
"strconv"
"sync/atomic"

"google.golang.org/grpc"
testpb "google.golang.org/grpc/test/grpc_testing"
Expand Down Expand Up @@ -93,20 +95,22 @@ func (ss *StubServer) Addr() string {

type dummyStubServer struct {
testpb.UnimplementedTestServiceServer
body []byte
counter uint64
}

func (d dummyStubServer) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
func (d *dummyStubServer) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
newCount := atomic.AddUint64(&d.counter, 1)

return &testpb.SimpleResponse{
Payload: &testpb.Payload{
Type: testpb.PayloadType_COMPRESSABLE,
Body: d.body,
Body: []byte(strconv.FormatUint(newCount, 10)),
},
}, nil
}

// NewDummyStubServer creates a simple test server that serves Unary calls with
// responses with the given payload.
func NewDummyStubServer(body []byte) *StubServer {
return New(dummyStubServer{body: body})
return New(&dummyStubServer{})
}
100 changes: 67 additions & 33 deletions tests/integration/clientv3/naming/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ package naming_test
import (
"bytes"
"context"
"fmt"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/client/v3/naming/endpoints"
Expand All @@ -29,25 +30,23 @@ import (
testpb "google.golang.org/grpc/test/grpc_testing"
)

// This test mimics scenario described in grpc_naming.md doc.
func testEtcdGrpcResolver(t *testing.T, lbPolicy string) {

func TestEtcdGrpcResolver(t *testing.T) {
integration2.BeforeTest(t)

s1PayloadBody := []byte{'1'}
s1 := grpc_testing.NewDummyStubServer(s1PayloadBody)
// Setup two new dummy stub servers
payloadBody := []byte{'1'}
s1 := grpc_testing.NewDummyStubServer(payloadBody)
if err := s1.Start(nil); err != nil {
t.Fatal("failed to start dummy grpc server (s1)", err)
}
defer s1.Stop()

s2PayloadBody := []byte{'2'}
s2 := grpc_testing.NewDummyStubServer(s2PayloadBody)
s2 := grpc_testing.NewDummyStubServer(payloadBody)
if err := s2.Start(nil); err != nil {
t.Fatal("failed to start dummy grpc server (s2)", err)
}
defer s2.Stop()

// Create new cluster with endpoint manager with two endpoints
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
defer clus.Terminate(t)

Expand All @@ -64,55 +63,90 @@ func TestEtcdGrpcResolver(t *testing.T) {
t.Fatal("failed to add foo", err)
}

err = em.AddEndpoint(context.TODO(), "foo/e2", e2)
if err != nil {
t.Fatal("failed to add foo", err)
}

b, err := resolver.NewBuilder(clus.Client(1))
if err != nil {
t.Fatal("failed to new resolver builder", err)
}

conn, err := grpc.Dial("etcd:///foo", grpc.WithInsecure(), grpc.WithResolvers(b))
// Create connection with provided lb policy
conn, err := grpc.Dial("etcd:///foo", grpc.WithInsecure(), grpc.WithResolvers(b),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingPolicy":"%s"}`, lbPolicy)))
if err != nil {
t.Fatal("failed to connect to foo", err)
}
defer conn.Close()

// Send an initial request that should go to e1
c := testpb.NewTestServiceClient(conn)
resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true))
if err != nil {
t.Fatal("failed to invoke rpc to foo (e1)", err)
}
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s1PayloadBody) {
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), payloadBody) {
t.Fatalf("unexpected response from foo (e1): %s", resp.GetPayload().GetBody())
}

em.DeleteEndpoint(context.TODO(), "foo/e1")
em.AddEndpoint(context.TODO(), "foo/e2", e2)
// Send more requests
lastResponse := []byte{'1'}
totalRequests := 100
for i := 1; i < totalRequests; i++ {
resp, err := c.UnaryCall(context.TODO(), &testpb.SimpleRequest{}, grpc.WaitForReady(true))
if err != nil {
t.Fatal("failed to invoke rpc to foo", err)
}

// We use a loop with deadline of 30s to avoid test getting flake
// as it's asynchronous for gRPC Client to update underlying connections.
maxRetries := 300
retryPeriod := 100 * time.Millisecond
retries := 0
for {
time.Sleep(retryPeriod)
retries++
t.Logf("Response: %v", string(resp.GetPayload().GetBody()))

resp, err = c.UnaryCall(context.TODO(), &testpb.SimpleRequest{})
if err != nil {
if retries < maxRetries {
continue
}
t.Fatal("failed to invoke rpc to foo (e2)", err)
if resp.GetPayload() == nil {
t.Fatalf("unexpected response from foo: %s", resp.GetPayload().GetBody())
}
lastResponse = resp.GetPayload().GetBody()
}

// If the load balancing policy is pick first then return payload should equal number of requests
t.Logf("Last response: %v", string(lastResponse))
if lbPolicy == "pick_first" {
if string(lastResponse) != "100" {
t.Fatalf("unexpected total responses from foo: %s", string(lastResponse))
}
if resp.GetPayload() == nil || !bytes.Equal(resp.GetPayload().GetBody(), s2PayloadBody) {
if retries < maxRetries {
continue
}
t.Fatalf("unexpected response from foo (e2): %s", resp.GetPayload().GetBody())
}

// If the load balancing policy is round robin we should see roughly half total requests served by each server
if lbPolicy == "round_robin" {
responses, err := strconv.Atoi(string(lastResponse))
if err != nil {
t.Fatalf("couldn't convert to int: %s", string(lastResponse))
}
break

// Allow 10% tolerance as round robin is not perfect and we don't want the test to flake
expected := float64(totalRequests) * 0.5
assert.InEpsilon(t, float64(expected), float64(responses), 0.1, "unexpected total responses from foo: %s", string(lastResponse))
}
}

// TestEtcdGrpcResolverPickFirst mimics scenarios described in grpc_naming.md doc.
func TestEtcdGrpcResolverPickFirst(t *testing.T) {

integration2.BeforeTest(t)

// Pick first is the default load balancer policy for grpc-go
testEtcdGrpcResolver(t, "pick_first")
}

// TestEtcdGrpcResolverRoundRobin mimics scenarios described in grpc_naming.md doc.
func TestEtcdGrpcResolverRoundRobin(t *testing.T) {

integration2.BeforeTest(t)

// Round robin is a common alternative for more production oriented scenarios
testEtcdGrpcResolver(t, "round_robin")
}

func TestEtcdEndpointManager(t *testing.T) {
integration2.BeforeTest(t)

Expand Down

0 comments on commit 18e3aca

Please sign in to comment.