diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index a25a76e4bd4d..6e21b0712eff 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -160,7 +160,7 @@ processor/deltatorateprocessor/ @open-telemetry/collect processor/filterprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @boostchicken processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo processor/groupbytraceprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling -processor/intervalprocessor/ @open-telemetry/collector-contrib-approvers @RichieSams +processor/intervalprocessor/ @open-telemetry/collector-contrib-approvers @RichieSams @sh0rez processor/k8sattributesprocessor/ @open-telemetry/collector-contrib-approvers @dmitryax @rmfitzpatrick @fatsheep9146 @TylerHelmuth processor/logstransformprocessor/ @open-telemetry/collector-contrib-approvers @djaglowski @dehaansa processor/metricsgenerationprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9 diff --git a/internal/exp/metrics/go.mod b/internal/exp/metrics/go.mod index f61b7e1e87af..6e54eb33f229 100644 --- a/internal/exp/metrics/go.mod +++ b/internal/exp/metrics/go.mod @@ -1,19 +1,22 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics -replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil +go 1.21 require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.95.0 + github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/pdata v1.2.0 ) require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.18.0 // indirect golang.org/x/sys v0.14.0 // indirect @@ -21,8 +24,7 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/grpc v1.61.0 // indirect google.golang.org/protobuf v1.32.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) -go 1.21 - -toolchain go1.21.1 +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil diff --git a/internal/exp/metrics/go.sum b/internal/exp/metrics/go.sum index 6f3b578693b2..9a5008b4d916 100644 --- a/internal/exp/metrics/go.sum +++ b/internal/exp/metrics/go.sum @@ -16,6 +16,10 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -23,6 +27,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= @@ -74,5 +80,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/exp/metrics/staleness/map.go b/internal/exp/metrics/staleness/map.go new file mode 100644 index 000000000000..77b29f5febd2 --- /dev/null +++ b/internal/exp/metrics/staleness/map.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" + +import ( + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +// Map is an abstraction over a map +type Map[T any] interface { + // Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value + Load(key identity.Stream) (T, bool) + // Store the given key value pair in the map + Store(key identity.Stream, value T) + // Remove the value at key from the map + Delete(key identity.Stream) + // Items returns an iterator function that in future go version can be used with range + // See: https://go.dev/wiki/RangefuncExperiment + Items() func(yield func(identity.Stream, T) bool) bool +} + +// RawMap implementation + +var _ Map[time.Time] = (*RawMap[identity.Stream, time.Time])(nil) + +// RawMap is an implementation of the Map interface using a standard golang map +type RawMap[K comparable, V any] map[K]V + +func (rm *RawMap[K, V]) Load(key K) (V, bool) { + value, ok := (*rm)[key] + return value, ok +} + +func (rm *RawMap[K, V]) Store(key K, value V) { + (*rm)[key] = value +} + +func (rm *RawMap[K, V]) Delete(key K) { + delete(*rm, key) +} + +func (rm *RawMap[K, V]) Items() func(yield func(K, V) bool) bool { + return func(yield func(K, V) bool) bool { + for k, v := range *rm { + if !yield(k, v) { + break + } + } + return false + } +} diff --git a/internal/exp/metrics/staleness/priority_queue.go b/internal/exp/metrics/staleness/priority_queue.go new file mode 100644 index 000000000000..f1b01743f95f --- /dev/null +++ b/internal/exp/metrics/staleness/priority_queue.go @@ -0,0 +1,111 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" + +import ( + "container/heap" + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +// PriorityQueue represents a way to store entries sorted by their priority. +// Pop() will return the oldest entry of the set. +type PriorityQueue interface { + // Update will add or update an entry, and reshuffle the queue internally as needed to keep it sorted + Update(id identity.Stream, newPrio time.Time) + // Peek will return the entry at the HEAD of the queue *without* removing it from the queue + Peek() (identity.Stream, time.Time) + // Pop will remove the entry at the HEAD of the queue and return it + Pop() (identity.Stream, time.Time) + // Len will return the number of entries in the queue + Len() int +} + +// heapQueue implements heap.Interface. +// We use it as the inner implementation of a heap-based sorted queue +type heapQueue []*queueItem + +type queueItem struct { + key identity.Stream + prio time.Time + index int +} + +func (pq heapQueue) Len() int { return len(pq) } + +func (pq heapQueue) Less(i, j int) bool { + // We want Pop to give us the lowest priority + return pq[i].prio.Before(pq[j].prio) +} + +func (pq heapQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *heapQueue) Push(x any) { + n := len(*pq) + item := x.(*queueItem) + item.index = n + *pq = append(*pq, item) +} + +func (pq *heapQueue) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +type heapPriorityQueue struct { + inner heapQueue + itemLookup map[identity.Stream]*queueItem +} + +func NewPriorityQueue() PriorityQueue { + pq := &heapPriorityQueue{ + inner: heapQueue{}, + itemLookup: map[identity.Stream]*queueItem{}, + } + heap.Init(&pq.inner) + + return pq +} + +func (pq *heapPriorityQueue) Update(id identity.Stream, newPrio time.Time) { + // Check if the entry already exists in the queue + item, ok := pq.itemLookup[id] + if ok { + // If so, we can update it in place + item.prio = newPrio + heap.Fix(&pq.inner, item.index) + } else { + item = &queueItem{ + key: id, + prio: newPrio, + } + heap.Push(&pq.inner, item) + pq.itemLookup[id] = item + } +} + +func (pq *heapPriorityQueue) Peek() (identity.Stream, time.Time) { + val := pq.inner[0] + return val.key, val.prio +} + +func (pq *heapPriorityQueue) Pop() (identity.Stream, time.Time) { + val := heap.Pop(&pq.inner).(*queueItem) + delete(pq.itemLookup, val.key) + return val.key, val.prio +} + +func (pq *heapPriorityQueue) Len() int { + return pq.inner.Len() +} diff --git a/internal/exp/metrics/staleness/priority_queue_test.go b/internal/exp/metrics/staleness/priority_queue_test.go new file mode 100644 index 000000000000..b58478e7c1db --- /dev/null +++ b/internal/exp/metrics/staleness/priority_queue_test.go @@ -0,0 +1,116 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +func TestPriorityQueueImpl(t *testing.T) { + t.Parallel() + + pq := NewPriorityQueue() + + idA := generateStreamID(t, map[string]any{ + "aaa": "123", + }) + idB := generateStreamID(t, map[string]any{ + "bbb": "456", + }) + idC := generateStreamID(t, map[string]any{ + "ccc": "789", + }) + + initialTime := time.Time{} + prioA := initialTime.Add(2 * time.Second) + prioB := initialTime.Add(1 * time.Second) + prioC := initialTime.Add(3 * time.Second) + + pq.Update(idA, prioA) + pq.Update(idB, prioB) + pq.Update(idC, prioC) + + // The first item should be B + id, prio := pq.Peek() + require.Equal(t, idB, id) + require.Equal(t, prioB, prio) + + // If we peek again, nothing should change + id, prio = pq.Peek() + require.Equal(t, idB, id) + require.Equal(t, prioB, prio) + + // Pop should return the same thing + id, prio = pq.Pop() + require.Equal(t, idB, id) + require.Equal(t, prioB, prio) + + // Now if we peek again, it should be the next item + id, prio = pq.Peek() + require.Equal(t, idA, id) + require.Equal(t, prioA, prio) + + // Pop should return the same thing + id, prio = pq.Pop() + require.Equal(t, idA, id) + require.Equal(t, prioA, prio) + + // One last time + id, prio = pq.Peek() + require.Equal(t, idC, id) + require.Equal(t, prioC, prio) + + // Pop should return the same thing + id, prio = pq.Pop() + require.Equal(t, idC, id) + require.Equal(t, prioC, prio) + + // The queue should now be empty + require.Equal(t, 0, pq.Len()) + + // And the inner lookup map should also be empty + require.IsType(t, &heapPriorityQueue{}, pq) + heapQueue := pq.(*heapPriorityQueue) + require.Len(t, heapQueue.itemLookup, 0) +} + +func generateStreamID(t *testing.T, attributes map[string]any) identity.Stream { + res := pcommon.NewResource() + err := res.Attributes().FromRaw(map[string]any{ + "foo": "bar", + "asdf": "qwer", + }) + require.NoError(t, err) + + scope := pcommon.NewInstrumentationScope() + scope.SetName("TestScope") + scope.SetVersion("v1.2.3") + err = scope.Attributes().FromRaw(map[string]any{ + "aaa": "bbb", + "ccc": "ddd", + }) + require.NoError(t, err) + + metric := pmetric.NewMetric() + + sum := metric.SetEmptySum() + sum.SetIsMonotonic(true) + sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + + dp := sum.DataPoints().AppendEmpty() + dp.SetStartTimestamp(678) + dp.SetTimestamp(789) + dp.SetDoubleValue(123.456) + err = dp.Attributes().FromRaw(attributes) + require.NoError(t, err) + + return identity.OfStream(identity.OfResourceMetric(res, scope, metric), dp) +} diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go new file mode 100644 index 000000000000..f5803ccdeb55 --- /dev/null +++ b/internal/exp/metrics/staleness/staleness.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" + +import ( + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +// We override how Now() is returned, so we can have deterministic tests +var NowFunc = time.Now + +// Staleness a a wrapper over a map that adds an additional "staleness" value to each entry. Users can +// call ExpireOldEntries() to automatically remove all entries from the map whole staleness value is +// older than the `max` +// +// NOTE: Staleness methods are *not* thread-safe. If the user needs to use Staleness in a multi-threaded +// environment, then it is the user's responsibility to properly serialize calls to Staleness methods +type Staleness[T any] struct { + max time.Duration + + items Map[T] + pq PriorityQueue +} + +func NewStaleness[T any](max time.Duration, newMap Map[T]) *Staleness[T] { + return &Staleness[T]{ + max: max, + items: newMap, + pq: NewPriorityQueue(), + } +} + +// Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value +func (s *Staleness[T]) Load(key identity.Stream) (T, bool) { + return s.items.Load(key) +} + +// Store the given key value pair in the map, and update the pair's staleness value to "now" +func (s *Staleness[T]) Store(id identity.Stream, value T) { + s.pq.Update(id, NowFunc()) + s.items.Store(id, value) +} + +// Items returns an iterator function that in future go version can be used with range +// See: https://go.dev/wiki/RangefuncExperiment +func (s *Staleness[T]) Items() func(yield func(identity.Stream, T) bool) bool { + return s.items.Items() +} + +// ExpireOldEntries will remove all entries whose staleness value is older than `now() - max` +// For example, if an entry has a staleness value of two hours ago, and max == 1 hour, then the entry would +// be removed. But if an entry had a stalness value of 30 minutes, then it *wouldn't* be removed. +func (s *Staleness[T]) ExpireOldEntries() { + now := NowFunc() + + for { + _, ts := s.pq.Peek() + if now.Sub(ts) < s.max { + break + } + id, _ := s.pq.Pop() + s.items.Delete(id) + } +} diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go new file mode 100644 index 000000000000..4d96b41061e6 --- /dev/null +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -0,0 +1,93 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package staleness + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +func TestStaleness(t *testing.T) { + max := 1 * time.Second + stalenessMap := NewStaleness[int]( + max, + &RawMap[identity.Stream, int]{}, + ) + + idA := generateStreamID(t, map[string]any{ + "aaa": "123", + }) + idB := generateStreamID(t, map[string]any{ + "bbb": "456", + }) + idC := generateStreamID(t, map[string]any{ + "ccc": "789", + }) + idD := generateStreamID(t, map[string]any{ + "ddd": "024", + }) + + initialTime := time.Time{} + timeA := initialTime.Add(2 * time.Second) + timeB := initialTime.Add(1 * time.Second) + timeC := initialTime.Add(3 * time.Second) + timeD := initialTime.Add(4 * time.Second) + + valueA := 1 + valueB := 4 + valueC := 7 + valueD := 0 + + // Add the values to the map + NowFunc = func() time.Time { return timeA } + stalenessMap.Store(idA, valueA) + NowFunc = func() time.Time { return timeB } + stalenessMap.Store(idB, valueB) + NowFunc = func() time.Time { return timeC } + stalenessMap.Store(idC, valueC) + NowFunc = func() time.Time { return timeD } + stalenessMap.Store(idD, valueD) + + // Set the time to 2.5s and run expire + // This should remove B, but the others should remain + // (now == 2.5s, B == 1s, max == 1s) + // now > B + max + NowFunc = func() time.Time { return initialTime.Add(2500 * time.Millisecond) } + stalenessMap.ExpireOldEntries() + validateStalenessMapEntries(t, + map[identity.Stream]int{ + idA: valueA, + idC: valueC, + idD: valueD, + }, + stalenessMap, + ) + + // Set the time to 4.5s and run expire + // This should remove A and C, but D should remain + // (now == 2.5s, A == 2s, C == 3s, max == 1s) + // now > A + max AND now > C + max + NowFunc = func() time.Time { return initialTime.Add(4500 * time.Millisecond) } + stalenessMap.ExpireOldEntries() + validateStalenessMapEntries(t, + map[identity.Stream]int{ + idD: valueD, + }, + stalenessMap, + ) +} + +func validateStalenessMapEntries(t *testing.T, expected map[identity.Stream]int, sm *Staleness[int]) { + actual := map[identity.Stream]int{} + + sm.Items()(func(key identity.Stream, value int) bool { + actual[key] = value + return true + }) + require.Equal(t, expected, actual) +} diff --git a/processor/intervalprocessor/README.md b/processor/intervalprocessor/README.md index 90cf86d05e3f..ddfc84e4983e 100644 --- a/processor/intervalprocessor/README.md +++ b/processor/intervalprocessor/README.md @@ -7,7 +7,7 @@ | Distributions | [] | | Warnings | [Statefulness](#warnings) | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Finterval%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Finterval) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Finterval%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Finterval) | -| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@RichieSams](https://www.github.com/RichieSams) | +| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@RichieSams](https://www.github.com/RichieSams), [@sh0rez](https://www.github.com/sh0rez) | [development]: https://github.com/open-telemetry/opentelemetry-collector#development @@ -31,4 +31,5 @@ The following metric types will *not* be aggregated, and will instead be passed, The following settings can be optionally configured: -- `max_staleness`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0 +* `interval`: The interval in which the processor should re-export the aggregated metrics. Default: 15s +* `max_staleness`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. NOTE: Staleness is only evaluated every 30 minutes. So `max_staleness` values less than 30 minutes will not be effective. Default: 0 diff --git a/processor/intervalprocessor/config.go b/processor/intervalprocessor/config.go index 8a43eca75c00..1393ba9c001a 100644 --- a/processor/intervalprocessor/config.go +++ b/processor/intervalprocessor/config.go @@ -4,15 +4,24 @@ package intervalprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor" import ( + "errors" "time" "go.opentelemetry.io/collector/component" ) +var ( + ErrInvalidIntervalValue = errors.New("invalid interval value") + ErrInvalidMaxStatenessValue = errors.New("invalid max_stateless value") +) + var _ component.Config = (*Config)(nil) // Config defines the configuration for the processor. type Config struct { + // Interval is the time + Interval time.Duration `mapstructure:"interval"` + // MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. MaxStaleness time.Duration `mapstructure:"max_staleness"` } @@ -20,5 +29,13 @@ type Config struct { // Validate checks whether the input configuration has all of the required fields for the processor. // An error is returned if there are any invalid inputs. func (config *Config) Validate() error { + if config.Interval <= 0 { + return ErrInvalidIntervalValue + } + + if config.MaxStaleness < 0 { + return ErrInvalidMaxStatenessValue + } + return nil } diff --git a/processor/intervalprocessor/factory.go b/processor/intervalprocessor/factory.go index 5bef7881dfdf..00ddb2dca253 100644 --- a/processor/intervalprocessor/factory.go +++ b/processor/intervalprocessor/factory.go @@ -6,6 +6,7 @@ package intervalprocessor // import "github.com/open-telemetry/opentelemetry-col import ( "context" "fmt" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -23,7 +24,10 @@ func NewFactory() processor.Factory { } func createDefaultConfig() component.Config { - return &Config{} + return &Config{ + Interval: 15 * time.Second, + MaxStaleness: 0, + } } func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (processor.Metrics, error) { diff --git a/processor/intervalprocessor/generated_component_test.go b/processor/intervalprocessor/generated_component_test.go index 55e89c63bcaa..dee3e958229a 100644 --- a/processor/intervalprocessor/generated_component_test.go +++ b/processor/intervalprocessor/generated_component_test.go @@ -9,13 +9,11 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - + "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processortest" - "go.opentelemetry.io/collector/confmap/confmaptest" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" ) diff --git a/processor/intervalprocessor/go.mod b/processor/intervalprocessor/go.mod index 533717deb457..7e425817527e 100644 --- a/processor/intervalprocessor/go.mod +++ b/processor/intervalprocessor/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.95.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.95.0 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/component v0.95.0 go.opentelemetry.io/collector/confmap v0.95.0 @@ -33,6 +34,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.95.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.18.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect @@ -56,6 +58,8 @@ require ( replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics + replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest diff --git a/processor/intervalprocessor/internal/metrics/metrics.go b/processor/intervalprocessor/internal/metrics/metrics.go new file mode 100644 index 000000000000..1a1624af7536 --- /dev/null +++ b/processor/intervalprocessor/internal/metrics/metrics.go @@ -0,0 +1,101 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" +) + +type Metric struct { + res pcommon.Resource + resSchemaURL string + scope pcommon.InstrumentationScope + scopeSchemaURL string + innerMetric pmetric.Metric +} + +func From(res pcommon.Resource, resSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, metric pmetric.Metric) Metric { + newMetric := Metric{ + res: res, + resSchemaURL: resSchemaURL, + scope: scope, + scopeSchemaURL: scopeSchemaURL, + innerMetric: pmetric.NewMetric(), + } + + // Clone the metric metadata without the datapoints + newMetric.innerMetric.SetName(metric.Name()) + newMetric.innerMetric.SetDescription(metric.Description()) + newMetric.innerMetric.SetUnit(metric.Unit()) + switch metric.Type() { + case pmetric.MetricTypeGauge: + newMetric.innerMetric.SetEmptyGauge() + case pmetric.MetricTypeSum: + sum := metric.Sum() + + newSum := newMetric.innerMetric.SetEmptySum() + newSum.SetAggregationTemporality(sum.AggregationTemporality()) + newSum.SetIsMonotonic(sum.IsMonotonic()) + case pmetric.MetricTypeHistogram: + histogram := metric.Histogram() + + newHistogram := newMetric.innerMetric.SetEmptyHistogram() + newHistogram.SetAggregationTemporality(histogram.AggregationTemporality()) + case pmetric.MetricTypeExponentialHistogram: + expHistogram := metric.ExponentialHistogram() + + newExpHistogram := newMetric.innerMetric.SetEmptyExponentialHistogram() + newExpHistogram.SetAggregationTemporality(expHistogram.AggregationTemporality()) + case pmetric.MetricTypeSummary: + newMetric.innerMetric.SetEmptySummary() + } + + return newMetric +} + +func (m *Metric) Identity() identity.Metric { + return identity.OfResourceMetric(m.res, m.scope, m.innerMetric) +} + +func (m *Metric) CopyToResourceMetric(dest pmetric.ResourceMetrics) { + m.res.CopyTo(dest.Resource()) + dest.SetSchemaUrl(m.resSchemaURL) +} + +func (m *Metric) CopyToScopeMetric(dest pmetric.ScopeMetrics) { + m.scope.CopyTo(dest.Scope()) + dest.SetSchemaUrl(m.scopeSchemaURL) +} + +func (m *Metric) CopyToPMetric(dest pmetric.Metric) { + m.innerMetric.CopyTo(dest) +} + +type DataPointSlice[DP DataPoint[DP]] interface { + Len() int + At(i int) DP +} + +type DataPoint[Self any] interface { + pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint + + Timestamp() pcommon.Timestamp + Attributes() pcommon.Map + CopyTo(dest Self) +} + +type StreamDataPoint[DP DataPoint[DP]] struct { + Metric Metric + DataPoint DP + LastUpdated time.Time +} + +func StreamDataPointIdentity[DP DataPoint[DP]](metricID identity.Metric, dataPoint DP) identity.Stream { + return identity.OfStream[DP](metricID, dataPoint) +} diff --git a/processor/intervalprocessor/metadata.yaml b/processor/intervalprocessor/metadata.yaml index d1051807c70e..2a4700c3e069 100644 --- a/processor/intervalprocessor/metadata.yaml +++ b/processor/intervalprocessor/metadata.yaml @@ -7,6 +7,6 @@ status: distributions: [] warnings: [Statefulness] codeowners: - active: [RichieSams] + active: [RichieSams, sh0rez] tests: config: diff --git a/processor/intervalprocessor/processor.go b/processor/intervalprocessor/processor.go index 1ba888a1f7b4..98565200afef 100644 --- a/processor/intervalprocessor/processor.go +++ b/processor/intervalprocessor/processor.go @@ -5,13 +5,21 @@ package intervalprocessor // import "github.com/open-telemetry/opentelemetry-col import ( "context" + "errors" + "fmt" + "sync" "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/processor" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor/internal/metrics" ) var _ processor.Metrics = (*Processor)(nil) @@ -19,9 +27,19 @@ var _ processor.Metrics = (*Processor)(nil) type Processor struct { ctx context.Context cancel context.CancelFunc - log *zap.Logger + logger *zap.Logger + + stateLock sync.Mutex + + numbers *staleness.Staleness[metrics.StreamDataPoint[pmetric.NumberDataPoint]] + histograms *staleness.Staleness[metrics.StreamDataPoint[pmetric.HistogramDataPoint]] + expHistograms *staleness.Staleness[metrics.StreamDataPoint[pmetric.ExponentialHistogramDataPoint]] - maxStaleness time.Duration + exportInterval time.Duration + exportTicker *time.Ticker + + expiryEnabled bool + expiryTicker *time.Ticker nextConsumer consumer.Metrics } @@ -30,19 +48,39 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics ctx, cancel := context.WithCancel(context.Background()) return &Processor{ - ctx: ctx, - cancel: cancel, - log: log, - maxStaleness: config.MaxStaleness, + ctx: ctx, + cancel: cancel, + logger: log, + + stateLock: sync.Mutex{}, + numbers: staleness.NewStaleness(config.MaxStaleness, &staleness.RawMap[identity.Stream, metrics.StreamDataPoint[pmetric.NumberDataPoint]]{}), + histograms: staleness.NewStaleness(config.MaxStaleness, &staleness.RawMap[identity.Stream, metrics.StreamDataPoint[pmetric.HistogramDataPoint]]{}), + expHistograms: staleness.NewStaleness(config.MaxStaleness, &staleness.RawMap[identity.Stream, metrics.StreamDataPoint[pmetric.ExponentialHistogramDataPoint]]{}), + + exportInterval: config.Interval, + + expiryEnabled: config.MaxStaleness > 0, + nextConsumer: nextConsumer, } } func (p *Processor) Start(_ context.Context, _ component.Host) error { + p.exportTicker = time.NewTicker(p.exportInterval) + go p.exportMetricsLoop() + + if p.expiryEnabled { + p.expiryTicker = time.NewTicker(30 * time.Minute) + go p.expireMetricsLoop() + } + return nil } func (p *Processor) Shutdown(_ context.Context) error { + if p.exportTicker != nil { + p.exportTicker.Stop() + } p.cancel() return nil } @@ -52,5 +90,209 @@ func (p *Processor) Capabilities() consumer.Capabilities { } func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - return p.nextConsumer.ConsumeMetrics(ctx, md) + var errs error + + p.stateLock.Lock() + defer p.stateLock.Unlock() + + md.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { + rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool { + sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { + switch m.Type() { + case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary: + return false + case pmetric.MetricTypeSum: + sum := m.Sum() + + if !sum.IsMonotonic() { + return false + } + + if sum.AggregationTemporality() != pmetric.AggregationTemporalityCumulative { + return false + } + + aggregateDataPoints(sum.DataPoints(), p.numbers, rm.Resource(), rm.SchemaUrl(), sm.Scope(), sm.SchemaUrl(), m) + return true + case pmetric.MetricTypeHistogram: + histogram := m.Histogram() + + if histogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative { + return false + } + + aggregateDataPoints(histogram.DataPoints(), p.histograms, rm.Resource(), rm.SchemaUrl(), sm.Scope(), sm.SchemaUrl(), m) + return true + case pmetric.MetricTypeExponentialHistogram: + expHistogram := m.ExponentialHistogram() + + if expHistogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative { + return false + } + + aggregateDataPoints(expHistogram.DataPoints(), p.expHistograms, rm.Resource(), rm.SchemaUrl(), sm.Scope(), sm.SchemaUrl(), m) + return true + default: + errs = errors.Join(fmt.Errorf("invalid MetricType %d", m.Type())) + return false + } + }) + return sm.Metrics().Len() == 0 + }) + return rm.ScopeMetrics().Len() == 0 + }) + + if err := p.nextConsumer.ConsumeMetrics(ctx, md); err != nil { + errs = errors.Join(errs, err) + } + + return errs +} + +func aggregateDataPoints[DPS metrics.DataPointSlice[DP], DP metrics.DataPoint[DP]](dataPoints DPS, state *staleness.Staleness[metrics.StreamDataPoint[DP]], res pcommon.Resource, resSchemaURL string, scope pcommon.InstrumentationScope, scopeSchemaURL string, m pmetric.Metric) { + metric := metrics.From(res, resSchemaURL, scope, scopeSchemaURL, m) + metricID := metric.Identity() + + now := time.Now() + + for i := 0; i < dataPoints.Len(); i++ { + dp := dataPoints.At(i) + + streamDataPointID := metrics.StreamDataPointIdentity(metricID, dp) + + existing, ok := state.Load(streamDataPointID) + if !ok { + state.Store(streamDataPointID, metrics.StreamDataPoint[DP]{ + Metric: metric, + DataPoint: dp, + LastUpdated: now, + }) + continue + } + + // Check if the datapoint is newer + if dp.Timestamp().AsTime().After(existing.DataPoint.Timestamp().AsTime()) { + state.Store(streamDataPointID, metrics.StreamDataPoint[DP]{ + Metric: metric, + DataPoint: dp, + LastUpdated: now, + }) + continue + } + + // Otherwise, we leave existing as-is + } +} + +func (p *Processor) exportMetricsLoop() { + for { + select { + case <-p.ctx.Done(): + return + case <-p.exportTicker.C: + p.exportMetrics() + } + } +} + +func (p *Processor) exportMetrics() { + p.stateLock.Lock() + defer p.stateLock.Unlock() + + // We have to generate our own metrics slice to send to the nextConsumer + md := pmetric.NewMetrics() + + // We want to avoid generating duplicate ResourceMetrics, ScopeMetrics, and Metrics + // So we use lookups to only generate what we need + rmLookup := map[identity.Resource]pmetric.ResourceMetrics{} + smLookup := map[identity.Scope]pmetric.ScopeMetrics{} + mLookup := map[identity.Metric]pmetric.Metric{} + + // TODO: Once the upcoming RangeFunc Experiment is fully available, we can switch these to + // use `range` syntax. See: ttps://go.dev/wiki/RangefuncExperiment + + p.numbers.Items()(func(dataID identity.Stream, dp metrics.StreamDataPoint[pmetric.NumberDataPoint]) bool { + m := getOrCreateMetric(dataID, dp.Metric, md, rmLookup, smLookup, mLookup) + + sum := m.Sum() + numDP := sum.DataPoints().AppendEmpty() + dp.DataPoint.CopyTo(numDP) + + return true + }) + + p.histograms.Items()(func(dataID identity.Stream, dp metrics.StreamDataPoint[pmetric.HistogramDataPoint]) bool { + m := getOrCreateMetric(dataID, dp.Metric, md, rmLookup, smLookup, mLookup) + + histogram := m.Histogram() + histogramDP := histogram.DataPoints().AppendEmpty() + dp.DataPoint.CopyTo(histogramDP) + + return true + }) + + p.expHistograms.Items()(func(dataID identity.Stream, dp metrics.StreamDataPoint[pmetric.ExponentialHistogramDataPoint]) bool { + m := getOrCreateMetric(dataID, dp.Metric, md, rmLookup, smLookup, mLookup) + + expHistogram := m.ExponentialHistogram() + expHistogramDP := expHistogram.DataPoints().AppendEmpty() + dp.DataPoint.CopyTo(expHistogramDP) + + return true + }) + + if err := p.nextConsumer.ConsumeMetrics(p.ctx, md); err != nil { + p.logger.Error("Metrics export failed", zap.Error(err)) + } +} + +func getOrCreateMetric( + streamID identity.Stream, metricRef metrics.Metric, + md pmetric.Metrics, + rmLookup map[identity.Resource]pmetric.ResourceMetrics, + smLookup map[identity.Scope]pmetric.ScopeMetrics, + mLookup map[identity.Metric]pmetric.Metric, +) pmetric.Metric { + // Find the ResourceMetrics + rm, ok := rmLookup[streamID.Metric().Scope().Resource()] + if !ok { + // We need to create it + rm = md.ResourceMetrics().AppendEmpty() + metricRef.CopyToResourceMetric(rm) + } + + // Find the ScopeMetrics + sm, ok := smLookup[streamID.Metric().Scope()] + if !ok { + sm = rm.ScopeMetrics().AppendEmpty() + metricRef.CopyToScopeMetric(sm) + } + + m, ok := mLookup[streamID.Metric()] + if !ok { + m = sm.Metrics().AppendEmpty() + metricRef.CopyToPMetric(m) + } + + return m +} + +func (p *Processor) expireMetricsLoop() { + for { + select { + case <-p.ctx.Done(): + return + case <-p.expiryTicker.C: + p.expireOldMetrics() + } + } +} + +func (p *Processor) expireOldMetrics() { + p.stateLock.Lock() + defer p.stateLock.Unlock() + + p.numbers.ExpireOldEntries() + p.histograms.ExpireOldEntries() + p.expHistograms.ExpireOldEntries() } diff --git a/processor/intervalprocessor/processor_test.go b/processor/intervalprocessor/processor_test.go new file mode 100644 index 000000000000..5dc7d3cd4cc8 --- /dev/null +++ b/processor/intervalprocessor/processor_test.go @@ -0,0 +1,640 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package intervalprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor" + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/processor/processortest" +) + +// TODO: Add tests for the other data types. Ensuring things like: gauges are passed through unchanged, etc. + +// TODO: Add tests for data expiration + +func TestAggregation(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + inputs []testMetric + next []testMetric + outputs []testMetric + }{ + // TODO: Add many more test cases for all the edge cases + { + name: "BasicAggregation", + inputs: []testMetric{ + { + Name: "test_metric_total", + Type: pmetric.MetricTypeSum, + IsMonotonic: true, + Temporality: pmetric.AggregationTemporalityCumulative, + DataPoints: []any{ + testNumberDataPoint{ + Timestamp: 50, + Value: 333, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 20, + Value: 222, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 80, + Value: 444, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + }, + }, + }, + next: []testMetric{}, + outputs: []testMetric{ + { + Name: "test_metric_total", + Type: pmetric.MetricTypeSum, + IsMonotonic: true, + Temporality: pmetric.AggregationTemporalityCumulative, + DataPoints: []any{ + testNumberDataPoint{ + Timestamp: 80, + Value: 444, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + }, + }, + }, + }, + { + name: "NonMonotonicSumsArePassedThrough", + inputs: []testMetric{ + { + Name: "test_metric_total", + Type: pmetric.MetricTypeSum, + IsMonotonic: false, + Temporality: pmetric.AggregationTemporalityCumulative, + DataPoints: []any{ + testNumberDataPoint{ + Timestamp: 50, + Value: 333, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 20, + Value: 222, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 80, + Value: 111, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + }, + }, + }, + next: []testMetric{ + { + Name: "test_metric_total", + Type: pmetric.MetricTypeSum, + IsMonotonic: false, + Temporality: pmetric.AggregationTemporalityCumulative, + DataPoints: []any{ + testNumberDataPoint{ + Timestamp: 50, + Value: 333, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 20, + Value: 222, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 80, + Value: 111, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + }, + }, + }, + outputs: []testMetric{}, + }, + { + name: "GaugesArePassedThrough", + inputs: []testMetric{ + { + Name: "test_metric_value", + Type: pmetric.MetricTypeGauge, + DataPoints: []any{ + testNumberDataPoint{ + Timestamp: 50, + Value: 345, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 20, + Value: 258, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 80, + Value: 178, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + }, + }, + }, + next: []testMetric{ + { + Name: "test_metric_value", + Type: pmetric.MetricTypeGauge, + DataPoints: []any{ + testNumberDataPoint{ + Timestamp: 50, + Value: 345, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 20, + Value: 258, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + testNumberDataPoint{ + Timestamp: 80, + Value: 178, + Attributes: map[string]any{ + "aaa": "bbb", + }, + }, + }, + }, + }, + outputs: []testMetric{}, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + config := &Config{Interval: time.Second, MaxStaleness: time.Second} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // next stores the results of the filter metric processor + next := &consumertest.MetricsSink{} + + factory := NewFactory() + mgp, err := factory.CreateMetricsProcessor( + context.Background(), + processortest.NewNopCreateSettings(), + config, + next, + ) + require.NoError(t, err) + + md := generateTestSumMetrics(t, testMetrics{ + { + SchemaURL: "https://test-res-schema.com/schema", + Attributes: map[string]any{ + "asdf": "foo", + }, + ScopeMetrics: []testScopeMetrics{ + { + Name: "MyTestInstrument", + Version: "1.2.3", + Attributes: map[string]any{ + "foo": "bar", + }, + Metrics: tc.inputs, + }, + }, + }, + }) + + // Test that ConsumeMetrics works + err = mgp.ConsumeMetrics(ctx, md) + require.NoError(t, err) + + require.IsType(t, &Processor{}, mgp) + processor := mgp.(*Processor) + + // Pretend we hit the interval timer and call export + processor.exportMetrics() + + // Next should have gotten two data sets: + // 1. Anything left over from ConsumeMetrics() + // 2. Anything exported from exportMetrics() + allMetrics := next.AllMetrics() + require.Len(t, allMetrics, 2) + + nextData := convertMetricsToTestData(t, allMetrics[0]) + exportData := convertMetricsToTestData(t, allMetrics[1]) + + expectedNextData := testMetrics{ + { + SchemaURL: "https://test-res-schema.com/schema", + Attributes: map[string]any{ + "asdf": "foo", + }, + ScopeMetrics: []testScopeMetrics{ + { + Name: "MyTestInstrument", + Version: "1.2.3", + Attributes: map[string]any{ + "foo": "bar", + }, + Metrics: tc.next, + }, + }, + }, + } + if len(tc.next) == 0 { + expectedNextData = testMetrics{} + } + + expectedExportData := testMetrics{ + { + SchemaURL: "https://test-res-schema.com/schema", + Attributes: map[string]any{ + "asdf": "foo", + }, + ScopeMetrics: []testScopeMetrics{ + { + Name: "MyTestInstrument", + Version: "1.2.3", + Attributes: map[string]any{ + "foo": "bar", + }, + Metrics: tc.outputs, + }, + }, + }, + } + if len(tc.outputs) == 0 { + expectedExportData = testMetrics{} + } + + require.Equal(t, expectedNextData, nextData) + require.Equal(t, expectedExportData, exportData) + }) + } +} + +type testMetrics []testResourceMetrics + +type testResourceMetrics struct { + SchemaURL string + Attributes map[string]any + + ScopeMetrics []testScopeMetrics +} + +type testScopeMetrics struct { + Name string + Version string + SchemaURL string + Attributes map[string]any + + Metrics []testMetric +} + +type testMetric struct { + Name string + Type pmetric.MetricType + IsMonotonic bool + Temporality pmetric.AggregationTemporality + + DataPoints []any +} + +type testNumberDataPoint struct { + StartTimestamp pcommon.Timestamp + Timestamp pcommon.Timestamp + + Value float64 + Attributes map[string]any +} + +type testSummaryDataPoint struct { + StartTimestamp pcommon.Timestamp + Timestamp pcommon.Timestamp + + QuantileValues []testValueAtQuantile + Attributes map[string]any +} + +type testValueAtQuantile struct { + Value float64 + Quantile float64 +} + +type testHistogramDataPoint struct { + StartTimestamp pcommon.Timestamp + Timestamp pcommon.Timestamp + + ExplicitBounds []float64 + BucketCounts []uint64 + Attributes map[string]any +} + +func generateTestSumMetrics(t *testing.T, tmd testMetrics) pmetric.Metrics { + md := pmetric.NewMetrics() + + for _, trm := range tmd { + rm := md.ResourceMetrics().AppendEmpty() + err := rm.Resource().Attributes().FromRaw(trm.Attributes) + require.NoError(t, err) + rm.SetSchemaUrl(trm.SchemaURL) + + for _, tsm := range trm.ScopeMetrics { + sm := rm.ScopeMetrics().AppendEmpty() + scope := pcommon.NewInstrumentationScope() + scope.SetName(tsm.Name) + scope.SetVersion(tsm.Version) + err = scope.Attributes().FromRaw(tsm.Attributes) + require.NoError(t, err) + scope.MoveTo(sm.Scope()) + sm.SetSchemaUrl(tsm.SchemaURL) + + for _, tm := range tsm.Metrics { + m := sm.Metrics().AppendEmpty() + m.SetName(tm.Name) + + switch tm.Type { + case pmetric.MetricTypeSum: + sum := m.SetEmptySum() + sum.SetIsMonotonic(tm.IsMonotonic) + sum.SetAggregationTemporality(tm.Temporality) + + for _, tdp := range tm.DataPoints { + require.IsType(t, testNumberDataPoint{}, tdp) + tdpp := tdp.(testNumberDataPoint) + + dp := sum.DataPoints().AppendEmpty() + + dp.SetStartTimestamp(tdpp.StartTimestamp) + dp.SetTimestamp(tdpp.Timestamp) + + dp.SetDoubleValue(tdpp.Value) + + err = dp.Attributes().FromRaw(tdpp.Attributes) + require.NoError(t, err) + } + case pmetric.MetricTypeGauge: + gauge := m.SetEmptyGauge() + + for _, tdp := range tm.DataPoints { + require.IsType(t, testNumberDataPoint{}, tdp) + tdpp := tdp.(testNumberDataPoint) + + dp := gauge.DataPoints().AppendEmpty() + + dp.SetStartTimestamp(tdpp.StartTimestamp) + dp.SetTimestamp(tdpp.Timestamp) + + dp.SetDoubleValue(tdpp.Value) + + err = dp.Attributes().FromRaw(tdpp.Attributes) + require.NoError(t, err) + } + case pmetric.MetricTypeSummary: + summary := m.SetEmptySummary() + + for _, tdp := range tm.DataPoints { + require.IsType(t, testSummaryDataPoint{}, tdp) + tdpp := tdp.(testSummaryDataPoint) + + dp := summary.DataPoints().AppendEmpty() + + dp.SetStartTimestamp(tdpp.StartTimestamp) + dp.SetTimestamp(tdpp.Timestamp) + + for _, valueAtQuantile := range tdpp.QuantileValues { + qv := dp.QuantileValues().AppendEmpty() + + qv.SetQuantile(valueAtQuantile.Quantile) + qv.SetValue(valueAtQuantile.Value) + } + + err = dp.Attributes().FromRaw(tdpp.Attributes) + require.NoError(t, err) + } + case pmetric.MetricTypeHistogram: + histogram := m.SetEmptyHistogram() + histogram.SetAggregationTemporality(tm.Temporality) + + for _, tdp := range tm.DataPoints { + require.IsType(t, testHistogramDataPoint{}, tdp) + tdpp := tdp.(testHistogramDataPoint) + + dp := histogram.DataPoints().AppendEmpty() + + dp.SetStartTimestamp(tdpp.StartTimestamp) + dp.SetTimestamp(tdpp.Timestamp) + + bucketSum := uint64(0) + for _, bucketCount := range tdpp.BucketCounts { + bucketSum += bucketCount + } + dp.SetCount(bucketSum) + dp.BucketCounts().FromRaw(tdpp.BucketCounts) + dp.ExplicitBounds().FromRaw(tdpp.ExplicitBounds) + + err = dp.Attributes().FromRaw(tdpp.Attributes) + require.NoError(t, err) + } + default: + t.Fatalf("Unsupported metric type %v", m.Type()) + } + } + } + } + + return md +} + +func convertMetricsToTestData(t *testing.T, md pmetric.Metrics) testMetrics { + tmd := testMetrics{} + + for i := 0; i < md.ResourceMetrics().Len(); i++ { + rm := md.ResourceMetrics().At(i) + + trm := testResourceMetrics{ + SchemaURL: rm.SchemaUrl(), + Attributes: rm.Resource().Attributes().AsRaw(), + ScopeMetrics: []testScopeMetrics{}, + } + + for j := 0; j < rm.ScopeMetrics().Len(); j++ { + sm := rm.ScopeMetrics().At(j) + + tsm := testScopeMetrics{ + Name: sm.Scope().Name(), + Version: sm.Scope().Version(), + SchemaURL: sm.SchemaUrl(), + Attributes: sm.Scope().Attributes().AsRaw(), + Metrics: []testMetric{}, + } + + for k := 0; k < sm.Metrics().Len(); k++ { + m := sm.Metrics().At(k) + + switch m.Type() { + case pmetric.MetricTypeSum: + sum := m.Sum() + + tm := testMetric{ + Name: m.Name(), + Type: pmetric.MetricTypeSum, + IsMonotonic: sum.IsMonotonic(), + Temporality: sum.AggregationTemporality(), + DataPoints: []any{}, + } + + for r := 0; r < sum.DataPoints().Len(); r++ { + dp := sum.DataPoints().At(r) + + tdp := testNumberDataPoint{ + StartTimestamp: dp.StartTimestamp(), + Timestamp: dp.Timestamp(), + Value: dp.DoubleValue(), + Attributes: dp.Attributes().AsRaw(), + } + + tm.DataPoints = append(tm.DataPoints, tdp) + } + + tsm.Metrics = append(tsm.Metrics, tm) + case pmetric.MetricTypeGauge: + gauge := m.Gauge() + + tm := testMetric{ + Name: m.Name(), + Type: pmetric.MetricTypeGauge, + DataPoints: []any{}, + } + + for r := 0; r < gauge.DataPoints().Len(); r++ { + dp := gauge.DataPoints().At(r) + + tdp := testNumberDataPoint{ + StartTimestamp: dp.StartTimestamp(), + Timestamp: dp.Timestamp(), + Value: dp.DoubleValue(), + Attributes: dp.Attributes().AsRaw(), + } + + tm.DataPoints = append(tm.DataPoints, tdp) + } + + tsm.Metrics = append(tsm.Metrics, tm) + case pmetric.MetricTypeSummary: + summary := m.Summary() + + tm := testMetric{ + Name: m.Name(), + Type: pmetric.MetricTypeSummary, + DataPoints: []any{}, + } + + for r := 0; r < summary.DataPoints().Len(); r++ { + dp := summary.DataPoints().At(r) + + tdp := testSummaryDataPoint{ + StartTimestamp: dp.StartTimestamp(), + Timestamp: dp.Timestamp(), + QuantileValues: []testValueAtQuantile{}, + Attributes: dp.Attributes().AsRaw(), + } + + for s := 0; s < dp.QuantileValues().Len(); s++ { + qv := dp.QuantileValues().At(s) + + tqv := testValueAtQuantile{ + Value: qv.Value(), + Quantile: qv.Quantile(), + } + + tdp.QuantileValues = append(tdp.QuantileValues, tqv) + } + + tm.DataPoints = append(tm.DataPoints, tdp) + } + + tsm.Metrics = append(tsm.Metrics, tm) + case pmetric.MetricTypeHistogram: + histogram := m.Histogram() + + tm := testMetric{ + Name: m.Name(), + Type: pmetric.MetricTypeHistogram, + Temporality: histogram.AggregationTemporality(), + DataPoints: []any{}, + } + + for r := 0; r < histogram.DataPoints().Len(); r++ { + dp := histogram.DataPoints().At(r) + + tdp := testHistogramDataPoint{ + StartTimestamp: dp.StartTimestamp(), + Timestamp: dp.Timestamp(), + ExplicitBounds: dp.ExplicitBounds().AsRaw(), + BucketCounts: dp.BucketCounts().AsRaw(), + Attributes: dp.Attributes().AsRaw(), + } + + tm.DataPoints = append(tm.DataPoints, tdp) + } + + tsm.Metrics = append(tsm.Metrics, tm) + default: + t.Fatalf("Invalid metric type %v", m.Type()) + } + } + + trm.ScopeMetrics = append(trm.ScopeMetrics, tsm) + } + + tmd = append(tmd, trm) + } + + return tmd +}