Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: skip points outside of archive retention on first pass #82

Merged
merged 19 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 51 additions & 32 deletions pkg/graphite/convert/whisperconverter/whisper.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,53 +54,72 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) {
return nil, fmt.Errorf("whisper file contains no archives for metric: %q", name)
}

totalPoints := 0
for _, a := range w.GetArchives() {
totalPoints += int(a.Points)
}

// Preallocate space for all allPoints in one slice.
allPoints := make([]pointWithPrecision, totalPoints)
pIdx := 0
// Dump one precision level at a time and write into the output slice.
// Its important to remember that the archive with index 0 (first archive)
// has the raw data and the highest precision https://graphite.readthedocs.io/en/latest/whisper.html#archives-retention-and-precision
seenTs := map[uint32]struct{}{}
lastMinTs := uint32(0)
var keptPoints []whisper.Point
for i, a := range w.GetArchives() {
archivePoints, err := w.DumpArchive(i)
if err != nil {
return nil, fmt.Errorf("failed to dump archive %d from whisper metric %s", i, name)
}
for j, p := range archivePoints {
allPoints[pIdx+j] = pointWithPrecision{p, a.SecondsPerPoint}

var minArchiveTs, maxArchiveTs uint32
// calculate maxArchiveTs
for _, p := range archivePoints {
// We want to track the max timestamp of the archive because we know
// it virtually represents now() and we wont have newer points.
// Then the min timestamp of the archive would be maxTs - the archive
// retention.
if p.Timestamp > maxArchiveTs {
maxArchiveTs = p.Timestamp
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we do this for each archive again and again?

Don't all archives have the same "now" and they only differ in their retention period?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right. We had seen the "offset" value in the database dump but that is a byte offset, not a timestamp offset.

pIdx += len(archivePoints)
}

// Points must be in time order.
sort.Slice(allPoints, func(i, j int) bool {
return allPoints[i].Timestamp < allPoints[j].Timestamp
})
// calculate minArchiveTs
if maxArchiveTs-a.Retention() > maxArchiveTs { // overflow, very big retention
ywwg marked this conversation as resolved.
Show resolved Hide resolved
minArchiveTs = 0
} else {
minArchiveTs = maxArchiveTs - a.Retention()
}

trimmedPoints := []whisper.Point{}
for i := 0; i < len(allPoints); i++ {
// Remove all points of time = 0.
if allPoints[i].Timestamp == 0 {
continue
// For subsequent archives
if lastMinTs > 0 {
// if we are already in the second or subsequent archive and we had
// some points in the prior archives, we want to skip
// samples in the previous archives
maxArchiveTs = lastMinTs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We update maxArchiveTs here, but this variable it's not used afterwards.

}
// There might be duplicate timestamps in different archives. Take the
// higher-precision archive value since it's unaggregated.
if i > 0 && allPoints[i].Timestamp == allPoints[i-1].Timestamp {
if allPoints[i].secondsPerPoint == allPoints[i-1].secondsPerPoint {
return nil, fmt.Errorf("duplicate timestamp at same precision in archive %s: %d", name, allPoints[i].Timestamp)
lastMinTs = minArchiveTs

for _, p := range archivePoints {
if p.Timestamp < minArchiveTs {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we check if minArchiveTs <= p.Timestamp && p.Timestamp <= maxArchiveTs holds here?

If we do that, we don't need to keep seenTs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually check for minArchiveTs should be exclusive, ie. minArchiveTs < p.Timestamp must hold.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because points are written to all archives at once, all archives will contain points from timestamps that are within the retentions. So we do need a check like this. I think we can do it more efficiently if we pre-sort the points and then do a double-index comparison

ywwg marked this conversation as resolved.
Show resolved Hide resolved
continue
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pstibrany and @ywwg This solves problem 1 where we just drop all points that are beyond the retention of the archive but still present in the ring buffer.

}
// If we have already seen a point with the same timestamp it means
// we already have a point from an archive with higher precision that
// we want to keep. So we skip this point.
if _, ok := seenTs[p.Timestamp]; ok {
Copy link
Member Author

@jesusvazquez jesusvazquez Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pstibrany and @ywwg This solves problem 2 that we discussed yesterday where we want to keep points from prior archives because they have higher resolutions.

continue
}
if allPoints[i].secondsPerPoint < allPoints[i-1].secondsPerPoint {
trimmedPoints[len(trimmedPoints)-1] = allPoints[i].Point
// Skip points with time = 0
if p.Timestamp == 0 {
continue
}
ywwg marked this conversation as resolved.
Show resolved Hide resolved
// If the previous point is higher precision, just continue.
continue
keptPoints = append(keptPoints, whisper.Point{Timestamp: p.Timestamp, Value: p.Value})
seenTs[p.Timestamp] = struct{}{}
}
trimmedPoints = append(trimmedPoints, allPoints[i].Point)
}

return trimmedPoints, nil
// We need to finally sort the kept points again because different archives
// may overlap and have older points
sort.Slice(keptPoints, func(i, j int) bool {
return keptPoints[i].Timestamp < keptPoints[j].Timestamp
})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the two additions above, we now only need to sort once.


return keptPoints, nil
}

// ToMimirSamples converts a Whisper metric with the given name to a slice of
Expand Down
211 changes: 191 additions & 20 deletions pkg/graphite/convert/whisperconverter/whisper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/mimir-proxies/pkg/graphite/convert"
"github.com/grafana/mimir-proxies/pkg/graphite/writeproxy"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/kisielk/whisper-go/whisper"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir-proxies/pkg/graphite/convert"
"github.com/grafana/mimir-proxies/pkg/graphite/writeproxy"
)

func simpleArchiveInfo(points, secondsPerPoint int) whisper.ArchiveInfo {
Expand Down Expand Up @@ -82,38 +83,38 @@ func TestExtractWhisperPoints(t *testing.T) {
},
points: [][]whisper.Point{
{
whisper.NewPoint(time.Unix(0, 0), 12),
whisper.NewPoint(time.Unix(0, 0), 12),
whisper.NewPoint(time.Unix(1000, 0), 12),
whisper.NewPoint(time.Unix(1001, 0), 42),
whisper.NewPoint(time.Unix(0, 0), 12), // skipped
whisper.NewPoint(time.Unix(900, 0), 12), // skipped due to being out of retention
whisper.NewPoint(time.Unix(1054, 0), 12),
whisper.NewPoint(time.Unix(1055, 0), 42),
whisper.NewPoint(time.Unix(1060, 0), 2),
whisper.NewPoint(time.Unix(1002, 0), 27.5),
whisper.NewPoint(time.Unix(1056, 0), 27.5),
},
{
whisper.NewPoint(time.Unix(0, 0), 12),
whisper.NewPoint(time.Unix(1004, 0), 1),
whisper.NewPoint(time.Unix(1060, 0), 102),
whisper.NewPoint(time.Unix(0, 0), 12), // skipped
whisper.NewPoint(time.Unix(1058, 0), 1),
whisper.NewPoint(time.Unix(1060, 0), 102), // duplicate, the one in the archive above should be kept and this one skipped
whisper.NewPoint(time.Unix(1121, 0), 4),
whisper.NewPoint(time.Unix(0, 0), 0),
whisper.NewPoint(time.Unix(1001, 0), 5),
whisper.NewPoint(time.Unix(650, 0), 50), // skipped due to being out of retention
whisper.NewPoint(time.Unix(1055, 0), 5),
},
},
},
want: []whisper.Point{
{
Timestamp: 1000,
Timestamp: 1054,
Value: 12,
},
{
Timestamp: 1001,
Timestamp: 1055,
Value: 42,
},
{
Timestamp: 1002,
Timestamp: 1056,
Value: 27.5,
},
{
Timestamp: 1004,
Timestamp: 1058,
Value: 1,
},
{
Expand All @@ -127,6 +128,176 @@ func TestExtractWhisperPoints(t *testing.T) {
},
},
},
{
name: "single series, multiple archives and retentions, with duplicates and points beyond retention",
metricName: "mymetric",
archive: &testArchive{
infos: []whisper.ArchiveInfo{
// This is what the test will define
// Maxts Mints
// 1030 1020 1009 994
// [ ]
// [ ]
// [ ]
// And this is what the test will expect
// 1030 1020 1009 994
// [XXXXXXXXX]
// [ XXXXXXXXX]
// [ XXXXXXXX]

simpleArchiveInfo(11, 1),
simpleArchiveInfo(8, 3),
simpleArchiveInfo(7, 6),
},
points: [][]whisper.Point{
{
whisper.NewPoint(time.Unix(1020, 0), 20),
whisper.NewPoint(time.Unix(1021, 0), 21),
whisper.NewPoint(time.Unix(1022, 0), 22),
whisper.NewPoint(time.Unix(1023, 0), 23),
whisper.NewPoint(time.Unix(1024, 0), 24),
whisper.NewPoint(time.Unix(1025, 0), 25),
whisper.NewPoint(time.Unix(1026, 0), 26),
whisper.NewPoint(time.Unix(1027, 0), 27),
whisper.NewPoint(time.Unix(1028, 0), 28),
whisper.NewPoint(time.Unix(1029, 0), 29),
whisper.NewPoint(time.Unix(1030, 0), 30),
},
{
whisper.NewPoint(time.Unix(1009, 0), 9), // This is the lower border of this archive, this point should be kept
whisper.NewPoint(time.Unix(1012, 0), 12),
whisper.NewPoint(time.Unix(1015, 0), 15),
whisper.NewPoint(time.Unix(1018, 0), 18),
whisper.NewPoint(time.Unix(1021, 0), 21), // skipped
whisper.NewPoint(time.Unix(1024, 0), 24), // skipped
whisper.NewPoint(time.Unix(1027, 0), 27), // skipped
whisper.NewPoint(time.Unix(1030, 0), 30), // skipped
},
{
whisper.NewPoint(time.Unix(1000, 0), 0),
whisper.NewPoint(time.Unix(1006, 0), 6),
whisper.NewPoint(time.Unix(1009, 0), 99), // skipped because archive 1 has a point at this time
whisper.NewPoint(time.Unix(1012, 0), 12), // skipped
whisper.NewPoint(time.Unix(1018, 0), 18), // skipped
whisper.NewPoint(time.Unix(1024, 0), 24), // skipped
whisper.NewPoint(time.Unix(1030, 0), 30), // skipped
},
},
},
want: []whisper.Point{
{
Timestamp: 1000,
Value: 0,
},
{
Timestamp: 1006,
Value: 6,
},
{
Timestamp: 1009,
Value: 9,
},
{
Timestamp: 1012,
Value: 12,
},
{
Timestamp: 1015,
Value: 15,
},
{
Timestamp: 1018,
Value: 18,
},
{
Timestamp: 1020,
Value: 20,
},
{
Timestamp: 1021,
Value: 21,
},
{
Timestamp: 1022,
Value: 22,
},
{
Timestamp: 1023,
Value: 23,
},
{
Timestamp: 1024,
Value: 24,
},
{
Timestamp: 1025,
Value: 25,
},
{
Timestamp: 1026,
Value: 26,
},
{
Timestamp: 1027,
Value: 27,
},
{
Timestamp: 1028,
Value: 28,
},
{
Timestamp: 1029,
Value: 29,
},
{
Timestamp: 1030,
Value: 30,
},
},
},
{
name: "test retention when first archives are empty",
metricName: "mymetric",
Comment on lines +294 to +295
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be unrealistic scenario to me. If first archive (with raw data) is empty, how can there be any aggregations at all?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an end-to-end test broke, so we added this test to cover that edge case

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

archive: &testArchive{
infos: []whisper.ArchiveInfo{
// This is what the test will define
// Maxts Mints
// 1030 1020 1009 994
// []
// [ ]
// [ ]
// And this is what the test will expect
// 1030 1020 1009 994
// [XXXXXXXXX]
// [ XXXXXXXXX]
// [ XXXXXXXX]

simpleArchiveInfo(11, 1),
simpleArchiveInfo(8, 3),
simpleArchiveInfo(7, 6),
},
points: [][]whisper.Point{
{},
{
whisper.NewPoint(time.Unix(1009, 0), 9), // This is the lower border of this archive, this point should be kept
},
{
whisper.NewPoint(time.Unix(1009, 0), 99), // skipped because archive 1 has a point at this time
whisper.NewPoint(time.Unix(1012, 0), 12),
},
},
},
want: []whisper.Point{
{
Timestamp: 1009,
Value: 9,
},
{
Timestamp: 1012,
Value: 12,
},
},
},
{
name: "simple series, ordering is fixed",
metricName: "mymetric",
Expand All @@ -137,18 +308,18 @@ func TestExtractWhisperPoints(t *testing.T) {
points: [][]whisper.Point{
{
whisper.NewPoint(time.Unix(2002, 0), 4),
whisper.NewPoint(time.Unix(1000, 0), 87),
whisper.NewPoint(time.Unix(1501, 0), 112),
whisper.NewPoint(time.Unix(2000, 0), 87),
whisper.NewPoint(time.Unix(2001, 0), 112),
},
},
},
want: []whisper.Point{
{
Timestamp: 1000,
Timestamp: 2000,
Value: 87,
},
{
Timestamp: 1501,
Timestamp: 2001,
Value: 112,
},
{
Expand Down