diff --git a/.chloggen/deltatocumulative-stale.yaml b/.chloggen/deltatocumulative-stale.yaml new file mode 100644 index 000000000000..5285f8173b1c --- /dev/null +++ b/.chloggen/deltatocumulative-stale.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: "deltatocumulativeprocessor" + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: expire stale series + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30705, 31016] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Adds `max_stale` option that allows to set an interval (default = `5min`) + after which a series that no longer receives new samples is removed from + tracking. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/exp/metrics/identity/stream.go b/internal/exp/metrics/identity/stream.go index 8deb50f9b317..19988f7730dc 100644 --- a/internal/exp/metrics/identity/stream.go +++ b/internal/exp/metrics/identity/stream.go @@ -7,7 +7,6 @@ import ( "hash" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" ) @@ -32,6 +31,5 @@ func OfStream[DataPoint attrPoint](m Metric, dp DataPoint) Stream { } type attrPoint interface { - pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint | pmetric.SummaryDataPoint Attributes() pcommon.Map } diff --git a/internal/exp/metrics/staleness/map.go b/internal/exp/metrics/staleness/map.go deleted file mode 100644 index 77b29f5febd2..000000000000 --- a/internal/exp/metrics/staleness/map.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" - -import ( - "time" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" -) - -// Map is an abstraction over a map -type Map[T any] interface { - // Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value - Load(key identity.Stream) (T, bool) - // Store the given key value pair in the map - Store(key identity.Stream, value T) - // Remove the value at key from the map - Delete(key identity.Stream) - // Items returns an iterator function that in future go version can be used with range - // See: https://go.dev/wiki/RangefuncExperiment - Items() func(yield func(identity.Stream, T) bool) bool -} - -// RawMap implementation - -var _ Map[time.Time] = (*RawMap[identity.Stream, time.Time])(nil) - -// RawMap is an implementation of the Map interface using a standard golang map -type RawMap[K comparable, V any] map[K]V - -func (rm *RawMap[K, V]) Load(key K) (V, bool) { - value, ok := (*rm)[key] - return value, ok -} - -func (rm *RawMap[K, V]) Store(key K, value V) { - (*rm)[key] = value -} - -func (rm *RawMap[K, V]) Delete(key K) { - delete(*rm, key) -} - -func (rm *RawMap[K, V]) Items() func(yield func(K, V) bool) bool { - return func(yield func(K, V) bool) bool { - for k, v := range *rm { - if !yield(k, v) { - break - } - } - return false - } -} diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index f5803ccdeb55..9e5f989495f8 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -7,6 +7,7 @@ import ( "time" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" ) // We override how Now() is returned, so we can have deterministic tests @@ -19,29 +20,34 @@ var NowFunc = time.Now // NOTE: Staleness methods are *not* thread-safe. If the user needs to use Staleness in a multi-threaded // environment, then it is the user's responsibility to properly serialize calls to Staleness methods type Staleness[T any] struct { - max time.Duration + Max time.Duration - items Map[T] + items streams.Map[T] pq PriorityQueue } -func NewStaleness[T any](max time.Duration, newMap Map[T]) *Staleness[T] { +func NewStaleness[T any](max time.Duration, items streams.Map[T]) *Staleness[T] { return &Staleness[T]{ - max: max, - items: newMap, + Max: max, + + items: items, pq: NewPriorityQueue(), } } // Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value -func (s *Staleness[T]) Load(key identity.Stream) (T, bool) { - return s.items.Load(key) +func (s *Staleness[T]) Load(id identity.Stream) (T, bool) { + return s.items.Load(id) } // Store the given key value pair in the map, and update the pair's staleness value to "now" -func (s *Staleness[T]) Store(id identity.Stream, value T) { +func (s *Staleness[T]) Store(id identity.Stream, v T) error { s.pq.Update(id, NowFunc()) - s.items.Store(id, value) + return s.items.Store(id, v) +} + +func (s *Staleness[T]) Delete(id identity.Stream) { + s.items.Delete(id) } // Items returns an iterator function that in future go version can be used with range @@ -55,13 +61,24 @@ func (s *Staleness[T]) Items() func(yield func(identity.Stream, T) bool) bool { // be removed. But if an entry had a stalness value of 30 minutes, then it *wouldn't* be removed. func (s *Staleness[T]) ExpireOldEntries() { now := NowFunc() - for { + if s.Len() == 0 { + return + } _, ts := s.pq.Peek() - if now.Sub(ts) < s.max { + if now.Sub(ts) < s.Max { break } id, _ := s.pq.Pop() s.items.Delete(id) } } + +func (s *Staleness[T]) Len() int { + return s.items.Len() +} + +func (s *Staleness[T]) Next() time.Time { + _, ts := s.pq.Peek() + return ts +} diff --git a/internal/exp/metrics/staleness/staleness_test.go b/internal/exp/metrics/staleness/staleness_test.go index 4d96b41061e6..98fec276238e 100644 --- a/internal/exp/metrics/staleness/staleness_test.go +++ b/internal/exp/metrics/staleness/staleness_test.go @@ -10,13 +10,14 @@ import ( "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" ) func TestStaleness(t *testing.T) { max := 1 * time.Second stalenessMap := NewStaleness[int]( max, - &RawMap[identity.Stream, int]{}, + make(streams.HashMap[int]), ) idA := generateStreamID(t, map[string]any{ @@ -45,13 +46,13 @@ func TestStaleness(t *testing.T) { // Add the values to the map NowFunc = func() time.Time { return timeA } - stalenessMap.Store(idA, valueA) + _ = stalenessMap.Store(idA, valueA) NowFunc = func() time.Time { return timeB } - stalenessMap.Store(idB, valueB) + _ = stalenessMap.Store(idB, valueB) NowFunc = func() time.Time { return timeC } - stalenessMap.Store(idC, valueC) + _ = stalenessMap.Store(idC, valueC) NowFunc = func() time.Time { return timeD } - stalenessMap.Store(idD, valueD) + _ = stalenessMap.Store(idD, valueD) // Set the time to 2.5s and run expire // This should remove B, but the others should remain diff --git a/internal/exp/metrics/streams/streams.go b/internal/exp/metrics/streams/streams.go new file mode 100644 index 000000000000..1e25102b3b47 --- /dev/null +++ b/internal/exp/metrics/streams/streams.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" + +import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + +// Sequence of streams that can be iterated upon +type Seq[T any] func(yield func(identity.Stream, T) bool) bool + +// Map defines a collection of items tracked by a stream-id and the operations +// on it +type Map[T any] interface { + Load(identity.Stream) (T, bool) + Store(identity.Stream, T) error + Delete(identity.Stream) + Items() func(yield func(identity.Stream, T) bool) bool + Len() int +} + +var _ Map[any] = HashMap[any](nil) + +type HashMap[T any] map[identity.Stream]T + +func (m HashMap[T]) Load(id identity.Stream) (T, bool) { + v, ok := (map[identity.Stream]T)(m)[id] + return v, ok +} + +func (m HashMap[T]) Store(id identity.Stream, v T) error { + (map[identity.Stream]T)(m)[id] = v + return nil +} + +func (m HashMap[T]) Delete(id identity.Stream) { + delete((map[identity.Stream]T)(m), id) +} + +func (m HashMap[T]) Items() func(yield func(identity.Stream, T) bool) bool { + return func(yield func(identity.Stream, T) bool) bool { + for id, v := range (map[identity.Stream]T)(m) { + if !yield(id, v) { + break + } + } + return false + } +} + +func (m HashMap[T]) Len() int { + return len((map[identity.Stream]T)(m)) +} diff --git a/processor/deltatocumulativeprocessor/README.md b/processor/deltatocumulativeprocessor/README.md index 60432e3b593b..1a639128fca8 100644 --- a/processor/deltatocumulativeprocessor/README.md +++ b/processor/deltatocumulativeprocessor/README.md @@ -23,6 +23,8 @@ metrics from delta temporality to cumulative, by accumulating samples in memory. ``` yaml processors: deltatocumulative: + # how long until a series not receiving new samples is removed + [ max_stale: | default = 5m ] ``` There is no further configuration required. All delta samples are converted to cumulative. diff --git a/processor/deltatocumulativeprocessor/config.go b/processor/deltatocumulativeprocessor/config.go index 8e46c69bc9b8..b5744a9779b7 100644 --- a/processor/deltatocumulativeprocessor/config.go +++ b/processor/deltatocumulativeprocessor/config.go @@ -4,13 +4,21 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" import ( + "fmt" + "time" + "go.opentelemetry.io/collector/component" ) var _ component.ConfigValidator = (*Config)(nil) -type Config struct{} +type Config struct { + MaxStale time.Duration `json:"max_stale"` +} func (c *Config) Validate() error { + if c.MaxStale <= 0 { + return fmt.Errorf("max_stale must be a positive duration (got %s)", c.MaxStale) + } return nil } diff --git a/processor/deltatocumulativeprocessor/factory.go b/processor/deltatocumulativeprocessor/factory.go index f28165ac66cd..b2fba4e00fc2 100644 --- a/processor/deltatocumulativeprocessor/factory.go +++ b/processor/deltatocumulativeprocessor/factory.go @@ -6,6 +6,7 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentele import ( "context" "fmt" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -23,7 +24,7 @@ func NewFactory() processor.Factory { } func createDefaultConfig() component.Config { - return &Config{} + return &Config{MaxStale: 5 * time.Minute} } func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg component.Config, next consumer.Metrics) (processor.Metrics, error) { diff --git a/processor/deltatocumulativeprocessor/go.mod b/processor/deltatocumulativeprocessor/go.mod index 703f7816a0bf..c7fc08ab881a 100644 --- a/processor/deltatocumulativeprocessor/go.mod +++ b/processor/deltatocumulativeprocessor/go.mod @@ -3,7 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/delta go 1.21 require ( - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.96.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.0.0-00010101000000-000000000000 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/component v0.96.0 go.opentelemetry.io/collector/confmap v0.96.0 @@ -32,6 +32,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.96.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.19.0 // indirect github.com/prometheus/client_model v0.6.0 // indirect @@ -54,3 +55,5 @@ require ( ) replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics diff --git a/processor/deltatocumulativeprocessor/internal/clock/clock.go b/processor/deltatocumulativeprocessor/internal/clock/clock.go new file mode 100644 index 000000000000..3c010136e97f --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/clock/clock.go @@ -0,0 +1,94 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package clock // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/clock" + +import ( + "sync" + "time" +) + +var clock Clock = Real{} + +func Change(c Clock) { + clock = c +} + +type Clock interface { + Now() time.Time + After(d time.Duration) <-chan time.Time +} + +func Now() time.Time { + return clock.Now() +} + +func After(d time.Duration) <-chan time.Time { + return clock.After(d) +} + +func Until(t time.Time) time.Duration { + return t.Sub(Now()) +} + +func Sleep(d time.Duration) { + <-After(d) +} + +type Real struct{} + +func (r Real) Now() time.Time { + return time.Now() +} + +func (r Real) After(d time.Duration) <-chan time.Time { + return time.After(d) +} + +type Settable interface { + Clock + Set(now time.Time) +} + +func Fake() Settable { + clock := &fake{} + clock.Set(time.Time{}) + return clock +} + +type fake struct { + mtx sync.RWMutex + ts time.Time +} + +func (f *fake) Set(now time.Time) { + f.mtx.Lock() + f.ts = now + f.mtx.Unlock() +} + +func (f *fake) Now() time.Time { + f.mtx.RLock() + defer f.mtx.RUnlock() + return f.ts +} + +func (f *fake) After(d time.Duration) <-chan time.Time { + var ( + end = f.Now().Add(d) + done = make(chan time.Time) + wait = make(chan struct{}) + ) + + go func() { + close(wait) + for f.Now().Before(end) { + time.Sleep(time.Millisecond / 10) + } + done <- f.Now() + close(done) + }() + <-wait + + return done +} diff --git a/processor/deltatocumulativeprocessor/internal/clock/clock_test.go b/processor/deltatocumulativeprocessor/internal/clock/clock_test.go new file mode 100644 index 000000000000..d1671a683ff3 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/clock/clock_test.go @@ -0,0 +1,32 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package clock + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestFakeAfter(t *testing.T) { + fake := Fake() + + ch := fake.After(10 * time.Second) + now := fake.Now() + + fake.Set(now.Add(10 * time.Second)) + done := <-ch + + require.Equal(t, 10*time.Second, done.Sub(now)) +} + +func TestFakeSet(t *testing.T) { + fake := Fake() + require.Equal(t, time.Time{}, fake.Now()) + + ts := time.Time{}.Add(10 * time.Minute) + fake.Set(ts) + require.Equal(t, ts, fake.Now()) +} diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta.go b/processor/deltatocumulativeprocessor/internal/delta/delta.go index fda1e8149a84..fac787e9ece4 100644 --- a/processor/deltatocumulativeprocessor/internal/delta/delta.go +++ b/processor/deltatocumulativeprocessor/internal/delta/delta.go @@ -8,52 +8,44 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" + exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" ) -func construct[D data.Point[D]]() streams.Aggregator[D] { - acc := &Accumulator[D]{dps: make(map[streams.Ident]D)} - return &Lock[D]{next: acc} -} - -func Numbers() streams.Aggregator[data.Number] { - return construct[data.Number]() -} - -func Histograms() streams.Aggregator[data.Histogram] { - return construct[data.Histogram]() +func New[D data.Point[D]]() Accumulator[D] { + return Accumulator[D]{ + Map: make(exp.HashMap[D]), + } } -var _ streams.Aggregator[data.Number] = (*Accumulator[data.Number])(nil) +var _ streams.Map[data.Number] = (*Accumulator[data.Number])(nil) type Accumulator[D data.Point[D]] struct { - dps map[streams.Ident]D + streams.Map[D] } -// Aggregate implements delta-to-cumulative aggregation as per spec: -// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#sums-delta-to-cumulative -func (a *Accumulator[D]) Aggregate(id streams.Ident, dp D) (D, error) { - aggr, ok := a.dps[id] +func (a Accumulator[D]) Store(id streams.Ident, dp D) error { + aggr, ok := a.Map.Load(id) // new series: initialize with current sample if !ok { - a.dps[id] = dp.Clone() - return a.dps[id], nil + clone := dp.Clone() + return a.Map.Store(id, clone) } // drop bad samples switch { case dp.StartTimestamp() < aggr.StartTimestamp(): // belongs to older series - return aggr, ErrOlderStart{Start: aggr.StartTimestamp(), Sample: dp.StartTimestamp()} + return ErrOlderStart{Start: aggr.StartTimestamp(), Sample: dp.StartTimestamp()} case dp.Timestamp() <= aggr.Timestamp(): // out of order - return aggr, ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()} + return ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()} } - a.dps[id] = aggr.Add(dp) - return a.dps[id], nil + res := aggr.Add(dp) + return a.Map.Store(id, res) } type ErrOlderStart struct { diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta_test.go b/processor/deltatocumulativeprocessor/internal/delta/delta_test.go index f298d174977c..919d4b852251 100644 --- a/processor/deltatocumulativeprocessor/internal/delta/delta_test.go +++ b/processor/deltatocumulativeprocessor/internal/delta/delta_test.go @@ -21,8 +21,12 @@ import ( var result any +func aggr() streams.Aggregator[data.Number] { + return streams.IntoAggregator(delta.New[data.Number]()) +} + func BenchmarkAccumulator(b *testing.B) { - acc := delta.Numbers() + acc := aggr() sum := random.Sum() bench := func(b *testing.B, nstreams int) { @@ -65,7 +69,7 @@ func BenchmarkAccumulator(b *testing.B) { // verify the distinction between streams and the accumulated value func TestAddition(t *testing.T) { - acc := delta.Numbers() + acc := aggr() sum := random.Sum() type Idx int @@ -104,7 +108,7 @@ func TestAddition(t *testing.T) { // verify that start + last times are updated func TestTimes(t *testing.T) { - acc := delta.Numbers() + acc := aggr() id, base := random.Sum().Stream() point := func(start, last pcommon.Timestamp) data.Number { dp := base.Clone() @@ -174,7 +178,7 @@ func TestErrs(t *testing.T) { for _, c := range cases { c := c t.Run(fmt.Sprintf("%T", c.Err), func(t *testing.T) { - acc := delta.Numbers() + acc := aggr() id, data := random.Sum().Stream() good := data.Clone() diff --git a/processor/deltatocumulativeprocessor/internal/delta/lock.go b/processor/deltatocumulativeprocessor/internal/delta/lock.go deleted file mode 100644 index aeb8eb00d30f..000000000000 --- a/processor/deltatocumulativeprocessor/internal/delta/lock.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package delta // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" - -import ( - "sync" - - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" -) - -var _ streams.Aggregator[data.Number] = (*Lock[data.Number])(nil) - -type Lock[D data.Point[D]] struct { - sync.Mutex - next streams.Aggregator[D] -} - -func (l *Lock[D]) Aggregate(id streams.Ident, dp D) (D, error) { - l.Lock() - dp, err := l.next.Aggregate(id, dp) - l.Unlock() - return dp, err -} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/ident.go b/processor/deltatocumulativeprocessor/internal/metrics/ident.go deleted file mode 100644 index 9076dc4918fa..000000000000 --- a/processor/deltatocumulativeprocessor/internal/metrics/ident.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" - -import ( - "hash" - "hash/fnv" - - "go.opentelemetry.io/collector/pdata/pmetric" -) - -type Ident struct { - ScopeIdent - - name string - unit string - ty string - - monotonic bool - temporality pmetric.AggregationTemporality -} - -func (i Ident) Hash() hash.Hash64 { - sum := i.ScopeIdent.Hash() - sum.Write([]byte(i.name)) - sum.Write([]byte(i.unit)) - sum.Write([]byte(i.ty)) - - var mono byte - if i.monotonic { - mono = 1 - } - sum.Write([]byte{mono, byte(i.temporality)}) - return sum -} - -type ScopeIdent struct { - ResourceIdent - - name string - version string - attrs [16]byte -} - -func (s ScopeIdent) Hash() hash.Hash64 { - sum := s.ResourceIdent.Hash() - sum.Write([]byte(s.name)) - sum.Write([]byte(s.version)) - sum.Write(s.attrs[:]) - return sum -} - -type ResourceIdent struct { - attrs [16]byte -} - -func (r ResourceIdent) Hash() hash.Hash64 { - sum := fnv.New64a() - sum.Write(r.attrs[:]) - return sum -} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go index 53b1f42b4997..6b705f5a7d24 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go @@ -7,9 +7,11 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" ) +type Ident = identity.Metric + type Metric struct { res pcommon.Resource scope pcommon.InstrumentationScope @@ -17,36 +19,7 @@ type Metric struct { } func (m *Metric) Ident() Ident { - id := Ident{ - ScopeIdent: ScopeIdent{ - ResourceIdent: ResourceIdent{ - attrs: pdatautil.MapHash(m.res.Attributes()), - }, - name: m.scope.Name(), - version: m.scope.Version(), - attrs: pdatautil.MapHash(m.scope.Attributes()), - }, - name: m.Metric.Name(), - unit: m.Metric.Unit(), - ty: m.Metric.Type().String(), - } - - switch m.Type() { - case pmetric.MetricTypeSum: - sum := m.Sum() - id.monotonic = sum.IsMonotonic() - id.temporality = sum.AggregationTemporality() - case pmetric.MetricTypeExponentialHistogram: - exp := m.ExponentialHistogram() - id.monotonic = true - id.temporality = exp.AggregationTemporality() - case pmetric.MetricTypeHistogram: - hist := m.Histogram() - id.monotonic = true - id.temporality = hist.AggregationTemporality() - } - - return id + return identity.OfResourceMetric(m.res, m.scope, m.Metric) } func From(res pcommon.Resource, scope pcommon.InstrumentationScope, metric pmetric.Metric) Metric { diff --git a/processor/deltatocumulativeprocessor/internal/streams/data.go b/processor/deltatocumulativeprocessor/internal/streams/data.go index 89073123eba3..435202cffabe 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/data.go +++ b/processor/deltatocumulativeprocessor/internal/streams/data.go @@ -6,28 +6,24 @@ package streams // import "github.com/open-telemetry/opentelemetry-collector-con import ( "errors" - "go.opentelemetry.io/collector/pdata/pcommon" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" ) -// Iterator as per https://go.dev/wiki/RangefuncExperiment -type Iter[V any] func(yield func(Ident, V) bool) - // Samples returns an Iterator over each sample of all streams in the metric -func Samples[D data.Point[D]](m metrics.Data[D]) Iter[D] { +func Samples[D data.Point[D]](m metrics.Data[D]) Seq[D] { mid := m.Ident() - return func(yield func(Ident, D) bool) { + return func(yield func(Ident, D) bool) bool { for i := 0; i < m.Len(); i++ { dp := m.At(i) - id := Identify(mid, dp.Attributes()) + id := identity.OfStream(mid, dp) if !yield(id, dp) { break } } + return false } } @@ -48,7 +44,3 @@ func Aggregate[D data.Point[D]](m metrics.Data[D], aggr Aggregator[D]) error { return errs } - -func Identify(metric metrics.Ident, attrs pcommon.Map) Ident { - return Ident{metric: metric, attrs: pdatautil.MapHash(attrs)} -} diff --git a/processor/deltatocumulativeprocessor/internal/streams/data_test.go b/processor/deltatocumulativeprocessor/internal/streams/data_test.go index f7c4dc077781..d0a042b5b60a 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/data_test.go +++ b/processor/deltatocumulativeprocessor/internal/streams/data_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" @@ -36,7 +37,7 @@ func BenchmarkSamples(b *testing.B) { for i := 0; i < dps.Len(); i++ { dp := dps.At(i) - rid = streams.Identify(mid, dp.Attributes()) + rid = identity.OfStream(mid, dp) rdp = dp } }) @@ -48,7 +49,7 @@ func BenchmarkSamples(b *testing.B) { for i := range dps.dps { dp := dps.dps[i] - rid = streams.Identify(mid, dp.Attributes()) + rid = identity.OfStream(mid, dp) rdp = dp } }) diff --git a/processor/deltatocumulativeprocessor/internal/streams/expiry.go b/processor/deltatocumulativeprocessor/internal/streams/expiry.go new file mode 100644 index 000000000000..37d2c97ede28 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/streams/expiry.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + +import ( + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/clock" +) + +func ExpireAfter[T any](items streams.Map[T], ttl time.Duration) Expiry[T] { + exp := Expiry[T]{ + Staleness: *staleness.NewStaleness[T](ttl, items), + sig: make(chan struct{}), + } + return exp +} + +var _ streams.Map[any] = (*Expiry[any])(nil) + +type Expiry[T any] struct { + staleness.Staleness[T] + sig chan struct{} +} + +func (e Expiry[T]) ExpireOldEntries() <-chan struct{} { + e.Staleness.ExpireOldEntries() + + n := e.Staleness.Len() + sig := make(chan struct{}) + + go func() { + switch { + case n == 0: + <-e.sig + case n > 0: + expires := e.Staleness.Next().Add(e.Max) + until := clock.Until(expires) + clock.Sleep(until) + } + close(sig) + }() + return sig +} + +func (e Expiry[T]) Store(id identity.Stream, v T) error { + err := e.Staleness.Store(id, v) + + // "try-send" to notify possibly sleeping expiry routine + select { + case e.sig <- struct{}{}: + default: + } + + return err +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/expiry_test.go b/processor/deltatocumulativeprocessor/internal/streams/expiry_test.go new file mode 100644 index 000000000000..c415f6baf2df --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/streams/expiry_test.go @@ -0,0 +1,100 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams_test + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" + exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/clock" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" +) + +func TestExpiry(t *testing.T) { + fake := clock.Fake() + clock.Change(fake) + staleness.NowFunc = func() time.Time { return fake.Now() } + + tm := TimeMap[data.Number]{ + Map: make(exp.HashMap[data.Number]), + add: make(map[identity.Stream]time.Time), + del: make(map[identity.Stream]time.Time), + } + const maxStale = time.Minute + exp := streams.ExpireAfter(tm, maxStale) + + var mtx sync.Mutex + go func() { + for { + mtx.Lock() + next := exp.ExpireOldEntries() + mtx.Unlock() + <-next + } + }() + + sum := random.Sum() + mtx.Lock() + now := fake.Now() + for i := 0; i < 10; i++ { + r := rand.Intn(10) + now = now.Add(time.Duration(r) * time.Second) + fake.Set(now) + + id, dp := sum.Stream() + err := exp.Store(id, dp) + require.NoError(t, err) + } + mtx.Unlock() + + go func() { + for { + now = now.Add(time.Second) + fake.Set(now) + time.Sleep(2 * time.Millisecond) + } + }() + + for { + mtx.Lock() + n := tm.Len() + mtx.Unlock() + if n == 0 { + break + } + time.Sleep(10 * time.Millisecond) + } + + for id := range tm.add { + add := tm.add[id] + del := tm.del[id] + require.Equal(t, maxStale, del.Sub(add)) + } +} + +type TimeMap[T any] struct { + streams.Map[T] + + add map[streams.Ident]time.Time + del map[streams.Ident]time.Time +} + +func (t TimeMap[T]) Store(id streams.Ident, v T) error { + t.add[id] = clock.Now() + return t.Map.Store(id, v) +} + +func (t TimeMap[T]) Delete(id streams.Ident) { + t.del[id] = clock.Now() + t.Map.Delete(id) +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/streams.go b/processor/deltatocumulativeprocessor/internal/streams/streams.go index 3cd99a760dd8..c6f725c7bfa2 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/streams.go +++ b/processor/deltatocumulativeprocessor/internal/streams/streams.go @@ -4,32 +4,32 @@ package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" import ( - "hash" - "strconv" - + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" +) + +type Ident = identity.Stream + +type ( + Seq[T any] streams.Seq[T] + Map[T any] streams.Map[T] ) type Aggregator[D data.Point[D]] interface { Aggregate(Ident, D) (D, error) } -type Ident struct { - metric metrics.Ident - attrs [16]byte -} - -func (i Ident) Hash() hash.Hash64 { - sum := i.metric.Hash() - sum.Write(i.attrs[:]) - return sum +func IntoAggregator[D data.Point[D]](m Map[D]) MapAggr[D] { + return MapAggr[D]{Map: m} } -func (i Ident) String() string { - return strconv.FormatUint(i.Hash().Sum64(), 16) +type MapAggr[D data.Point[D]] struct { + Map[D] } -func (i Ident) Metric() metrics.Ident { - return i.metric +func (a MapAggr[D]) Aggregate(id Ident, dp D) (D, error) { + err := a.Map.Store(id, dp) + v, _ := a.Map.Load(id) + return v, err } diff --git a/processor/deltatocumulativeprocessor/internal/testdata/random/random.go b/processor/deltatocumulativeprocessor/internal/testdata/random/random.go index e526ad08e28d..58ec3c2488c8 100644 --- a/processor/deltatocumulativeprocessor/internal/testdata/random/random.go +++ b/processor/deltatocumulativeprocessor/internal/testdata/random/random.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" @@ -38,7 +39,7 @@ func (m Metric) Stream() (streams.Ident, data.Number) { for i := 0; i < 10; i++ { dp.Attributes().PutStr(randStr(), randStr()) } - id := streams.Identify(m.Ident(), dp.Attributes()) + id := identity.OfStream(m.Ident(), dp) return id, data.Number{NumberDataPoint: dp} } diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index 057f3bcc3b37..8f5941ce84b1 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -6,6 +6,7 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentele import ( "context" "errors" + "sync" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -28,10 +29,13 @@ type Processor struct { ctx context.Context cancel context.CancelFunc - nums streams.Aggregator[data.Number] + aggr streams.Aggregator[data.Number] + exp *streams.Expiry[data.Number] + + mtx sync.Mutex } -func newProcessor(_ *Config, log *zap.Logger, next consumer.Metrics) *Processor { +func newProcessor(cfg *Config, log *zap.Logger, next consumer.Metrics) *Processor { ctx, cancel := context.WithCancel(context.Background()) proc := Processor{ @@ -39,13 +43,38 @@ func newProcessor(_ *Config, log *zap.Logger, next consumer.Metrics) *Processor ctx: ctx, cancel: cancel, next: next, - nums: delta.Numbers(), } + var dps streams.Map[data.Number] + dps = delta.New[data.Number]() + + if cfg.MaxStale > 0 { + exp := streams.ExpireAfter(dps, cfg.MaxStale) + proc.exp = &exp + dps = &exp + } + + proc.aggr = streams.IntoAggregator(dps) return &proc } func (p *Processor) Start(_ context.Context, _ component.Host) error { + if p.exp != nil { + go func() { + for { + p.mtx.Lock() + next := p.exp.ExpireOldEntries() + p.mtx.Unlock() + + select { + case <-next: + case <-p.ctx.Done(): + return + } + } + }() + } + return nil } @@ -59,6 +88,9 @@ func (p *Processor) Capabilities() consumer.Capabilities { } func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + p.mtx.Lock() + defer p.mtx.Unlock() + var errs error metrics.Each(md, func(m metrics.Metric) { @@ -66,7 +98,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro case pmetric.MetricTypeSum: sum := m.Sum() if sum.AggregationTemporality() == pmetric.AggregationTemporalityDelta { - err := streams.Aggregate[data.Number](metrics.Sum(m), p.nums) + err := streams.Aggregate[data.Number](metrics.Sum(m), p.aggr) errs = errors.Join(errs, err) sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) }