Skip to content

Commit

Permalink
[processor/deltatocumulative]: expire stale series (#31337)
Browse files Browse the repository at this point in the history
**Description:** Removes stale series from tracking (and thus frees
their memory) using staleness logic from
#31089

**Link to tracking Issue:**
#30705,
#31016

**Testing:** `TestExpiry`
**Documentation:** README updated
  • Loading branch information
sh0rez authored Mar 5, 2024
1 parent 17a0f2c commit f003e0e
Show file tree
Hide file tree
Showing 24 changed files with 509 additions and 257 deletions.
30 changes: 30 additions & 0 deletions .chloggen/deltatocumulative-stale.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: "deltatocumulativeprocessor"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: expire stale series

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30705, 31016]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Adds `max_stale` option that allows to set an interval (default = `5min`)
after which a series that no longer receives new samples is removed from
tracking.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 0 additions & 2 deletions internal/exp/metrics/identity/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"hash"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
)
Expand All @@ -32,6 +31,5 @@ func OfStream[DataPoint attrPoint](m Metric, dp DataPoint) Stream {
}

type attrPoint interface {
pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint | pmetric.SummaryDataPoint
Attributes() pcommon.Map
}
54 changes: 0 additions & 54 deletions internal/exp/metrics/staleness/map.go

This file was deleted.

39 changes: 28 additions & 11 deletions internal/exp/metrics/staleness/staleness.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
)

// We override how Now() is returned, so we can have deterministic tests
Expand All @@ -19,29 +20,34 @@ var NowFunc = time.Now
// NOTE: Staleness methods are *not* thread-safe. If the user needs to use Staleness in a multi-threaded
// environment, then it is the user's responsibility to properly serialize calls to Staleness methods
type Staleness[T any] struct {
max time.Duration
Max time.Duration

items Map[T]
items streams.Map[T]
pq PriorityQueue
}

func NewStaleness[T any](max time.Duration, newMap Map[T]) *Staleness[T] {
func NewStaleness[T any](max time.Duration, items streams.Map[T]) *Staleness[T] {
return &Staleness[T]{
max: max,
items: newMap,
Max: max,

items: items,
pq: NewPriorityQueue(),
}
}

// Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value
func (s *Staleness[T]) Load(key identity.Stream) (T, bool) {
return s.items.Load(key)
func (s *Staleness[T]) Load(id identity.Stream) (T, bool) {
return s.items.Load(id)
}

// Store the given key value pair in the map, and update the pair's staleness value to "now"
func (s *Staleness[T]) Store(id identity.Stream, value T) {
func (s *Staleness[T]) Store(id identity.Stream, v T) error {
s.pq.Update(id, NowFunc())
s.items.Store(id, value)
return s.items.Store(id, v)
}

func (s *Staleness[T]) Delete(id identity.Stream) {
s.items.Delete(id)
}

// Items returns an iterator function that in future go version can be used with range
Expand All @@ -55,13 +61,24 @@ func (s *Staleness[T]) Items() func(yield func(identity.Stream, T) bool) bool {
// be removed. But if an entry had a stalness value of 30 minutes, then it *wouldn't* be removed.
func (s *Staleness[T]) ExpireOldEntries() {
now := NowFunc()

for {
if s.Len() == 0 {
return
}
_, ts := s.pq.Peek()
if now.Sub(ts) < s.max {
if now.Sub(ts) < s.Max {
break
}
id, _ := s.pq.Pop()
s.items.Delete(id)
}
}

func (s *Staleness[T]) Len() int {
return s.items.Len()
}

func (s *Staleness[T]) Next() time.Time {
_, ts := s.pq.Peek()
return ts
}
11 changes: 6 additions & 5 deletions internal/exp/metrics/staleness/staleness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
)

func TestStaleness(t *testing.T) {
max := 1 * time.Second
stalenessMap := NewStaleness[int](
max,
&RawMap[identity.Stream, int]{},
make(streams.HashMap[int]),
)

idA := generateStreamID(t, map[string]any{
Expand Down Expand Up @@ -45,13 +46,13 @@ func TestStaleness(t *testing.T) {

// Add the values to the map
NowFunc = func() time.Time { return timeA }
stalenessMap.Store(idA, valueA)
_ = stalenessMap.Store(idA, valueA)
NowFunc = func() time.Time { return timeB }
stalenessMap.Store(idB, valueB)
_ = stalenessMap.Store(idB, valueB)
NowFunc = func() time.Time { return timeC }
stalenessMap.Store(idC, valueC)
_ = stalenessMap.Store(idC, valueC)
NowFunc = func() time.Time { return timeD }
stalenessMap.Store(idD, valueD)
_ = stalenessMap.Store(idD, valueD)

// Set the time to 2.5s and run expire
// This should remove B, but the others should remain
Expand Down
52 changes: 52 additions & 0 deletions internal/exp/metrics/streams/streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"

import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"

// Sequence of streams that can be iterated upon
type Seq[T any] func(yield func(identity.Stream, T) bool) bool

// Map defines a collection of items tracked by a stream-id and the operations
// on it
type Map[T any] interface {
Load(identity.Stream) (T, bool)
Store(identity.Stream, T) error
Delete(identity.Stream)
Items() func(yield func(identity.Stream, T) bool) bool
Len() int
}

var _ Map[any] = HashMap[any](nil)

type HashMap[T any] map[identity.Stream]T

func (m HashMap[T]) Load(id identity.Stream) (T, bool) {
v, ok := (map[identity.Stream]T)(m)[id]
return v, ok
}

func (m HashMap[T]) Store(id identity.Stream, v T) error {
(map[identity.Stream]T)(m)[id] = v
return nil
}

func (m HashMap[T]) Delete(id identity.Stream) {
delete((map[identity.Stream]T)(m), id)
}

func (m HashMap[T]) Items() func(yield func(identity.Stream, T) bool) bool {
return func(yield func(identity.Stream, T) bool) bool {
for id, v := range (map[identity.Stream]T)(m) {
if !yield(id, v) {
break
}
}
return false
}
}

func (m HashMap[T]) Len() int {
return len((map[identity.Stream]T)(m))
}
2 changes: 2 additions & 0 deletions processor/deltatocumulativeprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ metrics from delta temporality to cumulative, by accumulating samples in memory.
``` yaml
processors:
deltatocumulative:
# how long until a series not receiving new samples is removed
[ max_stale: <duration> | default = 5m ]
```
There is no further configuration required. All delta samples are converted to cumulative.
10 changes: 9 additions & 1 deletion processor/deltatocumulativeprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,21 @@
package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor"

import (
"fmt"
"time"

"go.opentelemetry.io/collector/component"
)

var _ component.ConfigValidator = (*Config)(nil)

type Config struct{}
type Config struct {
MaxStale time.Duration `json:"max_stale"`
}

func (c *Config) Validate() error {
if c.MaxStale <= 0 {
return fmt.Errorf("max_stale must be a positive duration (got %s)", c.MaxStale)
}
return nil
}
3 changes: 2 additions & 1 deletion processor/deltatocumulativeprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentele
import (
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -23,7 +24,7 @@ func NewFactory() processor.Factory {
}

func createDefaultConfig() component.Config {
return &Config{}
return &Config{MaxStale: 5 * time.Minute}
}

func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg component.Config, next consumer.Metrics) (processor.Metrics, error) {
Expand Down
5 changes: 4 additions & 1 deletion processor/deltatocumulativeprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/delta
go 1.21

require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.96.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.0.0-00010101000000-000000000000
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.96.0
go.opentelemetry.io/collector/confmap v0.96.0
Expand Down Expand Up @@ -32,6 +32,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.96.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_model v0.6.0 // indirect
Expand All @@ -54,3 +55,5 @@ require (
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics
Loading

0 comments on commit f003e0e

Please sign in to comment.