Skip to content

Commit

Permalink
Feature: add /api/v1/labels support
Browse files Browse the repository at this point in the history
Signed-off-by: jojohappy <[email protected]>
  • Loading branch information
jojohappy committed Mar 11, 2019
1 parent 432785e commit dc1498f
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 9 deletions.
34 changes: 34 additions & 0 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log.
r.Get("/label/:name/values", instr("label_values", api.labelValues))

r.Get("/series", instr("series", api.series))

r.Get("/labels", instr("label_names", api.labelNames))
}

type queryData struct {
Expand Down Expand Up @@ -611,3 +613,35 @@ func parseDuration(s string) (time.Duration, error) {
}
return 0, fmt.Errorf("cannot parse %q to a valid duration", s)
}

func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {
ctx := r.Context()

enablePartialResponse, apiErr := api.parsePartialResponseParam(r)
if apiErr != nil {
return nil, nil, apiErr
}

var (
warnmtx sync.Mutex
warnings []error
)
warningReporter := func(err error) {
warnmtx.Lock()
warnings = append(warnings, err)
warnmtx.Unlock()
}

q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
defer runutil.CloseWithLogOnErr(api.logger, q, "queryable labelNames")

names, err := q.LabelNames()
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}

return names, warnings, nil
}
15 changes: 13 additions & 2 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,20 @@ func (q *querier) LabelValues(name string) ([]string, error) {
}

// LabelNames returns all the unique label names present in the block in sorted order.
// TODO(bwplotka): Consider adding labelNames to thanos Query API https://github.com/improbable-eng/thanos/issues/702.
func (q *querier) LabelNames() ([]string, error) {
return nil, errors.New("not implemented")
span, ctx := tracing.StartSpan(q.ctx, "querier_label_names")
defer span.Finish()

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{PartialResponseDisabled: !q.partialResponse})
if err != nil {
return nil, errors.Wrap(err, "proxy LabelNames()")
}

for _, w := range resp.Warnings {
q.warningReporter(errors.New(w))
}

return resp.Names, nil
}

func (q *querier) Close() error {
Expand Down
42 changes: 40 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,37 @@ func chunksSize(chks []storepb.AggrChunk) (size int) {
}

// LabelNames implements the storepb.StoreServer interface.
func (s *BucketStore) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
return nil, status.Error(codes.Unimplemented, "not implemented")
func (s *BucketStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
g, gctx := errgroup.WithContext(ctx)

s.mtx.RLock()

var mtx sync.Mutex
var sets [][]string

for _, b := range s.blocks {
indexr := b.indexReader(gctx)
g.Go(func() error {
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names")

res := indexr.LabelNames()

mtx.Lock()
sets = append(sets, res)
mtx.Unlock()

return nil
})
}

s.mtx.RUnlock()

if err := g.Wait(); err != nil {
return nil, status.Error(codes.Aborted, err.Error())
}
return &storepb.LabelNamesResponse{
Names: strutil.MergeSlices(sets...),
}, nil
}

// LabelValues implements the storepb.StoreServer interface.
Expand Down Expand Up @@ -1537,6 +1566,15 @@ func (r *bucketIndexReader) LabelValues(name string) []string {
return res
}

// LabelNames returns a list of label names.
func (r *bucketIndexReader) LabelNames() []string {
res := make([]string, 0, len(r.block.lvals))
for ln, _ := range r.block.lvals {
res = append(res, ln)
}
return res
}

// Close released the underlying resources of the reader.
func (r *bucketIndexReader) Close() error {
r.block.pendingReaders.Done()
Expand Down
27 changes: 26 additions & 1 deletion pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,32 @@ func extendLset(lset []storepb.Label, extend labels.Labels) []storepb.Label {
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
*storepb.LabelNamesResponse, error,
) {
return nil, status.Error(codes.Unimplemented, "not implemented")
u := *p.base
u.Path = path.Join(u.Path, "/api/v1/labels")

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}

span, ctx := tracing.StartSpan(ctx, "/prom_label_names HTTP[client]")
defer span.Finish()

resp, err := p.client.Do(req.WithContext(ctx))
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label names request body")

var m struct {
Data []string `json:"data"`
}
if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
sort.Strings(m.Data)

return &storepb.LabelNamesResponse{Names: m.Data}, nil
}

// LabelValues returns all known label values for a given label name.
Expand Down
35 changes: 35 additions & 0 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,41 @@ func TestPrometheusStore_ExternalLabelValues_e2e(t *testing.T) {
testutil.Equals(t, []string{"b"}, resp.Values)
}

func TestPrometheusStore_LabelNames_e2e(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

p, err := testutil.NewPrometheus()
testutil.Ok(t, err)

a := p.Appender()
_, err = a.Add(labels.FromStrings("a", "b"), 0, 1)
testutil.Ok(t, err)
_, err = a.Add(labels.FromStrings("b", "c"), 0, 1)
testutil.Ok(t, err)
_, err = a.Add(labels.FromStrings("c", "a"), 0, 1)
testutil.Ok(t, err)
_, err = a.Add(labels.FromStrings("a", "d"), 0, 1)
testutil.Ok(t, err)
testutil.Ok(t, a.Commit())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

proxy, err := NewPrometheusStore(nil, nil, u, component.Sidecar, getExternalLabels, nil)
testutil.Ok(t, err)

resp, err := proxy.LabelNames(ctx, &storepb.LabelNamesRequest{})
testutil.Ok(t, err)

testutil.Equals(t, []string{"a", "b", "c"}, resp.Names)
}

func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

Expand Down
47 changes: 46 additions & 1 deletion pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,52 @@ func storeMatches(s Client, mint, maxt int64, matchers ...storepb.LabelMatcher)
func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
*storepb.LabelNamesResponse, error,
) {
return nil, status.Error(codes.Unimplemented, "not implemented")
var (
warnings []string
names [][]string
mtx sync.Mutex
g, gctx = errgroup.WithContext(ctx)
)

stores, err := s.stores(ctx)
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
}
for _, st := range stores {
st := st
g.Go(func() error {
resp, err := st.LabelNames(gctx, &storepb.LabelNamesRequest{
PartialResponseDisabled: r.PartialResponseDisabled,
})
if err != nil {
err = errors.Wrapf(err, "fetch label names from store %s", st)
if r.PartialResponseDisabled {
return err
}

mtx.Lock()
warnings = append(warnings, errors.Wrap(err, "fetch label names").Error())
mtx.Unlock()
return nil
}

mtx.Lock()
warnings = append(warnings, resp.Warnings...)
names = append(names, resp.Names)
mtx.Unlock()

return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}

return &storepb.LabelNamesResponse{
Names: strutil.MergeUnsortedSlices(names...),
Warnings: warnings,
}, nil
}

// LabelValues returns all known label values for a given label name.
Expand Down
45 changes: 44 additions & 1 deletion pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,45 @@ func TestProxyStore_LabelValues(t *testing.T) {
testutil.Equals(t, 1, len(resp.Warnings))
}

func TestProxyStore_LabelNames(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

m1 := &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"a", "b"},
Warnings: []string{"warning"},
},
}

m2 := &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"a", "c", "d"},
},
}

cls := []Client{
&testClient{StoreClient: m1},
&testClient{StoreClient: m2},
}

q := NewProxyStore(nil,
func(context.Context) ([]Client, error) { return cls, nil },
component.Query,
nil,
)

ctx := context.Background()
req := &storepb.LabelNamesRequest{
PartialResponseDisabled: true,
}
resp, err := q.LabelValues(ctx, req)
testutil.Ok(t, err)
testutil.Assert(t, proto.Equal(req, m1.LastLabelNamesReq), "request was not proxied properly to underlying storeAPI: %s vs %s", req, m1.LastLabelNamesReq)

testutil.Equals(t, []string{"a", "b", "c", "d"}, resp.Names)
testutil.Equals(t, 1, len(resp.Warnings))
}

type rawSeries struct {
lset []storepb.Label
samples []sample
Expand Down Expand Up @@ -716,10 +755,12 @@ func (s *storeSeriesServer) Context() context.Context {
type mockedStoreAPI struct {
RespSeries []*storepb.SeriesResponse
RespLabelValues *storepb.LabelValuesResponse
RespLabelNames *storepb.LabelNamesResponse
RespError error

LastSeriesReq *storepb.SeriesRequest
LastLabelValuesReq *storepb.LabelValuesRequest
LastLabelNamesReq *storepb.LabelNamesRequest
}

func (s *mockedStoreAPI) Info(ctx context.Context, req *storepb.InfoRequest, _ ...grpc.CallOption) (*storepb.InfoResponse, error) {
Expand All @@ -733,7 +774,9 @@ func (s *mockedStoreAPI) Series(ctx context.Context, req *storepb.SeriesRequest,
}

func (s *mockedStoreAPI) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest, _ ...grpc.CallOption) (*storepb.LabelNamesResponse, error) {
return nil, status.Error(codes.Unimplemented, "not implemented")
s.LastLabelNamesReq = req

return s.RespLabelNames, s.RespError
}

func (s *mockedStoreAPI) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest, _ ...grpc.CallOption) (*storepb.LabelValuesResponse, error) {
Expand Down
14 changes: 12 additions & 2 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,20 @@ func (s *TSDBStore) translateAndExtendLabels(m, extend labels.Labels) []storepb.
}

// LabelNames returns all known label names.
func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
func (s *TSDBStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) (
*storepb.LabelNamesResponse, error,
) {
return nil, status.Error(codes.Unimplemented, "not implemented")
q, err := s.db.Querier(math.MinInt64, math.MaxInt64)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer runutil.CloseWithLogOnErr(s.logger, q, "close tsdb querier label names")

res, err := q.LabelNames()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &storepb.LabelNamesResponse{Names: res}, nil
}

// LabelValues returns all known label values for a given label name.
Expand Down

0 comments on commit dc1498f

Please sign in to comment.