From 1a8917bac2cea8d44a3259f5d3b1b87f068a9a91 Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Fri, 19 Apr 2024 08:34:37 +0200 Subject: [PATCH 1/2] refactor: gRPC Invoke's refactor Moving things around and renaming, as a pre-requisition for the async Invoke, it also moves a timeout inicialisation logic close to the moment where it should be used. --- js/modules/k6/grpc/client.go | 44 +++++++++++++++++++----------- lib/netext/grpcext/conn.go | 48 ++++++++++++++++++++------------- lib/netext/grpcext/conn_test.go | 45 +++++++++++++------------------ 3 files changed, 76 insertions(+), 61 deletions(-) diff --git a/js/modules/k6/grpc/client.go b/js/modules/k6/grpc/client.go index 8b2b37e5e7e..796e3e6130c 100644 --- a/js/modules/k6/grpc/client.go +++ b/js/modules/k6/grpc/client.go @@ -278,28 +278,44 @@ func (c *Client) Invoke( method string, req goja.Value, params goja.Value, -) (*grpcext.Response, error) { +) (*grpcext.InvokeResponse, error) { + grpcReq, err := c.buildInvokeRequest(method, req, params) + if err != nil { + return nil, err + } + + return c.conn.Invoke(c.vu.Context(), grpcReq) +} + +// buildInvokeRequest creates a new InvokeRequest from the given method name, request object and parameters +func (c *Client) buildInvokeRequest( + method string, + req goja.Value, + params goja.Value, +) (grpcext.InvokeRequest, error) { + grpcReq := grpcext.InvokeRequest{} + state := c.vu.State() if state == nil { - return nil, common.NewInitContextError("invoking RPC methods in the init context is not supported") + return grpcReq, common.NewInitContextError("invoking RPC methods in the init context is not supported") } if c.conn == nil { - return nil, errors.New("no gRPC connection, you must call connect first") + return grpcReq, errors.New("no gRPC connection, you must call connect first") } if method == "" { - return nil, errors.New("method to invoke cannot be empty") + return grpcReq, errors.New("method to invoke cannot be empty") } if method[0] != '/' { method = "/" + method } methodDesc := c.mds[method] if methodDesc == nil { - return nil, fmt.Errorf("method %q not found in file descriptors", method) + return grpcReq, fmt.Errorf("method %q not found in file descriptors", method) } p, err := newCallParams(c.vu, params) if err != nil { - return nil, fmt.Errorf("invalid GRPC's client.invoke() parameters: %w", err) + return grpcReq, fmt.Errorf("invalid GRPC's client.invoke() parameters: %w", err) } // k6 GRPC Invoke's default timeout is 2 minutes @@ -308,25 +324,23 @@ func (c *Client) Invoke( } if req == nil { - return nil, errors.New("request cannot be nil") + return grpcReq, errors.New("request cannot be nil") } b, err := req.ToObject(c.vu.Runtime()).MarshalJSON() if err != nil { - return nil, fmt.Errorf("unable to serialise request object: %w", err) + return grpcReq, fmt.Errorf("unable to serialise request object: %w", err) } - ctx, cancel := context.WithTimeout(c.vu.Context(), p.Timeout) - defer cancel() - p.SetSystemTags(state, c.addr, method) - reqmsg := grpcext.Request{ + return grpcext.InvokeRequest{ + Method: method, MethodDescriptor: methodDesc, + Timeout: p.Timeout, Message: b, TagsAndMeta: &p.TagsAndMeta, - } - - return c.conn.Invoke(ctx, method, p.Metadata, reqmsg) + Metadata: p.Metadata, + }, nil } // Close will close the client gRPC connection diff --git a/lib/netext/grpcext/conn.go b/lib/netext/grpcext/conn.go index 5e9f9e0fc0b..89fdcba763f 100644 --- a/lib/netext/grpcext/conn.go +++ b/lib/netext/grpcext/conn.go @@ -8,6 +8,7 @@ import ( "net" "strconv" "strings" + "time" "github.com/sirupsen/logrus" "go.k6.io/k6/lib" @@ -27,23 +28,18 @@ import ( "google.golang.org/protobuf/types/dynamicpb" ) -// Request represents a gRPC request. -type Request struct { - MethodDescriptor protoreflect.MethodDescriptor - TagsAndMeta *metrics.TagsAndMeta - Message []byte -} - -// StreamRequest represents a gRPC stream request. -type StreamRequest struct { +// InvokeRequest represents a unary gRPC request. +type InvokeRequest struct { Method string MethodDescriptor protoreflect.MethodDescriptor + Timeout time.Duration TagsAndMeta *metrics.TagsAndMeta + Message []byte Metadata metadata.MD } -// Response represents a gRPC response. -type Response struct { +// InvokeResponse represents a gRPC response. +type InvokeResponse struct { Message interface{} Error interface{} Headers map[string][]string @@ -51,6 +47,15 @@ type Response struct { Status codes.Code } +// StreamRequest represents a gRPC stream request. +type StreamRequest struct { + Method string + MethodDescriptor protoreflect.MethodDescriptor + Timeout time.Duration + TagsAndMeta *metrics.TagsAndMeta + Metadata metadata.MD +} + type clientConnCloser interface { grpc.ClientConnInterface Close() error @@ -97,14 +102,13 @@ func (c *Conn) Reflect(ctx context.Context) (*descriptorpb.FileDescriptorSet, er // Invoke executes a unary gRPC request. func (c *Conn) Invoke( ctx context.Context, - url string, - md metadata.MD, - req Request, + req InvokeRequest, opts ...grpc.CallOption, -) (*Response, error) { - if url == "" { +) (*InvokeResponse, error) { + if req.Method == "" { return nil, fmt.Errorf("url is required") } + if req.MethodDescriptor == nil { return nil, fmt.Errorf("request method descriptor is required") } @@ -112,7 +116,13 @@ func (c *Conn) Invoke( return nil, fmt.Errorf("request message is required") } - ctx = metadata.NewOutgoingContext(ctx, md) + if req.Timeout != time.Duration(0) { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, req.Timeout) + defer cancel() + } + + ctx = metadata.NewOutgoingContext(ctx, req.Metadata) reqdm := dynamicpb.NewMessage(req.MethodDescriptor.Input()) if err := protojson.Unmarshal(req.Message, reqdm); err != nil { @@ -128,9 +138,9 @@ func (c *Conn) Invoke( copts = append(copts, opts...) copts = append(copts, grpc.Header(&header), grpc.Trailer(&trailer)) - err := c.raw.Invoke(ctx, url, reqdm, resp, copts...) + err := c.raw.Invoke(ctx, req.Method, reqdm, resp, copts...) - response := Response{ + response := InvokeResponse{ Headers: header, Trailers: trailer, } diff --git a/lib/netext/grpcext/conn_test.go b/lib/netext/grpcext/conn_test.go index fe418d79a34..a72a3bb0f77 100644 --- a/lib/netext/grpcext/conn_test.go +++ b/lib/netext/grpcext/conn_test.go @@ -30,11 +30,13 @@ func TestInvoke(t *testing.T) { } c := Conn{raw: invokemock(helloReply)} - r := Request{ + r := InvokeRequest{ + Method: "/hello.HelloService/SayHello", MethodDescriptor: methodFromProto("SayHello"), Message: []byte(`{"greeting":"text request"}`), + Metadata: metadata.New(nil), } - res, err := c.Invoke(context.Background(), "/hello.HelloService/SayHello", metadata.New(nil), r) + res, err := c.Invoke(context.Background(), r) require.NoError(t, err) assert.Equal(t, codes.OK, res.Status) @@ -51,11 +53,13 @@ func TestInvokeWithCallOptions(t *testing.T) { } c := Conn{raw: invokemock(reply)} - r := Request{ + r := InvokeRequest{ + Method: "/hello.HelloService/NoOp", MethodDescriptor: methodFromProto("NoOp"), Message: []byte(`{}`), + Metadata: metadata.New(nil), } - res, err := c.Invoke(context.Background(), "/hello.HelloService/NoOp", metadata.New(nil), r, grpc.UseCompressor("fakeone")) + res, err := c.Invoke(context.Background(), r, grpc.UseCompressor("fakeone")) require.NoError(t, err) assert.NotNil(t, res) } @@ -68,11 +72,13 @@ func TestInvokeReturnError(t *testing.T) { } c := Conn{raw: invokemock(helloReply)} - r := Request{ + r := InvokeRequest{ + Method: "/hello.HelloService/SayHello", MethodDescriptor: methodFromProto("SayHello"), Message: []byte(`{"greeting":"text request"}`), + Metadata: metadata.New(nil), } - res, err := c.Invoke(context.Background(), "/hello.HelloService/SayHello", metadata.New(nil), r) + res, err := c.Invoke(context.Background(), r) require.NoError(t, err) assert.Equal(t, codes.Unknown, res.Status) @@ -92,49 +98,34 @@ func TestConnInvokeInvalid(t *testing.T) { payload = []byte(`{"greeting":"test"}`) ) - req := Request{ - MethodDescriptor: methodDesc, - Message: payload, - } - tests := []struct { name string ctx context.Context - md metadata.MD - url string - req Request + req InvokeRequest experr string }{ { name: "EmptyMethod", ctx: ctx, - url: "", - md: md, - req: req, + req: InvokeRequest{MethodDescriptor: methodDesc, Message: payload, Metadata: md, Method: ""}, experr: "url is required", }, { name: "NullMethodDescriptor", ctx: ctx, - url: url, - md: nil, - req: Request{Message: payload}, + req: InvokeRequest{Message: payload, Metadata: nil, Method: url}, experr: "method descriptor is required", }, { name: "NullMessage", ctx: ctx, - url: url, - md: nil, - req: Request{MethodDescriptor: methodDesc}, + req: InvokeRequest{MethodDescriptor: methodDesc, Metadata: nil, Method: url}, experr: "message is required", }, { name: "EmptyMessage", ctx: ctx, - url: url, - md: nil, - req: Request{MethodDescriptor: methodDesc, Message: []byte{}}, + req: InvokeRequest{MethodDescriptor: methodDesc, Message: []byte{}, Metadata: nil, Method: url}, experr: "message is required", }, } @@ -145,7 +136,7 @@ func TestConnInvokeInvalid(t *testing.T) { t.Parallel() c := Conn{} - res, err := c.Invoke(tt.ctx, tt.url, tt.md, tt.req) + res, err := c.Invoke(tt.ctx, tt.req) require.Error(t, err) require.Nil(t, res) assert.Contains(t, err.Error(), tt.experr) From 95fff69833fd058303754a184aa2f4d00258ca61 Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Fri, 19 Apr 2024 11:11:39 +0200 Subject: [PATCH 2/2] grpc: AsyncInvoke implementation --- js/modules/k6/grpc/client.go | 31 +++++++++ js/modules/k6/grpc/client_test.go | 102 +++++++++++++++++++++++++++++- 2 files changed, 132 insertions(+), 1 deletion(-) diff --git a/js/modules/k6/grpc/client.go b/js/modules/k6/grpc/client.go index 796e3e6130c..8dd31804c31 100644 --- a/js/modules/k6/grpc/client.go +++ b/js/modules/k6/grpc/client.go @@ -287,6 +287,37 @@ func (c *Client) Invoke( return c.conn.Invoke(c.vu.Context(), grpcReq) } +// AsyncInvoke creates and calls a unary RPC by fully qualified method name asynchronously +func (c *Client) AsyncInvoke( + method string, + req goja.Value, + params goja.Value, +) *goja.Promise { + grpcReq, err := c.buildInvokeRequest(method, req, params) + + promise, resolve, reject := c.vu.Runtime().NewPromise() + if err != nil { + reject(err) + return promise + } + + callback := c.vu.RegisterCallback() + go func() { + res, err := c.conn.Invoke(c.vu.Context(), grpcReq) + + callback(func() error { + if err != nil { + reject(err) + return nil //nolint:nilerr // we don't want to return the error + } + resolve(res) + return nil + }) + }() + + return promise +} + // buildInvokeRequest creates a new InvokeRequest from the given method name, request object and parameters func (c *Client) buildInvokeRequest( method string, diff --git a/js/modules/k6/grpc/client_test.go b/js/modules/k6/grpc/client_test.go index 0f52caced11..663689a249d 100644 --- a/js/modules/k6/grpc/client_test.go +++ b/js/modules/k6/grpc/client_test.go @@ -223,6 +223,22 @@ func TestClient(t *testing.T) { err: `unknown param: "void"`, }, }, + { + name: "AsyncInvokeInvalidParam", + initString: codeBlock{code: ` + var client = new grpc.Client(); + client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`}, + vuString: codeBlock{ + code: ` + client.connect("GRPCBIN_ADDR"); + client.asyncInvoke("grpc.testing.TestService/EmptyCall", {}, { void: true }).then(function(resp) { + throw new Error("should not be here") + }, (err) => { + throw new Error(err) + })`, + err: `unknown param: "void"`, + }, + }, { name: "InvokeNilRequest", initString: codeBlock{code: ` @@ -317,6 +333,33 @@ func TestClient(t *testing.T) { }, }, }, + { + name: "AsyncInvoke", + initString: codeBlock{code: ` + var client = new grpc.Client(); + client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`}, + setup: func(tb *httpmultibin.HTTPMultiBin) { + tb.GRPCStub.EmptyCallFunc = func(context.Context, *grpc_testing.Empty) (*grpc_testing.Empty, error) { + return &grpc_testing.Empty{}, nil + } + }, + vuString: codeBlock{ + code: ` + client.connect("GRPCBIN_ADDR"); + client.asyncInvoke("grpc.testing.TestService/EmptyCall", {}).then(function(resp) { + if (resp.status !== grpc.StatusOK) { + throw new Error("unexpected error: " + JSON.stringify(resp.error) + "or status: " + resp.status) + } + }, (err) => { + throw new Error("unexpected error: " + err) + }) + `, + asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) { + samplesBuf := metrics.GetBufferedSamples(samples) + assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/EmptyCall")) + }, + }, + }, { name: "InvokeAnyProto", initString: codeBlock{code: ` @@ -387,6 +430,32 @@ func TestClient(t *testing.T) { throw new Error("server did not receive the correct request message") }`}, }, + { + name: "AsyncRequestMessage", + initString: codeBlock{ + code: ` + var client = new grpc.Client(); + client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`, + }, + setup: func(tb *httpmultibin.HTTPMultiBin) { + tb.GRPCStub.UnaryCallFunc = func(_ context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + if req.Payload == nil || string(req.Payload.Body) != "负载测试" { + return nil, status.Error(codes.InvalidArgument, "") + } + return &grpc_testing.SimpleResponse{}, nil + } + }, + vuString: codeBlock{code: ` + client.connect("GRPCBIN_ADDR"); + client.asyncInvoke("grpc.testing.TestService/UnaryCall", { payload: { body: "6LSf6L295rWL6K+V"} }).then(function(resp) { + if (resp.status !== grpc.StatusOK) { + throw new Error("server did not receive the correct request message") + } + }, (err) => { + throw new Error("unexpected error: " + err) + }); + `}, + }, { name: "RequestHeaders", initString: codeBlock{ @@ -464,6 +533,37 @@ func TestClient(t *testing.T) { }, }, }, + { + name: "AsyncResponseMessage", + initString: codeBlock{ + code: ` + var client = new grpc.Client(); + client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`, + }, + setup: func(tb *httpmultibin.HTTPMultiBin) { + tb.GRPCStub.UnaryCallFunc = func(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) { + return &grpc_testing.SimpleResponse{ + OauthScope: "水", + }, nil + } + }, + vuString: codeBlock{ + code: ` + client.connect("GRPCBIN_ADDR"); + client.asyncInvoke("grpc.testing.TestService/UnaryCall", {}).then(function(resp) { + if (!resp.message || resp.message.username !== "" || resp.message.oauthScope !== "水") { + throw new Error("unexpected response message: " + JSON.stringify(resp.message)) + } + }, (err) => { + throw new Error("unexpected error: " + err) + }); + `, + asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) { + samplesBuf := metrics.GetBufferedSamples(samples) + assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/UnaryCall")) + }, + }, + }, { name: "ResponseError", initString: codeBlock{ @@ -973,7 +1073,7 @@ func TestClient(t *testing.T) { assertResponse(t, tt.initString, err, val, ts) ts.ToVUContext() - val, err = ts.Run(tt.vuString.code) + val, err = ts.RunOnEventLoop(tt.vuString.code) assertResponse(t, tt.vuString, err, val, ts) }) }