From 63a57acd6cf116bf805ae168e6d52bd8055c5782 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Wed, 8 Mar 2023 09:05:04 +0100 Subject: [PATCH] Relay histograms from the gRPC Query API (#6178) Native histograms are currently not returned by the gRPC Query API. This commit fixes that. Signed-off-by: Filip Petkovski --- pkg/api/query/grpc.go | 12 ++-- pkg/query/remote_engine.go | 7 +++ pkg/receive/writer.go | 3 +- pkg/store/storepb/custom.go | 28 --------- pkg/store/storepb/prompb/samples.go | 93 ++++++++++++++++++++++++++--- 5 files changed, 102 insertions(+), 41 deletions(-) diff --git a/pkg/api/query/grpc.go b/pkg/api/query/grpc.go index e85fb179d5..8a11804b99 100644 --- a/pkg/api/query/grpc.go +++ b/pkg/api/query/grpc.go @@ -126,9 +126,11 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer } case promql.Vector: for _, sample := range vector { + floats, histograms := prompb.SamplesFromPromqlPoints([]promql.Point{sample.Point}) series := &prompb.TimeSeries{ - Labels: labelpb.ZLabelsFromPromLabels(sample.Metric), - Samples: prompb.SamplesFromPromqlPoints([]promql.Point{sample.Point}), + Labels: labelpb.ZLabelsFromPromLabels(sample.Metric), + Samples: floats, + Histograms: histograms, } if err := server.Send(querypb.NewQueryResponse(series)); err != nil { return err @@ -205,9 +207,11 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que switch matrix := result.Value.(type) { case promql.Matrix: for _, series := range matrix { + floats, histograms := prompb.SamplesFromPromqlPoints(series.Points) series := &prompb.TimeSeries{ - Labels: labelpb.ZLabelsFromPromLabels(series.Metric), - Samples: prompb.SamplesFromPromqlPoints(series.Points), + Labels: labelpb.ZLabelsFromPromLabels(series.Metric), + Samples: floats, + Histograms: histograms, } if err := srv.Send(querypb.NewQueryRangeResponse(series)); err != nil { return err diff --git a/pkg/query/remote_engine.go b/pkg/query/remote_engine.go index 0030dc8b14..1c52fd9858 100644 --- a/pkg/query/remote_engine.go +++ b/pkg/query/remote_engine.go @@ -19,6 +19,7 @@ import ( "github.com/thanos-io/thanos/pkg/api/query/querypb" "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) // Opts are the options for a PromQL query. @@ -192,6 +193,12 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result { V: s.Value, }) } + for _, h := range ts.Histograms { + series.Points = append(series.Points, promql.Point{ + T: h.Timestamp, + H: prompb.HistogramProtoToFloatHistogram(h), + }) + } result = append(result, series) } level.Debug(r.logger).Log("Executed query", "query", r.qs, "time", time.Since(start)) diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index 004dfd82d4..4bf8c3e1e9 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -14,7 +14,6 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/thanos/pkg/store/labelpb" - "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -134,7 +133,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR } for _, hp := range t.Histograms { - h := storepb.HistogramProtoToHistogram(hp) + h := prompb.HistogramProtoToHistogram(hp) ref, err = app.AppendHistogram(ref, lset, hp.Timestamp, h, nil) switch err { case storage.ErrOutOfOrderSample: diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index 85f858562b..c1f4b9b8bf 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -13,12 +13,10 @@ import ( "github.com/gogo/protobuf/types" "github.com/pkg/errors" - "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "google.golang.org/grpc/codes" "github.com/thanos-io/thanos/pkg/store/labelpb" - prompb "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) var PartialResponseStrategyValues = func() []string { @@ -554,29 +552,3 @@ func (m *QueryHints) IsSafeToExecute() bool { return false } - -// HistogramProtoToHistogram extracts a (normal integer) Histogram from the -// provided proto message. The caller has to make sure that the proto message -// represents an interger histogram and not a float histogram. -func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram { - return &histogram.Histogram{ - Schema: hp.Schema, - ZeroThreshold: hp.ZeroThreshold, - ZeroCount: hp.GetZeroCountInt(), - Count: hp.GetCountInt(), - Sum: hp.Sum, - PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()), - PositiveBuckets: hp.GetPositiveDeltas(), - NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()), - NegativeBuckets: hp.GetNegativeDeltas(), - } -} - -func spansProtoToSpans(s []*prompb.BucketSpan) []histogram.Span { - spans := make([]histogram.Span, len(s)) - for i := 0; i < len(s); i++ { - spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length} - } - - return spans -} diff --git a/pkg/store/storepb/prompb/samples.go b/pkg/store/storepb/prompb/samples.go index 6ec77d58e6..ac0aa57e16 100644 --- a/pkg/store/storepb/prompb/samples.go +++ b/pkg/store/storepb/prompb/samples.go @@ -5,6 +5,7 @@ package prompb import ( "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/promql" ) @@ -24,14 +25,92 @@ func SamplesFromSamplePairs(samples []model.SamplePair) []Sample { // SamplesFromPromqlPoints converts a slice of promql.Point // to a slice of Sample. -func SamplesFromPromqlPoints(samples []promql.Point) []Sample { - result := make([]Sample, 0, len(samples)) +func SamplesFromPromqlPoints(samples []promql.Point) ([]Sample, []Histogram) { + floats := make([]Sample, 0, len(samples)) + histograms := make([]Histogram, 0, len(samples)) for _, s := range samples { - result = append(result, Sample{ - Value: s.V, - Timestamp: s.T, - }) + if s.H == nil { + floats = append(floats, Sample{ + Value: s.V, + Timestamp: s.T, + }) + } else { + histograms = append(histograms, FloatHistogramToHistogramProto(s.T, s.H)) + } } - return result + return floats, histograms +} + +// HistogramProtoToHistogram extracts a (normal integer) Histogram from the +// provided proto message. The caller has to make sure that the proto message +// represents an interger histogram and not a float histogram. +// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L529-L542. +func HistogramProtoToHistogram(hp Histogram) *histogram.Histogram { + return &histogram.Histogram{ + Schema: hp.Schema, + ZeroThreshold: hp.ZeroThreshold, + ZeroCount: hp.GetZeroCountInt(), + Count: hp.GetCountInt(), + Sum: hp.Sum, + PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()), + PositiveBuckets: hp.GetPositiveDeltas(), + NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()), + NegativeBuckets: hp.GetNegativeDeltas(), + } +} + +// FloatHistogramToHistogramProto converts a float histogram to a protobuf type. +// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L587-L601. +func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) Histogram { + return Histogram{ + Count: &Histogram_CountFloat{CountFloat: fh.Count}, + Sum: fh.Sum, + Schema: fh.Schema, + ZeroThreshold: fh.ZeroThreshold, + ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount}, + NegativeSpans: spansToSpansProto(fh.NegativeSpans), + NegativeCounts: fh.NegativeBuckets, + PositiveSpans: spansToSpansProto(fh.PositiveSpans), + PositiveCounts: fh.PositiveBuckets, + ResetHint: Histogram_ResetHint(fh.CounterResetHint), + Timestamp: timestamp, + } +} + +// HistogramProtoToFloatHistogram extracts a (normal integer) Histogram from the +// provided proto message to a Float Histogram. The caller has to make sure that +// the proto message represents an float histogram and not a integer histogram. +// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L547-L560. +func HistogramProtoToFloatHistogram(hp Histogram) *histogram.FloatHistogram { + return &histogram.FloatHistogram{ + CounterResetHint: histogram.CounterResetHint(hp.ResetHint), + Schema: hp.Schema, + ZeroThreshold: hp.ZeroThreshold, + ZeroCount: hp.GetZeroCountFloat(), + Count: hp.GetCountFloat(), + Sum: hp.Sum, + PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()), + PositiveBuckets: hp.GetPositiveCounts(), + NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()), + NegativeBuckets: hp.GetNegativeCounts(), + } +} + +func spansToSpansProto(s []histogram.Span) []*BucketSpan { + spans := make([]*BucketSpan, len(s)) + for i := 0; i < len(s); i++ { + spans[i] = &BucketSpan{Offset: s[i].Offset, Length: s[i].Length} + } + + return spans +} + +func spansProtoToSpans(s []*BucketSpan) []histogram.Span { + spans := make([]histogram.Span, len(s)) + for i := 0; i < len(s); i++ { + spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length} + } + + return spans }