Skip to content

Commit

Permalink
add metadata codec implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 committed Oct 5, 2020
1 parent 109b18e commit 3e84336
Show file tree
Hide file tree
Showing 10 changed files with 653 additions and 77 deletions.
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1
github.com/pmezard/go-difflib v1.0.0
github.com/prometheus/alertmanager v0.21.0
github.com/prometheus/alertmanager v0.21.1-0.20200911160112-1fdff6b3f939
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.13.0
github.com/prometheus/prometheus v1.8.2-0.20200819132913-cb830b0a9c78
github.com/prometheus/common v0.14.0
github.com/prometheus/prometheus v1.8.2-0.20200923143134-7e2db3d092f3
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible
github.com/weaveworks/common v0.0.0-20200914083218-61ffdd448099
Expand All @@ -74,6 +74,7 @@ replace (
// Using a 3rd-party branch for custom dialer - see https://github.com/bradfitz/gomemcache/pull/86.
// Required by Cortex https://github.com/cortexproject/cortex/pull/3051.
github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab
//github.com/cortexproject/cortex => github.com/yeya24/cortex v0.2.0-rc.0.0.20200928222317-4caeb2cae5ab
// Update to v1.1.1 to make sure windows CI pass.
github.com/elastic/go-sysinfo => github.com/elastic/go-sysinfo v1.1.1
// Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs.
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -865,8 +865,9 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/prometheus/alertmanager v0.19.0/go.mod h1:Eyp94Yi/T+kdeb2qvq66E3RGuph5T/jm/RBVh4yz1xo=
github.com/prometheus/alertmanager v0.21.0 h1:qK51JcUR9l/unhawGA9F9B64OCYfcGewhPNprem/Acc=
github.com/prometheus/alertmanager v0.21.0/go.mod h1:h7tJ81NA0VLWvWEayi1QltevFkLF3KxmC/malTcT8Go=
github.com/prometheus/alertmanager v0.21.1-0.20200911160112-1fdff6b3f939 h1:/gGoc4W45469qMuGGEMArYEs8wsk31/5oE56NUGjEN0=
github.com/prometheus/alertmanager v0.21.1-0.20200911160112-1fdff6b3f939/go.mod h1:imXRHOP6QTsE0fFsIsAV/cXimS32m7gVZOiUj11m6Ig=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM=
Expand Down Expand Up @@ -898,8 +899,10 @@ github.com/prometheus/common v0.8.0/go.mod h1:PC/OgXc+UN7B4ALwvn1yzVZmVwvhXp5Jsb
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.11.1/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/common v0.13.0 h1:vJlpe9wPgDRM1Z+7Wj3zUUjY1nr6/1jNKyl7llliccg=
github.com/prometheus/common v0.12.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/common v0.13.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/common v0.14.0 h1:RHRyE8UocrbjU+6UvRzwi6HjiDfxrrBU91TtbKzkGp4=
github.com/prometheus/common v0.14.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289 h1:dTUS1vaLWq+Y6XKOTnrFpoVsQKLCbCp1OLj24TDi7oM=
github.com/prometheus/node_exporter v1.0.0-rc.0.0.20200428091818-01054558c289/go.mod h1:FGbBv5OPKjch+jNUJmEQpMZytIdyW0NdBtWFcfSKusc=
github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
Expand Down
1 change: 1 addition & 0 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
PartialResponseParam = "partial_response"
MaxSourceResolutionParam = "max_source_resolution"
ReplicaLabelsParam = "replicaLabels[]"
MatcherParam = "match[]"
StoreMatcherParam = "storeMatch[]"
)

Expand Down
321 changes: 321 additions & 0 deletions pkg/queryfrontend/metadata_codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
package queryfrontend

import (
"bytes"
"context"
"encoding/json"
"io/ioutil"
"net/http"
"net/url"
"sort"
"strconv"
"strings"

"github.com/cortexproject/cortex/pkg/querier/queryrange"
cortexutil "github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/weaveworks/common/httpgrpc"

queryv1 "github.com/thanos-io/thanos/pkg/api/query"
)

type metadataCodec struct {
queryrange.Codec
partialResponse bool
}

func NewThanosMetadataCodec(partialResponse bool) *metadataCodec {
return &metadataCodec{
Codec: queryrange.PrometheusCodec,
partialResponse: partialResponse,
}
}

func (c metadataCodec) MergeResponse(responses ...queryrange.Response) (queryrange.Response, error) {
if len(responses) == 0 {
return &ThanosLabelsResponse{
Status: queryrange.StatusSuccess,
Data: []string{},
}, nil
}

switch responses[0].(type) {
case *ThanosLabelsResponse:
set := make(map[string]struct{})

for _, res := range responses {
for _, value := range res.(*ThanosLabelsResponse).Data {
set[value] = struct{}{}
}
}
labels := make([]string, 0, len(set))
for label := range set {
labels = append(labels, label)
}

sort.Strings(labels)
return &ThanosLabelsResponse{
Status: queryrange.StatusSuccess,
Data: labels,
}, nil
case *ThanosSeriesResponse:
metadataResponses := make([]*ThanosSeriesResponse, 0, len(responses))

for _, res := range responses {
metadataResponses = append(metadataResponses, res.(*ThanosSeriesResponse))
}

return &ThanosSeriesResponse{
Status: queryrange.StatusSuccess,
Data: metadataResponses[0].Data,
}, nil
default:
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid response format")
}
}

func (c metadataCodec) DecodeRequest(_ context.Context, r *http.Request) (queryrange.Request, error) {
if err := r.ParseForm(); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

var (
req queryrange.Request
err error
)
switch op := getOperation(r); op {
case labelNamesOp, labelValuesOp:
req, err = c.parseLabelsRequest(r, op)
case seriesOp:
req, err = c.parseSeriesRequest(r)
}
if err != nil {
return nil, err
}

return req, nil
}

func (c metadataCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Request, error) {
var u *url.URL
switch thanosReq := r.(type) {
case *ThanosLabelsRequest:
var params = url.Values{
"start": []string{encodeTime(thanosReq.Start)},
"end": []string{encodeTime(thanosReq.End)},
queryv1.PartialResponseParam: []string{strconv.FormatBool(thanosReq.PartialResponse)},
}
if len(thanosReq.StoreMatchers) > 0 {
params[queryv1.StoreMatcherParam] = matchersToStringSlice(thanosReq.StoreMatchers)
}
u = &url.URL{
Path: thanosReq.Path,
RawQuery: params.Encode(),
}
case *ThanosSeriesRequest:
var params = url.Values{
"start": []string{encodeTime(thanosReq.Start)},
"end": []string{encodeTime(thanosReq.End)},
queryv1.DedupParam: []string{strconv.FormatBool(thanosReq.Dedup)},
queryv1.PartialResponseParam: []string{strconv.FormatBool(thanosReq.PartialResponse)},
queryv1.ReplicaLabelsParam: thanosReq.ReplicaLabels,
}
if len(thanosReq.Matchers) > 0 {
params[queryv1.MatcherParam] = matchersToStringSlice(thanosReq.Matchers)
}
if len(thanosReq.StoreMatchers) > 0 {
params[queryv1.StoreMatcherParam] = matchersToStringSlice(thanosReq.StoreMatchers)
}
u = &url.URL{
Path: thanosReq.Path,
RawQuery: params.Encode(),
}
default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request format")
}

req := &http.Request{
Method: "GET",
RequestURI: u.String(), // This is what the httpgrpc code looks at.
URL: u,
Body: http.NoBody,
Header: http.Header{},
}

return req.WithContext(ctx), nil
}

func (c metadataCodec) DecodeResponse(ctx context.Context, r *http.Response, req queryrange.Request) (queryrange.Response, error) {
if r.StatusCode/100 != 2 {
body, _ := ioutil.ReadAll(r.Body)
return nil, httpgrpc.Errorf(r.StatusCode, string(body))
}
log, ctx := spanlogger.New(ctx, "ParseQueryResponse") //nolint:ineffassign,staticcheck
defer log.Finish()

buf, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Error(err)
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}

log.LogFields(otlog.Int("bytes", len(buf)))

switch req.(type) {
case *ThanosLabelsRequest:
var resp ThanosLabelsResponse
if err := json.Unmarshal(buf, &resp); err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}
return &resp, nil
case *ThanosSeriesRequest:
var resp ThanosSeriesResponse
if err := json.Unmarshal(buf, &resp); err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
}
return &resp, nil
default:
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid response format")
}
}

func (c metadataCodec) EncodeResponse(ctx context.Context, res queryrange.Response) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse")
defer sp.Finish()

var (
b []byte
err error
)
switch resp := res.(type) {
case *ThanosLabelsResponse:
sp.LogFields(otlog.Int("labels", len(resp.Data)))
b, err = json.Marshal(resp)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err)
}
case *ThanosSeriesResponse:
sp.LogFields(otlog.Int("series", len(resp.Data)))
b, err = json.Marshal(resp)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err)
}
default:
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid response format")
}

sp.LogFields(otlog.Int("bytes", len(b)))
resp := http.Response{
Header: http.Header{
"Content-Type": []string{"application/json"},
},
Body: ioutil.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
}
return &resp, nil
}

func (c metadataCodec) parseLabelsRequest(r *http.Request, op string) (queryrange.Request, error) {
var (
result ThanosLabelsRequest
err error
)
result.Start, err = cortexutil.ParseTime(r.FormValue("start"))
if err != nil {
return nil, err
}

result.End, err = cortexutil.ParseTime(r.FormValue("end"))
if err != nil {
return nil, err
}

if result.End < result.Start {
return nil, errEndBeforeStart
}

result.PartialResponse, err = parsePartialResponseParam(r.FormValue(queryv1.PartialResponseParam), c.partialResponse)
if err != nil {
return nil, err
}

result.StoreMatchers, err = parseMatchersParam(r.Form[queryv1.StoreMatcherParam])
if err != nil {
return nil, err
}

result.Path = r.URL.Path

if op == seriesOp {
parts := strings.Split(r.URL.Path, "/")
if len(parts) > 1 {
result.Label = parts[len(parts)-2]
}
}

for _, value := range r.Header.Values(cacheControlHeader) {
if strings.Contains(value, noStoreValue) {
result.CachingOptions.Disabled = true
break
}
}

return &result, nil
}

func (c metadataCodec) parseSeriesRequest(r *http.Request) (queryrange.Request, error) {
var (
result ThanosSeriesRequest
err error
)
result.Start, err = cortexutil.ParseTime(r.FormValue("start"))
if err != nil {
return nil, err
}

result.End, err = cortexutil.ParseTime(r.FormValue("end"))
if err != nil {
return nil, err
}

if result.End < result.Start {
return nil, errEndBeforeStart
}

result.Matchers, err = parseMatchersParam(r.Form[queryv1.MatcherParam])
if err != nil {
return nil, err
}

result.Dedup, err = parseEnableDedupParam(r.FormValue(queryv1.DedupParam))
if err != nil {
return nil, err
}

result.PartialResponse, err = parsePartialResponseParam(r.FormValue(queryv1.PartialResponseParam), c.partialResponse)
if err != nil {
return nil, err
}

if len(r.Form[queryv1.ReplicaLabelsParam]) > 0 {
result.ReplicaLabels = r.Form[queryv1.ReplicaLabelsParam]
}

result.StoreMatchers, err = parseMatchersParam(r.Form[queryv1.StoreMatcherParam])
if err != nil {
return nil, err
}

result.Path = r.URL.Path

for _, value := range r.Header.Values(cacheControlHeader) {
if strings.Contains(value, noStoreValue) {
result.CachingOptions.Disabled = true
break
}
}

return &result, nil
}
Loading

0 comments on commit 3e84336

Please sign in to comment.