Skip to content

Commit

Permalink
receiver/prometheus: add store to track stale metrics
Browse files Browse the repository at this point in the history
Implements a simple store that will be used to record stale metrics;
those that are NOT present in the current scrape, but were in the prior
scrape. This logic will then be attached to our receiver logic
to then complete the mechanism and then issue stale markers.

Updates open-telemetry#3413
Updates open-telemetry#2961
  • Loading branch information
odeke-em committed Jun 15, 2021
1 parent 92d3c6f commit eb85405
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 0 deletions.
80 changes: 80 additions & 0 deletions receiver/prometheusreceiver/internal/staleness_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"github.com/prometheus/prometheus/pkg/labels"
)

type stalenessStore struct {
currentHashes map[uint64]bool
previousHashes map[uint64]bool
previous []labels.Labels
current []labels.Labels
}

func newStalenessStore() *stalenessStore {
return &stalenessStore{
previousHashes: make(map[uint64]bool),
currentHashes: make(map[uint64]bool),
}
}

// refresh copies over all the current values to previous, and prepares.
// refresh must be called before every new scrape.
func (ss *stalenessStore) refresh() {
// 1. Clear ss.previousHashes firstly. Please don't edit
// this map clearing idiom as it ensures speed.
for hash := range ss.previousHashes {
delete(ss.previousHashes, hash)
}
// 2. Copy over ss.currentHashes to ss.previousHashes.
for hash := range ss.currentHashes {
ss.previousHashes[hash] = ss.currentHashes[hash]
}
// 3. Clear ss.currentHashes, with the map clearing idiom for speed.
for hash := range ss.currentHashes {
delete(ss.currentHashes, hash)
}
// 4. Copy all the prior labels from what was previously ss.current.
ss.previous = ss.current
// 5. Clear ss.current to make for another cycle.
ss.current = nil
}

// isStale returns whether lbl was seen only in the previous scrape and not the current.
func (ss *stalenessStore) isStale(lbl labels.Labels) bool {
hash := lbl.Hash()
return ss.previousHashes[hash] && !ss.currentHashes[hash]
}

// markAsCurrentlySeen adds lbl to the manifest of labels seen in the current scrape.
// This method should be called before refresh, but during a scrape whenever labels are encountered.
func (ss *stalenessStore) markAsCurrentlySeen(lbl labels.Labels) {
ss.currentHashes[lbl.Hash()] = true
ss.current = append(ss.current, lbl)
}

// emitStaleLabels returns the labels that were previously seen in
// the prior scrape, but are not currently present in this scrape cycle.
func (ss *stalenessStore) emitStaleLabels() (stale []labels.Labels) {
for _, labels := range ss.previous {
hash := labels.Hash()
if ok := ss.currentHashes[hash]; !ok {
stale = append(stale, labels)
}
}
return stale
}
57 changes: 57 additions & 0 deletions receiver/prometheusreceiver/internal/staleness_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"testing"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"
)

func TestStalenessStore(t *testing.T) {
ss := newStalenessStore()
require.NotNil(t, ss.previousHashes)
require.Zero(t, len(ss.previousHashes))
require.NotNil(t, ss.currentHashes)
require.Zero(t, len(ss.currentHashes))

lbl1 := labels.Labels{
{Name: "__name__", Value: "lbl1"},
{Name: "a", Value: "1"},
}
lbl2 := labels.Labels{
{Name: "__name__", Value: "lbl2"},
{Name: "b", Value: "1"},
}
ss.markAsCurrentlySeen(lbl1)
require.Nil(t, ss.emitStaleLabels())
require.False(t, ss.isStale(lbl1))
require.False(t, ss.isStale(lbl2))

// Now refresh, the case of a new scrape.
// Without having marked lbl1 as being current, it should be reported as stale.
ss.refresh()
require.True(t, ss.isStale(lbl1))
require.False(t, ss.isStale(lbl2))
// .previous should have been the prior contents of current and current should be nil.
require.Equal(t, ss.previous, []labels.Labels{lbl1})
require.Nil(t, ss.current)

// After the next refresh cycle, we shouldn't have any stale labels.
ss.refresh()
require.False(t, ss.isStale(lbl1))
require.False(t, ss.isStale(lbl2))
}

0 comments on commit eb85405

Please sign in to comment.