Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OTEL: removing metrics from removed entities #948

Merged
merged 2 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/internal/export/expire/expiry_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ func (ex *ExpiryMap[T]) DeleteExpired() []T {
return delEntries
}

// DeleteAll cleans the map and returns a slice with its deleted elements
func (ex *ExpiryMap[T]) DeleteAll() []T {
ex.mt.Lock()
defer ex.mt.Unlock()
entries := make([]T, 0, len(ex.entries))
for k, e := range ex.entries {
entries = append(entries, e.val)
delete(ex.entries, k)
}
return entries
}

// All returns an array with all the stored entries. It might contain expired entries
// if DeleteExpired is not invoked before it.
// TODO: use https://tip.golang.org/wiki/RangefuncExperiment when available
Expand Down
70 changes: 56 additions & 14 deletions pkg/internal/export/otel/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"log/slog"
"os"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/hashicorp/golang-lru/v2/simplelru"
Expand All @@ -20,6 +21,7 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.19.0"
"google.golang.org/grpc/credentials"

"github.com/grafana/beyla/pkg/internal/export/expire"
"github.com/grafana/beyla/pkg/internal/svc"
)

Expand Down Expand Up @@ -83,14 +85,25 @@ func getResourceAttrs(service *svc.ID) *resource.Resource {
}

// ReporterPool keeps an LRU cache of different OTEL reporters given a service name.
// TODO: evict reporters after a time without being accessed
type ReporterPool[T any] struct {
pool *simplelru.LRU[svc.UID, T]
pool *simplelru.LRU[svc.UID, *expirable[T]]

itemConstructor func(*svc.ID) (T, error)

lastReporter T
lastReporter *expirable[T]
lastService *svc.ID

// TODO: use cacheable clock for efficiency
clock expire.Clock
ttl time.Duration
lastExpiration time.Time
}

// expirable.NewLRU implementation is pretty undeterministic, so
// we implement our own expiration mechanism on top of simplelru.LRU
type expirable[T any] struct {
lastAccess time.Time
value T
}

// NewReporterPool creates a ReporterPool instance given a cache length,
Expand All @@ -99,20 +112,29 @@ type ReporterPool[T any] struct {
// instantiate the generic OTEL metrics/traces reporter.
func NewReporterPool[T any](
cacheLen int,
callback simplelru.EvictCallback[svc.UID, T],
ttl time.Duration,
clock expire.Clock,
callback simplelru.EvictCallback[svc.UID, *expirable[T]],
itemConstructor func(id *svc.ID) (T, error),
) ReporterPool[T] {
pool, err := simplelru.NewLRU[svc.UID, T](cacheLen, callback)
pool, err := simplelru.NewLRU[svc.UID, *expirable[T]](cacheLen, callback)
if err != nil {
// should never happen: bug!
panic(err)
}
return ReporterPool[T]{pool: pool, itemConstructor: itemConstructor}
return ReporterPool[T]{
pool: pool,
itemConstructor: itemConstructor,
ttl: ttl,
clock: clock,
lastExpiration: clock(),
}
}

// For retrieves the associated item for the given service name, or
// creates a new one if it does not exist
func (rp *ReporterPool[T]) For(service *svc.ID) (T, error) {
rp.expireOldReporters()
// optimization: do not query the resources' cache if the
// previously processed span belongs to the same service name
// as the current.
Expand All @@ -129,20 +151,40 @@ func (rp *ReporterPool[T]) For(service *svc.ID) (T, error) {
rp.lastService = service
rp.lastReporter = lm
}
return rp.lastReporter, nil
// we need to update the last access for that reporter, to avoid it
// being expired after the TTL
rp.lastReporter.lastAccess = rp.clock()
return rp.lastReporter.value, nil
}

// expireOldReporters will remove the metrics reporters that haven't been accessed
// during the last TTL period
func (rp *ReporterPool[T]) expireOldReporters() {
now := rp.clock()
if now.Sub(rp.lastExpiration) < rp.ttl {
return
}
rp.lastExpiration = now
for {
_, v, ok := rp.pool.GetOldest()
if !ok || now.Sub(v.lastAccess) < rp.ttl {
return
}
rp.pool.RemoveOldest()
}
}

func (rp *ReporterPool[T]) get(service *svc.ID) (T, error) {
if m, ok := rp.pool.Get(service.UID); ok {
return m, nil
func (rp *ReporterPool[T]) get(service *svc.ID) (*expirable[T], error) {
if e, ok := rp.pool.Get(service.UID); ok {
return e, nil
}
m, err := rp.itemConstructor(service)
if err != nil {
var t T
return t, fmt.Errorf("creating resource for service %q: %w", service, err)
return nil, fmt.Errorf("creating resource for service %q: %w", service, err)
}
rp.pool.Add(service.UID, m)
return m, nil
e := &expirable[T]{value: m}
rp.pool.Add(service.UID, e)
return e, nil
}

// Intermediate representation of option functions suitable for testing
Expand Down
49 changes: 31 additions & 18 deletions pkg/internal/export/otel/expirer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,14 @@ type Expirer[Record any, Metric removableMetric[ValType], ValType any] struct {
ctx context.Context
attrs []attributes.Field[Record, attribute.KeyValue]
metric Metric
entries *expire.ExpiryMap[*expiryMapEntry[Metric, ValType]]
entries *expire.ExpiryMap[attribute.Set]
log *slog.Logger

clock expire.Clock
lastExpiration time.Time
ttl time.Duration
}

type expiryMapEntry[Metric removableMetric[ValType], ValType any] struct {
metric Metric
attributes attribute.Set
}

// NewExpirer creates an expirer that wraps data points of a given type. Its labeled instances are dropped
// if they haven't been updated during the last timeout period.
// Arguments:
Expand All @@ -64,7 +59,7 @@ func NewExpirer[Record any, Metric removableMetric[ValType], ValType any](
ctx: ctx,
metric: metric,
attrs: attrs,
entries: expire.NewExpiryMap[*expiryMapEntry[Metric, ValType]](clock, ttl),
entries: expire.NewExpiryMap[attribute.Set](clock, ttl),
log: plog().With("type", fmt.Sprintf("%T", metric)),
clock: clock,
lastExpiration: clock(),
Expand All @@ -86,13 +81,10 @@ func (ex *Expirer[Record, Metric, ValType]) ForRecord(r Record, extraAttrs ...at
ex.lastExpiration = now
}
recordAttrs, attrValues := ex.recordAttributes(r, extraAttrs...)
return ex.entries.GetOrCreate(attrValues, func() *expiryMapEntry[Metric, ValType] {
return ex.metric, ex.entries.GetOrCreate(attrValues, func() attribute.Set {
ex.log.With("labelValues", attrValues).Debug("storing new metric label set")
return &expiryMapEntry[Metric, ValType]{
metric: ex.metric,
attributes: recordAttrs,
}
}).metric, recordAttrs
return recordAttrs
})
}

func (ex *Expirer[Record, Metric, ValType]) recordAttributes(m Record, extraAttrs ...attribute.KeyValue) (attribute.Set, []string) {
Expand All @@ -113,10 +105,31 @@ func (ex *Expirer[Record, Metric, ValType]) recordAttributes(m Record, extraAttr
}

func (ex *Expirer[Record, Metric, ValType]) removeOutdated(ctx context.Context) {
if old := ex.entries.DeleteExpired(); len(old) > 0 {
for _, om := range old {
ex.log.Debug("deleting old OTEL metric", "labelValues", om)
om.metric.Remove(ctx, metric.WithAttributeSet(om.attributes))
}
for _, attrs := range ex.entries.DeleteExpired() {
ex.deleteMetricInstance(ctx, attrs)
}
}

func (ex *Expirer[Record, Metric, ValType]) deleteMetricInstance(ctx context.Context, attrs attribute.Set) {
if ex.log.Enabled(ex.ctx, slog.LevelDebug) {
ex.logger(attrs).Debug("deleting old OTEL metric")
}
ex.metric.Remove(ctx, metric.WithAttributeSet(attrs))
}

func (ex *Expirer[Record, Metric, ValType]) logger(attrs attribute.Set) *slog.Logger {
fmtAttrs := make([]any, 0, attrs.Len()*2)
for it := attrs.Iter(); it.Next(); {
a := it.Attribute()
fmtAttrs = append(fmtAttrs, string(a.Key), a.Value.Emit())
}
return ex.log.With(fmtAttrs...)
}

// RemoveAllMetrics is explicitly invoked when the metrics reporter of a given service
// instance needs to be shut down
func (ex *Expirer[Record, Metric, ValType]) RemoveAllMetrics(ctx context.Context) {
for _, attrs := range ex.entries.DeleteAll() {
ex.deleteMetricInstance(ctx, attrs)
}
}
Loading
Loading