From 8bfaeca2977eda806620ac3cf85cfdc777078c2e Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Wed, 19 Jun 2024 12:47:06 +0200 Subject: [PATCH 1/2] improving logging of OTEL expirer --- pkg/internal/export/otel/expirer.go | 35 ++++++++++++++++------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/pkg/internal/export/otel/expirer.go b/pkg/internal/export/otel/expirer.go index b5471ac30..02777873f 100644 --- a/pkg/internal/export/otel/expirer.go +++ b/pkg/internal/export/otel/expirer.go @@ -33,7 +33,7 @@ type Expirer[Record any, Metric removableMetric[ValType], ValType any] struct { ctx context.Context attrs []attributes.Field[Record, attribute.KeyValue] metric Metric - entries *expire.ExpiryMap[*expiryMapEntry[Metric, ValType]] + entries *expire.ExpiryMap[attribute.Set] log *slog.Logger clock expire.Clock @@ -41,11 +41,6 @@ type Expirer[Record any, Metric removableMetric[ValType], ValType any] struct { ttl time.Duration } -type expiryMapEntry[Metric removableMetric[ValType], ValType any] struct { - metric Metric - attributes attribute.Set -} - // NewExpirer creates an expirer that wraps data points of a given type. Its labeled instances are dropped // if they haven't been updated during the last timeout period. // Arguments: @@ -64,7 +59,7 @@ func NewExpirer[Record any, Metric removableMetric[ValType], ValType any]( ctx: ctx, metric: metric, attrs: attrs, - entries: expire.NewExpiryMap[*expiryMapEntry[Metric, ValType]](clock, ttl), + entries: expire.NewExpiryMap[attribute.Set](clock, ttl), log: plog().With("type", fmt.Sprintf("%T", metric)), clock: clock, lastExpiration: clock(), @@ -86,13 +81,10 @@ func (ex *Expirer[Record, Metric, ValType]) ForRecord(r Record, extraAttrs ...at ex.lastExpiration = now } recordAttrs, attrValues := ex.recordAttributes(r, extraAttrs...) - return ex.entries.GetOrCreate(attrValues, func() *expiryMapEntry[Metric, ValType] { + return ex.metric, ex.entries.GetOrCreate(attrValues, func() attribute.Set { ex.log.With("labelValues", attrValues).Debug("storing new metric label set") - return &expiryMapEntry[Metric, ValType]{ - metric: ex.metric, - attributes: recordAttrs, - } - }).metric, recordAttrs + return recordAttrs + }) } func (ex *Expirer[Record, Metric, ValType]) recordAttributes(m Record, extraAttrs ...attribute.KeyValue) (attribute.Set, []string) { @@ -114,9 +106,20 @@ func (ex *Expirer[Record, Metric, ValType]) recordAttributes(m Record, extraAttr func (ex *Expirer[Record, Metric, ValType]) removeOutdated(ctx context.Context) { if old := ex.entries.DeleteExpired(); len(old) > 0 { - for _, om := range old { - ex.log.Debug("deleting old OTEL metric", "labelValues", om) - om.metric.Remove(ctx, metric.WithAttributeSet(om.attributes)) + for _, attrs := range old { + if ex.log.Enabled(ex.ctx, slog.LevelDebug) { + ex.logger(attrs).Debug("deleting old OTEL metric") + } + ex.metric.Remove(ctx, metric.WithAttributeSet(attrs)) } } } + +func (ex *Expirer[Record, Metric, ValType]) logger(attrs attribute.Set) *slog.Logger { + fmtAttrs := make([]any, 0, attrs.Len()*2) + for it := attrs.Iter(); it.Next(); { + a := it.Attribute() + fmtAttrs = append(fmtAttrs, string(a.Key), a.Value.Emit()) + } + return ex.log.With(fmtAttrs...) +} From b156dd65d4807c6061169ed73a7bc511931935ea Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Wed, 19 Jun 2024 17:56:09 +0200 Subject: [PATCH 2/2] OTEL: removing metrics from removed entities --- pkg/internal/export/expire/expiry_map.go | 12 +++ pkg/internal/export/otel/common.go | 70 +++++++++--- pkg/internal/export/otel/expirer.go | 24 +++-- pkg/internal/export/otel/expirer_test.go | 130 +++++++++++++++++++++-- pkg/internal/export/otel/metrics.go | 24 ++++- pkg/internal/export/otel/metrics_proc.go | 17 ++- pkg/internal/pipe/instrumenter_test.go | 11 +- 7 files changed, 250 insertions(+), 38 deletions(-) diff --git a/pkg/internal/export/expire/expiry_map.go b/pkg/internal/export/expire/expiry_map.go index 5c6686c04..d452ddf5f 100644 --- a/pkg/internal/export/expire/expiry_map.go +++ b/pkg/internal/export/expire/expiry_map.go @@ -85,6 +85,18 @@ func (ex *ExpiryMap[T]) DeleteExpired() []T { return delEntries } +// DeleteAll cleans the map and returns a slice with its deleted elements +func (ex *ExpiryMap[T]) DeleteAll() []T { + ex.mt.Lock() + defer ex.mt.Unlock() + entries := make([]T, 0, len(ex.entries)) + for k, e := range ex.entries { + entries = append(entries, e.val) + delete(ex.entries, k) + } + return entries +} + // All returns an array with all the stored entries. It might contain expired entries // if DeleteExpired is not invoked before it. // TODO: use https://tip.golang.org/wiki/RangefuncExperiment when available diff --git a/pkg/internal/export/otel/common.go b/pkg/internal/export/otel/common.go index 8df64d5da..c495e84b9 100644 --- a/pkg/internal/export/otel/common.go +++ b/pkg/internal/export/otel/common.go @@ -7,6 +7,7 @@ import ( "log/slog" "os" "strings" + "time" "github.com/go-logr/logr" "github.com/hashicorp/golang-lru/v2/simplelru" @@ -20,6 +21,7 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.19.0" "google.golang.org/grpc/credentials" + "github.com/grafana/beyla/pkg/internal/export/expire" "github.com/grafana/beyla/pkg/internal/svc" ) @@ -83,14 +85,25 @@ func getResourceAttrs(service *svc.ID) *resource.Resource { } // ReporterPool keeps an LRU cache of different OTEL reporters given a service name. -// TODO: evict reporters after a time without being accessed type ReporterPool[T any] struct { - pool *simplelru.LRU[svc.UID, T] + pool *simplelru.LRU[svc.UID, *expirable[T]] itemConstructor func(*svc.ID) (T, error) - lastReporter T + lastReporter *expirable[T] lastService *svc.ID + + // TODO: use cacheable clock for efficiency + clock expire.Clock + ttl time.Duration + lastExpiration time.Time +} + +// expirable.NewLRU implementation is pretty undeterministic, so +// we implement our own expiration mechanism on top of simplelru.LRU +type expirable[T any] struct { + lastAccess time.Time + value T } // NewReporterPool creates a ReporterPool instance given a cache length, @@ -99,20 +112,29 @@ type ReporterPool[T any] struct { // instantiate the generic OTEL metrics/traces reporter. func NewReporterPool[T any]( cacheLen int, - callback simplelru.EvictCallback[svc.UID, T], + ttl time.Duration, + clock expire.Clock, + callback simplelru.EvictCallback[svc.UID, *expirable[T]], itemConstructor func(id *svc.ID) (T, error), ) ReporterPool[T] { - pool, err := simplelru.NewLRU[svc.UID, T](cacheLen, callback) + pool, err := simplelru.NewLRU[svc.UID, *expirable[T]](cacheLen, callback) if err != nil { // should never happen: bug! panic(err) } - return ReporterPool[T]{pool: pool, itemConstructor: itemConstructor} + return ReporterPool[T]{ + pool: pool, + itemConstructor: itemConstructor, + ttl: ttl, + clock: clock, + lastExpiration: clock(), + } } // For retrieves the associated item for the given service name, or // creates a new one if it does not exist func (rp *ReporterPool[T]) For(service *svc.ID) (T, error) { + rp.expireOldReporters() // optimization: do not query the resources' cache if the // previously processed span belongs to the same service name // as the current. @@ -129,20 +151,40 @@ func (rp *ReporterPool[T]) For(service *svc.ID) (T, error) { rp.lastService = service rp.lastReporter = lm } - return rp.lastReporter, nil + // we need to update the last access for that reporter, to avoid it + // being expired after the TTL + rp.lastReporter.lastAccess = rp.clock() + return rp.lastReporter.value, nil +} + +// expireOldReporters will remove the metrics reporters that haven't been accessed +// during the last TTL period +func (rp *ReporterPool[T]) expireOldReporters() { + now := rp.clock() + if now.Sub(rp.lastExpiration) < rp.ttl { + return + } + rp.lastExpiration = now + for { + _, v, ok := rp.pool.GetOldest() + if !ok || now.Sub(v.lastAccess) < rp.ttl { + return + } + rp.pool.RemoveOldest() + } } -func (rp *ReporterPool[T]) get(service *svc.ID) (T, error) { - if m, ok := rp.pool.Get(service.UID); ok { - return m, nil +func (rp *ReporterPool[T]) get(service *svc.ID) (*expirable[T], error) { + if e, ok := rp.pool.Get(service.UID); ok { + return e, nil } m, err := rp.itemConstructor(service) if err != nil { - var t T - return t, fmt.Errorf("creating resource for service %q: %w", service, err) + return nil, fmt.Errorf("creating resource for service %q: %w", service, err) } - rp.pool.Add(service.UID, m) - return m, nil + e := &expirable[T]{value: m} + rp.pool.Add(service.UID, e) + return e, nil } // Intermediate representation of option functions suitable for testing diff --git a/pkg/internal/export/otel/expirer.go b/pkg/internal/export/otel/expirer.go index 02777873f..3f695b750 100644 --- a/pkg/internal/export/otel/expirer.go +++ b/pkg/internal/export/otel/expirer.go @@ -105,16 +105,18 @@ func (ex *Expirer[Record, Metric, ValType]) recordAttributes(m Record, extraAttr } func (ex *Expirer[Record, Metric, ValType]) removeOutdated(ctx context.Context) { - if old := ex.entries.DeleteExpired(); len(old) > 0 { - for _, attrs := range old { - if ex.log.Enabled(ex.ctx, slog.LevelDebug) { - ex.logger(attrs).Debug("deleting old OTEL metric") - } - ex.metric.Remove(ctx, metric.WithAttributeSet(attrs)) - } + for _, attrs := range ex.entries.DeleteExpired() { + ex.deleteMetricInstance(ctx, attrs) } } +func (ex *Expirer[Record, Metric, ValType]) deleteMetricInstance(ctx context.Context, attrs attribute.Set) { + if ex.log.Enabled(ex.ctx, slog.LevelDebug) { + ex.logger(attrs).Debug("deleting old OTEL metric") + } + ex.metric.Remove(ctx, metric.WithAttributeSet(attrs)) +} + func (ex *Expirer[Record, Metric, ValType]) logger(attrs attribute.Set) *slog.Logger { fmtAttrs := make([]any, 0, attrs.Len()*2) for it := attrs.Iter(); it.Next(); { @@ -123,3 +125,11 @@ func (ex *Expirer[Record, Metric, ValType]) logger(attrs attribute.Set) *slog.Lo } return ex.log.With(fmtAttrs...) } + +// RemoveAllMetrics is explicitly invoked when the metrics reporter of a given service +// instance needs to be shut down +func (ex *Expirer[Record, Metric, ValType]) RemoveAllMetrics(ctx context.Context) { + for _, attrs := range ex.entries.DeleteAll() { + ex.deleteMetricInstance(ctx, attrs) + } +} diff --git a/pkg/internal/export/otel/expirer_test.go b/pkg/internal/export/otel/expirer_test.go index e29de368f..3cbf09064 100644 --- a/pkg/internal/export/otel/expirer_test.go +++ b/pkg/internal/export/otel/expirer_test.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/beyla/pkg/internal/netolly/ebpf" "github.com/grafana/beyla/pkg/internal/pipe/global" "github.com/grafana/beyla/pkg/internal/request" + "github.com/grafana/beyla/pkg/internal/svc" "github.com/grafana/beyla/test/collector" ) @@ -120,7 +121,11 @@ func TestNetMetricsExpiration(t *testing.T) { }) } -func TestAppMetricsExpiration(t *testing.T) { +// the expiration logic is held at two levels: +// (1) by group of attributes within the same service ID, +// (2) by metric set of a given service ID +// this test verifies case 1 +func TestAppMetricsExpiration_ByMetricAttrs(t *testing.T) { defer restoreEnvAfterExecution()() ctx, cancelCtx := context.WithCancel(context.Background()) defer cancelCtx() @@ -153,8 +158,8 @@ func TestAppMetricsExpiration(t *testing.T) { // WHEN it receives metrics metrics <- []request.Span{ - {Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200}, - {Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 150, End: 175}, + {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200}, + {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 150, End: 175}, } // THEN the metrics are exported @@ -175,7 +180,7 @@ func TestAppMetricsExpiration(t *testing.T) { // AND WHEN it keeps receiving a subset of the initial metrics during the TTL now.Advance(2 * time.Minute) metrics <- []request.Span{ - {Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 250, End: 280}, + {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 250, End: 280}, } // THEN THE metrics that have been received during the TTL period are still visible @@ -188,7 +193,7 @@ func TestAppMetricsExpiration(t *testing.T) { now.Advance(2 * time.Minute) metrics <- []request.Span{ - {Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 300, End: 310}, + {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 300, End: 310}, } // makes sure that the records channel is emptied and any remaining @@ -208,15 +213,126 @@ func TestAppMetricsExpiration(t *testing.T) { i-- continue } + require.Equal(t, map[string]string{"url.path": "/foo"}, metric.Attributes) + require.EqualValues(t, 140/float64(time.Second), metric.FloatVal) + } + + // AND WHEN the metrics labels that disappeared are received again + now.Advance(2 * time.Minute) + metrics <- []request.Span{ + {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 450, End: 520}, + } + + // THEN they are reported again, starting from zero in the case of counters + test.Eventually(t, timeout, func(t require.TestingT) { + metric := readChan(t, otlp.Records(), timeout) require.Equal(t, "http.server.request.duration", metric.Name) + assert.Equal(t, map[string]string{"url.path": "/bar"}, metric.Attributes) + assert.EqualValues(t, 70/float64(time.Second), metric.FloatVal) + }) +} + +// the expiration logic is held at two levels: +// (1) by group of attributes within the same service ID, +// (2) by metric set of a given service ID +// this test verifies case 2 +func TestAppMetricsExpiration_BySvcID(t *testing.T) { + defer restoreEnvAfterExecution()() + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + + otlp, err := collector.Start(ctx) + require.NoError(t, err) + + now := syncedClock{now: time.Now()} + timeNow = now.Now + + otelExporter, err := ReportMetrics( + ctx, + &global.ContextInfo{}, &MetricsConfig{ + Interval: 50 * time.Millisecond, + CommonEndpoint: otlp.ServerEndpoint, + MetricsProtocol: ProtocolHTTPProtobuf, + Features: []string{FeatureApplication}, + TTL: 3 * time.Minute, + ReportersCacheLen: 100, + }, attributes.Selection{ + attributes.HTTPServerDuration.Section: attributes.InclusionLists{ + Include: []string{"url.path"}, + }, + })() + + require.NoError(t, err) + + metrics := make(chan []request.Span, 20) + go otelExporter(metrics) + + // WHEN it receives metrics + metrics <- []request.Span{ + {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 100, End: 200}, + {ServiceID: svc.ID{UID: "bar"}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 150, End: 175}, + } + + // THEN the metrics are exported + test.Eventually(t, timeout, func(t require.TestingT) { + metric := readChan(t, otlp.Records(), timeout) + assert.Equal(t, "http.server.request.duration", metric.Name) assert.Equal(t, map[string]string{"url.path": "/foo"}, metric.Attributes) - assert.EqualValues(t, 140/float64(time.Second), metric.FloatVal) + assert.EqualValues(t, 100/float64(time.Second), metric.FloatVal) + }) + + test.Eventually(t, timeout, func(t require.TestingT) { + metric := readChan(t, otlp.Records(), timeout) + require.Equal(t, "http.server.request.duration", metric.Name) + assert.Equal(t, map[string]string{"url.path": "/bar"}, metric.Attributes) + assert.EqualValues(t, 25/float64(time.Second), metric.FloatVal) + }) + + // AND WHEN it keeps receiving a subset of the initial metrics during the TTL + now.Advance(2 * time.Minute) + metrics <- []request.Span{ + {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 250, End: 280}, } + // THEN THE metrics that have been received during the TTL period are still visible + test.Eventually(t, timeout, func(t require.TestingT) { + metric := readChan(t, otlp.Records(), timeout) + require.Equal(t, "http.server.request.duration", metric.Name) + require.Equal(t, map[string]string{"url.path": "/foo"}, metric.Attributes) + assert.EqualValues(t, 130/float64(time.Second), metric.FloatVal) + }) + + now.Advance(2 * time.Minute) + metrics <- []request.Span{ + {ServiceID: svc.ID{UID: "foo"}, Type: request.EventTypeHTTP, Path: "/foo", RequestStart: 300, End: 310}, + } + + // BUT not the metrics that haven't been received during that time. + // We just know it because OTEL will only sends foo/bar metric. + // If this test is flaky: it means it is actually failing + // repeating 10 times to make sure that only this metric is forwarded + // need to wait until expireCache internal goroutine removes all the expired entries + test.Eventually(t, timeout, func(t require.TestingT) { + // makes sure that the records channel is emptied and any remaining + // old metric is sent and then the channel is re-emptied + otlp.ResetRecords() + readChan(t, otlp.Records(), timeout) + otlp.ResetRecords() + for i := 0; i < 10; i++ { + metric := readChan(t, otlp.Records(), timeout) + if metric.Name != "http.server.request.duration" { + // ignore other HTTP metrics (e.g. request size) + i-- + continue + } + require.Equal(t, map[string]string{"url.path": "/foo"}, metric.Attributes) + require.EqualValues(t, 140/float64(time.Second), metric.FloatVal) + } + }) // AND WHEN the metrics labels that disappeared are received again now.Advance(2 * time.Minute) metrics <- []request.Span{ - {Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 450, End: 520}, + {ServiceID: svc.ID{UID: "bar"}, Type: request.EventTypeHTTP, Path: "/bar", RequestStart: 450, End: 520}, } // THEN they are reported again, starting from zero in the case of counters diff --git a/pkg/internal/export/otel/metrics.go b/pkg/internal/export/otel/metrics.go index 0fa4e39c2..a4cbf1405 100644 --- a/pkg/internal/export/otel/metrics.go +++ b/pkg/internal/export/otel/metrics.go @@ -179,6 +179,7 @@ type Metrics struct { service *svc.ID provider *metric.MeterProvider + // IMPORTANT! Don't forget to clean each Expirer in cleanupAllMetricsInstances method httpDuration *Expirer[*request.Span, instrument.Float64Histogram, float64] httpClientDuration *Expirer[*request.Span, instrument.Float64Histogram, float64] grpcDuration *Expirer[*request.Span, instrument.Float64Histogram, float64] @@ -256,17 +257,18 @@ func newMetricsReporter( mr.attrMessagingProcess = attributes.OpenTelemetryGetters( request.SpanOTELGetters, mr.attributes.For(attributes.MessagingProcessDuration)) - mr.reporters = NewReporterPool(cfg.ReportersCacheLen, - func(id svc.UID, v *Metrics) { + mr.reporters = NewReporterPool(cfg.ReportersCacheLen, cfg.TTL, timeNow, + func(id svc.UID, v *expirable[*Metrics]) { if mr.cfg.SpanMetricsEnabled() { - attrOpt := instrument.WithAttributeSet(mr.metricResourceAttributes(v.service)) - v.tracesTargetInfo.Add(mr.ctx, 1, attrOpt) + attrOpt := instrument.WithAttributeSet(mr.metricResourceAttributes(v.value.service)) + v.value.tracesTargetInfo.Add(mr.ctx, 1, attrOpt) } llog := log.With("service", id) llog.Debug("evicting metrics reporter from cache") + v.value.cleanupAllMetricsInstances() go func() { - if err := v.provider.ForceFlush(ctx); err != nil { + if err := v.value.provider.ForceFlush(ctx); err != nil { llog.Warn("error flushing evicted metrics provider", "error", err) } }() @@ -875,3 +877,15 @@ func setMetricsProtocol(cfg *MetricsConfig) { // unset. Guessing it os.Setenv(envMetricsProtocol, string(cfg.GuessProtocol())) } + +func (r *Metrics) cleanupAllMetricsInstances() { + r.httpDuration.RemoveAllMetrics(r.ctx) + r.httpClientDuration.RemoveAllMetrics(r.ctx) + r.grpcDuration.RemoveAllMetrics(r.ctx) + r.grpcClientDuration.RemoveAllMetrics(r.ctx) + r.dbClientDuration.RemoveAllMetrics(r.ctx) + r.msgPublishDuration.RemoveAllMetrics(r.ctx) + r.msgProcessDuration.RemoveAllMetrics(r.ctx) + r.httpRequestSize.RemoveAllMetrics(r.ctx) + r.httpClientRequestSize.RemoveAllMetrics(r.ctx) +} diff --git a/pkg/internal/export/otel/metrics_proc.go b/pkg/internal/export/otel/metrics_proc.go index 88be1b1a3..06ab78d87 100644 --- a/pkg/internal/export/otel/metrics_proc.go +++ b/pkg/internal/export/otel/metrics_proc.go @@ -79,6 +79,7 @@ type procMetrics struct { service *svc.ID provider *metric.MeterProvider + // don't forget to add the cleanup code in cleanupAllMetricsInstances function cpuTime *Expirer[*process.Status, metric2.Float64Counter, float64] cpuUtilisation *Expirer[*process.Status, metric2.Float64Gauge, float64] memory *Expirer[*process.Status, metric2.Int64UpDownCounter, int64] @@ -164,12 +165,13 @@ func newProcMetricsExporter( mr.netObserver = netAggregatedObserver } - mr.reporters = NewReporterPool[*procMetrics](cfg.Metrics.ReportersCacheLen, - func(id svc.UID, v *procMetrics) { + mr.reporters = NewReporterPool[*procMetrics](cfg.Metrics.ReportersCacheLen, cfg.Metrics.TTL, timeNow, + func(id svc.UID, v *expirable[*procMetrics]) { llog := log.With("service", id) llog.Debug("evicting metrics reporter from cache") + v.value.cleanupAllMetricsInstances() go func() { - if err := v.provider.ForceFlush(ctx); err != nil { + if err := v.value.provider.ForceFlush(ctx); err != nil { llog.Warn("error flushing evicted metrics provider", "error", err) } }() @@ -367,3 +369,12 @@ func netDisaggregatedObserver(ctx context.Context, reporter *procMetrics, record net, attrs = reporter.net.ForRecord(record, netIODirRcv) net.Add(ctx, record.NetRcvBytesDelta, metric2.WithAttributeSet(attrs)) } + +func (r *procMetrics) cleanupAllMetricsInstances() { + r.cpuTime.RemoveAllMetrics(r.ctx) + r.cpuUtilisation.RemoveAllMetrics(r.ctx) + r.memory.RemoveAllMetrics(r.ctx) + r.memoryVirtual.RemoveAllMetrics(r.ctx) + r.disk.RemoveAllMetrics(r.ctx) + r.net.RemoveAllMetrics(r.ctx) +} diff --git a/pkg/internal/pipe/instrumenter_test.go b/pkg/internal/pipe/instrumenter_test.go index a13e89294..eb2ff6ae6 100644 --- a/pkg/internal/pipe/instrumenter_test.go +++ b/pkg/internal/pipe/instrumenter_test.go @@ -63,6 +63,7 @@ func TestBasicPipeline(t *testing.T) { Features: []string{otel.FeatureApplication}, MetricsEndpoint: tc.ServerEndpoint, Interval: 10 * time.Millisecond, ReportersCacheLen: 16, + TTL: 5 * time.Minute, }, Attributes: beyla.Attributes{Select: allMetrics}, }, gctx(0), make(<-chan []request.Span)) @@ -250,6 +251,7 @@ func TestRouteConsolidation(t *testing.T) { Features: []string{otel.FeatureApplication}, MetricsEndpoint: tc.ServerEndpoint, Interval: 10 * time.Millisecond, ReportersCacheLen: 16, + TTL: 5 * time.Minute, }, Routes: &transform.RoutesConfig{Patterns: []string{"/user/{id}", "/products/{id}/push"}}, Attributes: beyla.Attributes{Select: allMetricsBut("client.address", "url.path")}, @@ -293,7 +295,8 @@ func TestRouteConsolidation(t *testing.T) { string(semconv.TelemetrySDKLanguageKey): "go", string(semconv.TelemetrySDKNameKey): "beyla", }, - Type: pmetric.MetricTypeHistogram, + Type: pmetric.MetricTypeHistogram, + FloatVal: 2 / float64(time.Second), }, events["/user/{id}"]) assert.Equal(t, collector.MetricRecord{ @@ -310,7 +313,8 @@ func TestRouteConsolidation(t *testing.T) { string(semconv.TelemetrySDKLanguageKey): "go", string(semconv.TelemetrySDKNameKey): "beyla", }, - Type: pmetric.MetricTypeHistogram, + Type: pmetric.MetricTypeHistogram, + FloatVal: 2 / float64(time.Second), }, events["/products/{id}/push"]) assert.Equal(t, collector.MetricRecord{ @@ -344,6 +348,7 @@ func TestGRPCPipeline(t *testing.T) { Features: []string{otel.FeatureApplication}, MetricsEndpoint: tc.ServerEndpoint, Interval: time.Millisecond, ReportersCacheLen: 16, + TTL: 5 * time.Minute, }, Attributes: beyla.Attributes{Select: allMetrics}, }, gctx(0), make(<-chan []request.Span)) @@ -430,6 +435,7 @@ func TestBasicPipelineInfo(t *testing.T) { Features: []string{otel.FeatureApplication}, MetricsEndpoint: tc.ServerEndpoint, Interval: 10 * time.Millisecond, ReportersCacheLen: 16, + TTL: 5 * time.Minute, }, Attributes: beyla.Attributes{Select: allMetrics}, }, gctx(0), tracesInput) @@ -500,6 +506,7 @@ func TestSpanAttributeFilterNode(t *testing.T) { Features: []string{otel.FeatureApplication}, MetricsEndpoint: tc.ServerEndpoint, Interval: 10 * time.Millisecond, ReportersCacheLen: 16, + TTL: 5 * time.Minute, }, Filters: filter.AttributesConfig{ Application: map[string]filter.MatchDefinition{"url.path": {Match: "/user/*"}},