From 2bc64c6383044c2d52f725ffbb5c3557273df1c1 Mon Sep 17 00:00:00 2001 From: Oleg Bespalov Date: Wed, 15 May 2024 15:49:19 +0200 Subject: [PATCH] Minimal working version It has limited support of metric types (count, gauge, trend(?)). Only gRPC receiver. Limited configuration. --- README.md | 23 +++- examples/script.js | 17 +++ go.mod | 22 ++-- go.sum | 44 +++---- pkg/opentelemetry/config.go | 68 +++++++++-- pkg/opentelemetry/config_test.go | 41 ++++--- pkg/opentelemetry/output.go | 189 +++++++++++++++++++++++++++++-- pkg/opentelemetry/tags.go | 26 +++++ 8 files changed, 359 insertions(+), 71 deletions(-) create mode 100644 examples/script.js create mode 100644 pkg/opentelemetry/tags.go diff --git a/README.md b/README.md index e14385b..9aa9740 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,17 @@ A work in progress k6 extension to output real-time test metrics in [OpenTelemetry metrics format](https://opentelemetry.io/docs/specs/otel/metrics/). +> [!WARNING] +> It's work in progress implementation and not ready for production use. + +Configuration options (currently environment variables only): + +* `K6_OTEL_RECEIVER_TYPE` - OpenTelemetry receiver type, currently only `grpc` is supported. Default is `grpc`. +* `K6_OTEL_RECEIVER_ENDPOINT` - OpenTelemetry receiver endpoint. Default is `localhost:4317`. +* `K6_OTEL_METRIC_PREFIX` - Metric prefix. Default is empty. +* `K6_OTEL_FLUSH_INTERVAL` - How frequently to flush metrics to the receiver from k6. Default is `1s`. +* `K6_OTEL_PUSH_INTERVAL` - How frequently to push metrics to the receiver from k6. Default is `1s`. + ## Build To build a `k6` binary with this extension, first ensure you have the prerequisites: @@ -10,16 +21,22 @@ To build a `k6` binary with this extension, first ensure you have the prerequisi - Git - [xk6](https://github.com/grafana/xk6) -1. Build with `xk6`: - ```bash make build ``` This will result in a `k6` binary in the current directory. +## Local usage + +1. You could run a local environment with OpenTelemetry collector ([Grafana Alloy](https://github.com/grafana/alloy)), Prometheus backend and Grafana (http://localhost:3000/): + +```bash +docker-compose up -d +``` + 2. Run with the just build `k6: ```bash -./k6 run -o xk6-opentelemetry +K6_OTEL_METRIC_PREFIX=k6_ ./k6 run -o xk6-opentelemetry examples/script.js ``` diff --git a/examples/script.js b/examples/script.js new file mode 100644 index 0000000..a31a897 --- /dev/null +++ b/examples/script.js @@ -0,0 +1,17 @@ +import http from "k6/http"; +import { check } from "k6"; + +export const options = { + vus: 10, + duration: '3m', + thresholds: { + 'http_reqs{expected_response:true}': ['rate>10'], + }, +}; + +export default function () { + check(http.get("https://test-api.k6.io/"), { + "status is 200": (r) => r.status == 200, + "protocol is HTTP/2": (r) => r.proto == "HTTP/2.0", + }); +} diff --git a/go.mod b/go.mod index e3fb0db..de64dd1 100644 --- a/go.mod +++ b/go.mod @@ -3,39 +3,41 @@ module github.com/grafana/xk6-output-opentelemetry go 1.21 require ( + github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.9.0 go.k6.io/k6 v0.51.0 + go.opentelemetry.io/otel v1.26.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.26.0 + go.opentelemetry.io/otel/metric v1.26.0 + go.opentelemetry.io/otel/sdk v1.26.0 + go.opentelemetry.io/otel/sdk/metric v1.26.0 ) require ( - github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/fatih/color v1.16.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spf13/afero v1.9.5 // indirect - go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 // indirect - go.opentelemetry.io/otel/metric v1.24.0 // indirect - go.opentelemetry.io/otel/sdk v1.24.0 // indirect - go.opentelemetry.io/otel/trace v1.24.0 // indirect - go.opentelemetry.io/proto/otlp v1.1.0 // indirect + go.opentelemetry.io/otel/trace v1.26.0 // indirect + go.opentelemetry.io/proto/otlp v1.2.0 // indirect golang.org/x/net v0.23.0 // indirect - golang.org/x/sys v0.18.0 // indirect + golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect google.golang.org/grpc v1.63.2 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/guregu/null.v3 v3.5.0 // indirect diff --git a/go.sum b/go.sum index d7a8b45..08bcae4 100644 --- a/go.sum +++ b/go.sum @@ -40,8 +40,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= -github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= @@ -127,8 +127,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 h1:Wqo399gCIufwto+VfwCSvsnfGpF/w5E9CNxSwbpD6No= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0/go.mod h1:qmOFXW2epJhM0qSnUUYpldc7gVz2KMQwJ/QYCDIa7XU= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -167,8 +167,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spf13/afero v1.9.5 h1:stMpOSZFs//0Lv29HduCmli3GUfpFoF3Y1Q/aXj/wVM= @@ -191,22 +191,26 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= -go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= -go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo= +go.opentelemetry.io/otel v1.26.0 h1:LQwgL5s/1W7YiiRwxf03QGnWLb2HW4pLiAhaA5cZXBs= +go.opentelemetry.io/otel v1.26.0/go.mod h1:UmLkJHUAidDval2EICqBMbnAd0/m2vmpf/dAM+fvFs4= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.26.0 h1:+hm+I+KigBy3M24/h1p/NHkUx/evbLH0PNcjpMyCHc4= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.26.0/go.mod h1:NjC8142mLvvNT6biDpaMjyz78kyEHIwAJlSX0N9P5KI= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 h1:t6wl9SPayj+c7lEIFgm4ooDBZVb01IhLB4InpomhRw8= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0/go.mod h1:iSDOcsnSA5INXzZtwaBPrKp/lWu/V14Dd+llD0oI2EA= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 h1:Mw5xcxMwlqoJd97vwPxA8isEaIoxsta9/Q51+TTJLGE= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0/go.mod h1:CQNu9bj7o7mC6U7+CA/schKEYakYXWr79ucDHTMGhCM= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 h1:Xw8U6u2f8DK2XAkGRFV7BBLENgnTGX9i4rQRxJf+/vs= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0/go.mod h1:6KW1Fm6R/s6Z3PGXwSJN2K4eT6wQB3vXX6CVnYX9NmM= -go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI= -go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco= -go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= -go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= -go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI= -go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU= -go.opentelemetry.io/proto/otlp v1.1.0 h1:2Di21piLrCqJ3U3eXGCTPHE9R8Nh+0uglSnOyxikMeI= -go.opentelemetry.io/proto/otlp v1.1.0/go.mod h1:GpBHCBWiqvVLDqmHZsoMM3C5ySeKTC7ej/RNTae6MdY= +go.opentelemetry.io/otel/metric v1.26.0 h1:7S39CLuY5Jgg9CrnA9HHiEjGMF/X2VHvoXGgSllRz30= +go.opentelemetry.io/otel/metric v1.26.0/go.mod h1:SY+rHOI4cEawI9a7N1A4nIg/nTQXe1ccCNWYOJUrpX4= +go.opentelemetry.io/otel/sdk v1.26.0 h1:Y7bumHf5tAiDlRYFmGqetNcLaVUZmh4iYfmGxtmz7F8= +go.opentelemetry.io/otel/sdk v1.26.0/go.mod h1:0p8MXpqLeJ0pzcszQQN4F0S5FVjBLgypeGSngLsmirs= +go.opentelemetry.io/otel/sdk/metric v1.26.0 h1:cWSks5tfriHPdWFnl+qpX3P681aAYqlZHcAyHw5aU9Y= +go.opentelemetry.io/otel/sdk/metric v1.26.0/go.mod h1:ClMFFknnThJCksebJwz7KIyEDHO+nTB6gK8obLy8RyE= +go.opentelemetry.io/otel/trace v1.26.0 h1:1ieeAUb4y0TE26jUFrCIXKpTuVK7uJGN9/Z/2LP5sQA= +go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0= +go.opentelemetry.io/proto/otlp v1.2.0 h1:pVeZGk7nXDC9O2hncA6nHldxEjm6LByfA2aN8IOkz94= +go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naRm/KDsgaHD8A= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -342,8 +346,8 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -477,8 +481,8 @@ google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUE google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo= google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de h1:jFNzHPIeuzhdRwVhbZdiym9q0ory/xY3sA+v2wPg8I0= google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:5iCWqnniDlqZHrd3neWVTOwvh/v6s3232omMecelax8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de h1:cZGRis4/ot9uVm639a+rHCUaG0JJHEsdyzSQTMX+suY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= diff --git a/pkg/opentelemetry/config.go b/pkg/opentelemetry/config.go index a2bc0a7..33b16dc 100644 --- a/pkg/opentelemetry/config.go +++ b/pkg/opentelemetry/config.go @@ -1,36 +1,86 @@ package opentelemetry import ( + "errors" "fmt" "time" "go.k6.io/k6/output" ) +const ( + grpcReceiverType = "grpc" +) + // Config is the config for the template collector type Config struct { - Address string + // MetricPrefix is the prefix to use for the metrics + MetricPrefix string + // ReceiverType is the type of the receiver to use + ReceiverType string + // GRPCReceiverEndpoint is the endpoint of the gRPC receiver + GRPCReceiverEndpoint string + // PushInterval is the interval at which to push metrics to the receiver PushInterval time.Duration + // FlushInterval is the interval at which to flush metrics from the k6 + FlushInterval time.Duration } -// NewConfig creates a new Config instance from the provided output.Params +// NewConfig creates and validates a new config func NewConfig(p output.Params) (Config, error) { cfg := Config{ - Address: "template", - PushInterval: 1 * time.Second, + MetricPrefix: "", + ReceiverType: grpcReceiverType, + GRPCReceiverEndpoint: "localhost:4317", + PushInterval: 1 * time.Second, + FlushInterval: 1 * time.Second, } + var err error for k, v := range p.Environment { switch k { - case "K6_TEMPLATE_PUSH_INTERVAL": - var err error + case "K6_OTEL_PUSH_INTERVAL": cfg.PushInterval, err = time.ParseDuration(v) if err != nil { - return cfg, fmt.Errorf("error parsing environment variable 'K6_TEMPLATE_PUSH_INTERVAL': %w", err) + return cfg, fmt.Errorf("error parsing environment variable 'K6_OTEL_PUSH_INTERVAL': %w", err) + } + case "K6_OTEL_METRIC_PREFIX": + cfg.MetricPrefix = v + case "K6_OTEL_FLUSH_INTERVAL": + cfg.FlushInterval, err = time.ParseDuration(v) + if err != nil { + return cfg, fmt.Errorf("error parsing environment variable 'K6_OTEL_FLUSH_INTERVAL': %w", err) } - case "K6_TEMPLATE_ADDRESS": - cfg.Address = v + case "K6_OTEL_RECEIVER_TYPE": + cfg.ReceiverType = v + case "K6_OTEL_GRPC_RECEIVER_ENDPOINT": + cfg.GRPCReceiverEndpoint = v } } + + // TDOO: consolidated config + + if err = cfg.Validate(); err != nil { + return cfg, fmt.Errorf("error validating config: %w", err) + } + return cfg, nil } + +// Validate validates the config +func (c Config) Validate() error { + if c.ReceiverType != grpcReceiverType { + return fmt.Errorf("unsupported receiver type %q, currently only %q supported", c.ReceiverType, grpcReceiverType) + } + + if c.GRPCReceiverEndpoint == "" { + return errors.New("gRPC receiver endpoint is required") + } + + return nil +} + +// String returns a string representation of the config +func (c Config) String() string { + return fmt.Sprintf("%s, %s", c.ReceiverType, c.GRPCReceiverEndpoint) +} diff --git a/pkg/opentelemetry/config_test.go b/pkg/opentelemetry/config_test.go index e1a12b8..7cd2c4b 100644 --- a/pkg/opentelemetry/config_test.go +++ b/pkg/opentelemetry/config_test.go @@ -13,35 +13,40 @@ func TestConfig(t *testing.T) { t.Parallel() // TODO: add more cases testCases := map[string]struct { - jsonRaw json.RawMessage - env map[string]string - arg string - config Config - err string + jsonRaw json.RawMessage + env map[string]string + arg string + expectedConfig Config + err string }{ "default": { - config: Config{ - Address: "template", - PushInterval: 1 * time.Second, + expectedConfig: Config{ + ReceiverType: grpcReceiverType, + GRPCReceiverEndpoint: "localhost:4317", + PushInterval: 1 * time.Second, + FlushInterval: 1 * time.Second, }, }, "overwrite": { - env: map[string]string{"K6_TEMPLATE_ADDRESS": "else", "K6_TEMPLATE_PUSH_INTERVAL": "4ms"}, - config: Config{ - Address: "else", - PushInterval: 4 * time.Millisecond, + env: map[string]string{"K6_OTEL_GRPC_RECEIVER_ENDPOINT": "else", "K6_OTEL_PUSH_INTERVAL": "4ms"}, + expectedConfig: Config{ + ReceiverType: grpcReceiverType, + GRPCReceiverEndpoint: "else", + PushInterval: 4 * time.Millisecond, + FlushInterval: 1 * time.Second, }, }, "early error": { - env: map[string]string{"K6_TEMPLATE_ADDRESS": "else", "K6_TEMPLATE_PUSH_INTERVAL": "4something"}, - config: Config{ - Address: "else", - PushInterval: 1 * time.Second, - }, + env: map[string]string{"K6_OTEL_GRPC_RECEIVER_ENDPOINT": "else", "K6_OTEL_PUSH_INTERVAL": "4something"}, err: `time: unknown unit "something" in duration "4something"`, }, + + "unsupported receiver type": { + env: map[string]string{"K6_OTEL_GRPC_RECEIVER_ENDPOINT": "else", "K6_OTEL_PUSH_INTERVAL": "4m", "K6_OTEL_RECEIVER_TYPE": "http"}, + err: `error validating config: unsupported receiver type "http", currently only "grpc" supported`, + }, } for name, testCase := range testCases { @@ -55,7 +60,7 @@ func TestConfig(t *testing.T) { return } require.NoError(t, err) - require.Equal(t, testCase.config, config) + require.Equal(t, testCase.expectedConfig, config) }) } } diff --git a/pkg/opentelemetry/output.go b/pkg/opentelemetry/output.go index 5724055..791d4e8 100644 --- a/pkg/opentelemetry/output.go +++ b/pkg/opentelemetry/output.go @@ -2,10 +2,20 @@ package opentelemetry import ( + "context" + "fmt" + "sync" "time" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" + otelMetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" + k6Const "go.k6.io/k6/lib/consts" + "go.k6.io/k6/metrics" "go.k6.io/k6/output" ) @@ -16,6 +26,13 @@ type Output struct { config Config periodicFlusher *output.PeriodicFlusher logger logrus.FieldLogger + + meterProvider *metric.MeterProvider + meter otelMetric.Meter + + counters sync.Map + upDownCounters sync.Map + histograms sync.Map } var _ output.Output = new(Output) @@ -26,7 +43,6 @@ func New(p output.Params) (*Output, error) { if err != nil { return nil, err } - // Some setupping code return &Output{ config: conf, @@ -36,29 +52,67 @@ func New(p output.Params) (*Output, error) { // Description returns a human-readable description of the output that will be shown in `k6 run` func (o *Output) Description() string { - return "OpenTelemetry: " + o.config.Address + return fmt.Sprintf("OpenTelemetry (%s)", o.config) } // Stop flushes all remaining metrics and finalizes the test run func (o *Output) Stop() error { o.logger.Debug("Stopping...") defer o.logger.Debug("Stopped!") + + if err := o.meterProvider.Shutdown(context.Background()); err != nil { + o.logger.WithError(err).Error("Error shutting down metric provider") + } + o.periodicFlusher.Stop() + return nil } // Start performs initialization tasks prior to Engine using the output func (o *Output) Start() error { - o.logger.Debug("Starting...") + o.logger.Debug("Starting output...") + + ctx := context.Background() + + // TODO: support different exporters (e.g. OTLP/HTTP), authentication, etc. + exp, err := otlpmetricgrpc.New( + ctx, + otlpmetricgrpc.WithInsecure(), + otlpmetricgrpc.WithEndpoint(o.config.GRPCReceiverEndpoint), + ) + if err != nil { + return fmt.Errorf("failed to create OpenTelemetry exporter: %w", err) + } + + res, err := resource.Merge(resource.Default(), + resource.NewWithAttributes(semconv.SchemaURL, + semconv.ServiceName("k6"), + semconv.ServiceVersion(k6Const.Version), + )) + if err != nil { + return fmt.Errorf("failed to create OpenTelemetry resource: %w", err) + } - // Here we should connect to a service, open a file or w/e else we decided we need to do + meterProvider := metric.NewMeterProvider( + metric.WithResource(res), + metric.WithReader( + metric.NewPeriodicReader( + exp, + metric.WithInterval(o.config.PushInterval), + ), + ), + ) - pf, err := output.NewPeriodicFlusher(o.config.PushInterval, o.flushMetrics) + pf, err := output.NewPeriodicFlusher(o.config.FlushInterval, o.flushMetrics) if err != nil { return err } + o.logger.Debug("Started!") o.periodicFlusher = pf + o.meterProvider = meterProvider + o.meter = meterProvider.Meter("k6") return nil } @@ -66,17 +120,130 @@ func (o *Output) Start() error { func (o *Output) flushMetrics() { samples := o.GetBufferedSamples() start := time.Now() - var count int + var count, errCount int for _, sc := range samples { samples := sc.GetSamples() - count += len(samples) + for _, sample := range samples { - // Here we actually write or accumulate to then write in batches - // for the template code we just ... dump some parts of it on the screen - o.logger.Infof("%s=%.5f,%s\n", sample.Metric.Name, sample.Value, sample.GetTags().Map()) + if err := o.dispatch(sample); err != nil { + o.logger.WithError(err).Error("Error dispatching sample") + errCount++ + + continue + } + count++ } } + if count > 0 { - o.logger.WithField("t", time.Since(start)).WithField("count", count).Debug("Wrote metrics to stdout") + o.logger. + WithField("t", time.Since(start)). + WithField("count", count). + Debug("registered metrics in OpenTelemetry metric provider") + } + + if errCount > 0 { + o.logger. + WithField("t", time.Since(start)). + WithField("count", errCount). + Warn("can't flush some metrics") + } +} + +func (o *Output) dispatch(entry metrics.Sample) error { + ctx := context.Background() + name := normalizeMetricName(o.config, entry.Metric.Name) + + switch entry.Metric.Type { + case metrics.Counter: + counter, err := o.getOrCreateCounter(name) + if err != nil { + return err + } + + counter.Add(ctx, entry.Value, otelMetric.WithAttributes(MapTagSet(entry.Tags)...)) + case metrics.Gauge: + gauge, err := o.getOrCreateUpDownCounter(name) + if err != nil { + return err + } + + gauge.Add(ctx, entry.Value, otelMetric.WithAttributes(MapTagSet(entry.Tags)...)) + case metrics.Trend: + trend, err := o.getOrCreateHistogram(name) + if err != nil { + return err + } + + trend.Record(ctx, entry.Value, otelMetric.WithAttributes(MapTagSet(entry.Tags)...)) + default: + // TODO: add support for other metric types + o.logger.Debugf("Drop unsupported metric type: %s", entry.Metric.Name) } + + return nil +} + +func normalizeMetricName(cfg Config, name string) string { + return cfg.MetricPrefix + name +} + +func (o *Output) getOrCreateCounter(name string) (otelMetric.Float64Counter, error) { + if counter, ok := o.counters.Load(name); ok { + if v, ok := counter.(otelMetric.Float64Counter); ok { + return v, nil + } + + return nil, fmt.Errorf("metric %q is not a counter", name) + } + + c, err := o.meter.Float64Counter(name) + if err != nil { + return nil, fmt.Errorf("failed to create counter for %q: %w", name, err) + } + + o.logger.Debugf("registered counter metric %q", name) + + o.counters.Store(name, c) + return c, nil +} + +func (o *Output) getOrCreateHistogram(name string) (otelMetric.Float64Histogram, error) { + if histogram, ok := o.histograms.Load(name); ok { + if v, ok := histogram.(otelMetric.Float64Histogram); ok { + return v, nil + } + + return nil, fmt.Errorf("metric %q is not a histogram", name) + } + + h, err := o.meter.Float64Histogram(name) + if err != nil { + return nil, fmt.Errorf("failed to create histogram for %q: %w", name, err) + } + + o.logger.Debugf("registered histogram metric %q", name) + + o.histograms.Store(name, h) + return h, nil +} + +func (o *Output) getOrCreateUpDownCounter(name string) (otelMetric.Float64UpDownCounter, error) { + if counter, ok := o.upDownCounters.Load(name); ok { + if v, ok := counter.(otelMetric.Float64UpDownCounter); ok { + return v, nil + } + + return nil, fmt.Errorf("metric %q is not an up/down counter", name) + } + + c, err := o.meter.Float64UpDownCounter(name) + if err != nil { + return nil, fmt.Errorf("failed to create up/down counter for %q: %w", name, err) + } + + o.logger.Debugf("registered up/down counter (gauge) metric %q ", name) + + o.upDownCounters.Store(name, c) + return c, nil } diff --git a/pkg/opentelemetry/tags.go b/pkg/opentelemetry/tags.go new file mode 100644 index 0000000..c18375c --- /dev/null +++ b/pkg/opentelemetry/tags.go @@ -0,0 +1,26 @@ +package opentelemetry + +import ( + "github.com/mstoykov/atlas" + "go.k6.io/k6/metrics" + "go.opentelemetry.io/otel/attribute" +) + +// MapTagSet converts a k6 tag set into +// the equivalent set of opentelemetry attributes +func MapTagSet(t *metrics.TagSet) []attribute.KeyValue { + n := (*atlas.Node)(t) + if n.Len() < 1 { + return nil + } + labels := make([]attribute.KeyValue, 0, n.Len()) + for !n.IsRoot() { + prev, key, value := n.Data() + n = prev + if key == "" || value == "" { + continue + } + labels = append(labels, attribute.String(key, value)) + } + return labels +}