From 3fb32185f65b88b10077dd6bbee7e5aa3f8844a6 Mon Sep 17 00:00:00 2001 From: Vilius Pranckaitis Date: Fri, 8 Jan 2021 00:39:49 +1100 Subject: [PATCH] [dtest] Add a method for Prom remote writes (#3065) --- .../dtest/docker/harness/query_api_test.go | 2 + .../docker/harness/resources/coordinator.go | 67 ++++++++++++++++++- 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/src/cmd/tools/dtest/docker/harness/query_api_test.go b/src/cmd/tools/dtest/docker/harness/query_api_test.go index 55065dac6a..a1c81cfc5b 100644 --- a/src/cmd/tools/dtest/docker/harness/query_api_test.go +++ b/src/cmd/tools/dtest/docker/harness/query_api_test.go @@ -73,7 +73,9 @@ func testInvalidQueryReturns400(t *testing.T, tests []urlTest) { coord := singleDBNodeDockerResources.Coordinator() for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { + t.Parallel() assert.NoError(t, coord.RunQuery(verifyResponse(400), tt.url), "for query '%v'", tt.url) }) } diff --git a/src/cmd/tools/dtest/docker/harness/resources/coordinator.go b/src/cmd/tools/dtest/docker/harness/resources/coordinator.go index 8d003f7da2..2c7f2cba1c 100644 --- a/src/cmd/tools/dtest/docker/harness/resources/coordinator.go +++ b/src/cmd/tools/dtest/docker/harness/resources/coordinator.go @@ -30,10 +30,14 @@ import ( "time" "github.com/m3db/m3/src/query/generated/proto/admin" + "github.com/m3db/m3/src/query/generated/proto/prompb" + xhttp "github.com/m3db/m3/src/x/net/http" "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" "github.com/ory/dockertest/v3" + "github.com/prometheus/common/model" "go.uber.org/zap" ) @@ -66,6 +70,8 @@ type Coordinator interface { // WriteCarbon writes a carbon metric datapoint at a given time. WriteCarbon(port int, metric string, v float64, t time.Time) error + // WriteProm writes a prometheus metric. + WriteProm(name string, tags map[string]string, samples []prompb.Sample) error // RunQuery runs the given query with a given verification function. RunQuery(verifier ResponseVerifier, query string) error } @@ -323,6 +329,65 @@ func (c *coordinator) WriteCarbon( // return nil } +func (c *coordinator) WriteProm(name string, tags map[string]string, samples []prompb.Sample) error { + if c.resource.closed { + return errClosed + } + + var ( + url = c.resource.getURL(7201, "api/v1/prom/remote/write") + reqLabels = []prompb.Label{{Name: []byte(model.MetricNameLabel), Value: []byte(name)}} + ) + + for tag, value := range tags { + reqLabels = append(reqLabels, prompb.Label{ + Name: []byte(tag), + Value: []byte(value), + }) + } + writeRequest := prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: reqLabels, + Samples: samples, + }, + }, + } + + logger := c.resource.logger.With( + zapMethod("createDatabase"), zap.String("url", url), + zap.String("request", writeRequest.String())) + + body, err := proto.Marshal(&writeRequest) + if err != nil { + logger.Error("failed marshaling request message", zap.Error(err)) + return err + } + data := bytes.NewBuffer(snappy.Encode(nil, body)) + + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, url, data) + if err != nil { + logger.Error("failed constructing request", zap.Error(err)) + return err + } + req.Header.Add(xhttp.HeaderContentType, xhttp.ContentTypeProtobuf) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + logger.Error("failed making a request", zap.Error(err)) + return err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode > 299 { + logger.Error("status code not 2xx", + zap.Int("status code", resp.StatusCode), + zap.String("status", resp.Status)) + return fmt.Errorf("status code %d", resp.StatusCode) + } + + return nil +} + func makePostRequest(logger *zap.Logger, url string, body proto.Message) (*http.Response, error) { data := bytes.NewBuffer(nil) if err := (&jsonpb.Marshaler{}).Marshal(data, body); err != nil { @@ -338,7 +403,7 @@ func makePostRequest(logger *zap.Logger, url string, body proto.Message) (*http. return nil, fmt.Errorf("failed to construct request: %w", err) } - req.Header.Add("Content-Type", "application/json") + req.Header.Add(xhttp.HeaderContentType, xhttp.ContentTypeJSON) return http.DefaultClient.Do(req) }