Skip to content

Commit

Permalink
[dtest] Add a method for Prom remote writes (#3065)
Browse files Browse the repository at this point in the history
  • Loading branch information
vpranckaitis authored Jan 7, 2021
1 parent 14e4c08 commit 3fb3218
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 1 deletion.
2 changes: 2 additions & 0 deletions src/cmd/tools/dtest/docker/harness/query_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func testInvalidQueryReturns400(t *testing.T, tests []urlTest) {
coord := singleDBNodeDockerResources.Coordinator()

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
assert.NoError(t, coord.RunQuery(verifyResponse(400), tt.url), "for query '%v'", tt.url)
})
}
Expand Down
67 changes: 66 additions & 1 deletion src/cmd/tools/dtest/docker/harness/resources/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@ import (
"time"

"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/generated/proto/prompb"
xhttp "github.com/m3db/m3/src/x/net/http"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/ory/dockertest/v3"
"github.com/prometheus/common/model"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -66,6 +70,8 @@ type Coordinator interface {

// WriteCarbon writes a carbon metric datapoint at a given time.
WriteCarbon(port int, metric string, v float64, t time.Time) error
// WriteProm writes a prometheus metric.
WriteProm(name string, tags map[string]string, samples []prompb.Sample) error
// RunQuery runs the given query with a given verification function.
RunQuery(verifier ResponseVerifier, query string) error
}
Expand Down Expand Up @@ -323,6 +329,65 @@ func (c *coordinator) WriteCarbon(
// return nil
}

func (c *coordinator) WriteProm(name string, tags map[string]string, samples []prompb.Sample) error {
if c.resource.closed {
return errClosed
}

var (
url = c.resource.getURL(7201, "api/v1/prom/remote/write")
reqLabels = []prompb.Label{{Name: []byte(model.MetricNameLabel), Value: []byte(name)}}
)

for tag, value := range tags {
reqLabels = append(reqLabels, prompb.Label{
Name: []byte(tag),
Value: []byte(value),
})
}
writeRequest := prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: reqLabels,
Samples: samples,
},
},
}

logger := c.resource.logger.With(
zapMethod("createDatabase"), zap.String("url", url),
zap.String("request", writeRequest.String()))

body, err := proto.Marshal(&writeRequest)
if err != nil {
logger.Error("failed marshaling request message", zap.Error(err))
return err
}
data := bytes.NewBuffer(snappy.Encode(nil, body))

req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, url, data)
if err != nil {
logger.Error("failed constructing request", zap.Error(err))
return err
}
req.Header.Add(xhttp.HeaderContentType, xhttp.ContentTypeProtobuf)

resp, err := http.DefaultClient.Do(req)
if err != nil {
logger.Error("failed making a request", zap.Error(err))
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
logger.Error("status code not 2xx",
zap.Int("status code", resp.StatusCode),
zap.String("status", resp.Status))
return fmt.Errorf("status code %d", resp.StatusCode)
}

return nil
}

func makePostRequest(logger *zap.Logger, url string, body proto.Message) (*http.Response, error) {
data := bytes.NewBuffer(nil)
if err := (&jsonpb.Marshaler{}).Marshal(data, body); err != nil {
Expand All @@ -338,7 +403,7 @@ func makePostRequest(logger *zap.Logger, url string, body proto.Message) (*http.
return nil, fmt.Errorf("failed to construct request: %w", err)
}

req.Header.Add("Content-Type", "application/json")
req.Header.Add(xhttp.HeaderContentType, xhttp.ContentTypeJSON)

return http.DefaultClient.Do(req)
}
Expand Down

0 comments on commit 3fb3218

Please sign in to comment.