Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC async invoke #3693

Merged
merged 2 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 60 additions & 15 deletions js/modules/k6/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,28 +278,75 @@ func (c *Client) Invoke(
method string,
req goja.Value,
params goja.Value,
) (*grpcext.Response, error) {
) (*grpcext.InvokeResponse, error) {
grpcReq, err := c.buildInvokeRequest(method, req, params)
if err != nil {
return nil, err
}

return c.conn.Invoke(c.vu.Context(), grpcReq)
}

// AsyncInvoke creates and calls a unary RPC by fully qualified method name asynchronously
func (c *Client) AsyncInvoke(
method string,
req goja.Value,
params goja.Value,
) *goja.Promise {
grpcReq, err := c.buildInvokeRequest(method, req, params)

promise, resolve, reject := c.vu.Runtime().NewPromise()
if err != nil {
reject(err)
return promise
}

callback := c.vu.RegisterCallback()
go func() {
res, err := c.conn.Invoke(c.vu.Context(), grpcReq)

callback(func() error {
if err != nil {
reject(err)
return nil //nolint:nilerr // we don't want to return the error
}
resolve(res)
return nil
})
}()

return promise
}

// buildInvokeRequest creates a new InvokeRequest from the given method name, request object and parameters
func (c *Client) buildInvokeRequest(
method string,
req goja.Value,
params goja.Value,
) (grpcext.InvokeRequest, error) {
grpcReq := grpcext.InvokeRequest{}

state := c.vu.State()
if state == nil {
return nil, common.NewInitContextError("invoking RPC methods in the init context is not supported")
return grpcReq, common.NewInitContextError("invoking RPC methods in the init context is not supported")
}
if c.conn == nil {
return nil, errors.New("no gRPC connection, you must call connect first")
return grpcReq, errors.New("no gRPC connection, you must call connect first")
}
if method == "" {
return nil, errors.New("method to invoke cannot be empty")
return grpcReq, errors.New("method to invoke cannot be empty")
}
if method[0] != '/' {
method = "/" + method
}
methodDesc := c.mds[method]
if methodDesc == nil {
return nil, fmt.Errorf("method %q not found in file descriptors", method)
return grpcReq, fmt.Errorf("method %q not found in file descriptors", method)
}

p, err := newCallParams(c.vu, params)
if err != nil {
return nil, fmt.Errorf("invalid GRPC's client.invoke() parameters: %w", err)
return grpcReq, fmt.Errorf("invalid GRPC's client.invoke() parameters: %w", err)
}

// k6 GRPC Invoke's default timeout is 2 minutes
Expand All @@ -308,25 +355,23 @@ func (c *Client) Invoke(
}

if req == nil {
return nil, errors.New("request cannot be nil")
return grpcReq, errors.New("request cannot be nil")
}
b, err := req.ToObject(c.vu.Runtime()).MarshalJSON()
if err != nil {
return nil, fmt.Errorf("unable to serialise request object: %w", err)
return grpcReq, fmt.Errorf("unable to serialise request object: %w", err)
}

ctx, cancel := context.WithTimeout(c.vu.Context(), p.Timeout)
defer cancel()

p.SetSystemTags(state, c.addr, method)

reqmsg := grpcext.Request{
return grpcext.InvokeRequest{
Method: method,
MethodDescriptor: methodDesc,
Timeout: p.Timeout,
Message: b,
TagsAndMeta: &p.TagsAndMeta,
}

return c.conn.Invoke(ctx, method, p.Metadata, reqmsg)
Metadata: p.Metadata,
}, nil
}

// Close will close the client gRPC connection
Expand Down
102 changes: 101 additions & 1 deletion js/modules/k6/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,22 @@ func TestClient(t *testing.T) {
err: `unknown param: "void"`,
},
},
{
name: "AsyncInvokeInvalidParam",
initString: codeBlock{code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
client.asyncInvoke("grpc.testing.TestService/EmptyCall", {}, { void: true }).then(function(resp) {
throw new Error("should not be here")
}, (err) => {
throw new Error(err)
})`,
err: `unknown param: "void"`,
},
},
{
name: "InvokeNilRequest",
initString: codeBlock{code: `
Expand Down Expand Up @@ -317,6 +333,33 @@ func TestClient(t *testing.T) {
},
},
},
{
name: "AsyncInvoke",
initString: codeBlock{code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`},
setup: func(tb *httpmultibin.HTTPMultiBin) {
tb.GRPCStub.EmptyCallFunc = func(context.Context, *grpc_testing.Empty) (*grpc_testing.Empty, error) {
return &grpc_testing.Empty{}, nil
}
},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
client.asyncInvoke("grpc.testing.TestService/EmptyCall", {}).then(function(resp) {
if (resp.status !== grpc.StatusOK) {
throw new Error("unexpected error: " + JSON.stringify(resp.error) + "or status: " + resp.status)
}
}, (err) => {
throw new Error("unexpected error: " + err)
})
`,
asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) {
samplesBuf := metrics.GetBufferedSamples(samples)
assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/EmptyCall"))
},
},
},
{
name: "InvokeAnyProto",
initString: codeBlock{code: `
Expand Down Expand Up @@ -387,6 +430,32 @@ func TestClient(t *testing.T) {
throw new Error("server did not receive the correct request message")
}`},
},
{
name: "AsyncRequestMessage",
initString: codeBlock{
code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`,
},
setup: func(tb *httpmultibin.HTTPMultiBin) {
tb.GRPCStub.UnaryCallFunc = func(_ context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
if req.Payload == nil || string(req.Payload.Body) != "负载测试" {
return nil, status.Error(codes.InvalidArgument, "")
}
return &grpc_testing.SimpleResponse{}, nil
}
},
vuString: codeBlock{code: `
client.connect("GRPCBIN_ADDR");
client.asyncInvoke("grpc.testing.TestService/UnaryCall", { payload: { body: "6LSf6L295rWL6K+V"} }).then(function(resp) {
if (resp.status !== grpc.StatusOK) {
throw new Error("server did not receive the correct request message")
}
}, (err) => {
throw new Error("unexpected error: " + err)
});
`},
},
{
name: "RequestHeaders",
initString: codeBlock{
Expand Down Expand Up @@ -464,6 +533,37 @@ func TestClient(t *testing.T) {
},
},
},
{
name: "AsyncResponseMessage",
initString: codeBlock{
code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`,
},
setup: func(tb *httpmultibin.HTTPMultiBin) {
tb.GRPCStub.UnaryCallFunc = func(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
return &grpc_testing.SimpleResponse{
OauthScope: "水",
}, nil
}
},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
client.asyncInvoke("grpc.testing.TestService/UnaryCall", {}).then(function(resp) {
if (!resp.message || resp.message.username !== "" || resp.message.oauthScope !== "水") {
throw new Error("unexpected response message: " + JSON.stringify(resp.message))
}
}, (err) => {
throw new Error("unexpected error: " + err)
});
`,
asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) {
samplesBuf := metrics.GetBufferedSamples(samples)
assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/UnaryCall"))
},
},
},
{
name: "ResponseError",
initString: codeBlock{
Expand Down Expand Up @@ -973,7 +1073,7 @@ func TestClient(t *testing.T) {
assertResponse(t, tt.initString, err, val, ts)

ts.ToVUContext()
val, err = ts.Run(tt.vuString.code)
val, err = ts.RunOnEventLoop(tt.vuString.code)
assertResponse(t, tt.vuString, err, val, ts)
})
}
Expand Down
48 changes: 29 additions & 19 deletions lib/netext/grpcext/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"strconv"
"strings"
"time"

"github.com/sirupsen/logrus"
"go.k6.io/k6/lib"
Expand All @@ -27,30 +28,34 @@ import (
"google.golang.org/protobuf/types/dynamicpb"
)

// Request represents a gRPC request.
type Request struct {
MethodDescriptor protoreflect.MethodDescriptor
TagsAndMeta *metrics.TagsAndMeta
Message []byte
}

// StreamRequest represents a gRPC stream request.
type StreamRequest struct {
// InvokeRequest represents a unary gRPC request.
type InvokeRequest struct {
Method string
MethodDescriptor protoreflect.MethodDescriptor
Timeout time.Duration
TagsAndMeta *metrics.TagsAndMeta
Message []byte
Metadata metadata.MD
}

// Response represents a gRPC response.
type Response struct {
// InvokeResponse represents a gRPC response.
type InvokeResponse struct {
Message interface{}
Error interface{}
Headers map[string][]string
Trailers map[string][]string
Status codes.Code
}

// StreamRequest represents a gRPC stream request.
type StreamRequest struct {
Method string
MethodDescriptor protoreflect.MethodDescriptor
Timeout time.Duration
TagsAndMeta *metrics.TagsAndMeta
Metadata metadata.MD
}

type clientConnCloser interface {
grpc.ClientConnInterface
Close() error
Expand Down Expand Up @@ -97,22 +102,27 @@ func (c *Conn) Reflect(ctx context.Context) (*descriptorpb.FileDescriptorSet, er
// Invoke executes a unary gRPC request.
func (c *Conn) Invoke(
ctx context.Context,
url string,
md metadata.MD,
req Request,
req InvokeRequest,
opts ...grpc.CallOption,
) (*Response, error) {
if url == "" {
) (*InvokeResponse, error) {
if req.Method == "" {
return nil, fmt.Errorf("url is required")
}

if req.MethodDescriptor == nil {
return nil, fmt.Errorf("request method descriptor is required")
}
if len(req.Message) == 0 {
return nil, fmt.Errorf("request message is required")
}

ctx = metadata.NewOutgoingContext(ctx, md)
if req.Timeout != time.Duration(0) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, req.Timeout)
defer cancel()
}

ctx = metadata.NewOutgoingContext(ctx, req.Metadata)

reqdm := dynamicpb.NewMessage(req.MethodDescriptor.Input())
if err := protojson.Unmarshal(req.Message, reqdm); err != nil {
Expand All @@ -128,9 +138,9 @@ func (c *Conn) Invoke(
copts = append(copts, opts...)
copts = append(copts, grpc.Header(&header), grpc.Trailer(&trailer))

err := c.raw.Invoke(ctx, url, reqdm, resp, copts...)
err := c.raw.Invoke(ctx, req.Method, reqdm, resp, copts...)

response := Response{
response := InvokeResponse{
Headers: header,
Trailers: trailer,
}
Expand Down
Loading