Skip to content

Commit

Permalink
fix exemplars implementation
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 committed Feb 27, 2021
1 parent 47ef813 commit 1b6abfc
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 478 deletions.
22 changes: 12 additions & 10 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"time"

cortexutil "github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
Expand Down Expand Up @@ -159,7 +160,7 @@ func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logge

r.Get("/metadata", instr("metadata", NewMetricMetadataHandler(qapi.metadatas, qapi.enableMetricMetadataPartialResponse)))

r.Get("/exemplars", instr("exemplars", NewExemplarsHandler(qapi.exemplars, qapi.enableExemplarPartialResponse)))
r.Get("/query_exemplars", instr("exemplars", NewExemplarsHandler(qapi.exemplars, qapi.enableExemplarPartialResponse)))
}

type queryData struct {
Expand Down Expand Up @@ -696,17 +697,18 @@ func NewExemplarsHandler(client exemplars.UnaryClient, enablePartialResponse boo
}

return func(r *http.Request) (interface{}, []error, *api.ApiError) {
typeParam := r.URL.Query().Get("type")
typ, ok := exemplarspb.ExemplarsRequest_Type_value[strings.ToUpper(typeParam)]
if !ok {
if typeParam != "" {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("invalid exemplars parameter type='%v'", typeParam)}
}
typ = int32(exemplarspb.ExemplarsRequest_ALL)
start, err := cortexutil.ParseTime(r.FormValue("start"))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
end, err := cortexutil.ParseTime(r.FormValue("end"))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}

req := &exemplarspb.ExemplarsRequest{
Type: exemplarspb.ExemplarsRequest_Type(typ),
Start: start,
End: end,
Query: r.FormValue("query"),
PartialResponseStrategy: ps,
}
exemplarsData, warnings, err := client.Exemplars(r.Context(), req)
Expand Down
20 changes: 20 additions & 0 deletions pkg/exemplars/exemplars.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ type exemplarsServer struct {
data []*exemplarspb.ExemplarData
}


func (srv *exemplarsServer) Send(res *exemplarspb.ExemplarsResponse) error {
if res.GetWarning() != "" {
srv.warnings = append(srv.warnings, errors.New(res.GetWarning()))
return nil
}

if res.GetData() == nil {
return errors.New("no data")
}

srv.data = append(srv.data, res.GetData())
return nil
}

func (srv *exemplarsServer) Context() context.Context {
return srv.ctx
}


func NewGRPCClient(es exemplarspb.ExemplarsServer) *GRPCClient {
return NewGRPCClientWithDedup(es, nil)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/exemplars/exemplars_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package exemplars

import (
"fmt"
"testing"
"time"

"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand All @@ -15,6 +17,12 @@ func TestMain(m *testing.M) {
testutil.TolerantVerifyLeakMain(m)
}

func TestA(t *testing.T) {
tt, err := time.Parse(time.RFC3339, "1614395157.954")
fmt.Printf("%v\n", err)
fmt.Printf("%v\n", tt)
}

func TestDedupExemplarsData(t *testing.T) {
for _, tc := range []struct {
name string
Expand Down
52 changes: 40 additions & 12 deletions pkg/exemplars/exemplarspb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,46 @@
package exemplarspb

import (
"encoding/json"
"math/big"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"math/big"

"encoding/json"
)

// UnmarshalJSON implements json.Unmarshaler.
func (m *Exemplar) UnmarshalJSON(b []byte) error {
v := struct {
Labels labelpb.ZLabelSet
TimeStamp model.Time
Value model.SampleValue
}{}

if err := json.Unmarshal(b, &v); err != nil {
return err
}
m.Labels = v.Labels
m.Ts = int64(v.TimeStamp)
m.Value = float64(v.Value)

return nil
}

// UnmarshalJSON implements json.Unmarshaler.
func (m *Exemplar) MarshalJSON() ([]byte, error) {
v := struct {
Labels labels.Labels `json:"labels"`
TimeStamp model.Time `json:"timestamp"`
Value model.SampleValue `json:"value"`
}{
Labels: labelpb.ZLabelsToPromLabels(m.Labels.Labels),
TimeStamp: model.Time(m.Ts),
Value: model.SampleValue(m.Value),
}
return json.Marshal(v)
}

func NewExemplarsResponse(e *ExemplarData) *ExemplarsResponse {
return &ExemplarsResponse{
Result: &ExemplarsResponse_Data{
Expand All @@ -27,12 +60,7 @@ func NewWarningExemplarsResponse(warning error) *ExemplarsResponse {
}
}

func (s *ExemplarData) MarshalJSON() ([]byte, error) {
if s == nil {
return []byte("[]"), nil
}
return json.Marshal(s)
}


func (s1 *ExemplarData) Compare(s2 *ExemplarData) int {
if d := labels.Compare(s1.SeriesLabels.PromLabels(), s2.SeriesLabels.PromLabels()); d != 0 {
Expand Down Expand Up @@ -66,11 +94,11 @@ func (e1 *Exemplar) Compare(e2 *Exemplar) int {
return d
}

if e1.Hasts || e2.Hasts {
if e1.Ts.Before(e2.Ts) {
if e1.Hasts && e2.Hasts {
if e1.Ts < e2.Ts {
return 1
}
if e1.Ts.After(e2.Ts) {
if e1.Ts > e2.Ts {
return -1
}
}
Expand Down
Loading

0 comments on commit 1b6abfc

Please sign in to comment.