Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split labels/series API endpoints in query frontend #3276

Merged
merged 16 commits into from
Oct 13, 2020
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1
github.com/pmezard/go-difflib v1.0.0
github.com/prometheus/alertmanager v0.21.0
github.com/prometheus/alertmanager v0.21.1-0.20200911160112-1fdff6b3f939
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.13.0
github.com/prometheus/prometheus v1.8.2-0.20200819132913-cb830b0a9c78
github.com/prometheus/common v0.14.0
github.com/prometheus/prometheus v1.8.2-0.20200923143134-7e2db3d092f3
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible
github.com/weaveworks/common v0.0.0-20200914083218-61ffdd448099
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -865,8 +865,9 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/prometheus/alertmanager v0.19.0/go.mod h1:Eyp94Yi/T+kdeb2qvq66E3RGuph5T/jm/RBVh4yz1xo=
github.com/prometheus/alertmanager v0.21.0 h1:qK51JcUR9l/unhawGA9F9B64OCYfcGewhPNprem/Acc=
github.com/prometheus/alertmanager v0.21.0/go.mod h1:h7tJ81NA0VLWvWEayi1QltevFkLF3KxmC/malTcT8Go=
github.com/prometheus/alertmanager v0.21.1-0.20200911160112-1fdff6b3f939 h1:/gGoc4W45469qMuGGEMArYEs8wsk31/5oE56NUGjEN0=
github.com/prometheus/alertmanager v0.21.1-0.20200911160112-1fdff6b3f939/go.mod h1:imXRHOP6QTsE0fFsIsAV/cXimS32m7gVZOiUj11m6Ig=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
Expand Down Expand Up @@ -898,8 +899,10 @@ github.com/prometheus/common v0.8.0/go.mod h1:PC/OgXc+UN7B4ALwvn1yzVZmVwvhXp5Jsb
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.11.1/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/common v0.13.0 h1:vJlpe9wPgDRM1Z+7Wj3zUUjY1nr6/1jNKyl7llliccg=
github.com/prometheus/common v0.12.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/common v0.13.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/common v0.14.0 h1:RHRyE8UocrbjU+6UvRzwi6HjiDfxrrBU91TtbKzkGp4=
github.com/prometheus/common v0.14.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289 h1:dTUS1vaLWq+Y6XKOTnrFpoVsQKLCbCp1OLj24TDi7oM=
github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289/go.mod h1:FGbBv5OPKjch+jNUJmEQpMZytIdyW0NdBtWFcfSKusc=
github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
Expand Down
5 changes: 3 additions & 2 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
PartialResponseParam = "partial_response"
MaxSourceResolutionParam = "max_source_resolution"
ReplicaLabelsParam = "replicaLabels[]"
MatcherParam = "match[]"
StoreMatcherParam = "storeMatch[]"
)

Expand Down Expand Up @@ -459,7 +460,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "parse form")}
}

if len(r.Form["match[]"]) == 0 {
if len(r.Form[MatcherParam]) == 0 {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.New("no match[] parameter provided")}
}

Expand All @@ -469,7 +470,7 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr
}

var matcherSets [][]*labels.Matcher
for _, s := range r.Form["match[]"] {
for _, s := range r.Form[MatcherParam] {
matchers, err := parser.ParseMetricSelector(s)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
Expand Down
327 changes: 327 additions & 0 deletions pkg/queryfrontend/metadata_codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,327 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package queryfrontend

import (
"bytes"
"context"
"encoding/json"
"io/ioutil"
"net/http"
"net/url"
"sort"
"strconv"
"strings"

"github.com/cortexproject/cortex/pkg/querier/queryrange"
cortexutil "github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/weaveworks/common/httpgrpc"

queryv1 "github.com/thanos-io/thanos/pkg/api/query"
)

type metadataCodec struct {
queryrange.Codec
partialResponse bool
}

func NewThanosMetadataCodec(partialResponse bool) *metadataCodec {
return &metadataCodec{
Codec: queryrange.PrometheusCodec,
partialResponse: partialResponse,
}
}

func (c metadataCodec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) {
if len(responses) == 0 {
return &ThanosLabelsResponse{
Status: queryrange.StatusSuccess,
Data: []string{},
}, nil
}

switch responses[0].(type) {
case *ThanosLabelsResponse:
set := make(map[string]struct{})

for _, res := range responses {
for _, value := range res.(*ThanosLabelsResponse).Data {
if _, ok := set[value]; !ok {
set[value] = struct{}{}
}
}
}
labels := make([]string, 0, len(set))
for label := range set {
labels = append(labels, label)
}

sort.Strings(labels)
return &ThanosLabelsResponse{
Status: queryrange.StatusSuccess,
Data: labels,
}, nil
case *ThanosSeriesResponse:
metadataResponses := make([]*ThanosSeriesResponse, 0, len(responses))

for _, res := range responses {
metadataResponses = append(metadataResponses, res.(*ThanosSeriesResponse))
}

return &ThanosSeriesResponse{
Status: queryrange.StatusSuccess,
// TODO: fix this
Data: metadataResponses[0].Data,
}, nil
default:
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid response format")
}
}

func (c metadataCodec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Request, error) {
if err := r.ParseForm(); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

var (
req queryrange.Request
err error
)
switch op := getOperation(r); op {
case labelNamesOp, labelValuesOp:
req, err = c.parseLabelsRequest(r, op)
case seriesOp:
req, err = c.parseSeriesRequest(r)
}
if err != nil {
return nil, err
}

return req, nil
}

func (c metadataCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) {
var u *url.URL
switch thanosReq := r.(type) {
case *ThanosLabelsRequest:
var params = url.Values{
"start": []string{encodeTime(thanosReq.Start)},
"end": []string{encodeTime(thanosReq.End)},
queryv1.PartialResponseParam: []string{strconv.FormatBool(thanosReq.PartialResponse)},
}
if len(thanosReq.StoreMatchers) > 0 {
params[queryv1.StoreMatcherParam] = matchersToStringSlice(thanosReq.StoreMatchers)
}
u = &url.URL{
Path: thanosReq.Path,
RawQuery: params.Encode(),
}
case *ThanosSeriesRequest:
var params = url.Values{
"start": []string{encodeTime(thanosReq.Start)},
"end": []string{encodeTime(thanosReq.End)},
queryv1.DedupParam: []string{strconv.FormatBool(thanosReq.Dedup)},
queryv1.PartialResponseParam: []string{strconv.FormatBool(thanosReq.PartialResponse)},
queryv1.ReplicaLabelsParam: thanosReq.ReplicaLabels,
}
if len(thanosReq.Matchers) > 0 {
params[queryv1.MatcherParam] = matchersToStringSlice(thanosReq.Matchers)
}
if len(thanosReq.StoreMatchers) > 0 {
params[queryv1.StoreMatcherParam] = matchersToStringSlice(thanosReq.StoreMatchers)
}
u = &url.URL{
Path: thanosReq.Path,
RawQuery: params.Encode(),
}
default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request format")
}

req := &http.Request{
Method: "GET",
RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u,
Body: http.NoBody,
Header: http.Header{},
}

return req.WithContext(ctx), nil
}

func (c metadataCodec) DecodeResponse(ctx context.Context, r *http.Response, req queryrange.Request) (queryrange.Response, error) {
if r.StatusCode/100 != 2 {
body, _ := ioutil.ReadAll(r.Body)
return nil, httpgrpc.Errorf(r.StatusCode, string(body))
}
log, ctx := spanlogger.New(ctx, "ParseQueryResponse") //nolint:ineffassign,staticcheck
defer log.Finish()

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Error(err)
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}

log.LogFields(otlog.Int("bytes", len(buf)))

switch req.(type) {
case *ThanosLabelsRequest:
var resp ThanosLabelsResponse
if err := json.Unmarshal(buf, &resp); err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}
return &resp, nil
case *ThanosSeriesRequest:
var resp ThanosSeriesResponse
if err := json.Unmarshal(buf, &resp); err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}
return &resp, nil
default:
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid response format")
}
}

func (c metadataCodec) EncodeResponse(ctx context.Context, res queryrange.Response) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse")
defer sp.Finish()

var (
b []byte
err error
)
switch resp := res.(type) {
case *ThanosLabelsResponse:
sp.LogFields(otlog.Int("labels", len(resp.Data)))
b, err = json.Marshal(resp)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err)
}
case *ThanosSeriesResponse:
sp.LogFields(otlog.Int("series", len(resp.Data)))
b, err = json.Marshal(resp)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err)
}
default:
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid response format")
}

sp.LogFields(otlog.Int("bytes", len(b)))
resp := http.Response{
Header: http.Header{
"Content-Type": []string{"application/json"},
},
Body: ioutil.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
}
return &resp, nil
}

func (c metadataCodec) parseLabelsRequest(r *http.Request, op string) (queryrange.Request, error) {
var (
result ThanosLabelsRequest
err error
)
result.Start, err = cortexutil.ParseTime(r.FormValue("start"))
if err != nil {
return nil, err
}

result.End, err = cortexutil.ParseTime(r.FormValue("end"))
if err != nil {
return nil, err
}

if result.End < result.Start {
return nil, errEndBeforeStart
}

result.PartialResponse, err = parsePartialResponseParam(r.FormValue(queryv1.PartialResponseParam), c.partialResponse)
if err != nil {
return nil, err
}

result.StoreMatchers, err = parseMatchersParam(r.Form[queryv1.StoreMatcherParam])
if err != nil {
return nil, err
}

result.Path = r.URL.Path

if op == labelValuesOp {
parts := strings.Split(r.URL.Path, "/")
if len(parts) > 1 {
result.Label = parts[len(parts)-2]
}
}

for _, value := range r.Header.Values(cacheControlHeader) {
if strings.Contains(value, noStoreValue) {
result.CachingOptions.Disabled = true
break
}
}

return &result, nil
}

func (c metadataCodec) parseSeriesRequest(r *http.Request) (queryrange.Request, error) {
var (
result ThanosSeriesRequest
err error
)
result.Start, err = cortexutil.ParseTime(r.FormValue("start"))
if err != nil {
return nil, err
}

result.End, err = cortexutil.ParseTime(r.FormValue("end"))
if err != nil {
return nil, err
}

if result.End < result.Start {
return nil, errEndBeforeStart
}

result.Matchers, err = parseMatchersParam(r.Form[queryv1.MatcherParam])
if err != nil {
return nil, err
}

result.Dedup, err = parseEnableDedupParam(r.FormValue(queryv1.DedupParam))
if err != nil {
return nil, err
}

result.PartialResponse, err = parsePartialResponseParam(r.FormValue(queryv1.PartialResponseParam), c.partialResponse)
if err != nil {
return nil, err
}

if len(r.Form[queryv1.ReplicaLabelsParam]) > 0 {
result.ReplicaLabels = r.Form[queryv1.ReplicaLabelsParam]
}

result.StoreMatchers, err = parseMatchersParam(r.Form[queryv1.StoreMatcherParam])
if err != nil {
return nil, err
}

result.Path = r.URL.Path

for _, value := range r.Header.Values(cacheControlHeader) {
if strings.Contains(value, noStoreValue) {
result.CachingOptions.Disabled = true
break
}
}

return &result, nil
}
Loading