Skip to content
This repository has been archived by the owner on May 21, 2024. It is now read-only.

Support for the google wrappers #49

Merged
merged 3 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 92 additions & 4 deletions grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,29 @@ import (
"strings"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/golang/protobuf/ptypes/any"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"go.k6.io/k6/lib/testutils/httpmultibin"
grpcanytesting "go.k6.io/k6/lib/testutils/httpmultibin/grpc_any_testing"
"go.k6.io/k6/lib/testutils/httpmultibin/grpc_testing"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"go.k6.io/k6/metrics"
"google.golang.org/grpc/metadata"
v1alphagrpc "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
grpcstats "google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

xk6grpc "github.com/grafana/xk6-grpc/grpc"
"github.com/grafana/xk6-grpc/grpc/testdata/wrappers_testing"
"github.com/grafana/xk6-grpc/lib/netext/grpcext"
"go.k6.io/k6/metrics"
)

func TestClient(t *testing.T) {
Expand Down Expand Up @@ -737,6 +740,91 @@ func TestClient(t *testing.T) {
err: "no gRPC connection",
},
},
{
name: "Wrappers",
setup: func(hb *httpmultibin.HTTPMultiBin) {
srv := wrappers_testing.Register(hb.ServerGRPC)

srv.TestStringImplementation = func(_ context.Context, sv *wrappers.StringValue) (*wrappers.StringValue, error) {
return &wrapperspb.StringValue{
Value: "hey " + sv.Value,
}, nil
}
},
initString: codeBlock{
code: `
const client = new grpc.Client();
client.load([], "../grpc/testdata/wrappers_testing/test.proto");
`,
},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");

let respString = client.invoke("grpc.wrappers.testing.Service/TestString", "John")
if (respString.message !== "hey John") {
throw new Error("expected to get 'hey John', but got a " + respString.message)
}
`,
},
},
{
name: "WrappersWithReflection",
setup: func(hb *httpmultibin.HTTPMultiBin) {
reflection.Register(hb.ServerGRPC)

srv := wrappers_testing.Register(hb.ServerGRPC)

srv.TestIntegerImplementation = func(_ context.Context, iv *wrappers.Int64Value) (*wrappers.Int64Value, error) {
return &wrappers.Int64Value{
Value: 2 * iv.Value,
}, nil
}

srv.TestStringImplementation = func(_ context.Context, sv *wrappers.StringValue) (*wrappers.StringValue, error) {
return &wrapperspb.StringValue{
Value: "hey " + sv.Value,
}, nil
}

srv.TestBooleanImplementation = func(_ context.Context, bv *wrappers.BoolValue) (*wrappers.BoolValue, error) {
return &wrapperspb.BoolValue{
Value: bv.Value != true,
}, nil
}

srv.TestDoubleImplementation = func(_ context.Context, bv *wrappers.DoubleValue) (*wrappers.DoubleValue, error) {
return &wrapperspb.DoubleValue{
Value: bv.Value * 2,
}, nil
}
},
initString: codeBlock{
code: `
const client = new grpc.Client();
`,
},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR", {reflect: true});

let respString = client.invoke("grpc.wrappers.testing.Service/TestString", "John")
if (respString.message !== "hey John") {
throw new Error("expected to get 'hey John', but got a " + respString.message)
}

let respInt = client.invoke("grpc.wrappers.testing.Service/TestInteger", "3")
if (respInt.message !== "6") {
throw new Error("expected to get '6', but got a " + respInt.message)
}

let respDouble = client.invoke("grpc.wrappers.testing.Service/TestDouble", "2.7")
if (respDouble.message !== 5.4) {
throw new Error("expected to get '5.4', but got a " + respDouble.message)
}
`,
},
},
}

for _, tt := range tests {
Expand Down
11 changes: 6 additions & 5 deletions grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"reflect"
"sync"
"time"

Expand Down Expand Up @@ -140,7 +141,7 @@ func (s *stream) loop() {
}
}

func (s *stream) queueMessage(msg map[string]interface{}) {
func (s *stream) queueMessage(msg interface{}) {
metrics.PushIfNotDone(s.vu.Context(), s.vu.State().Samples, metrics.Sample{
TimeSeries: metrics.TimeSeries{
Metric: s.instanceMetrics.StreamsMessagesReceived,
Expand Down Expand Up @@ -184,10 +185,6 @@ func (s *stream) readData(wg *sync.WaitGroup) {
return
}

if len(msg) > 0 {
s.queueMessage(msg)
}

if isRegularClosing(err) {
s.logger.WithError(err).Debug("stream is cancelled/finished")

Expand All @@ -197,6 +194,10 @@ func (s *stream) readData(wg *sync.WaitGroup) {

return
}

if msg != nil || !reflect.ValueOf(msg).IsNil() {
s.queueMessage(msg)
}
}
}

Expand Down
78 changes: 77 additions & 1 deletion grpc/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,21 @@ package grpc_test

import (
"context"
"errors"
"io"
"strings"
"testing"
"time"

"github.com/grafana/xk6-grpc/grpc/testutils/grpcservice"
"github.com/dop251/goja"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/grafana/xk6-grpc/grpc/testdata/wrappers_testing"
"github.com/grafana/xk6-grpc/grpc/testutils/grpcservice"
)

func TestStream_InvalidHeader(t *testing.T) {
Expand Down Expand Up @@ -287,3 +294,72 @@ func (s *featureExplorerStub) ListFeatures(rect *grpcservice.Rectangle, stream g

return status.Errorf(codes.Unimplemented, "method ListFeatures not implemented")
}

func TestStream_Wrappers(t *testing.T) {
t.Parallel()

ts := newTestState(t)

stub := wrappers_testing.Register(ts.httpBin.ServerGRPC)
stub.TestStreamImplementation = func(stream wrappers_testing.Service_TestStreamServer) error {
result := ""

for {
msg, err := stream.Recv()
if errors.Is(err, io.EOF) {
return stream.SendAndClose(&wrappers.StringValue{
Value: strings.TrimRight(result, " "),
})
}

if err != nil {
return err
}

result += msg.Value + " "
}
}

replace := func(code string) (goja.Value, error) {
return ts.VU.Runtime().RunString(ts.httpBin.Replacer.Replace(code))
}

initString := codeBlock{
code: `
var client = new grpc.Client();
client.load([], "../grpc/testdata/wrappers_testing/test.proto");`,
}
vuString := codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
let stream = new grpc.Stream(client, "grpc.wrappers.testing.Service/TestStream");
stream.on('data', function (data) {
call('Result: ' + data);
})

stream.write('Hey');
stream.write('John');
stream.end();

stream.on('error', function (e) {
call('Code: ' + e.code + ' Message: ' + e.message);
});
`,
}

val, err := replace(initString.code)
assertResponse(t, initString, err, val, ts)

ts.ToVUContext()

val, err = replace(vuString.code)

ts.EventLoop.WaitOnRegistered()

assertResponse(t, vuString, err, val, ts)

assert.Equal(t, ts.callRecorder.Recorded(), []string{
"Result: Hey John",
},
)
}
69 changes: 69 additions & 0 deletions grpc/testdata/wrappers_testing/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package wrappers_testing

import (
context "context"

wrappers "github.com/golang/protobuf/ptypes/wrappers"
grpc "google.golang.org/grpc"
)

//go:generate protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative test.proto

// Register registers a test service that could be used for the testing gRPC wrappers
func Register(r grpc.ServiceRegistrar) *service {
s := &service{}

RegisterServiceServer(r, s)

return s
}

type service struct {
UnimplementedServiceServer

TestStringImplementation func(context.Context, *wrappers.StringValue) (*wrappers.StringValue, error)
TestIntegerImplementation func(context.Context, *wrappers.Int64Value) (*wrappers.Int64Value, error)
TestBooleanImplementation func(context.Context, *wrappers.BoolValue) (*wrappers.BoolValue, error)
TestDoubleImplementation func(context.Context, *wrappers.DoubleValue) (*wrappers.DoubleValue, error)
TestStreamImplementation func(Service_TestStreamServer) error
}

func (s *service) TestString(ctx context.Context, in *wrappers.StringValue) (*wrappers.StringValue, error) {
if s.TestStringImplementation != nil {
return s.TestStringImplementation(ctx, in)
}

return s.UnimplementedServiceServer.TestString(ctx, in)
}

func (s *service) TestInteger(ctx context.Context, in *wrappers.Int64Value) (*wrappers.Int64Value, error) {
if s.TestIntegerImplementation != nil {
return s.TestIntegerImplementation(ctx, in)
}

return s.UnimplementedServiceServer.TestInteger(ctx, in)
}

func (s *service) TestBoolean(ctx context.Context, in *wrappers.BoolValue) (*wrappers.BoolValue, error) {
if s.TestBooleanImplementation != nil {
return s.TestBooleanImplementation(ctx, in)
}

return s.UnimplementedServiceServer.TestBoolean(ctx, in)
}

func (s *service) TestDouble(ctx context.Context, in *wrappers.DoubleValue) (*wrappers.DoubleValue, error) {
if s.TestDoubleImplementation != nil {
return s.TestDoubleImplementation(ctx, in)
}

return s.UnimplementedServiceServer.TestDouble(ctx, in)
}

func (s *service) TestStream(stream Service_TestStreamServer) error {
if s.TestStreamImplementation != nil {
return s.TestStreamImplementation(stream)
}

return s.UnimplementedServiceServer.TestStream(stream)
}
Loading