Skip to content

Commit

Permalink
Added receive float histogram support
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Rabenhorst <[email protected]>

Fixed imports

Signed-off-by: Sebastian Rabenhorst <[email protected]>
  • Loading branch information
rabenhorst committed Apr 28, 2023
1 parent f6871f7 commit 5fda4dc
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 240 deletions.
2 changes: 2 additions & 0 deletions pkg/query/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ func chunkEncoding(e storepb.Chunk_Encoding) chunkenc.Encoding {
return chunkenc.EncXOR
case storepb.Chunk_HISTOGRAM:
return chunkenc.EncHistogram
case storepb.Chunk_FLOAT_HISTOGRAM:
return chunkenc.EncFloatHistogram
}
return 255 // Invalid.
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
Expand Down Expand Up @@ -168,8 +169,18 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
}

for _, hp := range t.Histograms {
h := prompb.HistogramProtoToHistogram(hp)
ref, err = app.AppendHistogram(ref, lset, hp.Timestamp, h, nil)
var (
h *histogram.Histogram
fh *histogram.FloatHistogram
)

if hp.IsFloatHistogram() {
fh = prompb.FloatHistogramProtoToFloatHistogram(hp)
} else {
h = prompb.HistogramProtoToHistogram(hp)
}

ref, err = app.AppendHistogram(ref, lset, hp.Timestamp, h, fh)
switch err {
case storage.ErrOutOfOrderSample:
numSamplesOutOfOrder++
Expand Down
78 changes: 31 additions & 47 deletions pkg/receive/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ import (
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/tsdbutil"

"github.com/efficientgo/core/testutil"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestWriter(t *testing.T) {
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Histograms: []prompb.Histogram{
histogramToHistogramProto(9, testHistogram()),
prompb.HistogramToHistogramProto(10, tsdbutil.GenerateTestHistogram(0)),
},
},
},
Expand All @@ -261,7 +261,30 @@ func TestWriter(t *testing.T) {
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Histograms: []prompb.Histogram{
histogramToHistogramProto(10, testHistogram()),
prompb.HistogramToHistogramProto(10, tsdbutil.GenerateTestHistogram(0)),
},
},
},
},
"should succeed on float histogram with valid labels": {
reqs: []*prompb.WriteRequest{
{
Timeseries: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Histograms: []prompb.Histogram{
prompb.FloatHistogramToHistogramProto(10, tsdbutil.GenerateTestFloatHistogram(1)),
},
},
},
},
},
expectedErr: nil,
expectedIngested: []prompb.TimeSeries{
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Histograms: []prompb.Histogram{
prompb.FloatHistogramToHistogramProto(10, tsdbutil.GenerateTestFloatHistogram(1)),
},
},
},
Expand All @@ -273,7 +296,7 @@ func TestWriter(t *testing.T) {
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Histograms: []prompb.Histogram{
histogramToHistogramProto(10, testHistogram()),
prompb.HistogramToHistogramProto(10, tsdbutil.GenerateTestHistogram(0)),
},
},
},
Expand All @@ -283,7 +306,7 @@ func TestWriter(t *testing.T) {
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Histograms: []prompb.Histogram{
histogramToHistogramProto(9, testHistogram()),
prompb.HistogramToHistogramProto(9, tsdbutil.GenerateTestHistogram(0)),
},
},
},
Expand All @@ -294,7 +317,7 @@ func TestWriter(t *testing.T) {
{
Labels: append(lbls, labelpb.ZLabel{Name: "a", Value: "1"}, labelpb.ZLabel{Name: "b", Value: "2"}),
Histograms: []prompb.Histogram{
histogramToHistogramProto(10, testHistogram()),
prompb.HistogramToHistogramProto(10, tsdbutil.GenerateTestHistogram(0)),
},
},
},
Expand Down Expand Up @@ -479,7 +502,7 @@ func generateLabelsAndSeries(numLabels int, numSeries int, generateHistograms bo
}

if generateHistograms {
ts[j].Histograms = []prompb.Histogram{histogramToHistogramProto(10, testHistogram())}
ts[j].Histograms = []prompb.Histogram{prompb.HistogramToHistogramProto(10, tsdbutil.GenerateTestHistogram(0))}
continue
}

Expand All @@ -488,42 +511,3 @@ func generateLabelsAndSeries(numLabels int, numSeries int, generateHistograms bo

return ts
}

func testHistogram() *histogram.Histogram {
return &histogram.Histogram{
Count: 5,
ZeroCount: 2,
Sum: 18.4,
ZeroThreshold: 0.1,
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{1, 1, -1, 0}, // counts: 1, 2, 1, 1 (total 5)
}
}

func histogramToHistogramProto(timestamp int64, h *histogram.Histogram) prompb.Histogram {
return prompb.Histogram{
Count: &prompb.Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &prompb.Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToSpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToSpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
Timestamp: timestamp,
}
}

func spansToSpansProto(s []histogram.Span) []*prompb.BucketSpan {
spans := make([]*prompb.BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = &prompb.BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}
9 changes: 9 additions & 0 deletions pkg/store/storepb/prompb/custom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package prompb

func (h Histogram) IsFloatHistogram() bool {
_, ok := h.GetCount().(*Histogram_CountFloat)
return ok
}
118 changes: 83 additions & 35 deletions pkg/store/storepb/prompb/samples.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,71 +62,119 @@ func SamplesFromPromqlSeries(series promql.Series) ([]Sample, []Histogram) {
// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an interger histogram and not a float histogram.
// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L529-L542.
func HistogramProtoToHistogram(hp Histogram) *histogram.Histogram {
if hp.IsFloatHistogram() {
panic("HistogramProtoToHistogram called with a float histogram")
}
return &histogram.Histogram{
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}

// FloatHistogramToHistogramProto converts a float histogram to a protobuf type.
// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L587-L601.
func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) Histogram {
return Histogram{
Count: &Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
func FloatHistogramProtoToFloatHistogram(hp Histogram) *histogram.FloatHistogram {
if !hp.IsFloatHistogram() {
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
}
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
}
}

// HistogramProtoToFloatHistogram extracts a (normal integer) Histogram from the
// provided proto message to a Float Histogram. The caller has to make sure that
// the proto message represents an float histogram and not a integer histogram.
// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L547-L560.
func HistogramProtoToFloatHistogram(hp Histogram) *histogram.FloatHistogram {
if hp.IsFloatHistogram() {
panic("HistogramProtoToFloatHistogram called with a float histogram")
}
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
ZeroCount: float64(hp.GetZeroCountInt()),
Count: float64(hp.GetCountInt()),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
PositiveBuckets: deltasToCounts(hp.GetPositiveDeltas()),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
NegativeBuckets: deltasToCounts(hp.GetNegativeDeltas()),
}
}

func spansToSpansProto(s []histogram.Span) []*BucketSpan {
spans := make([]*BucketSpan, len(s))
func spansProtoToSpans(s []BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = &BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}

func spansProtoToSpans(s []*BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
func deltasToCounts(deltas []int64) []float64 {
counts := make([]float64, len(deltas))
var cur float64
for i, d := range deltas {
cur += float64(d)
counts[i] = cur
}
return counts
}

func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) Histogram {
return Histogram{
Count: &Histogram_CountInt{CountInt: h.Count},
Sum: h.Sum,
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
NegativeSpans: spansToSpansProto(h.NegativeSpans),
NegativeDeltas: h.NegativeBuckets,
PositiveSpans: spansToSpansProto(h.PositiveSpans),
PositiveDeltas: h.PositiveBuckets,
ResetHint: Histogram_ResetHint(h.CounterResetHint),
Timestamp: timestamp,
}
}

func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) Histogram {
return Histogram{
Count: &Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
}
}

func spansToSpansProto(s []histogram.Span) []BucketSpan {
spans := make([]BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
Expand Down
Loading

0 comments on commit 5fda4dc

Please sign in to comment.