Skip to content

Commit

Permalink
Fix metrics for GRPC streams do not correctly include tags when strea…
Browse files Browse the repository at this point in the history
…m ends
  • Loading branch information
cchamplin authored and olegbespalov committed Jul 15, 2024
1 parent f63c445 commit e79ee2c
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 2 deletions.
3 changes: 1 addition & 2 deletions js/modules/k6/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,10 @@ func defineStream(rt *sobek.Runtime, s *stream) {
}

func (s *stream) beginStream(p *callParams) error {
tags := s.vu.State().Tags.GetCurrentValues()
req := &grpcext.StreamRequest{
Method: s.method,
MethodDescriptor: s.methodDescriptor,
TagsAndMeta: &tags,
TagsAndMeta: &p.TagsAndMeta,
Metadata: p.Metadata,
}

Expand Down
106 changes: 106 additions & 0 deletions js/modules/k6/grpc/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"go.k6.io/k6/lib/testutils/grpcservice"
"go.k6.io/k6/lib/testutils/httpmultibin/grpc_wrappers_testing"
"go.k6.io/k6/metrics"

"github.com/golang/protobuf/ptypes/wrappers"
"github.com/grafana/sobek"
Expand Down Expand Up @@ -406,3 +407,108 @@ func TestStream_UndefinedHandler(t *testing.T) {

require.ErrorContains(t, err, "handler for \"data\" event isn't a callable function")
}

// TestStream_MetricsTagsMetadata tests that the metrics tags are correctly
// added to samples.
func TestStream_MetricsTagsMetadata(t *testing.T) {
t.Parallel()

ts := newTestState(t)

stub := &featureExplorerStub{}

savedFeatures := []*grpcservice.Feature{
{
Name: "foo",
Location: &grpcservice.Point{
Latitude: 1,
Longitude: 2,
},
},
{
Name: "bar",
Location: &grpcservice.Point{
Latitude: 3,
Longitude: 4,
},
},
}

stub.listFeatures = func(_ *grpcservice.Rectangle, stream grpcservice.FeatureExplorer_ListFeaturesServer) error {
for _, feature := range savedFeatures {
// adding a delay to make server response "slower"
time.Sleep(200 * time.Millisecond)

if err := stream.Send(feature); err != nil {
return err
}
}

return nil
}

grpcservice.RegisterFeatureExplorerServer(ts.httpBin.ServerGRPC, stub)

initString := codeBlock{
code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/grpcservice/route_guide.proto");`,
}
vuString := codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
let params = {
tags: { "tag1": "value1" },
};
let stream = new grpc.Stream(client, "main.FeatureExplorer/ListFeatures", params)
stream.on('data', function (data) {
call('Feature:' + data.name);
});
stream.on('end', function () {
call('End called');
});
stream.write({
lo: {
latitude: 1,
longitude: 2,
},
hi: {
latitude: 1,
longitude: 2,
},
});
stream.end();
`,
}

val, err := ts.Run(initString.code)
assertResponse(t, initString, err, val, ts)

ts.ToVUContext()

val, err = ts.RunOnEventLoop(vuString.code)

assertResponse(t, vuString, err, val, ts)

expTags := map[string]string{"tag1": "value1"}

samplesBuf := metrics.GetBufferedSamples(ts.samples)

assert.Len(t, samplesBuf, 5)
for _, samples := range samplesBuf {
for _, sample := range samples.GetSamples() {
assertTags(t, sample, expTags)
}
}
}

func assertTags(t *testing.T, sample metrics.Sample, tags map[string]string) {
for k, v := range tags {
tag, ok := sample.Tags.Get(k)
assert.True(t, ok)
assert.Equal(t, tag, v)
}
}

0 comments on commit e79ee2c

Please sign in to comment.