From cce9d19647379fa606c63538d63a3b67026f39e4 Mon Sep 17 00:00:00 2001 From: Jesus Vazquez Date: Mon, 25 Sep 2023 19:41:52 +0200 Subject: [PATCH 01/19] fix: skip points outside of archive retention on first pass --- .../convert/whisperconverter/whisper.go | 34 ++++++++++++++++++- .../convert/whisperconverter/whisper_test.go | 5 +-- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index 65d9916..9077bbe 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -54,6 +54,7 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { return nil, fmt.Errorf("whisper file contains no archives for metric: %q", name) } + // TODO We can probably get rid of this once we finalize changes totalPoints := 0 for _, a := range w.GetArchives() { totalPoints += int(a.Points) @@ -63,15 +64,46 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { allPoints := make([]pointWithPrecision, totalPoints) pIdx := 0 // Dump one precision level at a time and write into the output slice. + // Its important to remember that the archive with index 0 (first archive) + // has the raw data and the highest precision https://graphite.readthedocs.io/en/latest/whisper.html#archives-retention-and-precision + + // TODO (sort archives on seconds per point) then keep maxTs per archive and drop all points seen in the previous archive + for i, a := range w.GetArchives() { archivePoints, err := w.DumpArchive(i) if err != nil { return nil, fmt.Errorf("failed to dump archive %d from whisper metric %s", i, name) } + + var minArchiveTs, maxArchiveTs uint32 + for _, p := range archivePoints { // Double pass to find maxTs, inefficient + // We want to track the max timestamp of the archive because we know + // it virtually represents now() for the archive and in prometheus + // block terms it would give us the max timestamp of the archive. + // Then the min timestamp of the archive would be maxTs - the archive + // retention. + if p.Timestamp > maxArchiveTs { + maxArchiveTs = p.Timestamp + } + } + + if a.Retention() > maxArchiveTs { + minArchiveTs = 0 + } else { + minArchiveTs = maxArchiveTs - a.Retention() + } + + addedPoints := 0 for j, p := range archivePoints { + // If the point is older than minArchiveTs then we want to skip it. + if p.Timestamp < minArchiveTs { + continue + } allPoints[pIdx+j] = pointWithPrecision{p, a.SecondsPerPoint} + addedPoints++ } - pIdx += len(archivePoints) + pIdx += addedPoints + // TODO sort allpoints by timestamp } // Points must be in time order. diff --git a/pkg/graphite/convert/whisperconverter/whisper_test.go b/pkg/graphite/convert/whisperconverter/whisper_test.go index a277c34..4454774 100644 --- a/pkg/graphite/convert/whisperconverter/whisper_test.go +++ b/pkg/graphite/convert/whisperconverter/whisper_test.go @@ -9,14 +9,15 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/mimir-proxies/pkg/graphite/convert" - "github.com/grafana/mimir-proxies/pkg/graphite/writeproxy" "github.com/grafana/mimir/pkg/mimirpb" "github.com/kisielk/whisper-go/whisper" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/require" + + "github.com/grafana/mimir-proxies/pkg/graphite/convert" + "github.com/grafana/mimir-proxies/pkg/graphite/writeproxy" ) func simpleArchiveInfo(points, secondsPerPoint int) whisper.ArchiveInfo { From 41a3dd9e59921db24d1505f2901e0d7a31bdfa4f Mon Sep 17 00:00:00 2001 From: Jesus Vazquez Date: Mon, 25 Sep 2023 22:15:02 +0200 Subject: [PATCH 02/19] Fix tests --- .../convert/whisperconverter/whisper.go | 11 +++----- .../convert/whisperconverter/whisper_test.go | 26 +++++++++---------- 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index 9077bbe..a45eb0d 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -62,13 +62,12 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { // Preallocate space for all allPoints in one slice. allPoints := make([]pointWithPrecision, totalPoints) - pIdx := 0 // Dump one precision level at a time and write into the output slice. // Its important to remember that the archive with index 0 (first archive) // has the raw data and the highest precision https://graphite.readthedocs.io/en/latest/whisper.html#archives-retention-and-precision // TODO (sort archives on seconds per point) then keep maxTs per archive and drop all points seen in the previous archive - + idxPoint := 0 for i, a := range w.GetArchives() { archivePoints, err := w.DumpArchive(i) if err != nil { @@ -93,16 +92,14 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { minArchiveTs = maxArchiveTs - a.Retention() } - addedPoints := 0 - for j, p := range archivePoints { + for _, p := range archivePoints { // If the point is older than minArchiveTs then we want to skip it. if p.Timestamp < minArchiveTs { continue } - allPoints[pIdx+j] = pointWithPrecision{p, a.SecondsPerPoint} - addedPoints++ + allPoints[idxPoint] = pointWithPrecision{p, a.SecondsPerPoint} + idxPoint++ } - pIdx += addedPoints // TODO sort allpoints by timestamp } diff --git a/pkg/graphite/convert/whisperconverter/whisper_test.go b/pkg/graphite/convert/whisperconverter/whisper_test.go index 4454774..ab58194 100644 --- a/pkg/graphite/convert/whisperconverter/whisper_test.go +++ b/pkg/graphite/convert/whisperconverter/whisper_test.go @@ -85,36 +85,36 @@ func TestExtractWhisperPoints(t *testing.T) { { whisper.NewPoint(time.Unix(0, 0), 12), whisper.NewPoint(time.Unix(0, 0), 12), - whisper.NewPoint(time.Unix(1000, 0), 12), - whisper.NewPoint(time.Unix(1001, 0), 42), + whisper.NewPoint(time.Unix(1054, 0), 12), + whisper.NewPoint(time.Unix(1055, 0), 42), whisper.NewPoint(time.Unix(1060, 0), 2), - whisper.NewPoint(time.Unix(1002, 0), 27.5), + whisper.NewPoint(time.Unix(1056, 0), 27.5), }, { whisper.NewPoint(time.Unix(0, 0), 12), - whisper.NewPoint(time.Unix(1004, 0), 1), + whisper.NewPoint(time.Unix(1058, 0), 1), whisper.NewPoint(time.Unix(1060, 0), 102), whisper.NewPoint(time.Unix(1121, 0), 4), whisper.NewPoint(time.Unix(0, 0), 0), - whisper.NewPoint(time.Unix(1001, 0), 5), + whisper.NewPoint(time.Unix(1055, 0), 5), }, }, }, want: []whisper.Point{ { - Timestamp: 1000, + Timestamp: 1054, Value: 12, }, { - Timestamp: 1001, + Timestamp: 1055, Value: 42, }, { - Timestamp: 1002, + Timestamp: 1056, Value: 27.5, }, { - Timestamp: 1004, + Timestamp: 1058, Value: 1, }, { @@ -138,18 +138,18 @@ func TestExtractWhisperPoints(t *testing.T) { points: [][]whisper.Point{ { whisper.NewPoint(time.Unix(2002, 0), 4), - whisper.NewPoint(time.Unix(1000, 0), 87), - whisper.NewPoint(time.Unix(1501, 0), 112), + whisper.NewPoint(time.Unix(2000, 0), 87), + whisper.NewPoint(time.Unix(2001, 0), 112), }, }, }, want: []whisper.Point{ { - Timestamp: 1000, + Timestamp: 2000, Value: 87, }, { - Timestamp: 1501, + Timestamp: 2001, Value: 112, }, { From d1bb9b58a114e2ed066f5fe4aa9c22bffb48459e Mon Sep 17 00:00:00 2001 From: Jesus Vazquez Date: Tue, 26 Sep 2023 11:30:09 +0200 Subject: [PATCH 03/19] Optimize slice allocation --- .../convert/whisperconverter/whisper.go | 72 +++++++++---------- 1 file changed, 33 insertions(+), 39 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index a45eb0d..dcf498b 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -54,20 +54,11 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { return nil, fmt.Errorf("whisper file contains no archives for metric: %q", name) } - // TODO We can probably get rid of this once we finalize changes - totalPoints := 0 - for _, a := range w.GetArchives() { - totalPoints += int(a.Points) - } - - // Preallocate space for all allPoints in one slice. - allPoints := make([]pointWithPrecision, totalPoints) // Dump one precision level at a time and write into the output slice. // Its important to remember that the archive with index 0 (first archive) // has the raw data and the highest precision https://graphite.readthedocs.io/en/latest/whisper.html#archives-retention-and-precision - - // TODO (sort archives on seconds per point) then keep maxTs per archive and drop all points seen in the previous archive - idxPoint := 0 + seenTs := map[uint32]struct{}{} + var allKeptPoints [][]pointWithPrecision for i, a := range w.GetArchives() { archivePoints, err := w.DumpArchive(i) if err != nil { @@ -75,10 +66,9 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { } var minArchiveTs, maxArchiveTs uint32 - for _, p := range archivePoints { // Double pass to find maxTs, inefficient + for _, p := range archivePoints { // We want to track the max timestamp of the archive because we know - // it virtually represents now() for the archive and in prometheus - // block terms it would give us the max timestamp of the archive. + // it virtually represents now() and we wont have newer points. // Then the min timestamp of the archive would be maxTs - the archive // retention. if p.Timestamp > maxArchiveTs { @@ -92,43 +82,47 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { minArchiveTs = maxArchiveTs - a.Retention() } + var keptPoints []pointWithPrecision for _, p := range archivePoints { - // If the point is older than minArchiveTs then we want to skip it. if p.Timestamp < minArchiveTs { continue } - allPoints[idxPoint] = pointWithPrecision{p, a.SecondsPerPoint} - idxPoint++ + // If we have already seen a point with the same timestamp it means + // we already have a point from an archive with higher precision that + // we want to keep. So we skip this point. + if _, ok := seenTs[p.Timestamp]; ok { + continue + } + keptPoints = append(keptPoints, pointWithPrecision{p, a.SecondsPerPoint}) + seenTs[p.Timestamp] = struct{}{} } - // TODO sort allpoints by timestamp - } - // Points must be in time order. - sort.Slice(allPoints, func(i, j int) bool { - return allPoints[i].Timestamp < allPoints[j].Timestamp - }) + // Points are not necessarily in order because the archive is a ring buffer + // so we order the slice + sort.Slice(keptPoints, func(i, j int) bool { + return keptPoints[i].Timestamp < keptPoints[j].Timestamp + }) + + allKeptPoints = append(allKeptPoints, keptPoints) + } trimmedPoints := []whisper.Point{} - for i := 0; i < len(allPoints); i++ { - // Remove all points of time = 0. - if allPoints[i].Timestamp == 0 { - continue - } - // There might be duplicate timestamps in different archives. Take the - // higher-precision archive value since it's unaggregated. - if i > 0 && allPoints[i].Timestamp == allPoints[i-1].Timestamp { - if allPoints[i].secondsPerPoint == allPoints[i-1].secondsPerPoint { - return nil, fmt.Errorf("duplicate timestamp at same precision in archive %s: %d", name, allPoints[i].Timestamp) - } - if allPoints[i].secondsPerPoint < allPoints[i-1].secondsPerPoint { - trimmedPoints[len(trimmedPoints)-1] = allPoints[i].Point + for _, points := range allKeptPoints { + for _, p := range points { + // Remove all points of time = 0. + if p.Timestamp == 0 { + continue } - // If the previous point is higher precision, just continue. - continue + trimmedPoints = append(trimmedPoints, p.Point) } - trimmedPoints = append(trimmedPoints, allPoints[i].Point) } + // We need to finally sort the trimmed points again because different archives + // may overlap and have older points + sort.Slice(trimmedPoints, func(i, j int) bool { + return trimmedPoints[i].Timestamp < trimmedPoints[j].Timestamp + }) + return trimmedPoints, nil } From ceb63488914ba8700ba829d70911bd4d9f73a98e Mon Sep 17 00:00:00 2001 From: Jesus Vazquez Date: Tue, 26 Sep 2023 12:13:37 +0200 Subject: [PATCH 04/19] Only sort once, use single slice --- .../convert/whisperconverter/whisper.go | 32 +++++-------------- 1 file changed, 8 insertions(+), 24 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index dcf498b..61bdaea 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -58,7 +58,7 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { // Its important to remember that the archive with index 0 (first archive) // has the raw data and the highest precision https://graphite.readthedocs.io/en/latest/whisper.html#archives-retention-and-precision seenTs := map[uint32]struct{}{} - var allKeptPoints [][]pointWithPrecision + var keptPoints []whisper.Point for i, a := range w.GetArchives() { archivePoints, err := w.DumpArchive(i) if err != nil { @@ -82,7 +82,6 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { minArchiveTs = maxArchiveTs - a.Retention() } - var keptPoints []pointWithPrecision for _, p := range archivePoints { if p.Timestamp < minArchiveTs { continue @@ -93,37 +92,22 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { if _, ok := seenTs[p.Timestamp]; ok { continue } - keptPoints = append(keptPoints, pointWithPrecision{p, a.SecondsPerPoint}) - seenTs[p.Timestamp] = struct{}{} - } - - // Points are not necessarily in order because the archive is a ring buffer - // so we order the slice - sort.Slice(keptPoints, func(i, j int) bool { - return keptPoints[i].Timestamp < keptPoints[j].Timestamp - }) - - allKeptPoints = append(allKeptPoints, keptPoints) - } - - trimmedPoints := []whisper.Point{} - for _, points := range allKeptPoints { - for _, p := range points { - // Remove all points of time = 0. + // Skip points with time = 0 if p.Timestamp == 0 { continue } - trimmedPoints = append(trimmedPoints, p.Point) + keptPoints = append(keptPoints, whisper.Point{Timestamp: p.Timestamp, Value: p.Value}) + seenTs[p.Timestamp] = struct{}{} } } - // We need to finally sort the trimmed points again because different archives + // We need to finally sort the kept points again because different archives // may overlap and have older points - sort.Slice(trimmedPoints, func(i, j int) bool { - return trimmedPoints[i].Timestamp < trimmedPoints[j].Timestamp + sort.Slice(keptPoints, func(i, j int) bool { + return keptPoints[i].Timestamp < keptPoints[j].Timestamp }) - return trimmedPoints, nil + return keptPoints, nil } // ToMimirSamples converts a Whisper metric with the given name to a slice of From 647f350c49f1438ff3bfd3dbb881290ecd52ab64 Mon Sep 17 00:00:00 2001 From: Jesus Vazquez Date: Tue, 26 Sep 2023 13:29:11 +0200 Subject: [PATCH 05/19] Track previous archive max timestamp and use it to skip points Signed-off-by: Jesus Vazquez --- .../convert/whisperconverter/whisper.go | 9 +- .../convert/whisperconverter/whisper_test.go | 114 ++++++++++++++++++ 2 files changed, 120 insertions(+), 3 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index 61bdaea..928a53e 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -58,6 +58,7 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { // Its important to remember that the archive with index 0 (first archive) // has the raw data and the highest precision https://graphite.readthedocs.io/en/latest/whisper.html#archives-retention-and-precision seenTs := map[uint32]struct{}{} + previousMaxTs := uint32(0) var keptPoints []whisper.Point for i, a := range w.GetArchives() { archivePoints, err := w.DumpArchive(i) @@ -76,10 +77,12 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { } } - if a.Retention() > maxArchiveTs { - minArchiveTs = 0 - } else { + if previousMaxTs == 0 { + // this is archive 0 minArchiveTs = maxArchiveTs - a.Retention() + } else { + // this is archive 1+ + minArchiveTs = previousMaxTs } for _, p := range archivePoints { diff --git a/pkg/graphite/convert/whisperconverter/whisper_test.go b/pkg/graphite/convert/whisperconverter/whisper_test.go index ab58194..ee85b69 100644 --- a/pkg/graphite/convert/whisperconverter/whisper_test.go +++ b/pkg/graphite/convert/whisperconverter/whisper_test.go @@ -128,6 +128,120 @@ func TestExtractWhisperPoints(t *testing.T) { }, }, }, + { + name: "single series, multiple archives and retentions, with duplicates and points beyond retention", + metricName: "mymetric", + archive: &testArchive{ + infos: []whisper.ArchiveInfo{ + // This is what the test will define + // 1000 1010 1020 1030 + // [ ] + // [ ] + // [ ] + // And this is what the test will expect + // 1000 1010 1020 1030 + // [XXXXXXXXX] + // [ XXXXXXXXX] + // [ XXXXXXXX] + simpleArchiveInfo(10, 1), + simpleArchiveInfo(7, 3), + simpleArchiveInfo(6, 6), + }, + points: [][]whisper.Point{ + { + whisper.NewPoint(time.Unix(1000, 0), 0), + whisper.NewPoint(time.Unix(1001, 0), 1), + whisper.NewPoint(time.Unix(1002, 0), 2), + whisper.NewPoint(time.Unix(1003, 0), 3), + whisper.NewPoint(time.Unix(1004, 0), 4), + whisper.NewPoint(time.Unix(1005, 0), 5), + whisper.NewPoint(time.Unix(1006, 0), 6), + whisper.NewPoint(time.Unix(1007, 0), 7), + whisper.NewPoint(time.Unix(1008, 0), 8), + whisper.NewPoint(time.Unix(1009, 0), 9), + }, + { + whisper.NewPoint(time.Unix(1000, 0), 0), // skipped + whisper.NewPoint(time.Unix(1003, 0), 3), // skipped + whisper.NewPoint(time.Unix(1006, 0), 6), // skipped + whisper.NewPoint(time.Unix(1009, 0), 9), // skipped + whisper.NewPoint(time.Unix(1012, 0), 12), + whisper.NewPoint(time.Unix(1015, 0), 15), + whisper.NewPoint(time.Unix(1018, 0), 18), + }, + { + whisper.NewPoint(time.Unix(1000, 0), 0), // skipped + whisper.NewPoint(time.Unix(1006, 0), 6), // skipped + whisper.NewPoint(time.Unix(1012, 0), 12), // skipped + whisper.NewPoint(time.Unix(1018, 0), 18), // skipped + whisper.NewPoint(time.Unix(1024, 0), 24), + whisper.NewPoint(time.Unix(1030, 0), 30), + }, + }, + }, + want: []whisper.Point{ + { + Timestamp: 1000, + Value: 0, + }, + { + Timestamp: 1001, + Value: 1, + }, + { + Timestamp: 1002, + Value: 2, + }, + { + Timestamp: 1003, + Value: 3, + }, + { + Timestamp: 1004, + Value: 4, + }, + { + Timestamp: 1005, + Value: 5, + }, + { + Timestamp: 1006, + Value: 6, + }, + { + Timestamp: 1007, + Value: 7, + }, + { + Timestamp: 1008, + Value: 8, + }, + { + Timestamp: 1009, + Value: 9, + }, + { + Timestamp: 1012, + Value: 12, + }, + { + Timestamp: 1015, + Value: 15, + }, + { + Timestamp: 1018, + Value: 18, + }, + { + Timestamp: 1024, + Value: 24, + }, + { + Timestamp: 1030, + Value: 30, + }, + }, + }, { name: "simple series, ordering is fixed", metricName: "mymetric", From 8170e45466ee0242ed3487c1a4a04d7c6023b424 Mon Sep 17 00:00:00 2001 From: Jesus Vazquez Date: Tue, 26 Sep 2023 18:02:07 +0200 Subject: [PATCH 06/19] Track previous archive min ts I'm trying to achieve something like this MaxT MinT 0: [XXXXXXXXXX] 1d, 1m 1: [ XXXXXXXXXXXXXXXXX] 1w, 10m 2: [ XXXXXXXXXXXXX] 1y, 60m So we process archive 0 first and then we track archive 0 min ts and while processing archive 1 we discard all samples up to that mint and grab the remaining ones. --- .../convert/whisperconverter/whisper.go | 15 ++-- .../convert/whisperconverter/whisper_test.go | 90 +++++++++---------- 2 files changed, 53 insertions(+), 52 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index 928a53e..8e211cf 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -58,7 +58,7 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { // Its important to remember that the archive with index 0 (first archive) // has the raw data and the highest precision https://graphite.readthedocs.io/en/latest/whisper.html#archives-retention-and-precision seenTs := map[uint32]struct{}{} - previousMaxTs := uint32(0) + lastMinTs := uint32(0) var keptPoints []whisper.Point for i, a := range w.GetArchives() { archivePoints, err := w.DumpArchive(i) @@ -77,14 +77,15 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { } } - if previousMaxTs == 0 { - // this is archive 0 - minArchiveTs = maxArchiveTs - a.Retention() - } else { - // this is archive 1+ - minArchiveTs = previousMaxTs + minArchiveTs = maxArchiveTs - a.Retention() + if lastMinTs > 0 { + // if we are already in the second or subsequent archive, we want to skip + // samples in the previous archives + maxArchiveTs = lastMinTs } + lastMinTs = minArchiveTs + for _, p := range archivePoints { if p.Timestamp < minArchiveTs { continue diff --git a/pkg/graphite/convert/whisperconverter/whisper_test.go b/pkg/graphite/convert/whisperconverter/whisper_test.go index ee85b69..381af68 100644 --- a/pkg/graphite/convert/whisperconverter/whisper_test.go +++ b/pkg/graphite/convert/whisperconverter/whisper_test.go @@ -134,7 +134,7 @@ func TestExtractWhisperPoints(t *testing.T) { archive: &testArchive{ infos: []whisper.ArchiveInfo{ // This is what the test will define - // 1000 1010 1020 1030 + // 1030 1020 1010 1000 // [ ] // [ ] // [ ] @@ -149,33 +149,33 @@ func TestExtractWhisperPoints(t *testing.T) { }, points: [][]whisper.Point{ { - whisper.NewPoint(time.Unix(1000, 0), 0), - whisper.NewPoint(time.Unix(1001, 0), 1), - whisper.NewPoint(time.Unix(1002, 0), 2), - whisper.NewPoint(time.Unix(1003, 0), 3), - whisper.NewPoint(time.Unix(1004, 0), 4), - whisper.NewPoint(time.Unix(1005, 0), 5), - whisper.NewPoint(time.Unix(1006, 0), 6), - whisper.NewPoint(time.Unix(1007, 0), 7), - whisper.NewPoint(time.Unix(1008, 0), 8), - whisper.NewPoint(time.Unix(1009, 0), 9), + whisper.NewPoint(time.Unix(1021, 0), 21), + whisper.NewPoint(time.Unix(1022, 0), 22), + whisper.NewPoint(time.Unix(1023, 0), 23), + whisper.NewPoint(time.Unix(1024, 0), 24), + whisper.NewPoint(time.Unix(1025, 0), 25), + whisper.NewPoint(time.Unix(1026, 0), 26), + whisper.NewPoint(time.Unix(1027, 0), 27), + whisper.NewPoint(time.Unix(1028, 0), 28), + whisper.NewPoint(time.Unix(1029, 0), 29), + whisper.NewPoint(time.Unix(1030, 0), 30), }, { - whisper.NewPoint(time.Unix(1000, 0), 0), // skipped - whisper.NewPoint(time.Unix(1003, 0), 3), // skipped - whisper.NewPoint(time.Unix(1006, 0), 6), // skipped - whisper.NewPoint(time.Unix(1009, 0), 9), // skipped whisper.NewPoint(time.Unix(1012, 0), 12), whisper.NewPoint(time.Unix(1015, 0), 15), whisper.NewPoint(time.Unix(1018, 0), 18), + whisper.NewPoint(time.Unix(1021, 0), 21), // skipped + whisper.NewPoint(time.Unix(1024, 0), 24), // skipped + whisper.NewPoint(time.Unix(1027, 0), 27), // skipped + whisper.NewPoint(time.Unix(1030, 0), 30), // skipped }, { - whisper.NewPoint(time.Unix(1000, 0), 0), // skipped - whisper.NewPoint(time.Unix(1006, 0), 6), // skipped + whisper.NewPoint(time.Unix(1000, 0), 0), + whisper.NewPoint(time.Unix(1006, 0), 6), whisper.NewPoint(time.Unix(1012, 0), 12), // skipped whisper.NewPoint(time.Unix(1018, 0), 18), // skipped - whisper.NewPoint(time.Unix(1024, 0), 24), - whisper.NewPoint(time.Unix(1030, 0), 30), + whisper.NewPoint(time.Unix(1024, 0), 24), // skipped + whisper.NewPoint(time.Unix(1030, 0), 30), // skipped }, }, }, @@ -185,56 +185,56 @@ func TestExtractWhisperPoints(t *testing.T) { Value: 0, }, { - Timestamp: 1001, - Value: 1, + Timestamp: 1006, + Value: 6, }, { - Timestamp: 1002, - Value: 2, + Timestamp: 1012, + Value: 12, }, { - Timestamp: 1003, - Value: 3, + Timestamp: 1015, + Value: 15, }, { - Timestamp: 1004, - Value: 4, + Timestamp: 1018, + Value: 18, }, { - Timestamp: 1005, - Value: 5, + Timestamp: 1021, + Value: 21, }, { - Timestamp: 1006, - Value: 6, + Timestamp: 1022, + Value: 22, }, { - Timestamp: 1007, - Value: 7, + Timestamp: 1023, + Value: 23, }, { - Timestamp: 1008, - Value: 8, + Timestamp: 1024, + Value: 24, }, { - Timestamp: 1009, - Value: 9, + Timestamp: 1025, + Value: 25, }, { - Timestamp: 1012, - Value: 12, + Timestamp: 1026, + Value: 26, }, { - Timestamp: 1015, - Value: 15, + Timestamp: 1027, + Value: 27, }, { - Timestamp: 1018, - Value: 18, + Timestamp: 1028, + Value: 28, }, { - Timestamp: 1024, - Value: 24, + Timestamp: 1029, + Value: 29, }, { Timestamp: 1030, From 095071d2801a560dc5d9c4e88173c0c089096a52 Mon Sep 17 00:00:00 2001 From: Jesus Vazquez Date: Tue, 26 Sep 2023 20:02:29 +0200 Subject: [PATCH 07/19] fix daterange test by protecting from overflow Signed-off-by: Jesus Vazquez --- .../convert/whisperconverter/whisper.go | 14 +++- .../convert/whisperconverter/whisper_test.go | 76 ++++++++++++++++--- 2 files changed, 77 insertions(+), 13 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index 8e211cf..6621430 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -67,6 +67,7 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { } var minArchiveTs, maxArchiveTs uint32 + // calculate maxArchiveTs for _, p := range archivePoints { // We want to track the max timestamp of the archive because we know // it virtually represents now() and we wont have newer points. @@ -77,13 +78,20 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { } } - minArchiveTs = maxArchiveTs - a.Retention() + // calculate minArchiveTs + if maxArchiveTs-a.Retention() > maxArchiveTs { // overflow, very big retention + minArchiveTs = 0 + } else { + minArchiveTs = maxArchiveTs - a.Retention() + } + + // For subsequent archives if lastMinTs > 0 { - // if we are already in the second or subsequent archive, we want to skip + // if we are already in the second or subsequent archive and we had + // some points in the prior archives, we want to skip // samples in the previous archives maxArchiveTs = lastMinTs } - lastMinTs = minArchiveTs for _, p := range archivePoints { diff --git a/pkg/graphite/convert/whisperconverter/whisper_test.go b/pkg/graphite/convert/whisperconverter/whisper_test.go index 381af68..73a860b 100644 --- a/pkg/graphite/convert/whisperconverter/whisper_test.go +++ b/pkg/graphite/convert/whisperconverter/whisper_test.go @@ -83,19 +83,19 @@ func TestExtractWhisperPoints(t *testing.T) { }, points: [][]whisper.Point{ { - whisper.NewPoint(time.Unix(0, 0), 12), - whisper.NewPoint(time.Unix(0, 0), 12), + whisper.NewPoint(time.Unix(0, 0), 12), // skipped + whisper.NewPoint(time.Unix(900, 0), 12), // skipped due to being out of retention whisper.NewPoint(time.Unix(1054, 0), 12), whisper.NewPoint(time.Unix(1055, 0), 42), whisper.NewPoint(time.Unix(1060, 0), 2), whisper.NewPoint(time.Unix(1056, 0), 27.5), }, { - whisper.NewPoint(time.Unix(0, 0), 12), + whisper.NewPoint(time.Unix(0, 0), 12), // skipped whisper.NewPoint(time.Unix(1058, 0), 1), - whisper.NewPoint(time.Unix(1060, 0), 102), + whisper.NewPoint(time.Unix(1060, 0), 102), // duplicate, the one in the archive above should be kept and this one skipped whisper.NewPoint(time.Unix(1121, 0), 4), - whisper.NewPoint(time.Unix(0, 0), 0), + whisper.NewPoint(time.Unix(650, 0), 50), // skipped due to being out of retention whisper.NewPoint(time.Unix(1055, 0), 5), }, }, @@ -134,21 +134,24 @@ func TestExtractWhisperPoints(t *testing.T) { archive: &testArchive{ infos: []whisper.ArchiveInfo{ // This is what the test will define - // 1030 1020 1010 1000 + // Maxts Mints + // 1030 1020 1009 994 // [ ] // [ ] // [ ] // And this is what the test will expect - // 1000 1010 1020 1030 + // 1030 1020 1009 994 // [XXXXXXXXX] // [ XXXXXXXXX] // [ XXXXXXXX] - simpleArchiveInfo(10, 1), - simpleArchiveInfo(7, 3), - simpleArchiveInfo(6, 6), + + simpleArchiveInfo(11, 1), + simpleArchiveInfo(8, 3), + simpleArchiveInfo(7, 6), }, points: [][]whisper.Point{ { + whisper.NewPoint(time.Unix(1020, 0), 20), whisper.NewPoint(time.Unix(1021, 0), 21), whisper.NewPoint(time.Unix(1022, 0), 22), whisper.NewPoint(time.Unix(1023, 0), 23), @@ -161,6 +164,7 @@ func TestExtractWhisperPoints(t *testing.T) { whisper.NewPoint(time.Unix(1030, 0), 30), }, { + whisper.NewPoint(time.Unix(1009, 0), 9), // This is the lower border of this archive, this point should be kept whisper.NewPoint(time.Unix(1012, 0), 12), whisper.NewPoint(time.Unix(1015, 0), 15), whisper.NewPoint(time.Unix(1018, 0), 18), @@ -172,6 +176,7 @@ func TestExtractWhisperPoints(t *testing.T) { { whisper.NewPoint(time.Unix(1000, 0), 0), whisper.NewPoint(time.Unix(1006, 0), 6), + whisper.NewPoint(time.Unix(1009, 0), 99), // skipped because archive 1 has a point at this time whisper.NewPoint(time.Unix(1012, 0), 12), // skipped whisper.NewPoint(time.Unix(1018, 0), 18), // skipped whisper.NewPoint(time.Unix(1024, 0), 24), // skipped @@ -188,6 +193,10 @@ func TestExtractWhisperPoints(t *testing.T) { Timestamp: 1006, Value: 6, }, + { + Timestamp: 1009, + Value: 9, + }, { Timestamp: 1012, Value: 12, @@ -200,6 +209,10 @@ func TestExtractWhisperPoints(t *testing.T) { Timestamp: 1018, Value: 18, }, + { + Timestamp: 1020, + Value: 20, + }, { Timestamp: 1021, Value: 21, @@ -242,6 +255,49 @@ func TestExtractWhisperPoints(t *testing.T) { }, }, }, + { + name: "test retention when first archives are empty", + metricName: "mymetric", + archive: &testArchive{ + infos: []whisper.ArchiveInfo{ + // This is what the test will define + // Maxts Mints + // 1030 1020 1009 994 + // [] + // [ ] + // [ ] + // And this is what the test will expect + // 1030 1020 1009 994 + // [XXXXXXXXX] + // [ XXXXXXXXX] + // [ XXXXXXXX] + + simpleArchiveInfo(11, 1), + simpleArchiveInfo(8, 3), + simpleArchiveInfo(7, 6), + }, + points: [][]whisper.Point{ + {}, + { + whisper.NewPoint(time.Unix(1009, 0), 9), // This is the lower border of this archive, this point should be kept + }, + { + whisper.NewPoint(time.Unix(1009, 0), 99), // skipped because archive 1 has a point at this time + whisper.NewPoint(time.Unix(1012, 0), 12), + }, + }, + }, + want: []whisper.Point{ + { + Timestamp: 1009, + Value: 9, + }, + { + Timestamp: 1012, + Value: 12, + }, + }, + }, { name: "simple series, ordering is fixed", metricName: "mymetric", From 87e31434bfb2c86a85074289b330fb3b4557cf01 Mon Sep 17 00:00:00 2001 From: Owen Williams Date: Wed, 27 Sep 2023 10:20:22 -0400 Subject: [PATCH 08/19] multi-array method --- .../convert/whisperconverter/whisper.go | 92 ++++++++++++------- 1 file changed, 57 insertions(+), 35 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index 6621430..7fd6c23 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -50,66 +50,88 @@ type pointWithPrecision struct { // ReadPoints reads and concatenates all of the points in a whisper Archive. func ReadPoints(w Archive, name string) ([]whisper.Point, error) { - if len(w.GetArchives()) == 0 { + archives := w.GetArchives() + if len(archives) == 0 { return nil, fmt.Errorf("whisper file contains no archives for metric: %q", name) } + // Ensure archives are sorted by precision just in case. + sort.Slice(archives, func(i, j int) bool { + return archives[i].SecondsPerPoint < archives[j].SecondsPerPoint + }) + // Dump one precision level at a time and write into the output slice. // Its important to remember that the archive with index 0 (first archive) // has the raw data and the highest precision https://graphite.readthedocs.io/en/latest/whisper.html#archives-retention-and-precision - seenTs := map[uint32]struct{}{} - lastMinTs := uint32(0) + archivePoints := make([][]whisper.Point, len(archives)) var keptPoints []whisper.Point - for i, a := range w.GetArchives() { - archivePoints, err := w.DumpArchive(i) + // We want to track the max timestamp of the archives because we know + // it virtually represents now() and we wont have newer points. + // Then the min timestamp of the archive would be maxTs - each archive + // retention. + var maxTs uint32 + for i, a := range archives { + points, err := w.DumpArchive(i) if err != nil { return nil, fmt.Errorf("failed to dump archive %d from whisper metric %s", i, name) } - var minArchiveTs, maxArchiveTs uint32 - // calculate maxArchiveTs - for _, p := range archivePoints { - // We want to track the max timestamp of the archive because we know - // it virtually represents now() and we wont have newer points. - // Then the min timestamp of the archive would be maxTs - the archive - // retention. - if p.Timestamp > maxArchiveTs { - maxArchiveTs = p.Timestamp + // All archives share the same maxArchiveTs, so only calculate it once. + if i == 0 { + for _, p := range points { + if p.Timestamp > maxTs { + maxTs = p.Timestamp + } } } - // calculate minArchiveTs - if maxArchiveTs-a.Retention() > maxArchiveTs { // overflow, very big retention + var minArchiveTs uint32 + if maxTs < a.Retention() { // very big retention minArchiveTs = 0 } else { - minArchiveTs = maxArchiveTs - a.Retention() + minArchiveTs = maxTs - a.Retention() } - // For subsequent archives - if lastMinTs > 0 { - // if we are already in the second or subsequent archive and we had - // some points in the prior archives, we want to skip - // samples in the previous archives - maxArchiveTs = lastMinTs - } - lastMinTs = minArchiveTs + // Sort this archive. + sort.Slice(points, func(i, j int) bool { + return points[i].Timestamp < points[j].Timestamp + }) + archivePoints[i] = points - for _, p := range archivePoints { - if p.Timestamp < minArchiveTs { + // Store a number of indexes so we can look for duplicate points efficiently. + archiveIdx := make([]int, i) + +POINTLOOP: + for _, p := range archivePoints[i] { + // Skip points with time = 0 + if p.Timestamp == 0 { continue } - // If we have already seen a point with the same timestamp it means - // we already have a point from an archive with higher precision that - // we want to keep. So we skip this point. - if _, ok := seenTs[p.Timestamp]; ok { + if p.Timestamp < minArchiveTs { continue } - // Skip points with time = 0 - if p.Timestamp == 0 { - continue + + // For each of the previous archives, check to see if a point already + // exists at this timestamp. If it does, we don't add this point and + // keep the higher resolution point. + for x := range archiveIdx { + for { + if archiveIdx[x] >= len(archivePoints[x]) { + break + } + // We found a match, so skip this point + if archivePoints[x][archiveIdx[x]].Timestamp == p.Timestamp { + continue POINTLOOP + } + // The previous archive does not have this point, so stop. + if archivePoints[x][archiveIdx[x]].Timestamp > p.Timestamp { + break + } + archiveIdx[x]++ + } } + keptPoints = append(keptPoints, whisper.Point{Timestamp: p.Timestamp, Value: p.Value}) - seenTs[p.Timestamp] = struct{}{} } } From 383b6ea2bcf3df5a0013479a97b16877e4fa21ea Mon Sep 17 00:00:00 2001 From: Owen Williams Date: Wed, 27 Sep 2023 10:28:41 -0400 Subject: [PATCH 09/19] more space efficient --- .../convert/whisperconverter/whisper.go | 55 ++++++++----------- 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index 7fd6c23..06af1bd 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -63,13 +63,14 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { // Dump one precision level at a time and write into the output slice. // Its important to remember that the archive with index 0 (first archive) // has the raw data and the highest precision https://graphite.readthedocs.io/en/latest/whisper.html#archives-retention-and-precision - archivePoints := make([][]whisper.Point, len(archives)) var keptPoints []whisper.Point + // We want to track the max timestamp of the archives because we know // it virtually represents now() and we wont have newer points. // Then the min timestamp of the archive would be maxTs - each archive // retention. var maxTs uint32 + for i, a := range archives { points, err := w.DumpArchive(i) if err != nil { @@ -86,7 +87,7 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { } var minArchiveTs uint32 - if maxTs < a.Retention() { // very big retention + if maxTs < a.Retention() { // very big retention, invalid. minArchiveTs = 0 } else { minArchiveTs = maxTs - a.Retention() @@ -96,14 +97,14 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { sort.Slice(points, func(i, j int) bool { return points[i].Timestamp < points[j].Timestamp }) - archivePoints[i] = points - // Store a number of indexes so we can look for duplicate points efficiently. - archiveIdx := make([]int, i) - -POINTLOOP: - for _, p := range archivePoints[i] { - // Skip points with time = 0 + // We are going to append to keptPoints, so we store the original length so + // when we check for seen points, we don't include the ones we've been + // adding. + keptPointIdx := 0 + keptPointLen := len(keptPoints) + POINTLOOP: + for _, p := range points { if p.Timestamp == 0 { continue } @@ -111,36 +112,26 @@ POINTLOOP: continue } - // For each of the previous archives, check to see if a point already - // exists at this timestamp. If it does, we don't add this point and - // keep the higher resolution point. - for x := range archiveIdx { - for { - if archiveIdx[x] >= len(archivePoints[x]) { - break - } - // We found a match, so skip this point - if archivePoints[x][archiveIdx[x]].Timestamp == p.Timestamp { - continue POINTLOOP - } - // The previous archive does not have this point, so stop. - if archivePoints[x][archiveIdx[x]].Timestamp > p.Timestamp { - break - } - archiveIdx[x]++ + // Check to see if a point is already kept at this timestamp. If it is, + // we don't add this point and keep the higher resolution point. + for ; keptPointIdx < keptPointLen; keptPointIdx++ { + if keptPoints[keptPointIdx].Timestamp == p.Timestamp { + continue POINTLOOP + } + if keptPoints[keptPointIdx].Timestamp > p.Timestamp { + break } } keptPoints = append(keptPoints, whisper.Point{Timestamp: p.Timestamp, Value: p.Value}) } + // We need to sort the kept points because different archives may overlap + // and have older points + sort.Slice(keptPoints, func(i, j int) bool { + return keptPoints[i].Timestamp < keptPoints[j].Timestamp + }) } - // We need to finally sort the kept points again because different archives - // may overlap and have older points - sort.Slice(keptPoints, func(i, j int) bool { - return keptPoints[i].Timestamp < keptPoints[j].Timestamp - }) - return keptPoints, nil } From 249c2fb987d7150285c71ef5a080780ecd3c90f2 Mon Sep 17 00:00:00 2001 From: Owen Williams Date: Wed, 27 Sep 2023 12:25:59 -0400 Subject: [PATCH 10/19] Don't include *any* points covered by another retention level --- pkg/graphite/convert/whisperconverter/whisper.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index 06af1bd..54cce9a 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -3,6 +3,7 @@ package whisperconverter import ( "fmt" "io" + "math" "os" "sort" "time" @@ -70,6 +71,7 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { // Then the min timestamp of the archive would be maxTs - each archive // retention. var maxTs uint32 + lastMinTs := uint32(math.MaxUint32) for i, a := range archives { points, err := w.DumpArchive(i) @@ -105,12 +107,25 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { keptPointLen := len(keptPoints) POINTLOOP: for _, p := range points { + // turn the unix timestamp into a string + timeStr := time.Unix(int64(p.Timestamp), 0).Format("2006-01-02 15:04:05") + + if p.Value > 400 { + fmt.Println("archive", i, timeStr, p.Value) + } if p.Timestamp == 0 { continue } + // Don't include any points in this archive that are past the retention + // period. if p.Timestamp < minArchiveTs { continue } + // Don't include any points in this archive that were covered in a higher + // resolution archive. + if p.Timestamp >= lastMinTs { + continue + } // Check to see if a point is already kept at this timestamp. If it is, // we don't add this point and keep the higher resolution point. @@ -130,6 +145,7 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { sort.Slice(keptPoints, func(i, j int) bool { return keptPoints[i].Timestamp < keptPoints[j].Timestamp }) + lastMinTs = minArchiveTs } return keptPoints, nil From 2c109201550c0409fbf523eaf17fc5669d29c343 Mon Sep 17 00:00:00 2001 From: Owen Williams Date: Wed, 27 Sep 2023 12:45:41 -0400 Subject: [PATCH 11/19] fix tests and edge cases --- .../convert/whisperconverter/whisper.go | 12 ++--- .../convert/whisperconverter/whisper_test.go | 48 +++++++++---------- 2 files changed, 27 insertions(+), 33 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index 54cce9a..e37aeb4 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -79,8 +79,12 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { return nil, fmt.Errorf("failed to dump archive %d from whisper metric %s", i, name) } + if len(points) == 0 { + continue + } + // All archives share the same maxArchiveTs, so only calculate it once. - if i == 0 { + if maxTs == 0 { for _, p := range points { if p.Timestamp > maxTs { maxTs = p.Timestamp @@ -107,12 +111,6 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { keptPointLen := len(keptPoints) POINTLOOP: for _, p := range points { - // turn the unix timestamp into a string - timeStr := time.Unix(int64(p.Timestamp), 0).Format("2006-01-02 15:04:05") - - if p.Value > 400 { - fmt.Println("archive", i, timeStr, p.Value) - } if p.Timestamp == 0 { continue } diff --git a/pkg/graphite/convert/whisperconverter/whisper_test.go b/pkg/graphite/convert/whisperconverter/whisper_test.go index 73a860b..57a79eb 100644 --- a/pkg/graphite/convert/whisperconverter/whisper_test.go +++ b/pkg/graphite/convert/whisperconverter/whisper_test.go @@ -78,8 +78,8 @@ func TestExtractWhisperPoints(t *testing.T) { metricName: "mymetric", archive: &testArchive{ infos: []whisper.ArchiveInfo{ - simpleArchiveInfo(6, 1), - simpleArchiveInfo(6, 60), + simpleArchiveInfo(120, 1), + simpleArchiveInfo(4, 60), }, points: [][]whisper.Point{ { @@ -91,16 +91,21 @@ func TestExtractWhisperPoints(t *testing.T) { whisper.NewPoint(time.Unix(1056, 0), 27.5), }, { - whisper.NewPoint(time.Unix(0, 0), 12), // skipped - whisper.NewPoint(time.Unix(1058, 0), 1), + whisper.NewPoint(time.Unix(0, 0), 12), // skipped + whisper.NewPoint(time.Unix(1058, 0), 1), // skipped, covered by other archive whisper.NewPoint(time.Unix(1060, 0), 102), // duplicate, the one in the archive above should be kept and this one skipped - whisper.NewPoint(time.Unix(1121, 0), 4), - whisper.NewPoint(time.Unix(650, 0), 50), // skipped due to being out of retention + whisper.NewPoint(time.Unix(650, 0), 50), // skipped due to being out of retention whisper.NewPoint(time.Unix(1055, 0), 5), + whisper.NewPoint(time.Unix(901, 0), 4), }, }, }, want: []whisper.Point{ + // We do not to any rounding / conversion of time values. + { + Timestamp: 901, + Value: 4, + }, { Timestamp: 1054, Value: 12, @@ -113,19 +118,10 @@ func TestExtractWhisperPoints(t *testing.T) { Timestamp: 1056, Value: 27.5, }, - { - Timestamp: 1058, - Value: 1, - }, { Timestamp: 1060, Value: 2, }, - // We do not to any rounding / conversion of time values. - { - Timestamp: 1121, - Value: 4, - }, }, }, { @@ -145,9 +141,9 @@ func TestExtractWhisperPoints(t *testing.T) { // [ XXXXXXXXX] // [ XXXXXXXX] - simpleArchiveInfo(11, 1), - simpleArchiveInfo(8, 3), - simpleArchiveInfo(7, 6), + simpleArchiveInfo(10, 1), + simpleArchiveInfo(7, 3), + simpleArchiveInfo(6, 6), }, points: [][]whisper.Point{ { @@ -272,9 +268,9 @@ func TestExtractWhisperPoints(t *testing.T) { // [ XXXXXXXXX] // [ XXXXXXXX] - simpleArchiveInfo(11, 1), - simpleArchiveInfo(8, 3), - simpleArchiveInfo(7, 6), + simpleArchiveInfo(10, 1), + simpleArchiveInfo(7, 3), + simpleArchiveInfo(6, 6), }, points: [][]whisper.Point{ {}, @@ -283,18 +279,18 @@ func TestExtractWhisperPoints(t *testing.T) { }, { whisper.NewPoint(time.Unix(1009, 0), 99), // skipped because archive 1 has a point at this time - whisper.NewPoint(time.Unix(1012, 0), 12), + whisper.NewPoint(time.Unix(975, 0), 12), }, }, }, want: []whisper.Point{ { - Timestamp: 1009, - Value: 9, + Timestamp: 975, + Value: 12, }, { - Timestamp: 1012, - Value: 12, + Timestamp: 1009, + Value: 9, }, }, }, From 0eaaa93491fa99407ef536d4ff762aec303d8b7c Mon Sep 17 00:00:00 2001 From: Owen Williams Date: Wed, 27 Sep 2023 13:09:36 -0400 Subject: [PATCH 12/19] even more simplification --- .../convert/whisperconverter/whisper.go | 36 ++++++------------- .../convert/whisperconverter/whisper_test.go | 2 +- 2 files changed, 12 insertions(+), 26 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index e37aeb4..b04f82d 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -104,13 +104,9 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { return points[i].Timestamp < points[j].Timestamp }) - // We are going to append to keptPoints, so we store the original length so - // when we check for seen points, we don't include the ones we've been - // adding. - keptPointIdx := 0 - keptPointLen := len(keptPoints) - POINTLOOP: - for _, p := range points { + startIdx := -1 + endIdx := len(points) - 1 + for j, p := range points { if p.Timestamp == 0 { continue } @@ -122,27 +118,17 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { // Don't include any points in this archive that were covered in a higher // resolution archive. if p.Timestamp >= lastMinTs { - continue + break } - - // Check to see if a point is already kept at this timestamp. If it is, - // we don't add this point and keep the higher resolution point. - for ; keptPointIdx < keptPointLen; keptPointIdx++ { - if keptPoints[keptPointIdx].Timestamp == p.Timestamp { - continue POINTLOOP - } - if keptPoints[keptPointIdx].Timestamp > p.Timestamp { - break - } + endIdx = j + if startIdx == -1 { + startIdx = j } - - keptPoints = append(keptPoints, whisper.Point{Timestamp: p.Timestamp, Value: p.Value}) } - // We need to sort the kept points because different archives may overlap - // and have older points - sort.Slice(keptPoints, func(i, j int) bool { - return keptPoints[i].Timestamp < keptPoints[j].Timestamp - }) + // if startIdx is -1, we did not find any valid points. + if startIdx != -1 { + keptPoints = append(points[startIdx:endIdx+1], keptPoints...) + } lastMinTs = minArchiveTs } diff --git a/pkg/graphite/convert/whisperconverter/whisper_test.go b/pkg/graphite/convert/whisperconverter/whisper_test.go index 57a79eb..ca79686 100644 --- a/pkg/graphite/convert/whisperconverter/whisper_test.go +++ b/pkg/graphite/convert/whisperconverter/whisper_test.go @@ -95,7 +95,7 @@ func TestExtractWhisperPoints(t *testing.T) { whisper.NewPoint(time.Unix(1058, 0), 1), // skipped, covered by other archive whisper.NewPoint(time.Unix(1060, 0), 102), // duplicate, the one in the archive above should be kept and this one skipped whisper.NewPoint(time.Unix(650, 0), 50), // skipped due to being out of retention - whisper.NewPoint(time.Unix(1055, 0), 5), + whisper.NewPoint(time.Unix(1055, 0), 5), // skipped, covered by other archive whisper.NewPoint(time.Unix(901, 0), 4), }, }, From 800cc20567f2ab2e8182e1dfcb6cafebac19b432 Mon Sep 17 00:00:00 2001 From: Owen Williams Date: Wed, 27 Sep 2023 13:56:44 -0400 Subject: [PATCH 13/19] unneeded type --- pkg/graphite/convert/whisperconverter/whisper.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index b04f82d..ca64ec6 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -41,14 +41,6 @@ type Archive interface { DumpArchive(int) ([]whisper.Point, error) } -// pointWithPrecision is a whisper Point with the precision of the archive it -// came from. This is used to differentiate when we have duplicate timestamps at -// different precisions. -type pointWithPrecision struct { - whisper.Point - secondsPerPoint uint32 -} - // ReadPoints reads and concatenates all of the points in a whisper Archive. func ReadPoints(w Archive, name string) ([]whisper.Point, error) { archives := w.GetArchives() From 9577a279938d431538639c800f5dc04067b97a18 Mon Sep 17 00:00:00 2001 From: Owen Williams Date: Wed, 27 Sep 2023 14:17:11 -0400 Subject: [PATCH 14/19] remove archive sorting, because that was just sorting the archiveinfos, not the archives themselves --- pkg/graphite/convert/whisperconverter/whisper.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index ca64ec6..ac55656 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -48,11 +48,6 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { return nil, fmt.Errorf("whisper file contains no archives for metric: %q", name) } - // Ensure archives are sorted by precision just in case. - sort.Slice(archives, func(i, j int) bool { - return archives[i].SecondsPerPoint < archives[j].SecondsPerPoint - }) - // Dump one precision level at a time and write into the output slice. // Its important to remember that the archive with index 0 (first archive) // has the raw data and the highest precision https://graphite.readthedocs.io/en/latest/whisper.html#archives-retention-and-precision From d2f4800d95e93fd68bd767999ff08cf85c383b87 Mon Sep 17 00:00:00 2001 From: Owen Williams Date: Thu, 28 Sep 2023 09:20:09 -0400 Subject: [PATCH 15/19] Fix bounds checking --- pkg/graphite/convert/whisperconverter/whisper.go | 4 ++-- .../convert/whisperconverter/whisper_test.go | 12 ++++-------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index ac55656..202f656 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -99,12 +99,12 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { } // Don't include any points in this archive that are past the retention // period. - if p.Timestamp < minArchiveTs { + if p.Timestamp <= minArchiveTs { continue } // Don't include any points in this archive that were covered in a higher // resolution archive. - if p.Timestamp >= lastMinTs { + if p.Timestamp > lastMinTs { break } endIdx = j diff --git a/pkg/graphite/convert/whisperconverter/whisper_test.go b/pkg/graphite/convert/whisperconverter/whisper_test.go index ca79686..96f7cb6 100644 --- a/pkg/graphite/convert/whisperconverter/whisper_test.go +++ b/pkg/graphite/convert/whisperconverter/whisper_test.go @@ -147,7 +147,7 @@ func TestExtractWhisperPoints(t *testing.T) { }, points: [][]whisper.Point{ { - whisper.NewPoint(time.Unix(1020, 0), 20), + whisper.NewPoint(time.Unix(1020, 0), 20), // Skipped, this is past the lower bound of this archive. whisper.NewPoint(time.Unix(1021, 0), 21), whisper.NewPoint(time.Unix(1022, 0), 22), whisper.NewPoint(time.Unix(1023, 0), 23), @@ -160,7 +160,7 @@ func TestExtractWhisperPoints(t *testing.T) { whisper.NewPoint(time.Unix(1030, 0), 30), }, { - whisper.NewPoint(time.Unix(1009, 0), 9), // This is the lower border of this archive, this point should be kept + whisper.NewPoint(time.Unix(1009, 0), 9), // Skipped, this is past the lower bound of this archive. whisper.NewPoint(time.Unix(1012, 0), 12), whisper.NewPoint(time.Unix(1015, 0), 15), whisper.NewPoint(time.Unix(1018, 0), 18), @@ -172,7 +172,7 @@ func TestExtractWhisperPoints(t *testing.T) { { whisper.NewPoint(time.Unix(1000, 0), 0), whisper.NewPoint(time.Unix(1006, 0), 6), - whisper.NewPoint(time.Unix(1009, 0), 99), // skipped because archive 1 has a point at this time + whisper.NewPoint(time.Unix(1009, 0), 99), // This is the upper bound of this archive whisper.NewPoint(time.Unix(1012, 0), 12), // skipped whisper.NewPoint(time.Unix(1018, 0), 18), // skipped whisper.NewPoint(time.Unix(1024, 0), 24), // skipped @@ -191,7 +191,7 @@ func TestExtractWhisperPoints(t *testing.T) { }, { Timestamp: 1009, - Value: 9, + Value: 99, }, { Timestamp: 1012, @@ -205,10 +205,6 @@ func TestExtractWhisperPoints(t *testing.T) { Timestamp: 1018, Value: 18, }, - { - Timestamp: 1020, - Value: 20, - }, { Timestamp: 1021, Value: 21, From f1d2e64e66ea407cee7369029f752b23fc5603f4 Mon Sep 17 00:00:00 2001 From: Owen Williams Date: Thu, 28 Sep 2023 10:38:54 -0400 Subject: [PATCH 16/19] Even more efficient -- use a more idiomatic append --- .../convert/whisperconverter/whisper.go | 56 +++++++++++-------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index 202f656..2d524f0 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -53,25 +53,21 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { // has the raw data and the highest precision https://graphite.readthedocs.io/en/latest/whisper.html#archives-retention-and-precision var keptPoints []whisper.Point - // We want to track the max timestamp of the archives because we know - // it virtually represents now() and we wont have newer points. - // Then the min timestamp of the archive would be maxTs - each archive - // retention. + // We want to track the max timestamp of the archives because we know it + // virtually represents now() and we wont have newer points. Then the min + // timestamp of the archive would be maxTs - each archive retention. var maxTs uint32 - lastMinTs := uint32(math.MaxUint32) + // Also determine the lower bound of each archive, which is the upper bound of + // the next archive. + minArchiveTs := make([]uint32, len(archives)) for i, a := range archives { - points, err := w.DumpArchive(i) - if err != nil { - return nil, fmt.Errorf("failed to dump archive %d from whisper metric %s", i, name) - } - - if len(points) == 0 { - continue - } - // All archives share the same maxArchiveTs, so only calculate it once. if maxTs == 0 { + points, err := w.DumpArchive(i) + if err != nil { + return nil, fmt.Errorf("failed to dump archive %d from whisper metric %s", i, name) + } for _, p := range points { if p.Timestamp > maxTs { maxTs = p.Timestamp @@ -79,11 +75,24 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { } } - var minArchiveTs uint32 if maxTs < a.Retention() { // very big retention, invalid. - minArchiveTs = 0 + minArchiveTs[i] = uint32(math.MaxUint32) } else { - minArchiveTs = maxTs - a.Retention() + minArchiveTs[i] = maxTs - a.Retention() + } + } + + // Iterate over archives backwards so we process oldest points first. Sort the + // points, then determine the slice that is between the bounds for this + // archive, and append those to the output array. + for i := len(archives) - 1; i >= 0; i-- { + points, err := w.DumpArchive(i) + if err != nil { + return nil, fmt.Errorf("failed to dump archive %d from whisper metric %s", i, name) + } + + if len(points) == 0 { + continue } // Sort this archive. @@ -97,14 +106,14 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { if p.Timestamp == 0 { continue } - // Don't include any points in this archive that are past the retention - // period. - if p.Timestamp <= minArchiveTs { + // Don't include any points in this archive that are older than the + // retention period. + if p.Timestamp <= minArchiveTs[i] { continue } - // Don't include any points in this archive that were covered in a higher + // Don't include any points in this archive that are covered in a higher // resolution archive. - if p.Timestamp > lastMinTs { + if i > 0 && p.Timestamp > minArchiveTs[i-1] { break } endIdx = j @@ -114,9 +123,8 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { } // if startIdx is -1, we did not find any valid points. if startIdx != -1 { - keptPoints = append(points[startIdx:endIdx+1], keptPoints...) + keptPoints = append(keptPoints, points[startIdx:endIdx+1]...) } - lastMinTs = minArchiveTs } return keptPoints, nil From 2e17bb65c6ac29a1c198cccce2a22514e16d47f1 Mon Sep 17 00:00:00 2001 From: Owen Williams Date: Thu, 28 Sep 2023 12:14:39 -0400 Subject: [PATCH 17/19] ugh we have to account for a new edge case --- .../convert/whisperconverter/daterange.go | 13 ++- .../whisperconverter/daterange_test.go | 16 +++- .../convert/whisperconverter/test_utils.go | 12 +++ .../convert/whisperconverter/whisper.go | 57 ++++++++---- .../convert/whisperconverter/whisper_test.go | 90 +++++++++++++++---- 5 files changed, 152 insertions(+), 36 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/daterange.go b/pkg/graphite/convert/whisperconverter/daterange.go index af8bc68..6a34c4c 100644 --- a/pkg/graphite/convert/whisperconverter/daterange.go +++ b/pkg/graphite/convert/whisperconverter/daterange.go @@ -3,6 +3,7 @@ package whisperconverter import ( "fmt" "math" + "strings" "sync" "time" @@ -89,6 +90,16 @@ READLOOP: } } - fmt.Printf("--start-date %s --end-date %s\n", time.UnixMilli(minTS).Format("2006-01-02"), time.UnixMilli(maxTS).Format("2006-01-02")) + terms := []string{} + + if minTS != int64(math.MaxInt64) { + terms = append(terms, fmt.Sprintf("--start-date %s", time.UnixMilli(minTS).Format("2006-01-02"))) + } + if maxTS != int64(math.MinInt64) { + terms = append(terms, fmt.Sprintf("--end-date %s", time.UnixMilli(maxTS).Format("2006-01-02"))) + } + terms = append(terms, "\n") + fmt.Printf(strings.Join(terms, " ")) + wg.Done() } diff --git a/pkg/graphite/convert/whisperconverter/daterange_test.go b/pkg/graphite/convert/whisperconverter/daterange_test.go index bf06f04..c2e1e35 100644 --- a/pkg/graphite/convert/whisperconverter/daterange_test.go +++ b/pkg/graphite/convert/whisperconverter/daterange_test.go @@ -8,7 +8,9 @@ import ( "os" "regexp" "testing" + "time" + "github.com/go-graphite/go-whisper" "github.com/go-kit/log" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" @@ -16,9 +18,17 @@ import ( // This test contains a data due to how we take over STDOUT but it should be harmless. func TestCommandDateRange(t *testing.T) { - tmpDir, err := os.MkdirTemp("/tmp", "testCommandDateRange*") - require.NoError(t, err) - defer os.RemoveAll(tmpDir) + // tmpDir, err := os.MkdirTemp("/tmp", "testCommandDateRange*") + // require.NoError(t, err) + // defer os.RemoveAll(tmpDir) + whisper.Now = func() time.Time { + t, err := time.Parse("2006-01-02", "2022-06-01") + if err != nil { + panic(err) + } + return t + } + tmpDir := "/tmp" fooTimes, err := ToTimes([]string{ "2022-05-01", diff --git a/pkg/graphite/convert/whisperconverter/test_utils.go b/pkg/graphite/convert/whisperconverter/test_utils.go index c64a976..b1579f9 100644 --- a/pkg/graphite/convert/whisperconverter/test_utils.go +++ b/pkg/graphite/convert/whisperconverter/test_utils.go @@ -1,6 +1,7 @@ package whisperconverter import ( + "fmt" "os" "time" @@ -22,14 +23,25 @@ func CreateWhisperFile(path string, timestamps []*time.Time) error { if err != nil { return err } + // defer wsp.Close() for _, t := range timestamps { + fmt.Println("yes?") err = wsp.Update(1.0, int(t.Unix())) if err != nil { + fmt.Println(err) return err } } + wsp.Close() + + // wsp, err = whisper.Open(path) + // if err != nil { + // return err + // } + // wsp.Dump(true, false) + return nil } diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index 2d524f0..dcea372 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -3,7 +3,6 @@ package whisperconverter import ( "fmt" "io" - "math" "os" "sort" "time" @@ -51,19 +50,24 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { // Dump one precision level at a time and write into the output slice. // Its important to remember that the archive with index 0 (first archive) // has the raw data and the highest precision https://graphite.readthedocs.io/en/latest/whisper.html#archives-retention-and-precision - var keptPoints []whisper.Point + keptPoints := []whisper.Point{} // We want to track the max timestamp of the archives because we know it - // virtually represents now() and we wont have newer points. Then the min - // timestamp of the archive would be maxTs - each archive retention. - var maxTs uint32 - - // Also determine the lower bound of each archive, which is the upper bound of - // the next archive. - minArchiveTs := make([]uint32, len(archives)) - for i, a := range archives { - // All archives share the same maxArchiveTs, so only calculate it once. + // virtually represents now() and we won't have newer points. + var maxTs, maxTsOffset uint32 + + // Also determine the boundaries between archives. + lowerBoundTs := make([]uint32, len(archives)) + for i := range archives { + fmt.Println("retention", archives[i].Retention()) + // All archives share the same maxTs, so only calculate it once. if maxTs == 0 { + if i > 0 { + // If there are no points in the high-res archives, we have to bump up maxTs + // by the retention of the next-highest archive so that this point is validly + // in the retention for this archive. + maxTsOffset = archives[i].Retention() - archives[0].Retention() + } points, err := w.DumpArchive(i) if err != nil { return nil, fmt.Errorf("failed to dump archive %d from whisper metric %s", i, name) @@ -74,14 +78,29 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { } } } + } + fmt.Println("max ts", maxTs, maxTsOffset) + maxTs += maxTsOffset + fmt.Println("max ts adjusted", maxTs) - if maxTs < a.Retention() { // very big retention, invalid. - minArchiveTs[i] = uint32(math.MaxUint32) + for i, a := range archives { + if maxTs < a.Retention() { + // very big retention, boundary would be < 0, therefore all points are + // covered by this archive. + lowerBoundTs[i] = 0 } else { - minArchiveTs[i] = maxTs - a.Retention() + fmt.Println("lower bound is maxts-retent", maxTs, a.Retention()) + lowerBoundTs[i] = maxTs - a.Retention() } } + fmt.Println("bounds", lowerBoundTs) + + // no maxTs means no points. This is not an error. + if maxTs == 0 { + return []whisper.Point{}, nil + } + // Iterate over archives backwards so we process oldest points first. Sort the // points, then determine the slice that is between the bounds for this // archive, and append those to the output array. @@ -103,17 +122,21 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { startIdx := -1 endIdx := len(points) - 1 for j, p := range points { + fmt.Println("point", p.Timestamp, p.Value) if p.Timestamp == 0 { continue } // Don't include any points in this archive that are older than the // retention period. - if p.Timestamp <= minArchiveTs[i] { + if p.Timestamp <= lowerBoundTs[i] { + fmt.Println("older than lower bound", p.Timestamp, lowerBoundTs[i], p.Value) continue } // Don't include any points in this archive that are covered in a higher - // resolution archive. - if i > 0 && p.Timestamp > minArchiveTs[i-1] { + // resolution archive. If the other boundary is zero, it is invalid + // so we keep the point. + if i > 0 && p.Timestamp > lowerBoundTs[i-1] { + fmt.Println("too new for lower bound, skip", p.Timestamp, lowerBoundTs[i-1]) break } endIdx = j diff --git a/pkg/graphite/convert/whisperconverter/whisper_test.go b/pkg/graphite/convert/whisperconverter/whisper_test.go index 96f7cb6..fded451 100644 --- a/pkg/graphite/convert/whisperconverter/whisper_test.go +++ b/pkg/graphite/convert/whisperconverter/whisper_test.go @@ -73,6 +73,32 @@ func TestExtractWhisperPoints(t *testing.T) { }, }, }, + { + name: "simple series, large retention", + metricName: "mymetric", + archive: &testArchive{ + infos: []whisper.ArchiveInfo{ + simpleArchiveInfo(6000, 1), + simpleArchiveInfo(6000, 60), + }, + points: [][]whisper.Point{ + { + whisper.NewPoint(time.Unix(1000, 0), 1), + }, + // None of the points in this archive are valid because all points + // are covered by the first archive. + { + whisper.NewPoint(time.Unix(940, 0), 2), + }, + }, + }, + want: []whisper.Point{ + { + Timestamp: 1000, + Value: 1, + }, + }, + }, { name: "multiple series, different intervals, zeros, duplicate points", metricName: "mymetric", @@ -247,23 +273,28 @@ func TestExtractWhisperPoints(t *testing.T) { }, }, }, + { + name: "test retention when archives are empty", + metricName: "mymetric", + archive: &testArchive{ + infos: []whisper.ArchiveInfo{ + simpleArchiveInfo(10, 1), + simpleArchiveInfo(7, 3), + simpleArchiveInfo(6, 6), + }, + points: [][]whisper.Point{ + {}, + {}, + {}, + }, + }, + want: []whisper.Point{}, + }, { name: "test retention when first archives are empty", metricName: "mymetric", archive: &testArchive{ infos: []whisper.ArchiveInfo{ - // This is what the test will define - // Maxts Mints - // 1030 1020 1009 994 - // [] - // [ ] - // [ ] - // And this is what the test will expect - // 1030 1020 1009 994 - // [XXXXXXXXX] - // [ XXXXXXXXX] - // [ XXXXXXXX] - simpleArchiveInfo(10, 1), simpleArchiveInfo(7, 3), simpleArchiveInfo(6, 6), @@ -271,17 +302,17 @@ func TestExtractWhisperPoints(t *testing.T) { points: [][]whisper.Point{ {}, { - whisper.NewPoint(time.Unix(1009, 0), 9), // This is the lower border of this archive, this point should be kept + whisper.NewPoint(time.Unix(1009, 0), 9), }, { whisper.NewPoint(time.Unix(1009, 0), 99), // skipped because archive 1 has a point at this time - whisper.NewPoint(time.Unix(975, 0), 12), + whisper.NewPoint(time.Unix(998, 0), 12), }, }, }, want: []whisper.Point{ { - Timestamp: 975, + Timestamp: 998, Value: 12, }, { @@ -290,6 +321,35 @@ func TestExtractWhisperPoints(t *testing.T) { }, }, }, + { + name: "test retention when first archives are empty", + metricName: "mymetric", + archive: &testArchive{ + infos: []whisper.ArchiveInfo{ + simpleArchiveInfo(10, 1), + simpleArchiveInfo(7, 3), + simpleArchiveInfo(6, 6), + }, + points: [][]whisper.Point{ + {}, + {}, + { + whisper.NewPoint(time.Unix(1009, 0), 99), // skipped because archive 1 has a point at this time + whisper.NewPoint(time.Unix(1000, 0), 12), + }, + }, + }, + want: []whisper.Point{ + { + Timestamp: 1000, + Value: 12, + }, + { + Timestamp: 1009, + Value: 99, + }, + }, + }, { name: "simple series, ordering is fixed", metricName: "mymetric", From b4df7066d7e2c0727a44000794ef326a143fd1c3 Mon Sep 17 00:00:00 2001 From: Owen Williams Date: Thu, 28 Sep 2023 12:19:05 -0400 Subject: [PATCH 18/19] cleanup --- .../convert/whisperconverter/whisper.go | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index dcea372..1ed0915 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -55,17 +55,16 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { // We want to track the max timestamp of the archives because we know it // virtually represents now() and we won't have newer points. var maxTs, maxTsOffset uint32 - - // Also determine the boundaries between archives. - lowerBoundTs := make([]uint32, len(archives)) for i := range archives { - fmt.Println("retention", archives[i].Retention()) // All archives share the same maxTs, so only calculate it once. if maxTs == 0 { if i > 0 { - // If there are no points in the high-res archives, we have to bump up maxTs - // by the retention of the next-highest archive so that this point is validly - // in the retention for this archive. + // If there are no points in the high-res archives, we have to bump up + // maxTs by the difference in retention to the highest archive so that + // this point is validly in the retention for this archive. This can + // happen when the only points added to a whisper archive are + // significantly older than "Now()" at the time of writing, as happens + // during our e2e test. maxTsOffset = archives[i].Retention() - archives[0].Retention() } points, err := w.DumpArchive(i) @@ -79,23 +78,20 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { } } } - fmt.Println("max ts", maxTs, maxTsOffset) maxTs += maxTsOffset - fmt.Println("max ts adjusted", maxTs) + // Also determine the boundaries between archives. + lowerBoundTs := make([]uint32, len(archives)) for i, a := range archives { if maxTs < a.Retention() { // very big retention, boundary would be < 0, therefore all points are // covered by this archive. lowerBoundTs[i] = 0 } else { - fmt.Println("lower bound is maxts-retent", maxTs, a.Retention()) lowerBoundTs[i] = maxTs - a.Retention() } } - fmt.Println("bounds", lowerBoundTs) - // no maxTs means no points. This is not an error. if maxTs == 0 { return []whisper.Point{}, nil @@ -122,21 +118,18 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { startIdx := -1 endIdx := len(points) - 1 for j, p := range points { - fmt.Println("point", p.Timestamp, p.Value) if p.Timestamp == 0 { continue } // Don't include any points in this archive that are older than the // retention period. if p.Timestamp <= lowerBoundTs[i] { - fmt.Println("older than lower bound", p.Timestamp, lowerBoundTs[i], p.Value) continue } // Don't include any points in this archive that are covered in a higher // resolution archive. If the other boundary is zero, it is invalid // so we keep the point. if i > 0 && p.Timestamp > lowerBoundTs[i-1] { - fmt.Println("too new for lower bound, skip", p.Timestamp, lowerBoundTs[i-1]) break } endIdx = j From 6bf054863f3cae6f92599227992889f0ae4977a5 Mon Sep 17 00:00:00 2001 From: Owen Williams Date: Thu, 28 Sep 2023 12:38:01 -0400 Subject: [PATCH 19/19] proper fix --- pkg/graphite/convert/whisperconverter/whisper.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/graphite/convert/whisperconverter/whisper.go b/pkg/graphite/convert/whisperconverter/whisper.go index 1ed0915..cb2d959 100644 --- a/pkg/graphite/convert/whisperconverter/whisper.go +++ b/pkg/graphite/convert/whisperconverter/whisper.go @@ -60,12 +60,12 @@ func ReadPoints(w Archive, name string) ([]whisper.Point, error) { if maxTs == 0 { if i > 0 { // If there are no points in the high-res archives, we have to bump up - // maxTs by the difference in retention to the highest archive so that - // this point is validly in the retention for this archive. This can - // happen when the only points added to a whisper archive are + // maxTs by the difference in retention to the next higher archive so + // that this point is validly in the retention for this archive. This + // can happen when the only points added to a whisper archive are // significantly older than "Now()" at the time of writing, as happens // during our e2e test. - maxTsOffset = archives[i].Retention() - archives[0].Retention() + maxTsOffset = archives[i-1].Retention() } points, err := w.DumpArchive(i) if err != nil {