Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support label matchers in labels API #3566

Merged
merged 4 commits into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 103 additions & 5 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"math"
"net/http"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -439,16 +440,39 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, false).
var matcherSets [][]*labels.Matcher
for _, s := range r.Form[MatcherParam] {
matchers, err := parser.ParseMetricSelector(s)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
matcherSets = append(matcherSets, matchers)
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, true).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a nit, just wanted to ask a question. Why did you flip the last argument (skipChunks) to true, and what it does exactly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A query result contains two parts, the series metadata like label sets, and the sample values. In this case, since we are just querying labels, we can set skipChunks to not return samples, only labels are fine.

Copy link
Contributor Author

@yeya24 yeya24 Dec 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the original issue about this option #1822.
Also this Prometheus pr prometheus/prometheus#8050 has the same idea to improve the performance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks for the answer. :)

Querier(ctx, timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
defer runutil.CloseWithLogOnErr(qapi.logger, q, "queryable labelValues")

// TODO(fabxc): add back request context.
var (
vals []string
warnings storage.Warnings
)
// TODO(yeya24): push down matchers to Store level.
if len(matcherSets) > 0 {
// Get all series which match matchers.
var sets []storage.SeriesSet
for _, mset := range matcherSets {
s := q.Select(false, nil, mset...)
sets = append(sets, s)
}
vals, warnings, err = labelValuesByMatchers(sets, name)
} else {
vals, warnings, err = q.LabelValues(name)
}

vals, warnings, err := q.LabelValues(name)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
Expand Down Expand Up @@ -544,14 +568,39 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, false).
var matcherSets [][]*labels.Matcher
for _, s := range r.Form[MatcherParam] {
matchers, err := parser.ParseMetricSelector(s)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
matcherSets = append(matcherSets, matchers)
}

q, err := qapi.queryableCreate(true, nil, storeDebugMatchers, 0, enablePartialResponse, true).
Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
defer runutil.CloseWithLogOnErr(qapi.logger, q, "queryable labelNames")

names, warnings, err := q.LabelNames()
var (
names []string
warnings storage.Warnings
)
// TODO(yeya24): push down matchers to Store level.
if len(matcherSets) > 0 {
// Get all series which match matchers.
var sets []storage.SeriesSet
for _, mset := range matcherSets {
s := q.Select(false, nil, mset...)
sets = append(sets, s)
}
names, warnings, err = labelNamesByMatchers(sets)
} else {
names, warnings, err = q.LabelNames()
}

if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
Expand Down Expand Up @@ -673,3 +722,52 @@ func parseDuration(s string) (time.Duration, error) {
}
return 0, errors.Errorf("cannot parse %q to a valid duration", s)
}

// Modified from https://github.com/eklockare/prometheus/blob/6178-matchers-with-label-values/web/api/v1/api.go#L571-L591.
// labelNamesByMatchers uses matchers to filter out matching series, then label names are extracted.
func labelNamesByMatchers(sets []storage.SeriesSet) ([]string, storage.Warnings, error) {
set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
labelNamesSet := make(map[string]struct{})
for set.Next() {
series := set.At()
for _, lb := range series.Labels() {
labelNamesSet[lb.Name] = struct{}{}
}
}

warnings := set.Warnings()
if set.Err() != nil {
return nil, warnings, set.Err()
}
// Convert the map to an array.
labelNames := make([]string, 0, len(labelNamesSet))
for key := range labelNamesSet {
labelNames = append(labelNames, key)
}
sort.Strings(labelNames)
return labelNames, warnings, nil
}

// Modified from https://github.com/eklockare/prometheus/blob/6178-matchers-with-label-values/web/api/v1/api.go#L571-L591.
// LabelValuesByMatchers uses matchers to filter out matching series, then label values are extracted.
func labelValuesByMatchers(sets []storage.SeriesSet, name string) ([]string, storage.Warnings, error) {
set := storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
labelValuesSet := make(map[string]struct{})
for set.Next() {
series := set.At()
labelValue := series.Labels().Get(name)
labelValuesSet[labelValue] = struct{}{}
}

warnings := set.Warnings()
if set.Err() != nil {
return nil, warnings, set.Err()
}
// Convert the map to an array.
labelValues := make([]string, 0, len(labelValuesSet))
for key := range labelValuesSet {
labelValues = append(labelValues, key)
}
sort.Strings(labelValues)
return labelValues, warnings, nil
}
79 changes: 67 additions & 12 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,9 +745,6 @@ func TestMetadataEndpoints(t *testing.T) {
},
{
endpoint: api.labelNames,
params: map[string]string{
"name": "__name__",
},
response: []string{
"__name__",
"foo",
Expand All @@ -757,9 +754,6 @@ func TestMetadataEndpoints(t *testing.T) {
},
{
endpoint: apiWithLabelLookback.labelNames,
params: map[string]string{
"name": "foo",
},
response: []string{
"__name__",
"foo",
Expand All @@ -773,9 +767,6 @@ func TestMetadataEndpoints(t *testing.T) {
"start": []string{"1970-01-01T00:00:00Z"},
"end": []string{"1970-01-01T00:09:00Z"},
},
params: map[string]string{
"name": "foo",
},
response: []string{
"__name__",
"foo",
Expand All @@ -787,14 +778,78 @@ func TestMetadataEndpoints(t *testing.T) {
"start": []string{"1970-01-01T00:00:00Z"},
"end": []string{"1970-01-01T00:09:00Z"},
},
params: map[string]string{
"name": "foo",
},
response: []string{
"__name__",
"foo",
},
},
// Failed, to parse matchers.
{
endpoint: api.labelNames,
query: url.Values{
"match[]": []string{`{xxxx`},
},
errType: baseAPI.ErrorBadData,
},
// Failed to parse matchers.
{
endpoint: api.labelValues,
query: url.Values{
"match[]": []string{`{xxxx`},
},
params: map[string]string{
"name": "__name__",
},
errType: baseAPI.ErrorBadData,
},
{
endpoint: api.labelNames,
query: url.Values{
"match[]": []string{`test_metric_replica2`},
},
response: []string{"__name__", "foo", "replica1"},
},
{
endpoint: api.labelValues,
query: url.Values{
"match[]": []string{`test_metric_replica2`},
},
params: map[string]string{
"name": "__name__",
},
response: []string{"test_metric_replica2"},
},
{
endpoint: api.labelValues,
query: url.Values{
"match[]": []string{`{foo="bar"}`, `{foo="boo"}`},
},
params: map[string]string{
"name": "__name__",
},
response: []string{"test_metric1", "test_metric2", "test_metric_replica1", "test_metric_replica2"},
},
// No matched series.
{
endpoint: api.labelValues,
query: url.Values{
"match[]": []string{`{foo="yolo"}`},
},
params: map[string]string{
"name": "__name__",
},
response: []string{},
},
{
endpoint: api.labelValues,
query: url.Values{
"match[]": []string{`test_metric_replica2`},
},
params: map[string]string{
"name": "replica1",
},
response: []string{"a"},
},
// Bad name parameter.
{
endpoint: api.labelValues,
Expand Down
10 changes: 8 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,11 +672,14 @@ func (c *Client) SeriesInGRPC(ctx context.Context, base *url.URL, matchers []sto

// LabelNames returns all known label names. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, matchers []storepb.LabelMatcher, startTime, endTime int64) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/labels")
q := u.Query()

if len(matchers) > 0 {
q.Add("match[]", storepb.MatchersToString(matchers...))
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
u.RawQuery = q.Encode()
Expand All @@ -689,11 +692,14 @@ func (c *Client) LabelNamesInGRPC(ctx context.Context, base *url.URL, startTime,

// LabelValuesInGRPC returns all known label values for a given label name. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, startTime, endTime int64) ([]string, error) {
func (c *Client) LabelValuesInGRPC(ctx context.Context, base *url.URL, label string, matchers []storepb.LabelMatcher, startTime, endTime int64) ([]string, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/label/", label, "/values")
q := u.Query()

if len(matchers) > 0 {
q.Add("match[]", storepb.MatchersToString(matchers...))
}
q.Add("start", formatTime(timestamp.Time(startTime)))
q.Add("end", formatTime(timestamp.Time(endTime)))
u.RawQuery = q.Encode()
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin

// LabelNames returns all known label names.
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
lbls, err := p.client.LabelNamesInGRPC(ctx, p.base, r.Start, r.End)
lbls, err := p.client.LabelNamesInGRPC(ctx, p.base, nil, r.Start, r.End)
if err != nil {
return nil, err
}
Expand All @@ -499,7 +499,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
return &storepb.LabelValuesResponse{Values: []string{l}}, nil
}

vals, err := p.client.LabelValuesInGRPC(ctx, p.base, r.Label, r.Start, r.End)
vals, err := p.client.LabelValuesInGRPC(ctx, p.base, r.Label, nil, r.Start, r.End)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions test/e2e/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestQueryFrontend(t *testing.T) {

t.Run("query frontend splitting works for labels names API", func(t *testing.T) {
// LabelNames and LabelValues API should still work via query frontend.
labelNames(t, ctx, queryFrontend.HTTPEndpoint(), timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
labelNames(t, ctx, queryFrontend.HTTPEndpoint(), nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
return len(res) > 0
})
testutil.Ok(t, q.WaitSumMetricsWithOptions(
Expand All @@ -241,7 +241,7 @@ func TestQueryFrontend(t *testing.T) {
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "tripperware", "labels"))),
)

labelNames(t, ctx, queryFrontend.HTTPEndpoint(), timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
labelNames(t, ctx, queryFrontend.HTTPEndpoint(), nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
return len(res) > 0
})
testutil.Ok(t, q.WaitSumMetricsWithOptions(
Expand All @@ -262,7 +262,7 @@ func TestQueryFrontend(t *testing.T) {
})

t.Run("query frontend splitting works for labels values API", func(t *testing.T) {
labelValues(t, ctx, queryFrontend.HTTPEndpoint(), "instance", timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
labelValues(t, ctx, queryFrontend.HTTPEndpoint(), "instance", nil, timestamp.FromTime(now.Add(-time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
return len(res) == 1 && res[0] == "localhost:9090"
})
testutil.Ok(t, q.WaitSumMetricsWithOptions(
Expand All @@ -281,7 +281,7 @@ func TestQueryFrontend(t *testing.T) {
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "tripperware", "labels"))),
)

labelValues(t, ctx, queryFrontend.HTTPEndpoint(), "instance", timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
labelValues(t, ctx, queryFrontend.HTTPEndpoint(), "instance", nil, timestamp.FromTime(now.Add(-24*time.Hour)), timestamp.FromTime(now.Add(time.Hour)), func(res []string) bool {
return len(res) == 1 && res[0] == "localhost:9090"
})
testutil.Ok(t, q.WaitSumMetricsWithOptions(
Expand Down
Loading