Skip to content

Commit

Permalink
Relay histograms from the gRPC Query API (#6178)
Browse files Browse the repository at this point in the history
Native histograms are currently not returned by the gRPC Query API.

This commit fixes that.

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski authored Mar 8, 2023
1 parent 80d9847 commit 63a57ac
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 41 deletions.
12 changes: 8 additions & 4 deletions pkg/api/query/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer
}
case promql.Vector:
for _, sample := range vector {
floats, histograms := prompb.SamplesFromPromqlPoints([]promql.Point{sample.Point})
series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(sample.Metric),
Samples: prompb.SamplesFromPromqlPoints([]promql.Point{sample.Point}),
Labels: labelpb.ZLabelsFromPromLabels(sample.Metric),
Samples: floats,
Histograms: histograms,
}
if err := server.Send(querypb.NewQueryResponse(series)); err != nil {
return err
Expand Down Expand Up @@ -205,9 +207,11 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que
switch matrix := result.Value.(type) {
case promql.Matrix:
for _, series := range matrix {
floats, histograms := prompb.SamplesFromPromqlPoints(series.Points)
series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(series.Metric),
Samples: prompb.SamplesFromPromqlPoints(series.Points),
Labels: labelpb.ZLabelsFromPromLabels(series.Metric),
Samples: floats,
Histograms: histograms,
}
if err := srv.Send(querypb.NewQueryRangeResponse(series)); err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions pkg/query/remote_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

// Opts are the options for a PromQL query.
Expand Down Expand Up @@ -192,6 +193,12 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
V: s.Value,
})
}
for _, h := range ts.Histograms {
series.Points = append(series.Points, promql.Point{
T: h.Timestamp,
H: prompb.HistogramProtoToFloatHistogram(h),
})
}
result = append(result, series)
}
level.Debug(r.logger).Log("Executed query", "query", r.qs, "time", time.Since(start))
Expand Down
3 changes: 1 addition & 2 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

Expand Down Expand Up @@ -134,7 +133,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
}

for _, hp := range t.Histograms {
h := storepb.HistogramProtoToHistogram(hp)
h := prompb.HistogramProtoToHistogram(hp)
ref, err = app.AppendHistogram(ref, lset, hp.Timestamp, h, nil)
switch err {
case storage.ErrOutOfOrderSample:
Expand Down
28 changes: 0 additions & 28 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ import (

"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc/codes"

"github.com/thanos-io/thanos/pkg/store/labelpb"
prompb "github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

var PartialResponseStrategyValues = func() []string {
Expand Down Expand Up @@ -554,29 +552,3 @@ func (m *QueryHints) IsSafeToExecute() bool {

return false
}

// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an interger histogram and not a float histogram.
func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram {
return &histogram.Histogram{
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}

func spansProtoToSpans(s []*prompb.BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}
93 changes: 86 additions & 7 deletions pkg/store/storepb/prompb/samples.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package prompb

import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/promql"
)

Expand All @@ -24,14 +25,92 @@ func SamplesFromSamplePairs(samples []model.SamplePair) []Sample {

// SamplesFromPromqlPoints converts a slice of promql.Point
// to a slice of Sample.
func SamplesFromPromqlPoints(samples []promql.Point) []Sample {
result := make([]Sample, 0, len(samples))
func SamplesFromPromqlPoints(samples []promql.Point) ([]Sample, []Histogram) {
floats := make([]Sample, 0, len(samples))
histograms := make([]Histogram, 0, len(samples))
for _, s := range samples {
result = append(result, Sample{
Value: s.V,
Timestamp: s.T,
})
if s.H == nil {
floats = append(floats, Sample{
Value: s.V,
Timestamp: s.T,
})
} else {
histograms = append(histograms, FloatHistogramToHistogramProto(s.T, s.H))
}
}

return result
return floats, histograms
}

// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an interger histogram and not a float histogram.
// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L529-L542.
func HistogramProtoToHistogram(hp Histogram) *histogram.Histogram {
return &histogram.Histogram{
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}

// FloatHistogramToHistogramProto converts a float histogram to a protobuf type.
// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L587-L601.
func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) Histogram {
return Histogram{
Count: &Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
}
}

// HistogramProtoToFloatHistogram extracts a (normal integer) Histogram from the
// provided proto message to a Float Histogram. The caller has to make sure that
// the proto message represents an float histogram and not a integer histogram.
// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L547-L560.
func HistogramProtoToFloatHistogram(hp Histogram) *histogram.FloatHistogram {
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
}
}

func spansToSpansProto(s []histogram.Span) []*BucketSpan {
spans := make([]*BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = &BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}

func spansProtoToSpans(s []*BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}

0 comments on commit 63a57ac

Please sign in to comment.