From da75b53efcd12533c321aaedce48eabb4e19b6d9 Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Tue, 30 Nov 2021 09:00:42 -0800 Subject: [PATCH 1/3] [coord] Guard against duplicate __rollup__ tags Previously the tag merge logic could result in duplicate __rollup__ tags if the provided tags already had __rollup__. Additionally refactored to lazily merge the 2 sorted slices when calling Next(). This made it much easier to add deduping logic and I believe makes it easier to follow the code. --- .../m3coordinator/downsample/id_pool_types.go | 172 ++++++++---------- .../downsample/id_pool_types_test.go | 40 ++++ 2 files changed, 117 insertions(+), 95 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/id_pool_types.go b/src/cmd/services/m3coordinator/downsample/id_pool_types.go index b5dfb26802..395a56aaf5 100644 --- a/src/cmd/services/m3coordinator/downsample/id_pool_types.go +++ b/src/cmd/services/m3coordinator/downsample/id_pool_types.go @@ -65,25 +65,21 @@ func isRollupID( // method, it will return the rollup tag in the correct alphabetical order // when progressing through the existing tags. type rollupIDProvider struct { - index int - newName []byte - tagPairs []id.TagPair - nameTagIndex int - rollupTagIndex int - - tagEncoder serialize.TagEncoder - pool *rollupIDProviderPool - nameTag ident.ID - nameTagBytes []byte - nameTagBeforeRollupTag bool - tagNameID *ident.ReusableBytesID - tagValueID *ident.ReusableBytesID - mergeTags []mergeTag -} - -type mergeTag struct { - name []byte - index int + index int + len int + mergeTagsIdx int + newName []byte + tagPairs []id.TagPair + curr id.TagPair + currIdx int + + tagEncoder serialize.TagEncoder + pool *rollupIDProviderPool + nameTag ident.ID + nameTagBytes []byte + tagNameID *ident.ReusableBytesID + tagValueID *ident.ReusableBytesID + mergeTags []id.TagPair } func newRollupIDProvider( @@ -93,29 +89,27 @@ func newRollupIDProvider( ) *rollupIDProvider { nameTagBytes := nameTag.Bytes() nameTagBeforeRollupTag := bytes.Compare(nameTagBytes, rollupTagName) < 0 - mergeTags := []mergeTag{ + mergeTags := []id.TagPair{ { - name: rollupTagName, - index: -1, + Name: rollupTagName, + Value: rollupTagValue, }, { - name: nameTagBytes, - index: -1, + Name: nameTagBytes, + // Value is set in reset }, } if nameTagBeforeRollupTag { - mergeTags[0].name = nameTagBytes - mergeTags[1].name = rollupTagName + mergeTags[0], mergeTags[1] = mergeTags[1], mergeTags[0] } return &rollupIDProvider{ - tagEncoder: tagEncoder, - pool: pool, - nameTag: nameTag, - nameTagBytes: nameTagBytes, - nameTagBeforeRollupTag: nameTagBeforeRollupTag, - tagNameID: ident.NewReusableBytesID(), - tagValueID: ident.NewReusableBytesID(), - mergeTags: mergeTags, + tagEncoder: tagEncoder, + pool: pool, + nameTag: nameTag, + nameTagBytes: nameTagBytes, + tagNameID: ident.NewReusableBytesID(), + tagValueID: ident.NewReusableBytesID(), + mergeTags: mergeTags, } } @@ -143,41 +137,26 @@ func (p *rollupIDProvider) reset( newName []byte, tagPairs []id.TagPair, ) { - p.index = -1 + p.index = 0 + p.mergeTagsIdx = 0 p.newName = newName p.tagPairs = tagPairs - p.mergeTags[0].index = -1 - p.mergeTags[1].index = -1 - - // merge the special tags into the set of tag pairs without allocating extra space. - idx := 0 - for _, pair := range tagPairs { - if p.mergeTags[0].index == -1 && bytes.Compare(p.mergeTags[0].name, pair.Name) < 0 { - p.mergeTags[0].index = idx - idx++ + p.currIdx = -1 + + var dups int + // precompute the length of the "combined" slice for the Len() method. + // mergeTags is small so it's fine to do n^2 instead of a more complicated O(n) merge scan. + for i := range p.tagPairs { + for j := range p.mergeTags { + if bytes.Compare(p.tagPairs[i].Name, p.mergeTags[j].Name) == 0 { + dups++ + } + if bytes.Compare(p.mergeTags[j].Name, p.nameTagBytes) == 0 { + p.mergeTags[j].Value = newName + } } - if p.mergeTags[1].index == -1 && bytes.Compare(p.mergeTags[1].name, pair.Name) < 0 { - p.mergeTags[1].index = idx - // all tags merged, safe to break. - break - } - idx++ - } - - if p.mergeTags[0].index == -1 { - p.mergeTags[0].index = idx - idx++ - } - if p.mergeTags[1].index == -1 { - p.mergeTags[1].index = idx - } - - p.rollupTagIndex = p.mergeTags[0].index - p.nameTagIndex = p.mergeTags[1].index - if p.nameTagBeforeRollupTag { - p.nameTagIndex = p.mergeTags[0].index - p.rollupTagIndex = p.mergeTags[1].index } + p.len = len(p.tagPairs) + len(p.mergeTags) - dups } func (p *rollupIDProvider) finalize() { @@ -186,42 +165,45 @@ func (p *rollupIDProvider) finalize() { } } +// Next takes the smallest element across both sets of tags, removing any duplicates between the lists. func (p *rollupIDProvider) Next() bool { - p.index++ - return p.index < p.Len() + if p.index == len(p.tagPairs) && p.mergeTagsIdx == len(p.mergeTags) { + // at the end of both sets + return false + } + if p.index == len(p.tagPairs) { + // only merged tags left + p.curr = p.mergeTags[p.mergeTagsIdx] + p.mergeTagsIdx++ + } else if p.mergeTagsIdx == len(p.mergeTags) { + // only provided tags left + p.curr = p.tagPairs[p.index] + p.index++ + } else if bytes.Compare(p.tagPairs[p.index].Name, p.mergeTags[p.mergeTagsIdx].Name) == 0 { + // a merge tag exists in the provided tag, advance both to prevent duplicates. + p.curr = p.tagPairs[p.index] + p.index++ + p.mergeTagsIdx++ + } else if bytes.Compare(p.tagPairs[p.index].Name, p.mergeTags[p.mergeTagsIdx].Name) < 0 { + // the next provided tag is less + p.curr = p.tagPairs[p.index] + p.index++ + } else { + // the next merge tag is less + p.curr = p.mergeTags[p.mergeTagsIdx] + p.mergeTagsIdx++ + } + p.currIdx++ + return true } func (p *rollupIDProvider) CurrentIndex() int { - if p.index >= 0 { - return p.index - } - return 0 + return p.currIdx } func (p *rollupIDProvider) Current() ident.Tag { - idx := p.index - if idx == p.nameTagIndex { - p.tagValueID.Reset(p.newName) - return ident.Tag{ - Name: p.nameTag, - Value: p.tagValueID, - } - } - if idx == p.rollupTagIndex { - return rollupTag - } - - if p.index > p.nameTagIndex { - // Effective index is subtracted by 1 - idx-- - } - if p.index > p.rollupTagIndex { - // Effective index is subtracted by 1 - idx-- - } - - p.tagNameID.Reset(p.tagPairs[idx].Name) - p.tagValueID.Reset(p.tagPairs[idx].Value) + p.tagNameID.Reset(p.curr.Name) + p.tagValueID.Reset(p.curr.Value) return ident.Tag{ Name: p.tagNameID, Value: p.tagValueID, @@ -235,7 +217,7 @@ func (p *rollupIDProvider) Err() error { func (p *rollupIDProvider) Close() {} func (p *rollupIDProvider) Len() int { - return len(p.tagPairs) + 2 + return p.len } func (p *rollupIDProvider) Remaining() int { diff --git a/src/cmd/services/m3coordinator/downsample/id_pool_types_test.go b/src/cmd/services/m3coordinator/downsample/id_pool_types_test.go index 8170a7db36..000c765678 100644 --- a/src/cmd/services/m3coordinator/downsample/id_pool_types_test.go +++ b/src/cmd/services/m3coordinator/downsample/id_pool_types_test.go @@ -267,6 +267,39 @@ func TestRollupIdProvider(t *testing.T) { }, }, }, + { + name: "rollup and name already exists", + nameTag: "__name__", + metricName: "http_requests", + tags: []id.TagPair{ + { + Name: []byte("__name__"), + Value: []byte("http_requests"), + }, + { + Name: []byte("__rollup__"), + Value: []byte("true"), + }, + { + Name: []byte("foo"), + Value: []byte("fooValue"), + }, + }, + expectedTags: []id.TagPair{ + { + Name: []byte("__name__"), + Value: []byte("http_requests"), + }, + { + Name: []byte("__rollup__"), + Value: []byte("true"), + }, + { + Name: []byte("foo"), + Value: []byte("fooValue"), + }, + }, + }, } for _, tc := range cases { @@ -277,6 +310,13 @@ func TestRollupIdProvider(t *testing.T) { } encoder := &serialize.FakeTagEncoder{} p := newRollupIDProvider(encoder, nil, ident.BytesID(tc.nameTag)) + p.reset([]byte(tc.metricName), tc.tags) + require.Equal(t, len(tc.expectedTags), p.Len()) + curIdx := 0 + for p.Next() { + require.Equal(t, curIdx, p.CurrentIndex()) + curIdx++ + } rollupID, err := p.provide([]byte(tc.metricName), tc.tags) require.NoError(t, err) encoded, _ := encoder.Data() From aeb82eab257a6432521096a0ba628561cd766268 Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Tue, 30 Nov 2021 18:51:08 -0800 Subject: [PATCH 2/3] fix tests --- .../m3coordinator/downsample/id_pool_types.go | 21 ++++++++++--------- .../downsample/id_pool_types_test.go | 7 +++++++ 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/id_pool_types.go b/src/cmd/services/m3coordinator/downsample/id_pool_types.go index 395a56aaf5..19dc73ccfe 100644 --- a/src/cmd/services/m3coordinator/downsample/id_pool_types.go +++ b/src/cmd/services/m3coordinator/downsample/id_pool_types.go @@ -137,23 +137,22 @@ func (p *rollupIDProvider) reset( newName []byte, tagPairs []id.TagPair, ) { - p.index = 0 - p.mergeTagsIdx = 0 p.newName = newName p.tagPairs = tagPairs - p.currIdx = -1 + p.Rewind() var dups int // precompute the length of the "combined" slice for the Len() method. // mergeTags is small so it's fine to do n^2 instead of a more complicated O(n) merge scan. - for i := range p.tagPairs { - for j := range p.mergeTags { + for j := range p.mergeTags { + // update the name tag as well. + if bytes.Compare(p.mergeTags[j].Name, p.nameTagBytes) == 0 { + p.mergeTags[j].Value = newName + } + for i := range p.tagPairs { if bytes.Compare(p.tagPairs[i].Name, p.mergeTags[j].Name) == 0 { dups++ } - if bytes.Compare(p.mergeTags[j].Name, p.nameTagBytes) == 0 { - p.mergeTags[j].Value = newName - } } } p.len = len(p.tagPairs) + len(p.mergeTags) - dups @@ -221,7 +220,7 @@ func (p *rollupIDProvider) Len() int { } func (p *rollupIDProvider) Remaining() int { - return p.Len() - p.index - 1 + return p.Len() - p.CurrentIndex() - 1 } func (p *rollupIDProvider) Duplicate() ident.TagIterator { @@ -231,7 +230,9 @@ func (p *rollupIDProvider) Duplicate() ident.TagIterator { } func (p *rollupIDProvider) Rewind() { - p.index = -1 + p.index = 0 + p.mergeTagsIdx = 0 + p.currIdx = -1 } type rollupIDProviderPool struct { diff --git a/src/cmd/services/m3coordinator/downsample/id_pool_types_test.go b/src/cmd/services/m3coordinator/downsample/id_pool_types_test.go index 000c765678..384f467e85 100644 --- a/src/cmd/services/m3coordinator/downsample/id_pool_types_test.go +++ b/src/cmd/services/m3coordinator/downsample/id_pool_types_test.go @@ -313,6 +313,13 @@ func TestRollupIdProvider(t *testing.T) { p.reset([]byte(tc.metricName), tc.tags) require.Equal(t, len(tc.expectedTags), p.Len()) curIdx := 0 + for p.Next() { + require.Equal(t, curIdx, p.CurrentIndex()) + curIdx++ + require.Equal(t, p.Len()-curIdx, p.Remaining()) + } + p.Rewind() + curIdx = 0 for p.Next() { require.Equal(t, curIdx, p.CurrentIndex()) curIdx++ From 90325dd81bbf3ab6cabb6f7102b513c3f60344d6 Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Tue, 30 Nov 2021 19:09:19 -0800 Subject: [PATCH 3/3] lint --- .../m3coordinator/downsample/id_pool_types.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/id_pool_types.go b/src/cmd/services/m3coordinator/downsample/id_pool_types.go index 19dc73ccfe..85597abe35 100644 --- a/src/cmd/services/m3coordinator/downsample/id_pool_types.go +++ b/src/cmd/services/m3coordinator/downsample/id_pool_types.go @@ -146,11 +146,11 @@ func (p *rollupIDProvider) reset( // mergeTags is small so it's fine to do n^2 instead of a more complicated O(n) merge scan. for j := range p.mergeTags { // update the name tag as well. - if bytes.Compare(p.mergeTags[j].Name, p.nameTagBytes) == 0 { + if bytes.Equal(p.mergeTags[j].Name, p.nameTagBytes) { p.mergeTags[j].Value = newName } for i := range p.tagPairs { - if bytes.Compare(p.tagPairs[i].Name, p.mergeTags[j].Name) == 0 { + if bytes.Equal(p.tagPairs[i].Name, p.mergeTags[j].Name) { dups++ } } @@ -170,24 +170,25 @@ func (p *rollupIDProvider) Next() bool { // at the end of both sets return false } - if p.index == len(p.tagPairs) { + switch { + case p.index == len(p.tagPairs): // only merged tags left p.curr = p.mergeTags[p.mergeTagsIdx] p.mergeTagsIdx++ - } else if p.mergeTagsIdx == len(p.mergeTags) { + case p.mergeTagsIdx == len(p.mergeTags): // only provided tags left p.curr = p.tagPairs[p.index] p.index++ - } else if bytes.Compare(p.tagPairs[p.index].Name, p.mergeTags[p.mergeTagsIdx].Name) == 0 { + case bytes.Equal(p.tagPairs[p.index].Name, p.mergeTags[p.mergeTagsIdx].Name): // a merge tag exists in the provided tag, advance both to prevent duplicates. p.curr = p.tagPairs[p.index] p.index++ p.mergeTagsIdx++ - } else if bytes.Compare(p.tagPairs[p.index].Name, p.mergeTags[p.mergeTagsIdx].Name) < 0 { + case bytes.Compare(p.tagPairs[p.index].Name, p.mergeTags[p.mergeTagsIdx].Name) < 0: // the next provided tag is less p.curr = p.tagPairs[p.index] p.index++ - } else { + default: // the next merge tag is less p.curr = p.mergeTags[p.mergeTagsIdx] p.mergeTagsIdx++