Skip to content

Commit

Permalink
[tests] Add label API endpoints to coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles committed Nov 10, 2021
1 parent 0504226 commit c9db6db
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 33 deletions.
110 changes: 107 additions & 3 deletions src/integration/resources/coordinator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"io/ioutil"
"net"
"net/http"
"path"
"strings"
"time"

Expand Down Expand Up @@ -324,7 +325,22 @@ func (c *CoordinatorClient) WaitForClusterReady() error {
logger.Error("failed checking cluster readiness", zap.Error(err))
return err
}
resp.Body.Close()
defer resp.Body.Close()

if resp.StatusCode != 200 {
err = errors.New("non-200 status code received")

body, rerr := ioutil.ReadAll(resp.Body)
if rerr != nil {
logger.Warn("failed parse response body", zap.Error(rerr))
body = []byte("")
}

logger.Error("failed to check cluster readiness", zap.Error(err),
zap.String("responseBody", string(body)),
)
return err
}

logger.Info("cluster ready to receive reads and writes")

Expand Down Expand Up @@ -722,7 +738,7 @@ func (c *CoordinatorClient) query(
}

// InstantQuery runs an instant query with provided headers
func (c *CoordinatorClient) InstantQuery(req QueryRequest, headers map[string][]string) (model.Vector, error) {
func (c *CoordinatorClient) InstantQuery(req QueryRequest, headers Headers) (model.Vector, error) {
queryStr := fmt.Sprintf("%s?query=%s", route.QueryURL, req.Query)
if req.Time != nil {
queryStr = fmt.Sprintf("%s&time=%d", queryStr, req.Time.Unix())
Expand Down Expand Up @@ -752,7 +768,7 @@ type vectorResult struct {
}

// RangeQuery runs a range query with provided headers
func (c *CoordinatorClient) RangeQuery(req RangeQueryRequest, headers map[string][]string) (model.Matrix, error) {
func (c *CoordinatorClient) RangeQuery(req RangeQueryRequest, headers Headers) (model.Matrix, error) {
if req.Start.IsZero() {
req.Start = time.Now()
}
Expand Down Expand Up @@ -783,6 +799,84 @@ func (c *CoordinatorClient) RangeQuery(req RangeQueryRequest, headers map[string
return parsedResp.Data.Result, nil
}

// LabelNames return matching label names based on the request.
func (c *CoordinatorClient) LabelNames(
req LabelNamesRequest,
headers Headers,
) (model.LabelNames, error) {
urlPathAndQuery := fmt.Sprintf("%s?%s", route.LabelNamesURL, req.String())
resp, err := c.runQuery(urlPathAndQuery, headers)
if err != nil {
return nil, err
}

var parsedResp labelResponse
if err := json.Unmarshal([]byte(resp), &parsedResp); err != nil {
return nil, err
}

labelNames := make(model.LabelNames, 0, len(parsedResp.Data))
for _, label := range parsedResp.Data {
labelNames = append(labelNames, model.LabelName(label))
}

return labelNames, nil
}

// LabelValues return matching label values based on the request.
func (c *CoordinatorClient) LabelValues(
req LabelValuesRequest,
headers Headers,
) (model.LabelValues, error) {
urlPathAndQuery := fmt.Sprintf("%s?%s",
path.Join(route.Prefix, "label", req.LabelName, "values"),
req.String())
resp, err := c.runQuery(urlPathAndQuery, headers)
if err != nil {
return nil, err
}

var parsedResp labelResponse
if err := json.Unmarshal([]byte(resp), &parsedResp); err != nil {
return nil, err
}

labelValues := make(model.LabelValues, 0, len(parsedResp.Data))
for _, label := range parsedResp.Data {
labelValues = append(labelValues, model.LabelValue(label))
}

return labelValues, nil
}

// Series returns matching series based on the request.
func (c *CoordinatorClient) Series(
req SeriesRequest,
headers Headers,
) ([]model.Metric, error) {
urlPathAndQuery := fmt.Sprintf("%s?%s", route.SeriesMatchURL, req.String())
resp, err := c.runQuery(urlPathAndQuery, headers)
if err != nil {
return nil, err
}

var parsedResp seriesResponse
if err := json.Unmarshal([]byte(resp), &parsedResp); err != nil {
return nil, err
}

series := make([]model.Metric, 0, len(parsedResp.Data))
for _, labels := range parsedResp.Data {
labelSet := make(model.LabelSet)
for name, val := range labels {
labelSet[model.LabelName(name)] = model.LabelValue(val)
}
series = append(series, model.Metric(labelSet))
}

return series, nil
}

type jsonRangeQueryResponse struct {
Status string
Data matrixResult
Expand All @@ -793,6 +887,16 @@ type matrixResult struct {
Result model.Matrix
}

type labelResponse struct {
Status string
Data []string
}

type seriesResponse struct {
Status string
Data []map[string]string
}

func (c *CoordinatorClient) runQuery(
query string, headers map[string][]string,
) (string, error) {
Expand Down
33 changes: 33 additions & 0 deletions src/integration/resources/docker/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,39 @@ func (c *coordinator) RangeQuery(
return c.client.RangeQuery(req, headers)
}

// LabelNames return matching label names based on the request.
func (c *coordinator) LabelNames(
req resources.LabelNamesRequest,
headers resources.Headers,
) (model.LabelNames, error) {
if c.resource.closed {
return nil, errClosed
}
return c.client.LabelNames(req, headers)
}

// LabelValues returns matching label values based on the request.
func (c *coordinator) LabelValues(
req resources.LabelValuesRequest,
headers resources.Headers,
) (model.LabelValues, error) {
if c.resource.closed {
return nil, errClosed
}
return c.client.LabelValues(req, headers)
}

// Series returns matching series based on the request.
func (c *coordinator) Series(
req resources.SeriesRequest,
headers resources.Headers,
) ([]model.Metric, error) {
if c.resource.closed {
return nil, errClosed
}
return c.client.Series(req, headers)
}

func (c *coordinator) Close() error {
if c.resource.closed {
return errClosed
Expand Down
24 changes: 24 additions & 0 deletions src/integration/resources/inprocess/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,30 @@ func (c *Coordinator) RangeQuery(
return c.client.RangeQuery(req, headers)
}

// LabelNames return matching label names based on the request.
func (c *Coordinator) LabelNames(
req resources.LabelNamesRequest,
headers resources.Headers,
) (model.LabelNames, error) {
return c.client.LabelNames(req, headers)
}

// LabelValues returns matching label values based on the request.
func (c *Coordinator) LabelValues(
req resources.LabelValuesRequest,
headers resources.Headers,
) (model.LabelValues, error) {
return c.client.LabelValues(req, headers)
}

// Series returns matching series based on the request.
func (c *Coordinator) Series(
req resources.SeriesRequest,
headers resources.Headers,
) ([]model.Metric, error) {
return c.client.Series(req, headers)
}

func updateCoordinatorConfig(
cfg config.Configuration,
opts CoordinatorOptions,
Expand Down
62 changes: 43 additions & 19 deletions src/integration/resources/inprocess/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ import (
"github.com/m3db/m3/src/msg/generated/proto/topicpb"
"github.com/m3db/m3/src/msg/topic"
"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/generated/proto/prompb"
"github.com/m3db/m3/src/query/storage"
xtime "github.com/m3db/m3/src/x/time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -73,15 +77,49 @@ func TestNewEmbeddedCoordinatorNotStarted(t *testing.T) {
require.Error(t, err)
}

func TestM3msgTopicFunctions(t *testing.T) {
dbnode, err := NewDBNodeFromYAML(defaultDBNodeConfig, DBNodeOptions{})
func TestCoordinatorAPIs(t *testing.T) {
_, coord, closer := setupNodeAndCoordinator(t)
defer closer()

testM3msgTopicFunctions(t, coord)
testAggPlacementFunctions(t, coord)
testMetadataAPIs(t, coord)
}

func testMetadataAPIs(t *testing.T, coordinator resources.Coordinator) {
err := coordinator.WriteProm("cpu", map[string]string{"pod": "foo-1234"}, []prompb.Sample{
{Value: 1, Timestamp: storage.TimeToPromTimestamp(xtime.Now())},
})
require.NoError(t, err)

coord, err := NewCoordinatorFromYAML(defaultCoordConfig, CoordinatorOptions{})
names, err := coordinator.LabelNames(resources.LabelNamesRequest{}, nil)
require.NoError(t, err)
require.Equal(t, model.LabelNames{
"__name__",
"pod",
}, names)

values, err := coordinator.LabelValues(resources.LabelValuesRequest{
LabelName: "__name__",
}, nil)
require.NoError(t, err)
require.Equal(t, model.LabelValues{"cpu"}, values)

require.NoError(t, coord.WaitForNamespace(""))
series, err := coordinator.Series(resources.SeriesRequest{
MetadataRequest: resources.MetadataRequest{
Match: "cpu",
},
}, nil)
require.NoError(t, err)
require.Equal(t, []model.Metric{
{
"__name__": "cpu",
"pod": "foo-1234",
},
}, series)
}

func testM3msgTopicFunctions(t *testing.T, coord resources.Coordinator) {
// init an m3msg topic
m3msgTopicOpts := resources.M3msgTopicOptions{
Zone: "embedded",
Expand Down Expand Up @@ -134,9 +172,6 @@ func TestM3msgTopicFunctions(t *testing.T) {
getResp, err := coord.GetM3msgTopic(m3msgTopicOpts)
require.NoError(t, err)
validateEqualTopicResp(t, expectedAddResp, getResp)

assert.NoError(t, coord.Close())
assert.NoError(t, dbnode.Close())
}

func validateEqualTopicResp(t *testing.T, expected, actual admin.TopicGetResponse) {
Expand All @@ -149,15 +184,7 @@ func validateEqualTopicResp(t *testing.T, expected, actual admin.TopicGetRespons
require.Equal(t, t1, t2)
}

func TestAggPlacementFunctions(t *testing.T) {
dbnode, err := NewDBNodeFromYAML(defaultDBNodeConfig, DBNodeOptions{})
require.NoError(t, err)

coord, err := NewCoordinatorFromYAML(defaultCoordConfig, CoordinatorOptions{})
require.NoError(t, err)

require.NoError(t, coord.WaitForNamespace(""))

func testAggPlacementFunctions(t *testing.T, coord resources.Coordinator) {
placementOpts := resources.PlacementRequestOptions{
Service: resources.ServiceTypeM3Aggregator,
Env: "default_env",
Expand Down Expand Up @@ -235,9 +262,6 @@ func TestAggPlacementFunctions(t *testing.T) {
}
_, err = coord.GetPlacement(wrongPlacementOpts)
require.NotNil(t, err)

assert.NoError(t, coord.Close())
assert.NoError(t, dbnode.Close())
}

func validateEqualAggPlacement(t *testing.T, expected, actual *placementpb.Placement) {
Expand Down
9 changes: 5 additions & 4 deletions src/integration/resources/inprocess/dbnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestNewDBNodeNoSetup(t *testing.T) {
}

func TestDBNode(t *testing.T) {
dbnode, closer := setupNode(t)
dbnode, _, closer := setupNodeAndCoordinator(t)
defer closer()

testHealth(t, dbnode)
Expand Down Expand Up @@ -168,7 +168,7 @@ func validateTag(t *testing.T, tag ident.Tag, name string, value string) {
require.Equal(t, value, tag.Value.String())
}

func setupNode(t *testing.T) (resources.Node, func()) {
func setupNodeAndCoordinator(t *testing.T) (resources.Node, resources.Coordinator, func()) {
dbnode, err := NewDBNodeFromYAML(defaultDBNodeConfig, DBNodeOptions{GenerateHostID: true})
require.NoError(t, err)

Expand All @@ -190,9 +190,10 @@ func setupNode(t *testing.T) (resources.Node, func()) {
})
require.NoError(t, err)

require.NoError(t, dbnode.WaitForBootstrap())
require.NoError(t, coord.WaitForShardsReady())
require.NoError(t, coord.WaitForClusterReady())

return dbnode, func() {
return dbnode, coord, func() {
assert.NoError(t, coord.Close())
assert.NoError(t, dbnode.Close())
}
Expand Down
Loading

0 comments on commit c9db6db

Please sign in to comment.