Skip to content

Commit

Permalink
Added float histogram support for receive (thanos-io#146)
Browse files Browse the repository at this point in the history
* Added float histogram support for receiver

* Fixed lint

* Fixed docs

* Fixed docs
  • Loading branch information
rabenhorst authored Apr 28, 2023
1 parent 7de34fe commit a45abcb
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 199 deletions.
4 changes: 2 additions & 2 deletions docs/contributing/mentorship.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ Both Thanos and Prometheus projects participate in various, periodic mentoring p

Programs we participated / are participating:

- [LFX Mentorship (previously Community Bridge)](https://github.com/cncf/mentoring/tree/master/lfx-mentorship)
- [Google Summer of Code](https://github.com/cncf/mentoring/tree/master/summerofcode)
- [LFX Mentorship (previously Community Bridge)](https://github.com/cncf/mentoring/tree/main/programs/lfx-mentorship)
- [Google Summer of Code](https://github.com/cncf/mentoring/tree/main/programs/summerofcode)
- [Red Hat Beyond](https://research.redhat.com/blog/2020/05/24/open-source-development-course-and-devops-methodology/)

## For Mentees
Expand Down
2 changes: 1 addition & 1 deletion docs/tracing.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ config:

### Jaeger

Client for https://github.com/jaegertracing/jaeger tracing. Options can be provided also via environment variables. For more details see the Jaeger [exporter specification](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/sdk-environment-variables.md#jaeger-exporter).
Client for https://github.com/jaegertracing/jaeger tracing. Options can be provided also via environment variables. For more details see the Jaeger [exporter specification](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/configuration/sdk-environment-variables.md#jaeger-exporter).

*WARNING: Options `RPC Metrics`, `Gen128Bit` and `Disabled` are now deprecated and won't have any effect when set*

Expand Down
15 changes: 13 additions & 2 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"

Expand Down Expand Up @@ -134,8 +135,18 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
}

for _, hp := range t.Histograms {
h := prompb.HistogramProtoToHistogram(hp)
ref, err = app.AppendHistogram(ref, lset, hp.Timestamp, h, nil)
var (
h *histogram.Histogram
fh *histogram.FloatHistogram
)

if hp.IsFloatHistogram() {
fh = prompb.FloatHistogramProtoToFloatHistogram(hp)
} else {
h = prompb.HistogramProtoToHistogram(hp)
}

ref, err = app.AppendHistogram(ref, lset, hp.Timestamp, h, fh)
switch err {
case storage.ErrOutOfOrderSample:
numSamplesOutOfOrder++
Expand Down
78 changes: 31 additions & 47 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ import (
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/tsdbutil"

"github.com/efficientgo/core/testutil"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestWriter(t *testing.T) {
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Histograms: []prompb.Histogram{
histogramToHistogramProto(9, testHistogram()),
prompb.HistogramToHistogramProto(10, tsdbutil.GenerateTestHistogram(0)),
},
},
},
Expand All @@ -223,7 +223,30 @@ func TestWriter(t *testing.T) {
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Histograms: []prompb.Histogram{
histogramToHistogramProto(10, testHistogram()),
prompb.HistogramToHistogramProto(10, tsdbutil.GenerateTestHistogram(0)),
},
},
},
},
"should succeed on float histogram with valid labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Histograms: []prompb.Histogram{
prompb.FloatHistogramToHistogramProto(10, tsdbutil.GenerateTestFloatHistogram(1)),
},
},
},
},
},
expectedErr: nil,
expectedIngested: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Histograms: []prompb.Histogram{
prompb.FloatHistogramToHistogramProto(10, tsdbutil.GenerateTestFloatHistogram(1)),
},
},
},
Expand All @@ -235,7 +258,7 @@ func TestWriter(t *testing.T) {
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Histograms: []prompb.Histogram{
histogramToHistogramProto(10, testHistogram()),
prompb.HistogramToHistogramProto(10, tsdbutil.GenerateTestHistogram(0)),
},
},
},
Expand All @@ -245,7 +268,7 @@ func TestWriter(t *testing.T) {
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Histograms: []prompb.Histogram{
histogramToHistogramProto(9, testHistogram()),
prompb.HistogramToHistogramProto(9, tsdbutil.GenerateTestHistogram(0)),
},
},
},
Expand All @@ -256,7 +279,7 @@ func TestWriter(t *testing.T) {
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Histograms: []prompb.Histogram{
histogramToHistogramProto(10, testHistogram()),
prompb.HistogramToHistogramProto(10, tsdbutil.GenerateTestHistogram(0)),
},
},
},
Expand Down Expand Up @@ -441,7 +464,7 @@ func generateLabelsAndSeries(numLabels int, numSeries int, generateHistograms bo
}

if generateHistograms {
ts[j].Histograms = []prompb.Histogram{histogramToHistogramProto(10, testHistogram())}
ts[j].Histograms = []prompb.Histogram{prompb.HistogramToHistogramProto(10, tsdbutil.GenerateTestHistogram(0))}
continue
}

Expand All @@ -450,42 +473,3 @@ func generateLabelsAndSeries(numLabels int, numSeries int, generateHistograms bo

return ts
}

func testHistogram() *histogram.Histogram {
return &histogram.Histogram{
Count: 5,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 0.1,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5)
}
}

func histogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.Histogram {
return prompb.Histogram{
Count: &prompb.Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToSpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToSpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
Timestamp: timestamp,
}
}

func spansToSpansProto(s []histogram.Span) []*prompb.BucketSpan {
spans := make([]*prompb.BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = &prompb.BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}
9 changes: 9 additions & 0 deletions pkg/store/storepb/prompb/custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package prompb

func (h Histogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*Histogram_CountFloat)
return ok
}
118 changes: 83 additions & 35 deletions pkg/store/storepb/prompb/samples.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,71 +45,119 @@ func SamplesFromPromqlPoints(samples ...promql.Point) ([]Sample, []Histogram) {
// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an interger histogram and not a float histogram.
// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L529-L542.
func HistogramProtoToHistogram(hp Histogram) *histogram.Histogram {
if hp.IsFloatHistogram() {
panic("HistogramProtoToHistogram called with a float histogram")
}
return &histogram.Histogram{
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}

// FloatHistogramToHistogramProto converts a float histogram to a protobuf type.
// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L587-L601.
func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) Histogram {
return Histogram{
Count: &Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
func FloatHistogramProtoToFloatHistogram(hp Histogram) *histogram.FloatHistogram {
if !hp.IsFloatHistogram() {
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
}
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
}
}

// HistogramProtoToFloatHistogram extracts a (normal integer) Histogram from the
// provided proto message to a Float Histogram. The caller has to make sure that
// the proto message represents an float histogram and not a integer histogram.
// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L547-L560.
func HistogramProtoToFloatHistogram(hp Histogram) *histogram.FloatHistogram {
if hp.IsFloatHistogram() {
panic("HistogramProtoToFloatHistogram called with a float histogram")
}
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
ZeroCount: float64(hp.GetZeroCountInt()),
Count: float64(hp.GetCountInt()),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
PositiveBuckets: deltasToCounts(hp.GetPositiveDeltas()),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
NegativeBuckets: deltasToCounts(hp.GetNegativeDeltas()),
}
}

func spansToSpansProto(s []histogram.Span) []*BucketSpan {
spans := make([]*BucketSpan, len(s))
func spansProtoToSpans(s []BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = &BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}

func spansProtoToSpans(s []*BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
func deltasToCounts(deltas []int64) []float64 {
counts := make([]float64, len(deltas))
var cur float64
for i, d := range deltas {
cur += float64(d)
counts[i] = cur
}
return counts
}

func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) Histogram {
return Histogram{
Count: &Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToSpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToSpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
ResetHint: Histogram_ResetHint(h.CounterResetHint),
Timestamp: timestamp,
}
}

func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) Histogram {
return Histogram{
Count: &Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
}
}

func spansToSpansProto(s []histogram.Span) []BucketSpan {
spans := make([]BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
Expand Down
Loading

0 comments on commit a45abcb

Please sign in to comment.