From a98a06585a165b1e4e68f405f52a9952b18fc6ba Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Fri, 19 Apr 2024 08:34:37 +0200 Subject: [PATCH] refactor: gRPC Invoke's refactor Moving things around and renaming, as a pre-requisition for the async Invoke, it also moves a timeout inicialisation logic close to the moment where it should be used. --- js/modules/k6/grpc/client.go | 44 +++++++++++++++++++----------- lib/netext/grpcext/conn.go | 48 ++++++++++++++++++++------------- lib/netext/grpcext/conn_test.go | 45 +++++++++++++------------------ 3 files changed, 76 insertions(+), 61 deletions(-) diff --git a/js/modules/k6/grpc/client.go b/js/modules/k6/grpc/client.go index 8b2b37e5e7e..796e3e6130c 100644 --- a/js/modules/k6/grpc/client.go +++ b/js/modules/k6/grpc/client.go @@ -278,28 +278,44 @@ func (c *Client) Invoke( method string, req goja.Value, params goja.Value, -) (*grpcext.Response, error) { +) (*grpcext.InvokeResponse, error) { + grpcReq, err := c.buildInvokeRequest(method, req, params) + if err != nil { + return nil, err + } + + return c.conn.Invoke(c.vu.Context(), grpcReq) +} + +// buildInvokeRequest creates a new InvokeRequest from the given method name, request object and parameters +func (c *Client) buildInvokeRequest( + method string, + req goja.Value, + params goja.Value, +) (grpcext.InvokeRequest, error) { + grpcReq := grpcext.InvokeRequest{} + state := c.vu.State() if state == nil { - return nil, common.NewInitContextError("invoking RPC methods in the init context is not supported") + return grpcReq, common.NewInitContextError("invoking RPC methods in the init context is not supported") } if c.conn == nil { - return nil, errors.New("no gRPC connection, you must call connect first") + return grpcReq, errors.New("no gRPC connection, you must call connect first") } if method == "" { - return nil, errors.New("method to invoke cannot be empty") + return grpcReq, errors.New("method to invoke cannot be empty") } if method[0] != '/' { method = "/" + method } methodDesc := c.mds[method] if methodDesc == nil { - return nil, fmt.Errorf("method %q not found in file descriptors", method) + return grpcReq, fmt.Errorf("method %q not found in file descriptors", method) } p, err := newCallParams(c.vu, params) if err != nil { - return nil, fmt.Errorf("invalid GRPC's client.invoke() parameters: %w", err) + return grpcReq, fmt.Errorf("invalid GRPC's client.invoke() parameters: %w", err) } // k6 GRPC Invoke's default timeout is 2 minutes @@ -308,25 +324,23 @@ func (c *Client) Invoke( } if req == nil { - return nil, errors.New("request cannot be nil") + return grpcReq, errors.New("request cannot be nil") } b, err := req.ToObject(c.vu.Runtime()).MarshalJSON() if err != nil { - return nil, fmt.Errorf("unable to serialise request object: %w", err) + return grpcReq, fmt.Errorf("unable to serialise request object: %w", err) } - ctx, cancel := context.WithTimeout(c.vu.Context(), p.Timeout) - defer cancel() - p.SetSystemTags(state, c.addr, method) - reqmsg := grpcext.Request{ + return grpcext.InvokeRequest{ + Method: method, MethodDescriptor: methodDesc, + Timeout: p.Timeout, Message: b, TagsAndMeta: &p.TagsAndMeta, - } - - return c.conn.Invoke(ctx, method, p.Metadata, reqmsg) + Metadata: p.Metadata, + }, nil } // Close will close the client gRPC connection diff --git a/lib/netext/grpcext/conn.go b/lib/netext/grpcext/conn.go index 5e9f9e0fc0b..89fdcba763f 100644 --- a/lib/netext/grpcext/conn.go +++ b/lib/netext/grpcext/conn.go @@ -8,6 +8,7 @@ import ( "net" "strconv" "strings" + "time" "github.com/sirupsen/logrus" "go.k6.io/k6/lib" @@ -27,23 +28,18 @@ import ( "google.golang.org/protobuf/types/dynamicpb" ) -// Request represents a gRPC request. -type Request struct { - MethodDescriptor protoreflect.MethodDescriptor - TagsAndMeta *metrics.TagsAndMeta - Message []byte -} - -// StreamRequest represents a gRPC stream request. -type StreamRequest struct { +// InvokeRequest represents a unary gRPC request. +type InvokeRequest struct { Method string MethodDescriptor protoreflect.MethodDescriptor + Timeout time.Duration TagsAndMeta *metrics.TagsAndMeta + Message []byte Metadata metadata.MD } -// Response represents a gRPC response. -type Response struct { +// InvokeResponse represents a gRPC response. +type InvokeResponse struct { Message interface{} Error interface{} Headers map[string][]string @@ -51,6 +47,15 @@ type Response struct { Status codes.Code } +// StreamRequest represents a gRPC stream request. +type StreamRequest struct { + Method string + MethodDescriptor protoreflect.MethodDescriptor + Timeout time.Duration + TagsAndMeta *metrics.TagsAndMeta + Metadata metadata.MD +} + type clientConnCloser interface { grpc.ClientConnInterface Close() error @@ -97,14 +102,13 @@ func (c *Conn) Reflect(ctx context.Context) (*descriptorpb.FileDescriptorSet, er // Invoke executes a unary gRPC request. func (c *Conn) Invoke( ctx context.Context, - url string, - md metadata.MD, - req Request, + req InvokeRequest, opts ...grpc.CallOption, -) (*Response, error) { - if url == "" { +) (*InvokeResponse, error) { + if req.Method == "" { return nil, fmt.Errorf("url is required") } + if req.MethodDescriptor == nil { return nil, fmt.Errorf("request method descriptor is required") } @@ -112,7 +116,13 @@ func (c *Conn) Invoke( return nil, fmt.Errorf("request message is required") } - ctx = metadata.NewOutgoingContext(ctx, md) + if req.Timeout != time.Duration(0) { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, req.Timeout) + defer cancel() + } + + ctx = metadata.NewOutgoingContext(ctx, req.Metadata) reqdm := dynamicpb.NewMessage(req.MethodDescriptor.Input()) if err := protojson.Unmarshal(req.Message, reqdm); err != nil { @@ -128,9 +138,9 @@ func (c *Conn) Invoke( copts = append(copts, opts...) copts = append(copts, grpc.Header(&header), grpc.Trailer(&trailer)) - err := c.raw.Invoke(ctx, url, reqdm, resp, copts...) + err := c.raw.Invoke(ctx, req.Method, reqdm, resp, copts...) - response := Response{ + response := InvokeResponse{ Headers: header, Trailers: trailer, } diff --git a/lib/netext/grpcext/conn_test.go b/lib/netext/grpcext/conn_test.go index 6fb86a8af6c..42bbb829f92 100644 --- a/lib/netext/grpcext/conn_test.go +++ b/lib/netext/grpcext/conn_test.go @@ -30,11 +30,13 @@ func TestInvoke(t *testing.T) { } c := Conn{raw: invokemock(helloReply)} - r := Request{ + r := InvokeRequest{ + Method: "/hello.HelloService/SayHello", MethodDescriptor: methodFromProto("SayHello"), Message: []byte(`{"greeting":"text request"}`), + Metadata: metadata.New(nil), } - res, err := c.Invoke(context.Background(), "/hello.HelloService/SayHello", metadata.New(nil), r) + res, err := c.Invoke(context.Background(), r) require.NoError(t, err) assert.Equal(t, codes.OK, res.Status) @@ -51,11 +53,13 @@ func TestInvokeWithCallOptions(t *testing.T) { } c := Conn{raw: invokemock(reply)} - r := Request{ + r := InvokeRequest{ + Method: "/hello.HelloService/NoOp", MethodDescriptor: methodFromProto("NoOp"), Message: []byte(`{}`), + Metadata: metadata.New(nil), } - res, err := c.Invoke(context.Background(), "/hello.HelloService/NoOp", metadata.New(nil), r, grpc.UseCompressor("fakeone")) + res, err := c.Invoke(context.Background(), r, grpc.UseCompressor("fakeone")) require.NoError(t, err) assert.NotNil(t, res) } @@ -68,11 +72,13 @@ func TestInvokeReturnError(t *testing.T) { } c := Conn{raw: invokemock(helloReply)} - r := Request{ + r := InvokeRequest{ + Method: "/hello.HelloService/SayHello", MethodDescriptor: methodFromProto("SayHello"), Message: []byte(`{"greeting":"text request"}`), + Metadata: metadata.New(nil), } - res, err := c.Invoke(context.Background(), "/hello.HelloService/SayHello", metadata.New(nil), r) + res, err := c.Invoke(context.Background(), r) require.NoError(t, err) assert.Equal(t, codes.Unknown, res.Status) @@ -92,49 +98,34 @@ func TestConnInvokeInvalid(t *testing.T) { payload = []byte(`{"greeting":"test"}`) ) - req := Request{ - MethodDescriptor: methodDesc, - Message: payload, - } - tests := []struct { name string ctx context.Context - md metadata.MD - url string - req Request + req InvokeRequest experr string }{ { name: "EmptyMethod", ctx: ctx, - url: "", - md: md, - req: req, + req: InvokeRequest{MethodDescriptor: methodDesc, Message: payload, Metadata: md, Method: ""}, experr: "url is required", }, { name: "NullMethodDescriptor", ctx: ctx, - url: url, - md: nil, - req: Request{Message: payload}, + req: InvokeRequest{Message: payload, Metadata: nil, Method: url}, experr: "method descriptor is required", }, { name: "NullMessage", ctx: ctx, - url: url, - md: nil, - req: Request{MethodDescriptor: methodDesc}, + req: InvokeRequest{MethodDescriptor: methodDesc, Metadata: nil, Method: url}, experr: "message is required", }, { name: "EmptyMessage", ctx: ctx, - url: url, - md: nil, - req: Request{MethodDescriptor: methodDesc, Message: []byte{}}, + req: InvokeRequest{MethodDescriptor: methodDesc, Message: []byte{}, Metadata: nil, Method: url}, experr: "message is required", }, } @@ -145,7 +136,7 @@ func TestConnInvokeInvalid(t *testing.T) { t.Parallel() c := Conn{} - res, err := c.Invoke(tt.ctx, tt.url, tt.md, tt.req) + res, err := c.Invoke(tt.ctx, tt.req) require.Error(t, err) require.Nil(t, res) assert.Contains(t, err.Error(), tt.experr)