Skip to content

Commit

Permalink
Merge 95fff69 into 144209f
Browse files Browse the repository at this point in the history
  • Loading branch information
olegbespalov authored Apr 19, 2024
2 parents 144209f + 95fff69 commit 7f040a8
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 62 deletions.
75 changes: 60 additions & 15 deletions js/modules/k6/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,28 +278,75 @@ func (c *Client) Invoke(
method string,
req goja.Value,
params goja.Value,
) (*grpcext.Response, error) {
) (*grpcext.InvokeResponse, error) {
grpcReq, err := c.buildInvokeRequest(method, req, params)
if err != nil {
return nil, err
}

return c.conn.Invoke(c.vu.Context(), grpcReq)
}

// AsyncInvoke creates and calls a unary RPC by fully qualified method name asynchronously
func (c *Client) AsyncInvoke(
method string,
req goja.Value,
params goja.Value,
) *goja.Promise {
grpcReq, err := c.buildInvokeRequest(method, req, params)

promise, resolve, reject := c.vu.Runtime().NewPromise()
if err != nil {
reject(err)
return promise
}

callback := c.vu.RegisterCallback()
go func() {
res, err := c.conn.Invoke(c.vu.Context(), grpcReq)

callback(func() error {
if err != nil {
reject(err)
return nil //nolint:nilerr // we don't want to return the error
}
resolve(res)
return nil
})
}()

return promise
}

// buildInvokeRequest creates a new InvokeRequest from the given method name, request object and parameters
func (c *Client) buildInvokeRequest(
method string,
req goja.Value,
params goja.Value,
) (grpcext.InvokeRequest, error) {
grpcReq := grpcext.InvokeRequest{}

state := c.vu.State()
if state == nil {
return nil, common.NewInitContextError("invoking RPC methods in the init context is not supported")
return grpcReq, common.NewInitContextError("invoking RPC methods in the init context is not supported")
}
if c.conn == nil {
return nil, errors.New("no gRPC connection, you must call connect first")
return grpcReq, errors.New("no gRPC connection, you must call connect first")
}
if method == "" {
return nil, errors.New("method to invoke cannot be empty")
return grpcReq, errors.New("method to invoke cannot be empty")
}
if method[0] != '/' {
method = "/" + method
}
methodDesc := c.mds[method]
if methodDesc == nil {
return nil, fmt.Errorf("method %q not found in file descriptors", method)
return grpcReq, fmt.Errorf("method %q not found in file descriptors", method)
}

p, err := newCallParams(c.vu, params)
if err != nil {
return nil, fmt.Errorf("invalid GRPC's client.invoke() parameters: %w", err)
return grpcReq, fmt.Errorf("invalid GRPC's client.invoke() parameters: %w", err)
}

// k6 GRPC Invoke's default timeout is 2 minutes
Expand All @@ -308,25 +355,23 @@ func (c *Client) Invoke(
}

if req == nil {
return nil, errors.New("request cannot be nil")
return grpcReq, errors.New("request cannot be nil")
}
b, err := req.ToObject(c.vu.Runtime()).MarshalJSON()
if err != nil {
return nil, fmt.Errorf("unable to serialise request object: %w", err)
return grpcReq, fmt.Errorf("unable to serialise request object: %w", err)
}

ctx, cancel := context.WithTimeout(c.vu.Context(), p.Timeout)
defer cancel()

p.SetSystemTags(state, c.addr, method)

reqmsg := grpcext.Request{
return grpcext.InvokeRequest{
Method: method,
MethodDescriptor: methodDesc,
Timeout: p.Timeout,
Message: b,
TagsAndMeta: &p.TagsAndMeta,
}

return c.conn.Invoke(ctx, method, p.Metadata, reqmsg)
Metadata: p.Metadata,
}, nil
}

// Close will close the client gRPC connection
Expand Down
102 changes: 101 additions & 1 deletion js/modules/k6/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,22 @@ func TestClient(t *testing.T) {
err: `unknown param: "void"`,
},
},
{
name: "AsyncInvokeInvalidParam",
initString: codeBlock{code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
client.asyncInvoke("grpc.testing.TestService/EmptyCall", {}, { void: true }).then(function(resp) {
throw new Error("should not be here")
}, (err) => {
throw new Error(err)
})`,
err: `unknown param: "void"`,
},
},
{
name: "InvokeNilRequest",
initString: codeBlock{code: `
Expand Down Expand Up @@ -317,6 +333,33 @@ func TestClient(t *testing.T) {
},
},
},
{
name: "AsyncInvoke",
initString: codeBlock{code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`},
setup: func(tb *httpmultibin.HTTPMultiBin) {
tb.GRPCStub.EmptyCallFunc = func(context.Context, *grpc_testing.Empty) (*grpc_testing.Empty, error) {
return &grpc_testing.Empty{}, nil
}
},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
client.asyncInvoke("grpc.testing.TestService/EmptyCall", {}).then(function(resp) {
if (resp.status !== grpc.StatusOK) {
throw new Error("unexpected error: " + JSON.stringify(resp.error) + "or status: " + resp.status)
}
}, (err) => {
throw new Error("unexpected error: " + err)
})
`,
asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) {
samplesBuf := metrics.GetBufferedSamples(samples)
assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/EmptyCall"))
},
},
},
{
name: "InvokeAnyProto",
initString: codeBlock{code: `
Expand Down Expand Up @@ -387,6 +430,32 @@ func TestClient(t *testing.T) {
throw new Error("server did not receive the correct request message")
}`},
},
{
name: "AsyncRequestMessage",
initString: codeBlock{
code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`,
},
setup: func(tb *httpmultibin.HTTPMultiBin) {
tb.GRPCStub.UnaryCallFunc = func(_ context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
if req.Payload == nil || string(req.Payload.Body) != "负载测试" {
return nil, status.Error(codes.InvalidArgument, "")
}
return &grpc_testing.SimpleResponse{}, nil
}
},
vuString: codeBlock{code: `
client.connect("GRPCBIN_ADDR");
client.asyncInvoke("grpc.testing.TestService/UnaryCall", { payload: { body: "6LSf6L295rWL6K+V"} }).then(function(resp) {
if (resp.status !== grpc.StatusOK) {
throw new Error("server did not receive the correct request message")
}
}, (err) => {
throw new Error("unexpected error: " + err)
});
`},
},
{
name: "RequestHeaders",
initString: codeBlock{
Expand Down Expand Up @@ -464,6 +533,37 @@ func TestClient(t *testing.T) {
},
},
},
{
name: "AsyncResponseMessage",
initString: codeBlock{
code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`,
},
setup: func(tb *httpmultibin.HTTPMultiBin) {
tb.GRPCStub.UnaryCallFunc = func(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
return &grpc_testing.SimpleResponse{
OauthScope: "水",
}, nil
}
},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
client.asyncInvoke("grpc.testing.TestService/UnaryCall", {}).then(function(resp) {
if (!resp.message || resp.message.username !== "" || resp.message.oauthScope !== "水") {
throw new Error("unexpected response message: " + JSON.stringify(resp.message))
}
}, (err) => {
throw new Error("unexpected error: " + err)
});
`,
asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) {
samplesBuf := metrics.GetBufferedSamples(samples)
assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/UnaryCall"))
},
},
},
{
name: "ResponseError",
initString: codeBlock{
Expand Down Expand Up @@ -973,7 +1073,7 @@ func TestClient(t *testing.T) {
assertResponse(t, tt.initString, err, val, ts)

ts.ToVUContext()
val, err = ts.Run(tt.vuString.code)
val, err = ts.RunOnEventLoop(tt.vuString.code)
assertResponse(t, tt.vuString, err, val, ts)
})
}
Expand Down
48 changes: 29 additions & 19 deletions lib/netext/grpcext/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"strconv"
"strings"
"time"

"github.com/sirupsen/logrus"
"go.k6.io/k6/lib"
Expand All @@ -27,30 +28,34 @@ import (
"google.golang.org/protobuf/types/dynamicpb"
)

// Request represents a gRPC request.
type Request struct {
MethodDescriptor protoreflect.MethodDescriptor
TagsAndMeta *metrics.TagsAndMeta
Message []byte
}

// StreamRequest represents a gRPC stream request.
type StreamRequest struct {
// InvokeRequest represents a unary gRPC request.
type InvokeRequest struct {
Method string
MethodDescriptor protoreflect.MethodDescriptor
Timeout time.Duration
TagsAndMeta *metrics.TagsAndMeta
Message []byte
Metadata metadata.MD
}

// Response represents a gRPC response.
type Response struct {
// InvokeResponse represents a gRPC response.
type InvokeResponse struct {
Message interface{}
Error interface{}
Headers map[string][]string
Trailers map[string][]string
Status codes.Code
}

// StreamRequest represents a gRPC stream request.
type StreamRequest struct {
Method string
MethodDescriptor protoreflect.MethodDescriptor
Timeout time.Duration
TagsAndMeta *metrics.TagsAndMeta
Metadata metadata.MD
}

type clientConnCloser interface {
grpc.ClientConnInterface
Close() error
Expand Down Expand Up @@ -97,22 +102,27 @@ func (c *Conn) Reflect(ctx context.Context) (*descriptorpb.FileDescriptorSet, er
// Invoke executes a unary gRPC request.
func (c *Conn) Invoke(
ctx context.Context,
url string,
md metadata.MD,
req Request,
req InvokeRequest,
opts ...grpc.CallOption,
) (*Response, error) {
if url == "" {
) (*InvokeResponse, error) {
if req.Method == "" {
return nil, fmt.Errorf("url is required")
}

if req.MethodDescriptor == nil {
return nil, fmt.Errorf("request method descriptor is required")
}
if len(req.Message) == 0 {
return nil, fmt.Errorf("request message is required")
}

ctx = metadata.NewOutgoingContext(ctx, md)
if req.Timeout != time.Duration(0) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, req.Timeout)
defer cancel()
}

ctx = metadata.NewOutgoingContext(ctx, req.Metadata)

reqdm := dynamicpb.NewMessage(req.MethodDescriptor.Input())
if err := protojson.Unmarshal(req.Message, reqdm); err != nil {
Expand All @@ -128,9 +138,9 @@ func (c *Conn) Invoke(
copts = append(copts, opts...)
copts = append(copts, grpc.Header(&header), grpc.Trailer(&trailer))

err := c.raw.Invoke(ctx, url, reqdm, resp, copts...)
err := c.raw.Invoke(ctx, req.Method, reqdm, resp, copts...)

response := Response{
response := InvokeResponse{
Headers: header,
Trailers: trailer,
}
Expand Down
Loading

0 comments on commit 7f040a8

Please sign in to comment.