Skip to content

Commit

Permalink
[tests] Add conversion of prometheus docker integration test - part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles committed Nov 15, 2021
1 parent af07892 commit 72e9b8a
Show file tree
Hide file tree
Showing 12 changed files with 1,087 additions and 35 deletions.
831 changes: 831 additions & 0 deletions src/integration/prometheus/prometheus.go

Large diffs are not rendered by default.

73 changes: 73 additions & 0 deletions src/integration/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// +build cluster_integration
//
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package prometheus

import (
"path"
"runtime"
"testing"

"github.com/m3db/m3/src/integration/resources"
"github.com/m3db/m3/src/integration/resources/docker"
"github.com/m3db/m3/src/integration/resources/inprocess"

"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPrometheus(t *testing.T) {
m3, prom, closer := testSetup(t)
defer closer()

RunTest(t, m3, prom)
}

func testSetup(t *testing.T) (resources.M3Resources, resources.ExternalResources, func()) {
cfgs, err := inprocess.NewClusterConfigsFromYAML(
TestPrometheusDBNodeConfig, TestPrometheusCoordinatorConfig, "",
)
require.NoError(t, err)

m3, err := inprocess.NewCluster(cfgs,
resources.ClusterOptions{
DBNode: resources.NewDBNodeClusterOptions(),
},
)
require.NoError(t, err)

pool, err := dockertest.NewPool("")
require.NoError(t, err)

_, filename, _, _ := runtime.Caller(0)
prom := docker.NewPrometheus(docker.PrometheusOptions{
Pool: pool,
PathToCfg: path.Join(path.Dir(filename), "../resources/docker/config/prometheus.yml"),
})
require.NoError(t, prom.Setup())

return m3, prom, func() {
assert.NoError(t, prom.Close())
assert.NoError(t, m3.Cleanup())
}
}
2 changes: 1 addition & 1 deletion src/integration/resources/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

const (
retryMaxInterval = 5 * time.Second
retryMaxTime = 3 * time.Minute
retryMaxTime = 1 * time.Minute
)

// Retry is a function for retrying an operation in integration tests.
Expand Down
51 changes: 37 additions & 14 deletions src/integration/resources/coordinator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,19 +588,42 @@ func (c *CoordinatorClient) WriteCarbon(
return con.Close()
}

// WriteProm writes a prometheus metric.
func (c *CoordinatorClient) WriteProm(name string, tags map[string]string, samples []prompb.Sample) error {
var (
url = c.makeURL("api/v1/prom/remote/write")
reqLabels = []prompb.Label{{Name: []byte(model.MetricNameLabel), Value: []byte(name)}}
)
// WriteProm writes a prometheus metric. Takes tags/labels as a map for convenience.
func (c *CoordinatorClient) WriteProm(
name string,
tags map[string]string,
samples []prompb.Sample,
headers Headers,
) error {
labels := make([]prompb.Label, 0, len(tags))

for tag, value := range tags {
reqLabels = append(reqLabels, prompb.Label{
labels = append(labels, prompb.Label{
Name: []byte(tag),
Value: []byte(value),
})
}

return c.WritePromWithLabels(name, labels, samples, headers)
}

// WritePromWithLabels writes a prometheus metric. Allows you to provide the labels for the write
// directly instead of conveniently converting them from a map.
func (c *CoordinatorClient) WritePromWithLabels(
name string,
labels []prompb.Label,
samples []prompb.Sample,
headers Headers,
) error {
var (
url = c.makeURL("api/v1/prom/remote/write")
reqLabels = []prompb.Label{{Name: []byte(model.MetricNameLabel), Value: []byte(name)}}
)

for _, label := range labels {
reqLabels = append(reqLabels, label)
}

writeRequest := prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Expand All @@ -626,6 +649,11 @@ func (c *CoordinatorClient) WriteProm(name string, tags map[string]string, sampl
logger.Error("failed constructing request", zap.Error(err))
return err
}
for key, vals := range headers {
for _, val := range vals {
req.Header.Add(key, val)
}
}
req.Header.Add(xhttp.HeaderContentType, xhttp.ContentTypeProtobuf)

resp, err := c.client.Do(req)
Expand Down Expand Up @@ -769,12 +797,6 @@ type vectorResult struct {

// RangeQuery runs a range query with provided headers
func (c *CoordinatorClient) RangeQuery(req RangeQueryRequest, headers Headers) (model.Matrix, error) {
if req.Start.IsZero() {
req.Start = time.Now()
}
if req.End.IsZero() {
req.End = time.Now()
}
if req.Step == 0 {
req.Step = 15 * time.Second // default step is 15 seconds.
}
Expand Down Expand Up @@ -923,7 +945,8 @@ func (c *CoordinatorClient) runQuery(
b, err := ioutil.ReadAll(resp.Body)

if status := resp.StatusCode; status != http.StatusOK {
return "", fmt.Errorf("query response status not OK, received %v", status)
return "", fmt.Errorf("query response status not OK, received %v. error=%v",
status, string(b))
}

if contentType, ok := resp.Header["Content-Type"]; !ok {
Expand Down
8 changes: 4 additions & 4 deletions src/integration/resources/docker/config/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ scrape_configs:

- job_name: 'coordinator'
static_configs:
- targets: ['coordinator01:7203']
- targets: ['host.docker.internal:7203']

- job_name: 'dbnode'
static_configs:
- targets: ['dbnode01:9004']
- targets: ['host.docker.internal:9004']

remote_read:
- url: http://coordinator01:7201/api/v1/prom/remote/read
- url: http://host.docker.internal:7201/api/v1/prom/remote/read

remote_write:
- url: http://coordinator01:7201/api/v1/prom/remote/write
- url: http://host.docker.internal:7201/api/v1/prom/remote/write
write_relabel_configs:
- target_label: metrics_storage
replacement: m3db_remote
22 changes: 20 additions & 2 deletions src/integration/resources/docker/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,30 @@ func (c *coordinator) WriteCarbon(
return c.client.WriteCarbon(url, metric, v, t)
}

func (c *coordinator) WriteProm(name string, tags map[string]string, samples []prompb.Sample) error {
func (c *coordinator) WriteProm(
name string,
tags map[string]string,
samples []prompb.Sample,
headers resources.Headers,
) error {
if c.resource.closed {
return errClosed
}

return c.client.WriteProm(name, tags, samples, headers)
}

func (c *coordinator) WritePromWithLabels(
name string,
labels []prompb.Label,
samples []prompb.Sample,
headers resources.Headers,
) error {
if c.resource.closed {
return errClosed
}

return c.client.WriteProm(name, tags, samples)
return c.client.WritePromWithLabels(name, labels, samples, headers)
}

func (c *coordinator) ApplyKVUpdate(update string) error {
Expand Down
86 changes: 81 additions & 5 deletions src/integration/resources/docker/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,22 @@ package docker

import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"time"

"github.com/ory/dockertest/v3"
"github.com/prometheus/common/model"

"github.com/m3db/m3/src/integration/resources"
"github.com/m3db/m3/src/x/instrument"
)

type prometheus struct {
// Prometheus is a docker-backed instantiation of Prometheus.
type Prometheus struct {
pool *dockertest.Pool
pathToCfg string
iOpts instrument.Options
Expand All @@ -59,14 +64,14 @@ func NewPrometheus(opts PrometheusOptions) resources.ExternalResources {
if opts.InstrumentOptions == nil {
opts.InstrumentOptions = instrument.NewOptions()
}
return &prometheus{
return &Prometheus{
pool: opts.Pool,
pathToCfg: opts.PathToCfg,
iOpts: opts.InstrumentOptions,
}
}

func (p *prometheus) Setup() error {
func (p *Prometheus) Setup() error {
if p.resource != nil {
return errors.New("prometheus already setup. must close resource " +
"before attempting to setup again")
Expand Down Expand Up @@ -97,7 +102,7 @@ func (p *prometheus) Setup() error {
return p.waitForHealthy()
}

func (p *prometheus) waitForHealthy() error {
func (p *Prometheus) waitForHealthy() error {
return resources.Retry(func() error {
req, err := http.NewRequestWithContext(
context.Background(),
Expand All @@ -123,7 +128,78 @@ func (p *prometheus) waitForHealthy() error {
})
}

func (p *prometheus) Close() error {
// PrometheusQueryRequest contains the parameters for making a query request.
type PrometheusQueryRequest struct {
// Query is the prometheus query to execute
Query string
// Time is the time to execute the query at
Time time.Time
}

// String converts the query request into a string suitable for use
// in the url params or request body
func (p *PrometheusQueryRequest) String() string {
str := fmt.Sprintf("query=%v", p.Query)

if !p.Time.IsZero() {
str += fmt.Sprintf("&time=%v", p.Time.Unix())
}

return str
}

// Query executes a query request against the prometheus resource.
func (p *Prometheus) Query(req PrometheusQueryRequest) (model.Vector, error) {
if p.resource.Closed() {
return nil, errClosed
}

r, err := http.NewRequestWithContext(
context.Background(),
http.MethodGet,
fmt.Sprintf("http://0.0.0.0:9090/api/v1/query?%s", req.String()),
nil,
)
if err != nil {
return nil, err
}

client := http.Client{}
res, err := client.Do(r)
if err != nil {
return nil, err
}
defer res.Body.Close()

body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, err
}

if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("non-200 status code received. "+
"status=%v responseBody=%v", res.StatusCode, string(body))
}

var parsedResp jsonInstantQueryResponse
if err := json.Unmarshal(body, &parsedResp); err != nil {
return nil, err
}

return parsedResp.Data.Result, nil
}

type jsonInstantQueryResponse struct {
Status string
Data vectorResult
}

type vectorResult struct {
ResultType model.ValueType
Result model.Vector
}

func (p *Prometheus) Close() error {
if p.resource.Closed() {
return errClosed
}
Expand Down
4 changes: 2 additions & 2 deletions src/integration/resources/inprocess/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func testAggMetrics(t *testing.T, coord resources.Coordinator) {
expectedValue = model.SampleValue(6)
)
assert.NoError(t, resources.Retry(func() error {
return coord.WriteProm("cpu", map[string]string{"host": "host1"}, samples)
return coord.WriteProm("cpu", map[string]string{"host": "host1"}, samples, nil)
}))

queryHeaders := resources.Headers{"M3-Metrics-Type": {"aggregated"}, "M3-Storage-Policy": {"10s:6h"}}
Expand All @@ -315,7 +315,7 @@ func testAggMetrics(t *testing.T, coord resources.Coordinator) {
Query: "cpu",
Start: time.Now().Add(-30 * time.Second),
End: time.Now(),
Step: 1 * time.Second,
Step: 1 * time.Second,
},
queryHeaders,
)
Expand Down
Loading

0 comments on commit 72e9b8a

Please sign in to comment.