forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[internal/exp/metrics] Add a new internal package for handling metric…
… staleness (open-telemetry#31089) **Description:** It's a glorified wrapper over a Map type, which allows values to be expired based on a pre-supplied interval. **Link to tracking Issue:** open-telemetry#31016 **Testing:** I added some basic tests of the PriorityQueue implementation as well as the expiry behaviour of Staleness **Documentation:** All the new structs are documented
- Loading branch information
1 parent
8fcf507
commit f533df0
Showing
7 changed files
with
456 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,28 +1,30 @@ | ||
module github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics | ||
|
||
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil | ||
go 1.21 | ||
|
||
require ( | ||
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.95.0 | ||
github.com/stretchr/testify v1.8.4 | ||
go.opentelemetry.io/collector/pdata v1.2.0 | ||
) | ||
|
||
require ( | ||
github.com/cespare/xxhash/v2 v2.2.0 // indirect | ||
github.com/davecgh/go-spew v1.1.1 // indirect | ||
github.com/gogo/protobuf v1.3.2 // indirect | ||
github.com/golang/protobuf v1.5.3 // indirect | ||
github.com/json-iterator/go v1.1.12 // indirect | ||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect | ||
github.com/modern-go/reflect2 v1.0.2 // indirect | ||
github.com/pmezard/go-difflib v1.0.0 // indirect | ||
go.uber.org/multierr v1.11.0 // indirect | ||
golang.org/x/net v0.18.0 // indirect | ||
golang.org/x/sys v0.14.0 // indirect | ||
golang.org/x/text v0.14.0 // indirect | ||
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect | ||
google.golang.org/grpc v1.61.0 // indirect | ||
google.golang.org/protobuf v1.32.0 // indirect | ||
gopkg.in/yaml.v3 v3.0.1 // indirect | ||
) | ||
|
||
go 1.21 | ||
|
||
toolchain go1.21.1 | ||
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" | ||
) | ||
|
||
// Map is an abstraction over a map | ||
type Map[T any] interface { | ||
// Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value | ||
Load(key identity.Stream) (T, bool) | ||
// Store the given key value pair in the map | ||
Store(key identity.Stream, value T) | ||
// Remove the value at key from the map | ||
Delete(key identity.Stream) | ||
// Items returns an iterator function that in future go version can be used with range | ||
// See: https://go.dev/wiki/RangefuncExperiment | ||
Items() func(yield func(identity.Stream, T) bool) bool | ||
} | ||
|
||
// RawMap implementation | ||
|
||
var _ Map[time.Time] = (*RawMap[identity.Stream, time.Time])(nil) | ||
|
||
// RawMap is an implementation of the Map interface using a standard golang map | ||
type RawMap[K comparable, V any] map[K]V | ||
|
||
func (rm *RawMap[K, V]) Load(key K) (V, bool) { | ||
value, ok := (*rm)[key] | ||
return value, ok | ||
} | ||
|
||
func (rm *RawMap[K, V]) Store(key K, value V) { | ||
(*rm)[key] = value | ||
} | ||
|
||
func (rm *RawMap[K, V]) Delete(key K) { | ||
delete(*rm, key) | ||
} | ||
|
||
func (rm *RawMap[K, V]) Items() func(yield func(K, V) bool) bool { | ||
return func(yield func(K, V) bool) bool { | ||
for k, v := range *rm { | ||
if !yield(k, v) { | ||
break | ||
} | ||
} | ||
return false | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,111 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" | ||
|
||
import ( | ||
"container/heap" | ||
"time" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" | ||
) | ||
|
||
// PriorityQueue represents a way to store entries sorted by their priority. | ||
// Pop() will return the oldest entry of the set. | ||
type PriorityQueue interface { | ||
// Update will add or update an entry, and reshuffle the queue internally as needed to keep it sorted | ||
Update(id identity.Stream, newPrio time.Time) | ||
// Peek will return the entry at the HEAD of the queue *without* removing it from the queue | ||
Peek() (identity.Stream, time.Time) | ||
// Pop will remove the entry at the HEAD of the queue and return it | ||
Pop() (identity.Stream, time.Time) | ||
// Len will return the number of entries in the queue | ||
Len() int | ||
} | ||
|
||
// heapQueue implements heap.Interface. | ||
// We use it as the inner implementation of a heap-based sorted queue | ||
type heapQueue []*queueItem | ||
|
||
type queueItem struct { | ||
key identity.Stream | ||
prio time.Time | ||
index int | ||
} | ||
|
||
func (pq heapQueue) Len() int { return len(pq) } | ||
|
||
func (pq heapQueue) Less(i, j int) bool { | ||
// We want Pop to give us the lowest priority | ||
return pq[i].prio.Before(pq[j].prio) | ||
} | ||
|
||
func (pq heapQueue) Swap(i, j int) { | ||
pq[i], pq[j] = pq[j], pq[i] | ||
pq[i].index = i | ||
pq[j].index = j | ||
} | ||
|
||
func (pq *heapQueue) Push(x any) { | ||
n := len(*pq) | ||
item := x.(*queueItem) | ||
item.index = n | ||
*pq = append(*pq, item) | ||
} | ||
|
||
func (pq *heapQueue) Pop() any { | ||
old := *pq | ||
n := len(old) | ||
item := old[n-1] | ||
old[n-1] = nil // avoid memory leak | ||
item.index = -1 // for safety | ||
*pq = old[0 : n-1] | ||
return item | ||
} | ||
|
||
type heapPriorityQueue struct { | ||
inner heapQueue | ||
itemLookup map[identity.Stream]*queueItem | ||
} | ||
|
||
func NewPriorityQueue() PriorityQueue { | ||
pq := &heapPriorityQueue{ | ||
inner: heapQueue{}, | ||
itemLookup: map[identity.Stream]*queueItem{}, | ||
} | ||
heap.Init(&pq.inner) | ||
|
||
return pq | ||
} | ||
|
||
func (pq *heapPriorityQueue) Update(id identity.Stream, newPrio time.Time) { | ||
// Check if the entry already exists in the queue | ||
item, ok := pq.itemLookup[id] | ||
if ok { | ||
// If so, we can update it in place | ||
item.prio = newPrio | ||
heap.Fix(&pq.inner, item.index) | ||
} else { | ||
item = &queueItem{ | ||
key: id, | ||
prio: newPrio, | ||
} | ||
heap.Push(&pq.inner, item) | ||
pq.itemLookup[id] = item | ||
} | ||
} | ||
|
||
func (pq *heapPriorityQueue) Peek() (identity.Stream, time.Time) { | ||
val := pq.inner[0] | ||
return val.key, val.prio | ||
} | ||
|
||
func (pq *heapPriorityQueue) Pop() (identity.Stream, time.Time) { | ||
val := heap.Pop(&pq.inner).(*queueItem) | ||
delete(pq.itemLookup, val.key) | ||
return val.key, val.prio | ||
} | ||
|
||
func (pq *heapPriorityQueue) Len() int { | ||
return pq.inner.Len() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package staleness | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
"go.opentelemetry.io/collector/pdata/pcommon" | ||
"go.opentelemetry.io/collector/pdata/pmetric" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" | ||
) | ||
|
||
func TestPriorityQueueImpl(t *testing.T) { | ||
t.Parallel() | ||
|
||
pq := NewPriorityQueue() | ||
|
||
idA := generateStreamID(t, map[string]any{ | ||
"aaa": "123", | ||
}) | ||
idB := generateStreamID(t, map[string]any{ | ||
"bbb": "456", | ||
}) | ||
idC := generateStreamID(t, map[string]any{ | ||
"ccc": "789", | ||
}) | ||
|
||
initialTime := time.Time{} | ||
prioA := initialTime.Add(2 * time.Second) | ||
prioB := initialTime.Add(1 * time.Second) | ||
prioC := initialTime.Add(3 * time.Second) | ||
|
||
pq.Update(idA, prioA) | ||
pq.Update(idB, prioB) | ||
pq.Update(idC, prioC) | ||
|
||
// The first item should be B | ||
id, prio := pq.Peek() | ||
require.Equal(t, idB, id) | ||
require.Equal(t, prioB, prio) | ||
|
||
// If we peek again, nothing should change | ||
id, prio = pq.Peek() | ||
require.Equal(t, idB, id) | ||
require.Equal(t, prioB, prio) | ||
|
||
// Pop should return the same thing | ||
id, prio = pq.Pop() | ||
require.Equal(t, idB, id) | ||
require.Equal(t, prioB, prio) | ||
|
||
// Now if we peek again, it should be the next item | ||
id, prio = pq.Peek() | ||
require.Equal(t, idA, id) | ||
require.Equal(t, prioA, prio) | ||
|
||
// Pop should return the same thing | ||
id, prio = pq.Pop() | ||
require.Equal(t, idA, id) | ||
require.Equal(t, prioA, prio) | ||
|
||
// One last time | ||
id, prio = pq.Peek() | ||
require.Equal(t, idC, id) | ||
require.Equal(t, prioC, prio) | ||
|
||
// Pop should return the same thing | ||
id, prio = pq.Pop() | ||
require.Equal(t, idC, id) | ||
require.Equal(t, prioC, prio) | ||
|
||
// The queue should now be empty | ||
require.Equal(t, 0, pq.Len()) | ||
|
||
// And the inner lookup map should also be empty | ||
require.IsType(t, &heapPriorityQueue{}, pq) | ||
heapQueue := pq.(*heapPriorityQueue) | ||
require.Len(t, heapQueue.itemLookup, 0) | ||
} | ||
|
||
func generateStreamID(t *testing.T, attributes map[string]any) identity.Stream { | ||
res := pcommon.NewResource() | ||
err := res.Attributes().FromRaw(map[string]any{ | ||
"foo": "bar", | ||
"asdf": "qwer", | ||
}) | ||
require.NoError(t, err) | ||
|
||
scope := pcommon.NewInstrumentationScope() | ||
scope.SetName("TestScope") | ||
scope.SetVersion("v1.2.3") | ||
err = scope.Attributes().FromRaw(map[string]any{ | ||
"aaa": "bbb", | ||
"ccc": "ddd", | ||
}) | ||
require.NoError(t, err) | ||
|
||
metric := pmetric.NewMetric() | ||
|
||
sum := metric.SetEmptySum() | ||
sum.SetIsMonotonic(true) | ||
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) | ||
|
||
dp := sum.DataPoints().AppendEmpty() | ||
dp.SetStartTimestamp(678) | ||
dp.SetTimestamp(789) | ||
dp.SetDoubleValue(123.456) | ||
err = dp.Attributes().FromRaw(attributes) | ||
require.NoError(t, err) | ||
|
||
return identity.OfStream(identity.OfResourceMetric(res, scope, metric), dp) | ||
} |
Oops, something went wrong.