From 064769e01e931af7bb08fa218da151613c98815f Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Tue, 6 Feb 2024 17:13:54 -0500 Subject: [PATCH 01/11] [internal/exp/metrics] Add a new internal package for handling metric staleness It's a glorified wrapper over a Map type, which allows values to be expired based on a pre-supplied interval. --- internal/exp/metrics/go.mod | 8 +- internal/exp/metrics/go.sum | 5 + .../exp/metrics/staleness/priority_queue.go | 97 ++++++++++++++ .../metrics/staleness/priority_queue_test.go | 111 ++++++++++++++++ internal/exp/metrics/staleness/staleness.go | 55 ++++++++ .../exp/metrics/staleness/staleness_test.go | 121 ++++++++++++++++++ 6 files changed, 395 insertions(+), 2 deletions(-) create mode 100644 internal/exp/metrics/staleness/priority_queue.go create mode 100644 internal/exp/metrics/staleness/priority_queue_test.go create mode 100644 internal/exp/metrics/staleness/staleness.go create mode 100644 internal/exp/metrics/staleness/staleness_test.go diff --git a/internal/exp/metrics/go.mod b/internal/exp/metrics/go.mod index f61b7e1e87af..ca0a23cb1e86 100644 --- a/internal/exp/metrics/go.mod +++ b/internal/exp/metrics/go.mod @@ -1,19 +1,22 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics -replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil +go 1.20 require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.95.0 + github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/pdata v1.2.0 ) require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.18.0 // indirect golang.org/x/sys v0.14.0 // indirect @@ -21,8 +24,9 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/grpc v1.61.0 // indirect google.golang.org/protobuf v1.32.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) -go 1.21 +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil toolchain go1.21.1 diff --git a/internal/exp/metrics/go.sum b/internal/exp/metrics/go.sum index 6f3b578693b2..1c3e08163635 100644 --- a/internal/exp/metrics/go.sum +++ b/internal/exp/metrics/go.sum @@ -16,6 +16,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -23,6 +25,7 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= @@ -74,5 +77,7 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/exp/metrics/staleness/priority_queue.go b/internal/exp/metrics/staleness/priority_queue.go new file mode 100644 index 000000000000..8a26aabc06b8 --- /dev/null +++ b/internal/exp/metrics/staleness/priority_queue.go @@ -0,0 +1,97 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness + +import ( + "container/heap" + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +type priorityQueueImpl []*queueItem + +type queueItem struct { + key identity.Stream + prio time.Time + index int +} + +func (pq priorityQueueImpl) Len() int { return len(pq) } + +func (pq priorityQueueImpl) Less(i, j int) bool { + // We want Pop to give us the lowest priority + return pq[i].prio.Before(pq[j].prio) +} + +func (pq priorityQueueImpl) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *priorityQueueImpl) Push(x any) { + n := len(*pq) + item := x.(*queueItem) + item.index = n + *pq = append(*pq, item) +} + +func (pq *priorityQueueImpl) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +func (pq *priorityQueueImpl) Update(item *queueItem, newPrio time.Time) { + item.prio = newPrio + heap.Fix(pq, item.index) +} + +type PriorityQueue struct { + inner priorityQueueImpl + itemLookup map[identity.Stream]*queueItem +} + +func NewPriorityQueue() *PriorityQueue { + pq := &PriorityQueue{ + inner: priorityQueueImpl{}, + itemLookup: map[identity.Stream]*queueItem{}, + } + heap.Init(&pq.inner) + + return pq +} + +func (pq *PriorityQueue) Update(id identity.Stream, newPrio time.Time) { + item, ok := pq.itemLookup[id] + if !ok { + item = &queueItem{ + key: id, + prio: newPrio, + } + heap.Push(&pq.inner, item) + pq.itemLookup[id] = item + } else { + pq.inner.Update(item, newPrio) + } +} + +func (pq *PriorityQueue) Peek() (identity.Stream, time.Time) { + val := pq.inner[0] + return val.key, val.prio +} + +func (pq *PriorityQueue) Pop() (identity.Stream, time.Time) { + val := heap.Pop(&pq.inner).(*queueItem) + return val.key, val.prio +} + +func (pq *PriorityQueue) Len() int { + return pq.inner.Len() +} diff --git a/internal/exp/metrics/staleness/priority_queue_test.go b/internal/exp/metrics/staleness/priority_queue_test.go new file mode 100644 index 000000000000..373c4189bb61 --- /dev/null +++ b/internal/exp/metrics/staleness/priority_queue_test.go @@ -0,0 +1,111 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +func TestPriorityQueueImpl(t *testing.T) { + t.Parallel() + + pq := NewPriorityQueue() + + idA := generateStreamID(t, map[string]any{ + "aaa": "123", + }) + idB := generateStreamID(t, map[string]any{ + "bbb": "456", + }) + idC := generateStreamID(t, map[string]any{ + "ccc": "789", + }) + + initialTime := time.Time{} + prioA := initialTime.Add(2 * time.Second) + prioB := initialTime.Add(1 * time.Second) + prioC := initialTime.Add(3 * time.Second) + + pq.Update(idA, prioA) + pq.Update(idB, prioB) + pq.Update(idC, prioC) + + // The first item should be B + id, prio := pq.Peek() + require.Equal(t, idB, id) + require.Equal(t, prioB, prio) + + // If we peek again, nothing should change + id, prio = pq.Peek() + require.Equal(t, idB, id) + require.Equal(t, prioB, prio) + + // Pop should return the same thing + id, prio = pq.Pop() + require.Equal(t, idB, id) + require.Equal(t, prioB, prio) + + // Now if we peek again, it should be the next item + id, prio = pq.Peek() + require.Equal(t, idA, id) + require.Equal(t, prioA, prio) + + // Pop should return the same thing + id, prio = pq.Pop() + require.Equal(t, idA, id) + require.Equal(t, prioA, prio) + + // One last time + id, prio = pq.Peek() + require.Equal(t, idC, id) + require.Equal(t, prioC, prio) + + // Pop should return the same thing + id, prio = pq.Pop() + require.Equal(t, idC, id) + require.Equal(t, prioC, prio) + + // The queue should now be empty + require.Equal(t, 0, pq.Len()) +} + +func generateStreamID(t *testing.T, attributes map[string]any) identity.Stream { + res := pcommon.NewResource() + err := res.Attributes().FromRaw(map[string]any{ + "foo": "bar", + "asdf": "qwer", + }) + require.NoError(t, err) + + scope := pcommon.NewInstrumentationScope() + scope.SetName("TestScope") + scope.SetVersion("v1.2.3") + err = scope.Attributes().FromRaw(map[string]any{ + "aaa": "bbb", + "ccc": "ddd", + }) + require.NoError(t, err) + + metric := pmetric.NewMetric() + + sum := metric.SetEmptySum() + sum.SetIsMonotonic(true) + sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + dp := sum.DataPoints().AppendEmpty() + dp.SetStartTimestamp(678) + dp.SetTimestamp(789) + dp.SetDoubleValue(123.456) + err = dp.Attributes().FromRaw(attributes) + require.NoError(t, err) + + return identity.OfStream(identity.OfResourceMetric(res, scope, metric), dp) +} diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go new file mode 100644 index 000000000000..9e760c82f6cd --- /dev/null +++ b/internal/exp/metrics/staleness/staleness.go @@ -0,0 +1,55 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness + +import ( + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +// We override how Now() is returned, so we can have deterministic tests +var nowFunc = time.Now + +type Map[T any] interface { + Load(key identity.Stream) (T, bool) + Store(key identity.Stream, value T) + Delete(key identity.Stream) + // Range calls f sequentially for each key and value present in the map. + // If f returns false, range stops the iteration. + Range(f func(key identity.Stream, value T) bool) +} + +type Staleness[T any] struct { + max time.Duration + + Map[T] + pq *PriorityQueue +} + +func NewStaleness[T any](max time.Duration, newMap Map[T]) *Staleness[T] { + return &Staleness[T]{ + max: max, + Map: newMap, + pq: NewPriorityQueue(), + } +} + +func (s *Staleness[T]) Store(id identity.Stream, value T) { + s.pq.Update(id, nowFunc()) + s.Map.Store(id, value) +} + +func (s *Staleness[T]) ExpireOldEntries() { + now := nowFunc() + + for { + _, ts := s.pq.Peek() + if now.Sub(ts) < s.max { + break + } + id, _ := s.pq.Pop() + s.Map.Delete(id) + } +} diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go new file mode 100644 index 000000000000..b1fd20b1fdfa --- /dev/null +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -0,0 +1,121 @@ +package staleness + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +// RawMap +var _ Map[time.Time] = (*RawMap[identity.Stream, time.Time])(nil) + +type RawMap[K comparable, V any] map[K]V + +func (rm *RawMap[K, V]) Load(key K) (V, bool) { + value, ok := (*rm)[key] + return value, ok +} + +func (rm *RawMap[K, V]) Store(key K, value V) { + (*rm)[key] = value +} + +func (rm *RawMap[K, V]) Delete(key K) { + delete(*rm, key) +} + +func (rm *RawMap[K, V]) Range(f func(key K, value V) bool) { + for key, value := range *rm { + if !f(key, value) { + return + } + } +} + +// Tests + +func TestStaleness(t *testing.T) { + t.Parallel() + + max := 1 * time.Second + stalenessMap := NewStaleness[int]( + max, + &RawMap[identity.Stream, int]{}, + ) + + idA := generateStreamID(t, map[string]any{ + "aaa": "123", + }) + idB := generateStreamID(t, map[string]any{ + "bbb": "456", + }) + idC := generateStreamID(t, map[string]any{ + "ccc": "789", + }) + idD := generateStreamID(t, map[string]any{ + "ddd": "024", + }) + + initialTime := time.Time{} + timeA := initialTime.Add(2 * time.Second) + timeB := initialTime.Add(1 * time.Second) + timeC := initialTime.Add(3 * time.Second) + timeD := initialTime.Add(4 * time.Second) + + valueA := 1 + valueB := 4 + valueC := 7 + valueD := 0 + + // Add the values to the map + nowFunc = func() time.Time { return timeA } + stalenessMap.Store(idA, valueA) + nowFunc = func() time.Time { return timeB } + stalenessMap.Store(idB, valueB) + nowFunc = func() time.Time { return timeC } + stalenessMap.Store(idC, valueC) + nowFunc = func() time.Time { return timeD } + stalenessMap.Store(idD, valueD) + + // Set the time to 2.5s and run expire + // This should remove B, but the others should remain + // (now == 2.5s, B == 1s, max == 1s) + // now > B + max + nowFunc = func() time.Time { return initialTime.Add(2500 * time.Millisecond) } + stalenessMap.ExpireOldEntries() + validateStalenessMapEntries(t, + map[identity.Stream]int{ + idA: valueA, + idC: valueC, + idD: valueD, + }, + stalenessMap, + ) + + // Set the time to 4.5s and run expire + // This should remove A and C, but D should remain + // (now == 2.5s, A == 2s, C == 3s, max == 1s) + // now > A + max AND now > C + max + nowFunc = func() time.Time { return initialTime.Add(4500 * time.Millisecond) } + stalenessMap.ExpireOldEntries() + validateStalenessMapEntries(t, + map[identity.Stream]int{ + idD: valueD, + }, + stalenessMap, + ) +} + +func validateStalenessMapEntries(t *testing.T, expected map[identity.Stream]int, sm *Staleness[int]) { + actual := map[identity.Stream]int{} + + sm.Range(func(key identity.Stream, value int) bool { + actual[key] = value + return true + }) + + require.Equal(t, expected, actual) +} From 8ccd51f934b26bbe14d88234aba5f30c8b8af8a5 Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Tue, 13 Feb 2024 10:32:12 -0500 Subject: [PATCH 02/11] Properly clean up lookup map when popping --- internal/exp/metrics/staleness/priority_queue.go | 1 + internal/exp/metrics/staleness/priority_queue_test.go | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/internal/exp/metrics/staleness/priority_queue.go b/internal/exp/metrics/staleness/priority_queue.go index 8a26aabc06b8..5e334be7b57d 100644 --- a/internal/exp/metrics/staleness/priority_queue.go +++ b/internal/exp/metrics/staleness/priority_queue.go @@ -89,6 +89,7 @@ func (pq *PriorityQueue) Peek() (identity.Stream, time.Time) { func (pq *PriorityQueue) Pop() (identity.Stream, time.Time) { val := heap.Pop(&pq.inner).(*queueItem) + delete(pq.itemLookup, val.key) return val.key, val.prio } diff --git a/internal/exp/metrics/staleness/priority_queue_test.go b/internal/exp/metrics/staleness/priority_queue_test.go index 373c4189bb61..b58478e7c1db 100644 --- a/internal/exp/metrics/staleness/priority_queue_test.go +++ b/internal/exp/metrics/staleness/priority_queue_test.go @@ -75,6 +75,11 @@ func TestPriorityQueueImpl(t *testing.T) { // The queue should now be empty require.Equal(t, 0, pq.Len()) + + // And the inner lookup map should also be empty + require.IsType(t, &heapPriorityQueue{}, pq) + heapQueue := pq.(*heapPriorityQueue) + require.Len(t, heapQueue.itemLookup, 0) } func generateStreamID(t *testing.T, attributes map[string]any) identity.Stream { From 932874ae2a8e8023750680daf7521f2013abe48b Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Tue, 13 Feb 2024 10:32:53 -0500 Subject: [PATCH 03/11] Make PriorityMap an interface So Staleness isn't so tightly tied to it --- .../exp/metrics/staleness/priority_queue.go | 39 +++++++++++-------- internal/exp/metrics/staleness/staleness.go | 2 +- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/internal/exp/metrics/staleness/priority_queue.go b/internal/exp/metrics/staleness/priority_queue.go index 5e334be7b57d..e524ee112e88 100644 --- a/internal/exp/metrics/staleness/priority_queue.go +++ b/internal/exp/metrics/staleness/priority_queue.go @@ -10,7 +10,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" ) -type priorityQueueImpl []*queueItem +type heapQueue []*queueItem type queueItem struct { key identity.Stream @@ -18,27 +18,27 @@ type queueItem struct { index int } -func (pq priorityQueueImpl) Len() int { return len(pq) } +func (pq heapQueue) Len() int { return len(pq) } -func (pq priorityQueueImpl) Less(i, j int) bool { +func (pq heapQueue) Less(i, j int) bool { // We want Pop to give us the lowest priority return pq[i].prio.Before(pq[j].prio) } -func (pq priorityQueueImpl) Swap(i, j int) { +func (pq heapQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].index = i pq[j].index = j } -func (pq *priorityQueueImpl) Push(x any) { +func (pq *heapQueue) Push(x any) { n := len(*pq) item := x.(*queueItem) item.index = n *pq = append(*pq, item) } -func (pq *priorityQueueImpl) Pop() any { +func (pq *heapQueue) Pop() any { old := *pq n := len(old) item := old[n-1] @@ -48,19 +48,26 @@ func (pq *priorityQueueImpl) Pop() any { return item } -func (pq *priorityQueueImpl) Update(item *queueItem, newPrio time.Time) { +func (pq *heapQueue) Update(item *queueItem, newPrio time.Time) { item.prio = newPrio heap.Fix(pq, item.index) } -type PriorityQueue struct { - inner priorityQueueImpl +type PriorityQueue interface { + Update(id identity.Stream, newPrio time.Time) + Peek() (identity.Stream, time.Time) + Pop() (identity.Stream, time.Time) + Len() int +} + +type heapPriorityQueue struct { + inner heapQueue itemLookup map[identity.Stream]*queueItem } -func NewPriorityQueue() *PriorityQueue { - pq := &PriorityQueue{ - inner: priorityQueueImpl{}, +func NewPriorityQueue() PriorityQueue { + pq := &heapPriorityQueue{ + inner: heapQueue{}, itemLookup: map[identity.Stream]*queueItem{}, } heap.Init(&pq.inner) @@ -68,7 +75,7 @@ func NewPriorityQueue() *PriorityQueue { return pq } -func (pq *PriorityQueue) Update(id identity.Stream, newPrio time.Time) { +func (pq *heapPriorityQueue) Update(id identity.Stream, newPrio time.Time) { item, ok := pq.itemLookup[id] if !ok { item = &queueItem{ @@ -82,17 +89,17 @@ func (pq *PriorityQueue) Update(id identity.Stream, newPrio time.Time) { } } -func (pq *PriorityQueue) Peek() (identity.Stream, time.Time) { +func (pq *heapPriorityQueue) Peek() (identity.Stream, time.Time) { val := pq.inner[0] return val.key, val.prio } -func (pq *PriorityQueue) Pop() (identity.Stream, time.Time) { +func (pq *heapPriorityQueue) Pop() (identity.Stream, time.Time) { val := heap.Pop(&pq.inner).(*queueItem) delete(pq.itemLookup, val.key) return val.key, val.prio } -func (pq *PriorityQueue) Len() int { +func (pq *heapPriorityQueue) Len() int { return pq.inner.Len() } diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index 9e760c82f6cd..038ffceb76eb 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -25,7 +25,7 @@ type Staleness[T any] struct { max time.Duration Map[T] - pq *PriorityQueue + pq PriorityQueue } func NewStaleness[T any](max time.Duration, newMap Map[T]) *Staleness[T] { From eb9521681f5d626e968d5cf72ecf5fcc158b661e Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Tue, 13 Feb 2024 10:37:51 -0500 Subject: [PATCH 04/11] Implement Map.LoadOrStore() --- internal/exp/metrics/staleness/staleness.go | 6 ++++++ internal/exp/metrics/staleness/staleness_test.go | 10 ++++++++++ 2 files changed, 16 insertions(+) diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index 038ffceb76eb..cf68946b9c59 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -13,8 +13,14 @@ import ( var nowFunc = time.Now type Map[T any] interface { + // Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value Load(key identity.Stream) (T, bool) + // Store the given key value pair in the map Store(key identity.Stream, value T) + // LoadOrStore will either load the value from the map and return it and the boolean `true` + // or if it doesn't exist in the Map yet, the value passed in will be stored and then returned with the boolean `false` + LoadOrStore(key identity.Stream, value T) (T, bool) + // Remove the value at key from the map Delete(key identity.Stream) // Range calls f sequentially for each key and value present in the map. // If f returns false, range stops the iteration. diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go index b1fd20b1fdfa..e2e4d009d156 100644 --- a/internal/exp/metrics/staleness/staleness_test.go +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -19,6 +19,16 @@ func (rm *RawMap[K, V]) Load(key K) (V, bool) { return value, ok } +func (rm *RawMap[K, V]) LoadOrStore(key K, value V) (V, bool) { + returnedVal, ok := (*rm)[key] + if !ok { + (*rm)[key] = value + returnedVal = value + } + + return returnedVal, ok +} + func (rm *RawMap[K, V]) Store(key K, value V) { (*rm)[key] = value } From f16534264faa7d4320d982681a175cffb280f77f Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Wed, 14 Feb 2024 16:20:07 -0500 Subject: [PATCH 05/11] Use the new upcoming iterator style instead of our custom Range() --- internal/exp/metrics/staleness/staleness.go | 6 +++--- internal/exp/metrics/staleness/staleness_test.go | 14 ++++++++------ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index cf68946b9c59..afbcaf37a522 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -22,9 +22,9 @@ type Map[T any] interface { LoadOrStore(key identity.Stream, value T) (T, bool) // Remove the value at key from the map Delete(key identity.Stream) - // Range calls f sequentially for each key and value present in the map. - // If f returns false, range stops the iteration. - Range(f func(key identity.Stream, value T) bool) + // Items returns an iterator function that in future go version can be used with range + // See: https://go.dev/wiki/RangefuncExperiment + Items() func(yield func(identity.Stream, T) bool) bool } type Staleness[T any] struct { diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go index e2e4d009d156..363378ce8063 100644 --- a/internal/exp/metrics/staleness/staleness_test.go +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -37,11 +37,14 @@ func (rm *RawMap[K, V]) Delete(key K) { delete(*rm, key) } -func (rm *RawMap[K, V]) Range(f func(key K, value V) bool) { - for key, value := range *rm { - if !f(key, value) { - return +func (rm *RawMap[K, V]) Items() func(yield func(K, V) bool) bool { + return func(yield func(K, V) bool) bool { + for k, v := range *rm { + if !yield(k, v) { + break + } } + return false } } @@ -122,10 +125,9 @@ func TestStaleness(t *testing.T) { func validateStalenessMapEntries(t *testing.T, expected map[identity.Stream]int, sm *Staleness[int]) { actual := map[identity.Stream]int{} - sm.Range(func(key identity.Stream, value int) bool { + sm.Items()(func(key identity.Stream, value int) bool { actual[key] = value return true }) - require.Equal(t, expected, actual) } From 2d75b1fc886058ba5573f29f6a62dbe3f38d4519 Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Tue, 20 Feb 2024 15:21:34 -0500 Subject: [PATCH 06/11] Move Staleness map to be internal So users are forced to use the correct methods. Also adds lots of documentation --- internal/exp/metrics/staleness/map.go | 51 +++++++++++++++++++ .../exp/metrics/staleness/priority_queue.go | 36 +++++++------ internal/exp/metrics/staleness/staleness.go | 50 ++++++++++-------- .../exp/metrics/staleness/staleness_test.go | 41 --------------- 4 files changed, 100 insertions(+), 78 deletions(-) create mode 100644 internal/exp/metrics/staleness/map.go diff --git a/internal/exp/metrics/staleness/map.go b/internal/exp/metrics/staleness/map.go new file mode 100644 index 000000000000..622f27ef1a21 --- /dev/null +++ b/internal/exp/metrics/staleness/map.go @@ -0,0 +1,51 @@ +package staleness + +import ( + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +// Map is an abstraction over a map +type Map[T any] interface { + // Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value + Load(key identity.Stream) (T, bool) + // Store the given key value pair in the map + Store(key identity.Stream, value T) + // Remove the value at key from the map + Delete(key identity.Stream) + // Items returns an iterator function that in future go version can be used with range + // See: https://go.dev/wiki/RangefuncExperiment + Items() func(yield func(identity.Stream, T) bool) bool +} + +// RawMap implementation + +var _ Map[time.Time] = (*RawMap[identity.Stream, time.Time])(nil) + +// RawMap is an implementation of the Map interface using a standard golang map +type RawMap[K comparable, V any] map[K]V + +func (rm *RawMap[K, V]) Load(key K) (V, bool) { + value, ok := (*rm)[key] + return value, ok +} + +func (rm *RawMap[K, V]) Store(key K, value V) { + (*rm)[key] = value +} + +func (rm *RawMap[K, V]) Delete(key K) { + delete(*rm, key) +} + +func (rm *RawMap[K, V]) Items() func(yield func(K, V) bool) bool { + return func(yield func(K, V) bool) bool { + for k, v := range *rm { + if !yield(k, v) { + break + } + } + return false + } +} diff --git a/internal/exp/metrics/staleness/priority_queue.go b/internal/exp/metrics/staleness/priority_queue.go index e524ee112e88..d43676f15416 100644 --- a/internal/exp/metrics/staleness/priority_queue.go +++ b/internal/exp/metrics/staleness/priority_queue.go @@ -10,6 +10,21 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" ) +// PriorityQueue represents a way to store entries sorted by their priority. +// Pop() will return the oldest entry of the set. +type PriorityQueue interface { + // Update will add or update an entry, and reshuffle the queue internally as needed to keep it sorted + Update(id identity.Stream, newPrio time.Time) + // Peek will return the entry at the HEAD of the queue *without* removing it from the queue + Peek() (identity.Stream, time.Time) + // Pop will remove the entry at the HEAD of the queue and return it + Pop() (identity.Stream, time.Time) + // Len will return the number of entries in the queue + Len() int +} + +// heapQueue implements heap.Interface. +// We use it as the inner implementation of a heap-based sorted queue type heapQueue []*queueItem type queueItem struct { @@ -48,18 +63,6 @@ func (pq *heapQueue) Pop() any { return item } -func (pq *heapQueue) Update(item *queueItem, newPrio time.Time) { - item.prio = newPrio - heap.Fix(pq, item.index) -} - -type PriorityQueue interface { - Update(id identity.Stream, newPrio time.Time) - Peek() (identity.Stream, time.Time) - Pop() (identity.Stream, time.Time) - Len() int -} - type heapPriorityQueue struct { inner heapQueue itemLookup map[identity.Stream]*queueItem @@ -76,16 +79,19 @@ func NewPriorityQueue() PriorityQueue { } func (pq *heapPriorityQueue) Update(id identity.Stream, newPrio time.Time) { + // Check if the entry already exists in the queue item, ok := pq.itemLookup[id] - if !ok { + if ok { + // If so, we can update it in place + item.prio = newPrio + heap.Fix(&pq.inner, item.index) + } else { item = &queueItem{ key: id, prio: newPrio, } heap.Push(&pq.inner, item) pq.itemLookup[id] = item - } else { - pq.inner.Update(item, newPrio) } } diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index afbcaf37a522..481149b2f7b8 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -12,41 +12,47 @@ import ( // We override how Now() is returned, so we can have deterministic tests var nowFunc = time.Now -type Map[T any] interface { - // Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value - Load(key identity.Stream) (T, bool) - // Store the given key value pair in the map - Store(key identity.Stream, value T) - // LoadOrStore will either load the value from the map and return it and the boolean `true` - // or if it doesn't exist in the Map yet, the value passed in will be stored and then returned with the boolean `false` - LoadOrStore(key identity.Stream, value T) (T, bool) - // Remove the value at key from the map - Delete(key identity.Stream) - // Items returns an iterator function that in future go version can be used with range - // See: https://go.dev/wiki/RangefuncExperiment - Items() func(yield func(identity.Stream, T) bool) bool -} - +// Staleness a a wrapper over a map that adds an additional "staleness" value to each entry. Users can +// call ExpireOldEntries() to automatically remove all entries from the map whole staleness value is +// older than the `max` +// +// NOTE: Staleness methods are *not* thread-safe. If the user needs to use Staleness in a multi-threaded +// environment, then it is the user's responsibility to properly serialize calls to Staleness methods type Staleness[T any] struct { max time.Duration - Map[T] - pq PriorityQueue + items Map[T] + pq PriorityQueue } func NewStaleness[T any](max time.Duration, newMap Map[T]) *Staleness[T] { return &Staleness[T]{ - max: max, - Map: newMap, - pq: NewPriorityQueue(), + max: max, + items: newMap, + pq: NewPriorityQueue(), } } +// Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value +func (s *Staleness[T]) Load(key identity.Stream) (T, bool) { + return s.items.Load(key) +} + +// Store the given key value pair in the map, and update the pair's staleness value to "now" func (s *Staleness[T]) Store(id identity.Stream, value T) { s.pq.Update(id, nowFunc()) - s.Map.Store(id, value) + s.items.Store(id, value) +} + +// Items returns an iterator function that in future go version can be used with range +// See: https://go.dev/wiki/RangefuncExperiment +func (s *Staleness[T]) Items() func(yield func(identity.Stream, T) bool) bool { + return s.items.Items() } +// ExpireOldEntries will remove all entries whose staleness value is older than `now() - max` +// For example, if an entry has a staleness value of two hours ago, and max == 1 hour, then the entry would +// be removed. But if an entry had a stalness value of 30 minutes, then it *wouldn't* be removed. func (s *Staleness[T]) ExpireOldEntries() { now := nowFunc() @@ -56,6 +62,6 @@ func (s *Staleness[T]) ExpireOldEntries() { break } id, _ := s.pq.Pop() - s.Map.Delete(id) + s.items.Delete(id) } } diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go index 363378ce8063..00137d6e1ad5 100644 --- a/internal/exp/metrics/staleness/staleness_test.go +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -9,47 +9,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" ) -// RawMap -var _ Map[time.Time] = (*RawMap[identity.Stream, time.Time])(nil) - -type RawMap[K comparable, V any] map[K]V - -func (rm *RawMap[K, V]) Load(key K) (V, bool) { - value, ok := (*rm)[key] - return value, ok -} - -func (rm *RawMap[K, V]) LoadOrStore(key K, value V) (V, bool) { - returnedVal, ok := (*rm)[key] - if !ok { - (*rm)[key] = value - returnedVal = value - } - - return returnedVal, ok -} - -func (rm *RawMap[K, V]) Store(key K, value V) { - (*rm)[key] = value -} - -func (rm *RawMap[K, V]) Delete(key K) { - delete(*rm, key) -} - -func (rm *RawMap[K, V]) Items() func(yield func(K, V) bool) bool { - return func(yield func(K, V) bool) bool { - for k, v := range *rm { - if !yield(k, v) { - break - } - } - return false - } -} - -// Tests - func TestStaleness(t *testing.T) { t.Parallel() From a3d08a896b03fd196283017e647592d3444a3b8c Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Tue, 20 Feb 2024 15:37:26 -0500 Subject: [PATCH 07/11] Fix linter / check errors --- internal/exp/metrics/go.mod | 4 +--- internal/exp/metrics/go.sum | 4 ++++ internal/exp/metrics/staleness/map.go | 5 ++++- internal/exp/metrics/staleness/priority_queue.go | 2 +- internal/exp/metrics/staleness/staleness.go | 2 +- internal/exp/metrics/staleness/staleness_test.go | 3 +++ 6 files changed, 14 insertions(+), 6 deletions(-) diff --git a/internal/exp/metrics/go.mod b/internal/exp/metrics/go.mod index ca0a23cb1e86..6e54eb33f229 100644 --- a/internal/exp/metrics/go.mod +++ b/internal/exp/metrics/go.mod @@ -1,6 +1,6 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics -go 1.20 +go 1.21 require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.95.0 @@ -28,5 +28,3 @@ require ( ) replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil - -toolchain go1.21.1 diff --git a/internal/exp/metrics/go.sum b/internal/exp/metrics/go.sum index 1c3e08163635..9a5008b4d916 100644 --- a/internal/exp/metrics/go.sum +++ b/internal/exp/metrics/go.sum @@ -17,7 +17,9 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -26,6 +28,7 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= @@ -79,5 +82,6 @@ google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7 google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/exp/metrics/staleness/map.go b/internal/exp/metrics/staleness/map.go index 622f27ef1a21..77b29f5febd2 100644 --- a/internal/exp/metrics/staleness/map.go +++ b/internal/exp/metrics/staleness/map.go @@ -1,4 +1,7 @@ -package staleness +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" import ( "time" diff --git a/internal/exp/metrics/staleness/priority_queue.go b/internal/exp/metrics/staleness/priority_queue.go index d43676f15416..f1b01743f95f 100644 --- a/internal/exp/metrics/staleness/priority_queue.go +++ b/internal/exp/metrics/staleness/priority_queue.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package staleness +package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" import ( "container/heap" diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index 481149b2f7b8..34c53e3702cd 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package staleness +package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" import ( "time" diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go index 00137d6e1ad5..688a6c7b3c0b 100644 --- a/internal/exp/metrics/staleness/staleness_test.go +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package staleness import ( From 6a41d35b2a5800cf7862748e14dec5315a7497be Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Wed, 21 Feb 2024 12:28:59 -0500 Subject: [PATCH 08/11] The Staleness tests can't be parallel, because they use the global `NowFunc` --- internal/exp/metrics/staleness/staleness_test.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go index 688a6c7b3c0b..4d96b41061e6 100644 --- a/internal/exp/metrics/staleness/staleness_test.go +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -13,8 +13,6 @@ import ( ) func TestStaleness(t *testing.T) { - t.Parallel() - max := 1 * time.Second stalenessMap := NewStaleness[int]( max, @@ -46,20 +44,20 @@ func TestStaleness(t *testing.T) { valueD := 0 // Add the values to the map - nowFunc = func() time.Time { return timeA } + NowFunc = func() time.Time { return timeA } stalenessMap.Store(idA, valueA) - nowFunc = func() time.Time { return timeB } + NowFunc = func() time.Time { return timeB } stalenessMap.Store(idB, valueB) - nowFunc = func() time.Time { return timeC } + NowFunc = func() time.Time { return timeC } stalenessMap.Store(idC, valueC) - nowFunc = func() time.Time { return timeD } + NowFunc = func() time.Time { return timeD } stalenessMap.Store(idD, valueD) // Set the time to 2.5s and run expire // This should remove B, but the others should remain // (now == 2.5s, B == 1s, max == 1s) // now > B + max - nowFunc = func() time.Time { return initialTime.Add(2500 * time.Millisecond) } + NowFunc = func() time.Time { return initialTime.Add(2500 * time.Millisecond) } stalenessMap.ExpireOldEntries() validateStalenessMapEntries(t, map[identity.Stream]int{ @@ -74,7 +72,7 @@ func TestStaleness(t *testing.T) { // This should remove A and C, but D should remain // (now == 2.5s, A == 2s, C == 3s, max == 1s) // now > A + max AND now > C + max - nowFunc = func() time.Time { return initialTime.Add(4500 * time.Millisecond) } + NowFunc = func() time.Time { return initialTime.Add(4500 * time.Millisecond) } stalenessMap.ExpireOldEntries() validateStalenessMapEntries(t, map[identity.Stream]int{ From cde1ec7d7591d842ff07808b6674648a1dc69699 Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Wed, 21 Feb 2024 12:29:28 -0500 Subject: [PATCH 09/11] Expose `NowFunc` so other modules can do testing --- internal/exp/metrics/staleness/staleness.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index 34c53e3702cd..f5803ccdeb55 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -10,7 +10,7 @@ import ( ) // We override how Now() is returned, so we can have deterministic tests -var nowFunc = time.Now +var NowFunc = time.Now // Staleness a a wrapper over a map that adds an additional "staleness" value to each entry. Users can // call ExpireOldEntries() to automatically remove all entries from the map whole staleness value is @@ -40,7 +40,7 @@ func (s *Staleness[T]) Load(key identity.Stream) (T, bool) { // Store the given key value pair in the map, and update the pair's staleness value to "now" func (s *Staleness[T]) Store(id identity.Stream, value T) { - s.pq.Update(id, nowFunc()) + s.pq.Update(id, NowFunc()) s.items.Store(id, value) } @@ -54,7 +54,7 @@ func (s *Staleness[T]) Items() func(yield func(identity.Stream, T) bool) bool { // For example, if an entry has a staleness value of two hours ago, and max == 1 hour, then the entry would // be removed. But if an entry had a stalness value of 30 minutes, then it *wouldn't* be removed. func (s *Staleness[T]) ExpireOldEntries() { - now := nowFunc() + now := NowFunc() for { _, ts := s.pq.Peek() From c53a0867bea36a221dd269419b83dd9e970704a5 Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Tue, 20 Feb 2024 14:18:17 -0500 Subject: [PATCH 10/11] [processor/interval] Implement the main logic --- .github/CODEOWNERS | 2 +- processor/intervalprocessor/README.md | 5 +- processor/intervalprocessor/config.go | 17 + processor/intervalprocessor/factory.go | 6 +- .../generated_component_test.go | 4 +- processor/intervalprocessor/go.mod | 4 + .../internal/metrics/metrics.go | 101 +++ processor/intervalprocessor/metadata.yaml | 2 +- processor/intervalprocessor/processor.go | 243 ++++++- processor/intervalprocessor/processor_test.go | 640 ++++++++++++++++++ 10 files changed, 1010 insertions(+), 14 deletions(-) create mode 100644 processor/intervalprocessor/internal/metrics/metrics.go create mode 100644 processor/intervalprocessor/processor_test.go diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index a25a76e4bd4d..6e21b0712eff 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -160,7 +160,7 @@ processor/deltatorateprocessor/ @open-telemetry/collect processor/filterprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @boostchicken processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo processor/groupbytraceprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling -processor/intervalprocessor/ @open-telemetry/collector-contrib-approvers @RichieSams +processor/intervalprocessor/ @open-telemetry/collector-contrib-approvers @RichieSams @sh0rez processor/k8sattributesprocessor/ @open-telemetry/collector-contrib-approvers @dmitryax @rmfitzpatrick @fatsheep9146 @TylerHelmuth processor/logstransformprocessor/ @open-telemetry/collector-contrib-approvers @djaglowski @dehaansa processor/metricsgenerationprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9 diff --git a/processor/intervalprocessor/README.md b/processor/intervalprocessor/README.md index 90cf86d05e3f..c8d5473fa307 100644 --- a/processor/intervalprocessor/README.md +++ b/processor/intervalprocessor/README.md @@ -7,7 +7,7 @@ | Distributions | [] | | Warnings | [Statefulness](#warnings) | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Finterval%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Finterval) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Finterval%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Finterval) | -| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@RichieSams](https://www.github.com/RichieSams) | +| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@RichieSams](https://www.github.com/RichieSams), [@sh0rez](https://www.github.com/sh0rez) | [development]: https://github.com/open-telemetry/opentelemetry-collector#development @@ -31,4 +31,5 @@ The following metric types will *not* be aggregated, and will instead be passed, The following settings can be optionally configured: -- `max_staleness`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0 +* `interval`: The interval in which the processor should re-export the aggregated metrics. Default: 15s +* `max_staleness`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0 diff --git a/processor/intervalprocessor/config.go b/processor/intervalprocessor/config.go index 8a43eca75c00..1393ba9c001a 100644 --- a/processor/intervalprocessor/config.go +++ b/processor/intervalprocessor/config.go @@ -4,15 +4,24 @@ package intervalprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor" import ( + "errors" "time" "go.opentelemetry.io/collector/component" ) +var ( + ErrInvalidIntervalValue = errors.New("invalid interval value") + ErrInvalidMaxStatenessValue = errors.New("invalid max_stateless value") +) + var _ component.Config = (*Config)(nil) // Config defines the configuration for the processor. type Config struct { + // Interval is the time + Interval time.Duration `mapstructure:"interval"` + // MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. MaxStaleness time.Duration `mapstructure:"max_staleness"` } @@ -20,5 +29,13 @@ type Config struct { // Validate checks whether the input configuration has all of the required fields for the processor. // An error is returned if there are any invalid inputs. func (config *Config) Validate() error { + if config.Interval <= 0 { + return ErrInvalidIntervalValue + } + + if config.MaxStaleness < 0 { + return ErrInvalidMaxStatenessValue + } + return nil } diff --git a/processor/intervalprocessor/factory.go b/processor/intervalprocessor/factory.go index 5bef7881dfdf..00ddb2dca253 100644 --- a/processor/intervalprocessor/factory.go +++ b/processor/intervalprocessor/factory.go @@ -6,6 +6,7 @@ package intervalprocessor // import "github.com/open-telemetry/opentelemetry-col import ( "context" "fmt" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -23,7 +24,10 @@ func NewFactory() processor.Factory { } func createDefaultConfig() component.Config { - return &Config{} + return &Config{ + Interval: 15 * time.Second, + MaxStaleness: 0, + } } func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (processor.Metrics, error) { diff --git a/processor/intervalprocessor/generated_component_test.go b/processor/intervalprocessor/generated_component_test.go index 55e89c63bcaa..dee3e958229a 100644 --- a/processor/intervalprocessor/generated_component_test.go +++ b/processor/intervalprocessor/generated_component_test.go @@ -9,13 +9,11 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - + "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processortest" - "go.opentelemetry.io/collector/confmap/confmaptest" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" ) diff --git a/processor/intervalprocessor/go.mod b/processor/intervalprocessor/go.mod index 533717deb457..7e425817527e 100644 --- a/processor/intervalprocessor/go.mod +++ b/processor/intervalprocessor/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.95.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.95.0 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/component v0.95.0 go.opentelemetry.io/collector/confmap v0.95.0 @@ -33,6 +34,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.95.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.18.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect @@ -56,6 +58,8 @@ require ( replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics + replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest diff --git a/processor/intervalprocessor/internal/metrics/metrics.go b/processor/intervalprocessor/internal/metrics/metrics.go new file mode 100644 index 000000000000..1a1624af7536 --- /dev/null +++ b/processor/intervalprocessor/internal/metrics/metrics.go @@ -0,0 +1,101 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +type Metric struct { + res pcommon.Resource + resSchemaURL string + scope pcommon.InstrumentationScope + scopeSchemaURL string + innerMetric pmetric.Metric +} + +func From(res pcommon.Resource, resSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, metric pmetric.Metric) Metric { + newMetric := Metric{ + res: res, + resSchemaURL: resSchemaURL, + scope: scope, + scopeSchemaURL: scopeSchemaURL, + innerMetric: pmetric.NewMetric(), + } + + // Clone the metric metadata without the datapoints + newMetric.innerMetric.SetName(metric.Name()) + newMetric.innerMetric.SetDescription(metric.Description()) + newMetric.innerMetric.SetUnit(metric.Unit()) + switch metric.Type() { + case pmetric.MetricTypeGauge: + newMetric.innerMetric.SetEmptyGauge() + case pmetric.MetricTypeSum: + sum := metric.Sum() + + newSum := newMetric.innerMetric.SetEmptySum() + newSum.SetAggregationTemporality(sum.AggregationTemporality()) + newSum.SetIsMonotonic(sum.IsMonotonic()) + case pmetric.MetricTypeHistogram: + histogram := metric.Histogram() + + newHistogram := newMetric.innerMetric.SetEmptyHistogram() + newHistogram.SetAggregationTemporality(histogram.AggregationTemporality()) + case pmetric.MetricTypeExponentialHistogram: + expHistogram := metric.ExponentialHistogram() + + newExpHistogram := newMetric.innerMetric.SetEmptyExponentialHistogram() + newExpHistogram.SetAggregationTemporality(expHistogram.AggregationTemporality()) + case pmetric.MetricTypeSummary: + newMetric.innerMetric.SetEmptySummary() + } + + return newMetric +} + +func (m *Metric) Identity() identity.Metric { + return identity.OfResourceMetric(m.res, m.scope, m.innerMetric) +} + +func (m *Metric) CopyToResourceMetric(dest pmetric.ResourceMetrics) { + m.res.CopyTo(dest.Resource()) + dest.SetSchemaUrl(m.resSchemaURL) +} + +func (m *Metric) CopyToScopeMetric(dest pmetric.ScopeMetrics) { + m.scope.CopyTo(dest.Scope()) + dest.SetSchemaUrl(m.scopeSchemaURL) +} + +func (m *Metric) CopyToPMetric(dest pmetric.Metric) { + m.innerMetric.CopyTo(dest) +} + +type DataPointSlice[DP DataPoint[DP]] interface { + Len() int + At(i int) DP +} + +type DataPoint[Self any] interface { + pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint + + Timestamp() pcommon.Timestamp + Attributes() pcommon.Map + CopyTo(dest Self) +} + +type StreamDataPoint[DP DataPoint[DP]] struct { + Metric Metric + DataPoint DP + LastUpdated time.Time +} + +func StreamDataPointIdentity[DP DataPoint[DP]](metricID identity.Metric, dataPoint DP) identity.Stream { + return identity.OfStream[DP](metricID, dataPoint) +} diff --git a/processor/intervalprocessor/metadata.yaml b/processor/intervalprocessor/metadata.yaml index d1051807c70e..2a4700c3e069 100644 --- a/processor/intervalprocessor/metadata.yaml +++ b/processor/intervalprocessor/metadata.yaml @@ -7,6 +7,6 @@ status: distributions: [] warnings: [Statefulness] codeowners: - active: [RichieSams] + active: [RichieSams, sh0rez] tests: config: diff --git a/processor/intervalprocessor/processor.go b/processor/intervalprocessor/processor.go index 1ba888a1f7b4..bb7c9d05b959 100644 --- a/processor/intervalprocessor/processor.go +++ b/processor/intervalprocessor/processor.go @@ -5,13 +5,20 @@ package intervalprocessor // import "github.com/open-telemetry/opentelemetry-col import ( "context" + "errors" + "fmt" + "sync" "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/processor" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor/internal/metrics" ) var _ processor.Metrics = (*Processor)(nil) @@ -19,9 +26,16 @@ var _ processor.Metrics = (*Processor)(nil) type Processor struct { ctx context.Context cancel context.CancelFunc - log *zap.Logger + logger *zap.Logger + + stateLock sync.Mutex + numbers map[identity.Stream]metrics.StreamDataPoint[pmetric.NumberDataPoint] + histograms map[identity.Stream]metrics.StreamDataPoint[pmetric.HistogramDataPoint] + expHistograms map[identity.Stream]metrics.StreamDataPoint[pmetric.ExponentialHistogramDataPoint] - maxStaleness time.Duration + interval time.Duration + maxStaleness time.Duration + intervalTicker *time.Ticker nextConsumer consumer.Metrics } @@ -30,19 +44,32 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics ctx, cancel := context.WithCancel(context.Background()) return &Processor{ - ctx: ctx, - cancel: cancel, - log: log, + ctx: ctx, + cancel: cancel, + logger: log, + + stateLock: sync.Mutex{}, + numbers: map[identity.Stream]metrics.StreamDataPoint[pmetric.NumberDataPoint]{}, + histograms: map[identity.Stream]metrics.StreamDataPoint[pmetric.HistogramDataPoint]{}, + expHistograms: map[identity.Stream]metrics.StreamDataPoint[pmetric.ExponentialHistogramDataPoint]{}, + + interval: config.Interval, maxStaleness: config.MaxStaleness, + nextConsumer: nextConsumer, } } func (p *Processor) Start(_ context.Context, _ component.Host) error { + p.intervalTicker = time.NewTicker(p.interval) + go p.exportMetricsLoop() return nil } func (p *Processor) Shutdown(_ context.Context) error { + if p.intervalTicker != nil { + p.intervalTicker.Stop() + } p.cancel() return nil } @@ -52,5 +79,209 @@ func (p *Processor) Capabilities() consumer.Capabilities { } func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - return p.nextConsumer.ConsumeMetrics(ctx, md) + var errs error + + p.stateLock.Lock() + defer p.stateLock.Unlock() + + md.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { + rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool { + sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { + switch m.Type() { + case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary: + return false + case pmetric.MetricTypeSum: + sum := m.Sum() + + if !sum.IsMonotonic() { + return false + } + + if sum.AggregationTemporality() != pmetric.AggregationTemporalityCumulative { + return false + } + + aggregateDataPoints(sum.DataPoints(), p.numbers, rm.Resource(), rm.SchemaUrl(), sm.Scope(), sm.SchemaUrl(), m) + return true + case pmetric.MetricTypeHistogram: + histogram := m.Histogram() + + if histogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative { + return false + } + + aggregateDataPoints(histogram.DataPoints(), p.histograms, rm.Resource(), rm.SchemaUrl(), sm.Scope(), sm.SchemaUrl(), m) + return true + case pmetric.MetricTypeExponentialHistogram: + expHistogram := m.ExponentialHistogram() + + if expHistogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative { + return false + } + + aggregateDataPoints(expHistogram.DataPoints(), p.expHistograms, rm.Resource(), rm.SchemaUrl(), sm.Scope(), sm.SchemaUrl(), m) + return true + default: + errs = errors.Join(fmt.Errorf("invalid MetricType %d", m.Type())) + return false + } + }) + return sm.Metrics().Len() == 0 + }) + return rm.ScopeMetrics().Len() == 0 + }) + + if err := p.nextConsumer.ConsumeMetrics(ctx, md); err != nil { + errs = errors.Join(errs, err) + } + + return errs +} + +func aggregateDataPoints[DPS metrics.DataPointSlice[DP], DP metrics.DataPoint[DP]](dataPoints DPS, state map[identity.Stream]metrics.StreamDataPoint[DP], res pcommon.Resource, resSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, m pmetric.Metric) { + metric := metrics.From(res, resSchemaURL, scope, scopeSchemaURL, m) + metricID := metric.Identity() + + now := time.Now() + + for i := 0; i < dataPoints.Len(); i++ { + dp := dataPoints.At(i) + + streamDataPointID := metrics.StreamDataPointIdentity(metricID, dp) + + existing, ok := state[streamDataPointID] + if !ok { + state[streamDataPointID] = metrics.StreamDataPoint[DP]{ + Metric: metric, + DataPoint: dp, + LastUpdated: now, + } + continue + } + + // Check if the datapoint is newer + if dp.Timestamp().AsTime().After(existing.DataPoint.Timestamp().AsTime()) { + state[streamDataPointID] = metrics.StreamDataPoint[DP]{ + Metric: metric, + DataPoint: dp, + LastUpdated: now, + } + continue + } + + // Otherwise, we leave existing as-is + } +} + +func (p *Processor) exportMetricsLoop() { + for { + select { + case <-p.ctx.Done(): + return + case <-p.intervalTicker.C: + p.expireOldMetrics() + p.exportMetrics() + } + } +} + +func (p *Processor) exportMetrics() { + p.stateLock.Lock() + defer p.stateLock.Unlock() + + // We have to generate our own metrics slice to send to the nextConsumer + md := pmetric.NewMetrics() + + // We want to avoid generating duplicate ResourceMetrics, ScopeMetrics, and Metrics + // So we use lookups to only generate what we need + rmLookup := map[identity.Resource]pmetric.ResourceMetrics{} + smLookup := map[identity.Scope]pmetric.ScopeMetrics{} + mLookup := map[identity.Metric]pmetric.Metric{} + + for dataID, dp := range p.numbers { + m := getOrCreateMetric(dataID, dp.Metric, md, rmLookup, smLookup, mLookup) + + sum := m.Sum() + numDP := sum.DataPoints().AppendEmpty() + dp.DataPoint.CopyTo(numDP) + } + + for dataID, dp := range p.histograms { + m := getOrCreateMetric(dataID, dp.Metric, md, rmLookup, smLookup, mLookup) + + histogram := m.Histogram() + histogramDP := histogram.DataPoints().AppendEmpty() + dp.DataPoint.CopyTo(histogramDP) + } + + for dataID, dp := range p.expHistograms { + m := getOrCreateMetric(dataID, dp.Metric, md, rmLookup, smLookup, mLookup) + + expHistogram := m.ExponentialHistogram() + expHistogramDP := expHistogram.DataPoints().AppendEmpty() + dp.DataPoint.CopyTo(expHistogramDP) + } + + if err := p.nextConsumer.ConsumeMetrics(p.ctx, md); err != nil { + p.logger.Error("Metrics export failed", zap.Error(err)) + } +} + +func getOrCreateMetric( + streamID identity.Stream, metricRef metrics.Metric, + md pmetric.Metrics, + rmLookup map[identity.Resource]pmetric.ResourceMetrics, + smLookup map[identity.Scope]pmetric.ScopeMetrics, + mLookup map[identity.Metric]pmetric.Metric, +) pmetric.Metric { + // Find the ResourceMetrics + rm, ok := rmLookup[streamID.Metric().Scope().Resource()] + if !ok { + // We need to create it + rm = md.ResourceMetrics().AppendEmpty() + metricRef.CopyToResourceMetric(rm) + } + + // Find the ScopeMetrics + sm, ok := smLookup[streamID.Metric().Scope()] + if !ok { + sm = rm.ScopeMetrics().AppendEmpty() + metricRef.CopyToScopeMetric(sm) + } + + m, ok := mLookup[streamID.Metric()] + if !ok { + m = sm.Metrics().AppendEmpty() + metricRef.CopyToPMetric(m) + } + + return m +} + +func (p *Processor) expireOldMetrics() { + // Fast-out + if p.maxStaleness == 0 { + return + } + + p.stateLock.Lock() + defer p.stateLock.Unlock() + + now := time.Now() + + for key, dp := range p.numbers { + if dp.LastUpdated.Add(p.maxStaleness).Before(now) { + delete(p.numbers, key) + } + } + for key, dp := range p.histograms { + if dp.LastUpdated.Add(p.maxStaleness).Before(now) { + delete(p.numbers, key) + } + } + for key, dp := range p.expHistograms { + if dp.LastUpdated.Add(p.maxStaleness).Before(now) { + delete(p.numbers, key) + } + } } diff --git a/processor/intervalprocessor/processor_test.go b/processor/intervalprocessor/processor_test.go new file mode 100644 index 000000000000..5dc7d3cd4cc8 --- /dev/null +++ b/processor/intervalprocessor/processor_test.go @@ -0,0 +1,640 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package intervalprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor" + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/processor/processortest" +) + +// TODO: Add tests for the other data types. Ensuring things like: gauges are passed through unchanged, etc. + +// TODO: Add tests for data expiration + +func TestAggregation(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + inputs []testMetric + next []testMetric + outputs []testMetric + }{ + // TODO: Add many more test cases for all the edge cases + { + name: "BasicAggregation", + inputs: []testMetric{ + { + Name: "test_metric_total", + Type: pmetric.MetricTypeSum, + IsMonotonic: true, + Temporality: pmetric.AggregationTemporalityCumulative, + DataPoints: []any{ + testNumberDataPoint{ + Timestamp: 50, + Value: 333, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 20, + Value: 222, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 80, + Value: 444, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + }, + }, + }, + next: []testMetric{}, + outputs: []testMetric{ + { + Name: "test_metric_total", + Type: pmetric.MetricTypeSum, + IsMonotonic: true, + Temporality: pmetric.AggregationTemporalityCumulative, + DataPoints: []any{ + testNumberDataPoint{ + Timestamp: 80, + Value: 444, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + }, + }, + }, + }, + { + name: "NonMonotonicSumsArePassedThrough", + inputs: []testMetric{ + { + Name: "test_metric_total", + Type: pmetric.MetricTypeSum, + IsMonotonic: false, + Temporality: pmetric.AggregationTemporalityCumulative, + DataPoints: []any{ + testNumberDataPoint{ + Timestamp: 50, + Value: 333, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 20, + Value: 222, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 80, + Value: 111, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + }, + }, + }, + next: []testMetric{ + { + Name: "test_metric_total", + Type: pmetric.MetricTypeSum, + IsMonotonic: false, + Temporality: pmetric.AggregationTemporalityCumulative, + DataPoints: []any{ + testNumberDataPoint{ + Timestamp: 50, + Value: 333, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 20, + Value: 222, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 80, + Value: 111, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + }, + }, + }, + outputs: []testMetric{}, + }, + { + name: "GaugesArePassedThrough", + inputs: []testMetric{ + { + Name: "test_metric_value", + Type: pmetric.MetricTypeGauge, + DataPoints: []any{ + testNumberDataPoint{ + Timestamp: 50, + Value: 345, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 20, + Value: 258, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 80, + Value: 178, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + }, + }, + }, + next: []testMetric{ + { + Name: "test_metric_value", + Type: pmetric.MetricTypeGauge, + DataPoints: []any{ + testNumberDataPoint{ + Timestamp: 50, + Value: 345, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 20, + Value: 258, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 80, + Value: 178, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + }, + }, + }, + outputs: []testMetric{}, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + config := &Config{Interval: time.Second, MaxStaleness: time.Second} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // next stores the results of the filter metric processor + next := &consumertest.MetricsSink{} + + factory := NewFactory() + mgp, err := factory.CreateMetricsProcessor( + context.Background(), + processortest.NewNopCreateSettings(), + config, + next, + ) + require.NoError(t, err) + + md := generateTestSumMetrics(t, testMetrics{ + { + SchemaURL: "https://test-res-schema.com/schema", + Attributes: map[string]any{ + "asdf": "foo", + }, + ScopeMetrics: []testScopeMetrics{ + { + Name: "MyTestInstrument", + Version: "1.2.3", + Attributes: map[string]any{ + "foo": "bar", + }, + Metrics: tc.inputs, + }, + }, + }, + }) + + // Test that ConsumeMetrics works + err = mgp.ConsumeMetrics(ctx, md) + require.NoError(t, err) + + require.IsType(t, &Processor{}, mgp) + processor := mgp.(*Processor) + + // Pretend we hit the interval timer and call export + processor.exportMetrics() + + // Next should have gotten two data sets: + // 1. Anything left over from ConsumeMetrics() + // 2. Anything exported from exportMetrics() + allMetrics := next.AllMetrics() + require.Len(t, allMetrics, 2) + + nextData := convertMetricsToTestData(t, allMetrics[0]) + exportData := convertMetricsToTestData(t, allMetrics[1]) + + expectedNextData := testMetrics{ + { + SchemaURL: "https://test-res-schema.com/schema", + Attributes: map[string]any{ + "asdf": "foo", + }, + ScopeMetrics: []testScopeMetrics{ + { + Name: "MyTestInstrument", + Version: "1.2.3", + Attributes: map[string]any{ + "foo": "bar", + }, + Metrics: tc.next, + }, + }, + }, + } + if len(tc.next) == 0 { + expectedNextData = testMetrics{} + } + + expectedExportData := testMetrics{ + { + SchemaURL: "https://test-res-schema.com/schema", + Attributes: map[string]any{ + "asdf": "foo", + }, + ScopeMetrics: []testScopeMetrics{ + { + Name: "MyTestInstrument", + Version: "1.2.3", + Attributes: map[string]any{ + "foo": "bar", + }, + Metrics: tc.outputs, + }, + }, + }, + } + if len(tc.outputs) == 0 { + expectedExportData = testMetrics{} + } + + require.Equal(t, expectedNextData, nextData) + require.Equal(t, expectedExportData, exportData) + }) + } +} + +type testMetrics []testResourceMetrics + +type testResourceMetrics struct { + SchemaURL string + Attributes map[string]any + + ScopeMetrics []testScopeMetrics +} + +type testScopeMetrics struct { + Name string + Version string + SchemaURL string + Attributes map[string]any + + Metrics []testMetric +} + +type testMetric struct { + Name string + Type pmetric.MetricType + IsMonotonic bool + Temporality pmetric.AggregationTemporality + + DataPoints []any +} + +type testNumberDataPoint struct { + StartTimestamp pcommon.Timestamp + Timestamp pcommon.Timestamp + + Value float64 + Attributes map[string]any +} + +type testSummaryDataPoint struct { + StartTimestamp pcommon.Timestamp + Timestamp pcommon.Timestamp + + QuantileValues []testValueAtQuantile + Attributes map[string]any +} + +type testValueAtQuantile struct { + Value float64 + Quantile float64 +} + +type testHistogramDataPoint struct { + StartTimestamp pcommon.Timestamp + Timestamp pcommon.Timestamp + + ExplicitBounds []float64 + BucketCounts []uint64 + Attributes map[string]any +} + +func generateTestSumMetrics(t *testing.T, tmd testMetrics) pmetric.Metrics { + md := pmetric.NewMetrics() + + for _, trm := range tmd { + rm := md.ResourceMetrics().AppendEmpty() + err := rm.Resource().Attributes().FromRaw(trm.Attributes) + require.NoError(t, err) + rm.SetSchemaUrl(trm.SchemaURL) + + for _, tsm := range trm.ScopeMetrics { + sm := rm.ScopeMetrics().AppendEmpty() + scope := pcommon.NewInstrumentationScope() + scope.SetName(tsm.Name) + scope.SetVersion(tsm.Version) + err = scope.Attributes().FromRaw(tsm.Attributes) + require.NoError(t, err) + scope.MoveTo(sm.Scope()) + sm.SetSchemaUrl(tsm.SchemaURL) + + for _, tm := range tsm.Metrics { + m := sm.Metrics().AppendEmpty() + m.SetName(tm.Name) + + switch tm.Type { + case pmetric.MetricTypeSum: + sum := m.SetEmptySum() + sum.SetIsMonotonic(tm.IsMonotonic) + sum.SetAggregationTemporality(tm.Temporality) + + for _, tdp := range tm.DataPoints { + require.IsType(t, testNumberDataPoint{}, tdp) + tdpp := tdp.(testNumberDataPoint) + + dp := sum.DataPoints().AppendEmpty() + + dp.SetStartTimestamp(tdpp.StartTimestamp) + dp.SetTimestamp(tdpp.Timestamp) + + dp.SetDoubleValue(tdpp.Value) + + err = dp.Attributes().FromRaw(tdpp.Attributes) + require.NoError(t, err) + } + case pmetric.MetricTypeGauge: + gauge := m.SetEmptyGauge() + + for _, tdp := range tm.DataPoints { + require.IsType(t, testNumberDataPoint{}, tdp) + tdpp := tdp.(testNumberDataPoint) + + dp := gauge.DataPoints().AppendEmpty() + + dp.SetStartTimestamp(tdpp.StartTimestamp) + dp.SetTimestamp(tdpp.Timestamp) + + dp.SetDoubleValue(tdpp.Value) + + err = dp.Attributes().FromRaw(tdpp.Attributes) + require.NoError(t, err) + } + case pmetric.MetricTypeSummary: + summary := m.SetEmptySummary() + + for _, tdp := range tm.DataPoints { + require.IsType(t, testSummaryDataPoint{}, tdp) + tdpp := tdp.(testSummaryDataPoint) + + dp := summary.DataPoints().AppendEmpty() + + dp.SetStartTimestamp(tdpp.StartTimestamp) + dp.SetTimestamp(tdpp.Timestamp) + + for _, valueAtQuantile := range tdpp.QuantileValues { + qv := dp.QuantileValues().AppendEmpty() + + qv.SetQuantile(valueAtQuantile.Quantile) + qv.SetValue(valueAtQuantile.Value) + } + + err = dp.Attributes().FromRaw(tdpp.Attributes) + require.NoError(t, err) + } + case pmetric.MetricTypeHistogram: + histogram := m.SetEmptyHistogram() + histogram.SetAggregationTemporality(tm.Temporality) + + for _, tdp := range tm.DataPoints { + require.IsType(t, testHistogramDataPoint{}, tdp) + tdpp := tdp.(testHistogramDataPoint) + + dp := histogram.DataPoints().AppendEmpty() + + dp.SetStartTimestamp(tdpp.StartTimestamp) + dp.SetTimestamp(tdpp.Timestamp) + + bucketSum := uint64(0) + for _, bucketCount := range tdpp.BucketCounts { + bucketSum += bucketCount + } + dp.SetCount(bucketSum) + dp.BucketCounts().FromRaw(tdpp.BucketCounts) + dp.ExplicitBounds().FromRaw(tdpp.ExplicitBounds) + + err = dp.Attributes().FromRaw(tdpp.Attributes) + require.NoError(t, err) + } + default: + t.Fatalf("Unsupported metric type %v", m.Type()) + } + } + } + } + + return md +} + +func convertMetricsToTestData(t *testing.T, md pmetric.Metrics) testMetrics { + tmd := testMetrics{} + + for i := 0; i < md.ResourceMetrics().Len(); i++ { + rm := md.ResourceMetrics().At(i) + + trm := testResourceMetrics{ + SchemaURL: rm.SchemaUrl(), + Attributes: rm.Resource().Attributes().AsRaw(), + ScopeMetrics: []testScopeMetrics{}, + } + + for j := 0; j < rm.ScopeMetrics().Len(); j++ { + sm := rm.ScopeMetrics().At(j) + + tsm := testScopeMetrics{ + Name: sm.Scope().Name(), + Version: sm.Scope().Version(), + SchemaURL: sm.SchemaUrl(), + Attributes: sm.Scope().Attributes().AsRaw(), + Metrics: []testMetric{}, + } + + for k := 0; k < sm.Metrics().Len(); k++ { + m := sm.Metrics().At(k) + + switch m.Type() { + case pmetric.MetricTypeSum: + sum := m.Sum() + + tm := testMetric{ + Name: m.Name(), + Type: pmetric.MetricTypeSum, + IsMonotonic: sum.IsMonotonic(), + Temporality: sum.AggregationTemporality(), + DataPoints: []any{}, + } + + for r := 0; r < sum.DataPoints().Len(); r++ { + dp := sum.DataPoints().At(r) + + tdp := testNumberDataPoint{ + StartTimestamp: dp.StartTimestamp(), + Timestamp: dp.Timestamp(), + Value: dp.DoubleValue(), + Attributes: dp.Attributes().AsRaw(), + } + + tm.DataPoints = append(tm.DataPoints, tdp) + } + + tsm.Metrics = append(tsm.Metrics, tm) + case pmetric.MetricTypeGauge: + gauge := m.Gauge() + + tm := testMetric{ + Name: m.Name(), + Type: pmetric.MetricTypeGauge, + DataPoints: []any{}, + } + + for r := 0; r < gauge.DataPoints().Len(); r++ { + dp := gauge.DataPoints().At(r) + + tdp := testNumberDataPoint{ + StartTimestamp: dp.StartTimestamp(), + Timestamp: dp.Timestamp(), + Value: dp.DoubleValue(), + Attributes: dp.Attributes().AsRaw(), + } + + tm.DataPoints = append(tm.DataPoints, tdp) + } + + tsm.Metrics = append(tsm.Metrics, tm) + case pmetric.MetricTypeSummary: + summary := m.Summary() + + tm := testMetric{ + Name: m.Name(), + Type: pmetric.MetricTypeSummary, + DataPoints: []any{}, + } + + for r := 0; r < summary.DataPoints().Len(); r++ { + dp := summary.DataPoints().At(r) + + tdp := testSummaryDataPoint{ + StartTimestamp: dp.StartTimestamp(), + Timestamp: dp.Timestamp(), + QuantileValues: []testValueAtQuantile{}, + Attributes: dp.Attributes().AsRaw(), + } + + for s := 0; s < dp.QuantileValues().Len(); s++ { + qv := dp.QuantileValues().At(s) + + tqv := testValueAtQuantile{ + Value: qv.Value(), + Quantile: qv.Quantile(), + } + + tdp.QuantileValues = append(tdp.QuantileValues, tqv) + } + + tm.DataPoints = append(tm.DataPoints, tdp) + } + + tsm.Metrics = append(tsm.Metrics, tm) + case pmetric.MetricTypeHistogram: + histogram := m.Histogram() + + tm := testMetric{ + Name: m.Name(), + Type: pmetric.MetricTypeHistogram, + Temporality: histogram.AggregationTemporality(), + DataPoints: []any{}, + } + + for r := 0; r < histogram.DataPoints().Len(); r++ { + dp := histogram.DataPoints().At(r) + + tdp := testHistogramDataPoint{ + StartTimestamp: dp.StartTimestamp(), + Timestamp: dp.Timestamp(), + ExplicitBounds: dp.ExplicitBounds().AsRaw(), + BucketCounts: dp.BucketCounts().AsRaw(), + Attributes: dp.Attributes().AsRaw(), + } + + tm.DataPoints = append(tm.DataPoints, tdp) + } + + tsm.Metrics = append(tsm.Metrics, tm) + default: + t.Fatalf("Invalid metric type %v", m.Type()) + } + } + + trm.ScopeMetrics = append(trm.ScopeMetrics, tsm) + } + + tmd = append(tmd, trm) + } + + return tmd +} From 303390ea7207d69f8857d0b7373cb350473bb90c Mon Sep 17 00:00:00 2001 From: Adrian Astley Date: Wed, 21 Feb 2024 12:30:51 -0500 Subject: [PATCH 11/11] Utilize the new Staleness map --- processor/intervalprocessor/README.md | 2 +- processor/intervalprocessor/processor.go | 111 +++++++++++++---------- 2 files changed, 62 insertions(+), 51 deletions(-) diff --git a/processor/intervalprocessor/README.md b/processor/intervalprocessor/README.md index c8d5473fa307..ddfc84e4983e 100644 --- a/processor/intervalprocessor/README.md +++ b/processor/intervalprocessor/README.md @@ -32,4 +32,4 @@ The following metric types will *not* be aggregated, and will instead be passed, The following settings can be optionally configured: * `interval`: The interval in which the processor should re-export the aggregated metrics. Default: 15s -* `max_staleness`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0 +* `max_staleness`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. NOTE: Staleness is only evaluated every 30 minutes. So `max_staleness` values less than 30 minutes will not be effective. Default: 0 diff --git a/processor/intervalprocessor/processor.go b/processor/intervalprocessor/processor.go index bb7c9d05b959..98565200afef 100644 --- a/processor/intervalprocessor/processor.go +++ b/processor/intervalprocessor/processor.go @@ -18,6 +18,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor/internal/metrics" ) @@ -28,14 +29,17 @@ type Processor struct { cancel context.CancelFunc logger *zap.Logger - stateLock sync.Mutex - numbers map[identity.Stream]metrics.StreamDataPoint[pmetric.NumberDataPoint] - histograms map[identity.Stream]metrics.StreamDataPoint[pmetric.HistogramDataPoint] - expHistograms map[identity.Stream]metrics.StreamDataPoint[pmetric.ExponentialHistogramDataPoint] + stateLock sync.Mutex - interval time.Duration - maxStaleness time.Duration - intervalTicker *time.Ticker + numbers *staleness.Staleness[metrics.StreamDataPoint[pmetric.NumberDataPoint]] + histograms *staleness.Staleness[metrics.StreamDataPoint[pmetric.HistogramDataPoint]] + expHistograms *staleness.Staleness[metrics.StreamDataPoint[pmetric.ExponentialHistogramDataPoint]] + + exportInterval time.Duration + exportTicker *time.Ticker + + expiryEnabled bool + expiryTicker *time.Ticker nextConsumer consumer.Metrics } @@ -49,26 +53,33 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics logger: log, stateLock: sync.Mutex{}, - numbers: map[identity.Stream]metrics.StreamDataPoint[pmetric.NumberDataPoint]{}, - histograms: map[identity.Stream]metrics.StreamDataPoint[pmetric.HistogramDataPoint]{}, - expHistograms: map[identity.Stream]metrics.StreamDataPoint[pmetric.ExponentialHistogramDataPoint]{}, + numbers: staleness.NewStaleness(config.MaxStaleness, &staleness.RawMap[identity.Stream, metrics.StreamDataPoint[pmetric.NumberDataPoint]]{}), + histograms: staleness.NewStaleness(config.MaxStaleness, &staleness.RawMap[identity.Stream, metrics.StreamDataPoint[pmetric.HistogramDataPoint]]{}), + expHistograms: staleness.NewStaleness(config.MaxStaleness, &staleness.RawMap[identity.Stream, metrics.StreamDataPoint[pmetric.ExponentialHistogramDataPoint]]{}), + + exportInterval: config.Interval, - interval: config.Interval, - maxStaleness: config.MaxStaleness, + expiryEnabled: config.MaxStaleness > 0, nextConsumer: nextConsumer, } } func (p *Processor) Start(_ context.Context, _ component.Host) error { - p.intervalTicker = time.NewTicker(p.interval) + p.exportTicker = time.NewTicker(p.exportInterval) go p.exportMetricsLoop() + + if p.expiryEnabled { + p.expiryTicker = time.NewTicker(30 * time.Minute) + go p.expireMetricsLoop() + } + return nil } func (p *Processor) Shutdown(_ context.Context) error { - if p.intervalTicker != nil { - p.intervalTicker.Stop() + if p.exportTicker != nil { + p.exportTicker.Stop() } p.cancel() return nil @@ -138,7 +149,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro return errs } -func aggregateDataPoints[DPS metrics.DataPointSlice[DP], DP metrics.DataPoint[DP]](dataPoints DPS, state map[identity.Stream]metrics.StreamDataPoint[DP], res pcommon.Resource, resSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, m pmetric.Metric) { +func aggregateDataPoints[DPS metrics.DataPointSlice[DP], DP metrics.DataPoint[DP]](dataPoints DPS, state *staleness.Staleness[metrics.StreamDataPoint[DP]], res pcommon.Resource, resSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, m pmetric.Metric) { metric := metrics.From(res, resSchemaURL, scope, scopeSchemaURL, m) metricID := metric.Identity() @@ -149,23 +160,23 @@ func aggregateDataPoints[DPS metrics.DataPointSlice[DP], DP metrics.DataPoint[DP streamDataPointID := metrics.StreamDataPointIdentity(metricID, dp) - existing, ok := state[streamDataPointID] + existing, ok := state.Load(streamDataPointID) if !ok { - state[streamDataPointID] = metrics.StreamDataPoint[DP]{ + state.Store(streamDataPointID, metrics.StreamDataPoint[DP]{ Metric: metric, DataPoint: dp, LastUpdated: now, - } + }) continue } // Check if the datapoint is newer if dp.Timestamp().AsTime().After(existing.DataPoint.Timestamp().AsTime()) { - state[streamDataPointID] = metrics.StreamDataPoint[DP]{ + state.Store(streamDataPointID, metrics.StreamDataPoint[DP]{ Metric: metric, DataPoint: dp, LastUpdated: now, - } + }) continue } @@ -178,8 +189,7 @@ func (p *Processor) exportMetricsLoop() { select { case <-p.ctx.Done(): return - case <-p.intervalTicker.C: - p.expireOldMetrics() + case <-p.exportTicker.C: p.exportMetrics() } } @@ -198,29 +208,38 @@ func (p *Processor) exportMetrics() { smLookup := map[identity.Scope]pmetric.ScopeMetrics{} mLookup := map[identity.Metric]pmetric.Metric{} - for dataID, dp := range p.numbers { + // TODO: Once the upcoming RangeFunc Experiment is fully available, we can switch these to + // use `range` syntax. See: ttps://go.dev/wiki/RangefuncExperiment + + p.numbers.Items()(func(dataID identity.Stream, dp metrics.StreamDataPoint[pmetric.NumberDataPoint]) bool { m := getOrCreateMetric(dataID, dp.Metric, md, rmLookup, smLookup, mLookup) sum := m.Sum() numDP := sum.DataPoints().AppendEmpty() dp.DataPoint.CopyTo(numDP) - } - for dataID, dp := range p.histograms { + return true + }) + + p.histograms.Items()(func(dataID identity.Stream, dp metrics.StreamDataPoint[pmetric.HistogramDataPoint]) bool { m := getOrCreateMetric(dataID, dp.Metric, md, rmLookup, smLookup, mLookup) histogram := m.Histogram() histogramDP := histogram.DataPoints().AppendEmpty() dp.DataPoint.CopyTo(histogramDP) - } - for dataID, dp := range p.expHistograms { + return true + }) + + p.expHistograms.Items()(func(dataID identity.Stream, dp metrics.StreamDataPoint[pmetric.ExponentialHistogramDataPoint]) bool { m := getOrCreateMetric(dataID, dp.Metric, md, rmLookup, smLookup, mLookup) expHistogram := m.ExponentialHistogram() expHistogramDP := expHistogram.DataPoints().AppendEmpty() dp.DataPoint.CopyTo(expHistogramDP) - } + + return true + }) if err := p.nextConsumer.ConsumeMetrics(p.ctx, md); err != nil { p.logger.Error("Metrics export failed", zap.Error(err)) @@ -258,30 +277,22 @@ func getOrCreateMetric( return m } -func (p *Processor) expireOldMetrics() { - // Fast-out - if p.maxStaleness == 0 { - return +func (p *Processor) expireMetricsLoop() { + for { + select { + case <-p.ctx.Done(): + return + case <-p.expiryTicker.C: + p.expireOldMetrics() + } } +} +func (p *Processor) expireOldMetrics() { p.stateLock.Lock() defer p.stateLock.Unlock() - now := time.Now() - - for key, dp := range p.numbers { - if dp.LastUpdated.Add(p.maxStaleness).Before(now) { - delete(p.numbers, key) - } - } - for key, dp := range p.histograms { - if dp.LastUpdated.Add(p.maxStaleness).Before(now) { - delete(p.numbers, key) - } - } - for key, dp := range p.expHistograms { - if dp.LastUpdated.Add(p.maxStaleness).Before(now) { - delete(p.numbers, key) - } - } + p.numbers.ExpireOldEntries() + p.histograms.ExpireOldEntries() + p.expHistograms.ExpireOldEntries() }