From 84af0d18d3ed7365b1a49468682c9fa79f3da84d Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Wed, 3 Jul 2024 15:51:36 +0200 Subject: [PATCH] Differentiation between metric and resource attributes (#984) * start to split attributes in resource/metric * separate metric and resource attributes * add comment * Fix k8s test --- pkg/internal/export/attributes/attr_defs.go | 69 +++++------ .../export/attributes/attr_getters.go | 19 +++- .../export/attributes/attr_selector.go | 107 +++++++++++++----- .../export/attributes/attr_selector_test.go | 54 +++++---- pkg/internal/export/otel/metrics.go | 36 +++--- pkg/internal/export/otel/metrics_net.go | 2 +- pkg/internal/export/otel/metrics_proc.go | 33 +++--- pkg/internal/export/otel/traces.go | 5 +- pkg/internal/export/prom/prom_proc.go | 3 +- pkg/internal/pipe/instrumenter_test.go | 86 ++++++++++---- .../daemonset/k8s_daemonset_z_metrics_test.go | 1 - 11 files changed, 269 insertions(+), 146 deletions(-) diff --git a/pkg/internal/export/attributes/attr_defs.go b/pkg/internal/export/attributes/attr_defs.go index 5b390e757..ac9c79b96 100644 --- a/pkg/internal/export/attributes/attr_defs.go +++ b/pkg/internal/export/attributes/attr_defs.go @@ -42,17 +42,17 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup { // attributes to be reported exclusively for prometheus exporters var prometheusAttributes = AttrReportGroup{ Disabled: !promEnabled, - Attributes: map[attr.Name]Default{ + MetricAttributes: map[attr.Name]Default{ attr.TargetInstance: true, attr.ServiceNamespace: true, }, } - // ServiceName is reported both as resource and metrics attribute, as - // the OTEL definition requires that it is reported as resource attribute - // but Grafana Cloud takes int from the metric + // ServiceName and ServiceNamespace are reported both as resource and metric attributes, as + // the OTEL definition requires that it is reported as resource attribute, + // but Grafana Cloud takes it from the metric var appAttributes = AttrReportGroup{ SubGroups: []*AttrReportGroup{&prometheusAttributes}, - Attributes: map[attr.Name]Default{ + MetricAttributes: map[attr.Name]Default{ attr.ServiceName: true, attr.ServiceNamespace: true, }, @@ -62,7 +62,7 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup { // kubernetes metadata is enabled var networkKubeAttributes = AttrReportGroup{ Disabled: !kubeEnabled, - Attributes: map[attr.Name]Default{ + MetricAttributes: map[attr.Name]Default{ attr.K8sSrcOwnerName: true, attr.K8sSrcNamespace: true, attr.K8sDstOwnerName: true, @@ -85,7 +85,7 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup { // is defined var networkCIDR = AttrReportGroup{ Disabled: !cidrEnabled, - Attributes: map[attr.Name]Default{ + MetricAttributes: map[attr.Name]Default{ attr.DstCIDR: true, attr.SrcCIDR: true, }, @@ -95,7 +95,7 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup { // kubernetes metadata is enabled var appKubeAttributes = AttrReportGroup{ Disabled: !kubeEnabled, - Attributes: map[attr.Name]Default{ + ResourceAttributes: map[attr.Name]Default{ attr.K8sNamespaceName: true, attr.K8sPodName: true, attr.K8sDeploymentName: true, @@ -111,24 +111,24 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup { var httpRoutes = AttrReportGroup{ Disabled: !groups.Has(GroupHTTPRoutes), - Attributes: map[attr.Name]Default{ + MetricAttributes: map[attr.Name]Default{ attr.HTTPRoute: true, }, } var serverInfo = AttrReportGroup{ - Attributes: map[attr.Name]Default{ + MetricAttributes: map[attr.Name]Default{ attr.ClientAddr: Default(peerInfoEnabled), }, } var httpClientInfo = AttrReportGroup{ - Attributes: map[attr.Name]Default{ + MetricAttributes: map[attr.Name]Default{ attr.ServerAddr: Default(peerInfoEnabled), attr.ServerPort: Default(peerInfoEnabled), }, } var grpcClientInfo = AttrReportGroup{ - Attributes: map[attr.Name]Default{ + MetricAttributes: map[attr.Name]Default{ attr.ServerAddr: Default(peerInfoEnabled), }, } @@ -138,14 +138,14 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup { // via the deprecated BEYLA_METRICS_REPORT_PEER config option var deprecatedHTTPPath = AttrReportGroup{ Disabled: !groups.Has(GroupTarget), - Attributes: map[attr.Name]Default{ + MetricAttributes: map[attr.Name]Default{ attr.HTTPUrlPath: true, }, } var httpCommon = AttrReportGroup{ SubGroups: []*AttrReportGroup{&httpRoutes, &deprecatedHTTPPath}, - Attributes: map[attr.Name]Default{ + MetricAttributes: map[attr.Name]Default{ attr.HTTPRequestMethod: true, attr.HTTPResponseStatusCode: true, attr.HTTPUrlPath: false, @@ -154,14 +154,17 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup { // TODO: populate it with host resource attributes in https://opentelemetry.io/docs/specs/semconv/resource/host/ var hostAttributes = AttrReportGroup{ - Attributes: map[attr.Name]Default{ + ResourceAttributes: map[attr.Name]Default{ attr.HostName: true, }, } var processAttributes = AttrReportGroup{ SubGroups: []*AttrReportGroup{&appKubeAttributes, &hostAttributes}, - Attributes: map[attr.Name]Default{ + // TODO: attributes below are resource-level, but in App O11y we don't treat processes as resources, + // but applications. Let's first consider how to match processes and Applications before marking this spec + // as stable + MetricAttributes: map[attr.Name]Default{ attr.ProcCommand: true, attr.ProcCPUState: true, attr.ProcOwner: true, @@ -176,10 +179,18 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup { }, } + var messagingAttributes = AttrReportGroup{ + SubGroups: []*AttrReportGroup{&appAttributes, &appKubeAttributes}, + MetricAttributes: map[attr.Name]Default{ + attr.MessagingSystem: true, + attr.MessagingDestination: true, + }, + } + return map[Section]AttrReportGroup{ BeylaNetworkFlow.Section: { SubGroups: []*AttrReportGroup{&networkCIDR, &networkKubeAttributes}, - Attributes: map[attr.Name]Default{ + MetricAttributes: map[attr.Name]Default{ attr.BeylaIP: false, attr.Transport: false, attr.SrcAddress: false, @@ -206,7 +217,7 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup { }, RPCClientDuration.Section: { SubGroups: []*AttrReportGroup{&appAttributes, &appKubeAttributes, &grpcClientInfo}, - Attributes: map[attr.Name]Default{ + MetricAttributes: map[attr.Name]Default{ attr.RPCMethod: true, attr.RPCSystem: true, attr.RPCGRPCStatusCode: true, @@ -214,7 +225,7 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup { }, RPCServerDuration.Section: { SubGroups: []*AttrReportGroup{&appAttributes, &appKubeAttributes, &serverInfo}, - Attributes: map[attr.Name]Default{ + MetricAttributes: map[attr.Name]Default{ attr.RPCMethod: true, attr.RPCSystem: true, attr.RPCGRPCStatusCode: true, @@ -225,28 +236,20 @@ func getDefinitions(groups AttrGroups) map[Section]AttrReportGroup { }, DBClientDuration.Section: { SubGroups: []*AttrReportGroup{&appAttributes, &appKubeAttributes}, - Attributes: map[attr.Name]Default{ + MetricAttributes: map[attr.Name]Default{ attr.DBOperation: true, attr.DBSystem: true, attr.ErrorType: true, }, }, MessagingPublishDuration.Section: { - SubGroups: []*AttrReportGroup{&appAttributes, &appKubeAttributes}, - Attributes: map[attr.Name]Default{ - attr.MessagingSystem: true, - attr.MessagingDestination: true, - }, + SubGroups: []*AttrReportGroup{&messagingAttributes}, }, MessagingProcessDuration.Section: { - SubGroups: []*AttrReportGroup{&appAttributes, &appKubeAttributes}, - Attributes: map[attr.Name]Default{ - attr.MessagingSystem: true, - attr.MessagingDestination: true, - }, + SubGroups: []*AttrReportGroup{&messagingAttributes}, }, Traces.Section: { - Attributes: map[attr.Name]Default{ + MetricAttributes: map[attr.Name]Default{ attr.DBQueryText: false, }, }, @@ -265,7 +268,9 @@ func AllAttributeNames() map[attr.Name]struct{} { names := map[attr.Name]struct{}{} // -1 to enable all the metric group flags for _, section := range getDefinitions(-1) { - maps.Copy(names, section.All()) + allNames := section.All() + maps.Copy(names, allNames.Metric) + maps.Copy(names, allNames.Resource) } return names } diff --git a/pkg/internal/export/attributes/attr_getters.go b/pkg/internal/export/attributes/attr_getters.go index bfc535f81..bdfa65e65 100644 --- a/pkg/internal/export/attributes/attr_getters.go +++ b/pkg/internal/export/attributes/attr_getters.go @@ -24,16 +24,25 @@ type NamedGetters[T, O any] func(name attr.Name) (Getter[T, O], bool) // It differentiates two name formats: the exposed name for the attribute (uses _ for word separation, as // required by Prometheus); and the internal name of the attribute (uses . for word separation, as internally Beyla // stores the metadata). -func PrometheusGetters[T, O any](getter NamedGetters[T, O], names []attr.Name) []Field[T, O] { - return buildGetterList(getter, names, attr.Name.Prom) +func PrometheusGetters[T, O any](getter NamedGetters[T, O], names Sections[[]attr.Name]) []Field[T, O] { + // at the moment we will still keep prometheus attributes as metric-level + // TODO: move resource-level attributes to target_info metrics + attrNames := make([]attr.Name, 0, len(names.Metric)+len(names.Resource)) + attrNames = append(attrNames, names.Metric...) + attrNames = append(attrNames, names.Resource...) + return buildGetterList(getter, attrNames, attr.Name.Prom) } // OpenTelemetryGetters builds a list of Getter getters for the names provided by the // user configuration, ready to be passed to an OpenTelemetry exporter. -func OpenTelemetryGetters[T, O any](getter NamedGetters[T, O], names []attr.Name) []Field[T, O] { - return buildGetterList(getter, names, func(name attr.Name) string { +func OpenTelemetryGetters[T, O any](getter NamedGetters[T, O], names Sections[[]attr.Name]) Sections[[]Field[T, O]] { + otelToStr := func(name attr.Name) string { return string(name.OTEL()) - }) + } + return Sections[[]Field[T, O]]{ + Resource: buildGetterList(getter, names.Resource, otelToStr), + Metric: buildGetterList(getter, names.Metric, otelToStr), + } } func buildGetterList[T, O any]( diff --git a/pkg/internal/export/attributes/attr_selector.go b/pkg/internal/export/attributes/attr_selector.go index 2dc9ee073..af1ab0301 100644 --- a/pkg/internal/export/attributes/attr_selector.go +++ b/pkg/internal/export/attributes/attr_selector.go @@ -21,8 +21,16 @@ type AttrReportGroup struct { // SubGroups are attribute groups related to this instance. If this instance is // enabled, they might be also enabled (unless they are explicitly disabled) SubGroups []*AttrReportGroup - // Attributes map of name: enabled for this group - Attributes map[attr.Name]Default + // MetricAttributes map of name: enabled for this group. It refers to metric-level attributes. + MetricAttributes map[attr.Name]Default + // ResourceAttributes is like MetricAttributes but for resources (OTEL) or target_info (Prometheus) + ResourceAttributes map[attr.Name]Default +} + +// Sections classifies some attribute-related groups between Metric and Resource attributes +type Sections[T any] struct { + Metric T + Resource T } // AttrSelector returns, for each metric, the attributes that have to be reported @@ -44,68 +52,107 @@ func NewAttrSelector(groups AttrGroups, selectorCfg Selection) (*AttrSelector, e } // For returns the list of attribute names for a given metric -func (p *AttrSelector) For(metricName Name) []attr.Name { - metricAttributes, ok := p.definition[metricName.Section] +func (p *AttrSelector) For(metricName Name) Sections[[]attr.Name] { + attributeNames, ok := p.definition[metricName.Section] if !ok { panic(fmt.Sprintf("BUG! metric not found %+v", metricName)) } inclusionLists, ok := p.selector[metricName.Section] if !ok { + attrs := attributeNames.Default() // if the user did not provide any selector, return the default attributes for that metric - attrs := helpers.SetToSlice(metricAttributes.Default()) - slices.Sort(attrs) - return attrs + sas := Sections[[]attr.Name]{ + Metric: helpers.SetToSlice(attrs.Metric), + Resource: helpers.SetToSlice(attrs.Resource), + } + slices.Sort(sas.Metric) + slices.Sort(sas.Resource) + return sas } - var addAttributes map[attr.Name]struct{} + var addAttributes Sections[map[attr.Name]struct{}] // if the "include" list is empty, we use the default attributes // as included if len(inclusionLists.Include) == 0 { - addAttributes = metricAttributes.Default() + addAttributes = attributeNames.Default() } else { - addAttributes = map[attr.Name]struct{}{} - for attrName := range metricAttributes.All() { + addAttributes = Sections[map[attr.Name]struct{}]{ + Metric: map[attr.Name]struct{}{}, + Resource: map[attr.Name]struct{}{}, + } + allAttributes := attributeNames.All() + for attrName := range allAttributes.Metric { if inclusionLists.includes(attrName) { - addAttributes[attrName] = struct{}{} + addAttributes.Metric[attrName] = struct{}{} + } + } + for attrName := range allAttributes.Resource { + if inclusionLists.includes(attrName) { + addAttributes.Resource[attrName] = struct{}{} } } } // now remove any attribute specified in the "exclude" list - maps.DeleteFunc(addAttributes, func(attr attr.Name, _ struct{}) bool { + maps.DeleteFunc(addAttributes.Metric, func(attr attr.Name, _ struct{}) bool { + return inclusionLists.excludes(attr) + }) + maps.DeleteFunc(addAttributes.Resource, func(attr attr.Name, _ struct{}) bool { return inclusionLists.excludes(attr) }) - attrs := helpers.SetToSlice(addAttributes) - slices.Sort(attrs) - return attrs + sas := Sections[[]attr.Name]{ + Metric: helpers.SetToSlice(addAttributes.Metric), + Resource: helpers.SetToSlice(addAttributes.Resource), + } + slices.Sort(sas.Metric) + slices.Sort(sas.Resource) + return sas } // All te attributes for this group and their subgroups, unless they are disabled. -func (p *AttrReportGroup) All() map[attr.Name]struct{} { +func (p *AttrReportGroup) All() Sections[map[attr.Name]struct{}] { + sas := Sections[map[attr.Name]struct{}]{ + Metric: map[attr.Name]struct{}{}, + Resource: map[attr.Name]struct{}{}, + } if p.Disabled { - return map[attr.Name]struct{}{} + return sas } - attrs := map[attr.Name]struct{}{} for _, parent := range p.SubGroups { - maps.Copy(attrs, parent.All()) + psas := parent.All() + maps.Copy(sas.Metric, psas.Metric) + maps.Copy(sas.Resource, psas.Resource) + } + for k := range p.MetricAttributes { + sas.Metric[k] = struct{}{} } - for k := range p.Attributes { - attrs[k] = struct{}{} + for k := range p.ResourceAttributes { + sas.Resource[k] = struct{}{} } - return attrs + return sas } // Default attributes for this group and their subgroups, unless they are disabled. -func (p *AttrReportGroup) Default() map[attr.Name]struct{} { +func (p *AttrReportGroup) Default() Sections[map[attr.Name]struct{}] { + sas := Sections[map[attr.Name]struct{}]{ + Metric: map[attr.Name]struct{}{}, + Resource: map[attr.Name]struct{}{}, + } if p.Disabled { - return map[attr.Name]struct{}{} + return sas } - attrs := map[attr.Name]struct{}{} for _, parent := range p.SubGroups { - maps.Copy(attrs, parent.Default()) + psas := parent.Default() + maps.Copy(sas.Metric, psas.Metric) + maps.Copy(sas.Resource, psas.Resource) + } + for k, def := range p.MetricAttributes { + if def { + sas.Metric[k] = struct{}{} + } } - for k, def := range p.Attributes { + for k, def := range p.ResourceAttributes { if def { - attrs[k] = struct{}{} + sas.Resource[k] = struct{}{} } } - return attrs + return sas } diff --git a/pkg/internal/export/attributes/attr_selector_test.go b/pkg/internal/export/attributes/attr_selector_test.go index b2ecaef2b..4b2c84ea8 100644 --- a/pkg/internal/export/attributes/attr_selector_test.go +++ b/pkg/internal/export/attributes/attr_selector_test.go @@ -31,15 +31,18 @@ func TestFor(t *testing.T) { }, }) require.NoError(t, err) - assert.Equal(t, []attr.Name{ - "beyla.ip", - "k8s.dst.namespace", - "k8s.dst.node.ip", - "k8s.src.namespace", - "k8s.src.node.ip", - "src.address", - "src.name", - "src.port", + assert.Equal(t, Sections[[]attr.Name]{ + Metric: []attr.Name{ + "beyla.ip", + "k8s.dst.namespace", + "k8s.dst.node.ip", + "k8s.src.namespace", + "k8s.src.node.ip", + "src.address", + "src.name", + "src.port", + }, + Resource: []attr.Name{}, }, p.For(BeylaNetworkFlow)) } @@ -51,10 +54,13 @@ func TestFor_KubeDisabled(t *testing.T) { }, }) require.NoError(t, err) - assert.Equal(t, []attr.Name{ - "beyla.ip", - "src.address", - "src.name", + assert.Equal(t, Sections[[]attr.Name]{ + Metric: []attr.Name{ + "beyla.ip", + "src.address", + "src.name", + }, + Resource: []attr.Name{}, }, p.For(BeylaNetworkFlow)) } @@ -69,12 +75,15 @@ func TestNilDoesNotCrash(t *testing.T) { func TestDefault(t *testing.T) { p, err := NewAttrSelector(GroupKubernetes, nil) require.NoError(t, err) - assert.Equal(t, []attr.Name{ - "k8s.cluster.name", - "k8s.dst.namespace", - "k8s.dst.owner.name", - "k8s.src.namespace", - "k8s.src.owner.name", + assert.Equal(t, Sections[[]attr.Name]{ + Metric: []attr.Name{ + "k8s.cluster.name", + "k8s.dst.namespace", + "k8s.dst.owner.name", + "k8s.src.namespace", + "k8s.src.owner.name", + }, + Resource: []attr.Name{}, }, p.For(BeylaNetworkFlow)) } @@ -85,7 +94,10 @@ func TestTraces(t *testing.T) { }, }) require.NoError(t, err) - assert.Equal(t, []attr.Name{ - "db.query.text", + assert.Equal(t, Sections[[]attr.Name]{ + Metric: []attr.Name{ + "db.query.text", + }, + Resource: []attr.Name{}, }, p.For(Traces)) } diff --git a/pkg/internal/export/otel/metrics.go b/pkg/internal/export/otel/metrics.go index 217ace807..9862fdfcb 100644 --- a/pkg/internal/export/otel/metrics.go +++ b/pkg/internal/export/otel/metrics.go @@ -166,15 +166,15 @@ type MetricsReporter struct { is instrumentations.InstrumentationSelection // user-selected fields for each of the reported metrics - attrHTTPDuration []attributes.Field[*request.Span, attribute.KeyValue] - attrHTTPClientDuration []attributes.Field[*request.Span, attribute.KeyValue] - attrGRPCServer []attributes.Field[*request.Span, attribute.KeyValue] - attrGRPCClient []attributes.Field[*request.Span, attribute.KeyValue] - attrDBClient []attributes.Field[*request.Span, attribute.KeyValue] - attrMessagingPublish []attributes.Field[*request.Span, attribute.KeyValue] - attrMessagingProcess []attributes.Field[*request.Span, attribute.KeyValue] - attrHTTPRequestSize []attributes.Field[*request.Span, attribute.KeyValue] - attrHTTPClientRequestSize []attributes.Field[*request.Span, attribute.KeyValue] + attrHTTPDuration attributes.Sections[[]attributes.Field[*request.Span, attribute.KeyValue]] + attrHTTPClientDuration attributes.Sections[[]attributes.Field[*request.Span, attribute.KeyValue]] + attrGRPCServer attributes.Sections[[]attributes.Field[*request.Span, attribute.KeyValue]] + attrGRPCClient attributes.Sections[[]attributes.Field[*request.Span, attribute.KeyValue]] + attrDBClient attributes.Sections[[]attributes.Field[*request.Span, attribute.KeyValue]] + attrMessagingPublish attributes.Sections[[]attributes.Field[*request.Span, attribute.KeyValue]] + attrMessagingProcess attributes.Sections[[]attributes.Field[*request.Span, attribute.KeyValue]] + attrHTTPRequestSize attributes.Sections[[]attributes.Field[*request.Span, attribute.KeyValue]] + attrHTTPClientRequestSize attributes.Sections[[]attributes.Field[*request.Span, attribute.KeyValue]] } // Metrics is a set of metrics associated to a given OTEL MeterProvider. @@ -379,28 +379,28 @@ func (mr *MetricsReporter) setupOtelMeters(m *Metrics, meter instrument.Meter) e return fmt.Errorf("creating http duration histogram metric: %w", err) } m.httpDuration = NewExpirer[*request.Span, instrument.Float64Histogram, float64]( - m.ctx, httpDuration, mr.attrHTTPDuration, timeNow, mr.cfg.TTL) + m.ctx, httpDuration, mr.attrHTTPDuration.Metric, timeNow, mr.cfg.TTL) httpClientDuration, err := meter.Float64Histogram(attributes.HTTPClientDuration.OTEL, instrument.WithUnit("s")) if err != nil { return fmt.Errorf("creating http duration histogram metric: %w", err) } m.httpClientDuration = NewExpirer[*request.Span, instrument.Float64Histogram, float64]( - m.ctx, httpClientDuration, mr.attrHTTPClientDuration, timeNow, mr.cfg.TTL) + m.ctx, httpClientDuration, mr.attrHTTPClientDuration.Metric, timeNow, mr.cfg.TTL) httpRequestSize, err := meter.Float64Histogram(attributes.HTTPServerRequestSize.OTEL, instrument.WithUnit("By")) if err != nil { return fmt.Errorf("creating http size histogram metric: %w", err) } m.httpRequestSize = NewExpirer[*request.Span, instrument.Float64Histogram, float64]( - m.ctx, httpRequestSize, mr.attrHTTPRequestSize, timeNow, mr.cfg.TTL) + m.ctx, httpRequestSize, mr.attrHTTPRequestSize.Metric, timeNow, mr.cfg.TTL) httpClientRequestSize, err := meter.Float64Histogram(attributes.HTTPClientRequestSize.OTEL, instrument.WithUnit("By")) if err != nil { return fmt.Errorf("creating http size histogram metric: %w", err) } m.httpClientRequestSize = NewExpirer[*request.Span, instrument.Float64Histogram, float64]( - m.ctx, httpClientRequestSize, mr.attrHTTPClientRequestSize, timeNow, mr.cfg.TTL) + m.ctx, httpClientRequestSize, mr.attrHTTPClientRequestSize.Metric, timeNow, mr.cfg.TTL) } if mr.is.GRPCEnabled() { @@ -409,14 +409,14 @@ func (mr *MetricsReporter) setupOtelMeters(m *Metrics, meter instrument.Meter) e return fmt.Errorf("creating grpc duration histogram metric: %w", err) } m.grpcDuration = NewExpirer[*request.Span, instrument.Float64Histogram, float64]( - m.ctx, grpcDuration, mr.attrGRPCServer, timeNow, mr.cfg.TTL) + m.ctx, grpcDuration, mr.attrGRPCServer.Metric, timeNow, mr.cfg.TTL) grpcClientDuration, err := meter.Float64Histogram(attributes.RPCClientDuration.OTEL, instrument.WithUnit("s")) if err != nil { return fmt.Errorf("creating grpc duration histogram metric: %w", err) } m.grpcClientDuration = NewExpirer[*request.Span, instrument.Float64Histogram, float64]( - m.ctx, grpcClientDuration, mr.attrGRPCClient, timeNow, mr.cfg.TTL) + m.ctx, grpcClientDuration, mr.attrGRPCClient.Metric, timeNow, mr.cfg.TTL) } if mr.is.DBEnabled() { @@ -425,7 +425,7 @@ func (mr *MetricsReporter) setupOtelMeters(m *Metrics, meter instrument.Meter) e return fmt.Errorf("creating db client duration histogram metric: %w", err) } m.dbClientDuration = NewExpirer[*request.Span, instrument.Float64Histogram, float64]( - m.ctx, dbClientDuration, mr.attrDBClient, timeNow, mr.cfg.TTL) + m.ctx, dbClientDuration, mr.attrDBClient.Metric, timeNow, mr.cfg.TTL) } if mr.is.MQEnabled() { @@ -434,14 +434,14 @@ func (mr *MetricsReporter) setupOtelMeters(m *Metrics, meter instrument.Meter) e return fmt.Errorf("creating messaging client publish duration histogram metric: %w", err) } m.msgPublishDuration = NewExpirer[*request.Span, instrument.Float64Histogram, float64]( - m.ctx, msgPublishDuration, mr.attrMessagingPublish, timeNow, mr.cfg.TTL) + m.ctx, msgPublishDuration, mr.attrMessagingPublish.Metric, timeNow, mr.cfg.TTL) msgProcessDuration, err := meter.Float64Histogram(attributes.MessagingProcessDuration.OTEL, instrument.WithUnit("s")) if err != nil { return fmt.Errorf("creating messaging client process duration histogram metric: %w", err) } m.msgProcessDuration = NewExpirer[*request.Span, instrument.Float64Histogram, float64]( - m.ctx, msgProcessDuration, mr.attrMessagingProcess, timeNow, mr.cfg.TTL) + m.ctx, msgProcessDuration, mr.attrMessagingProcess.Metric, timeNow, mr.cfg.TTL) } return nil diff --git a/pkg/internal/export/otel/metrics_net.go b/pkg/internal/export/otel/metrics_net.go index e7ee62a5d..af8dac60a 100644 --- a/pkg/internal/export/otel/metrics_net.go +++ b/pkg/internal/export/otel/metrics_net.go @@ -113,7 +113,7 @@ func newMetricsExporter(ctx context.Context, ctxInfo *global.ContextInfo, cfg *N log.Error("creating observable counter", "error", err) return nil, err } - expirer := NewExpirer[*ebpf.Record, metric2.Int64Counter, float64](ctx, bytesMetric, attrs, clock.Time, cfg.Metrics.TTL) + expirer := NewExpirer[*ebpf.Record, metric2.Int64Counter, float64](ctx, bytesMetric, attrs.Metric, clock.Time, cfg.Metrics.TTL) log.Debug("restricting attributes not in this list", "attributes", cfg.AttributeSelectors) return &netMetricsExporter{ ctx: ctx, diff --git a/pkg/internal/export/otel/metrics_proc.go b/pkg/internal/export/otel/metrics_proc.go index 86253fb07..e1bf0e042 100644 --- a/pkg/internal/export/otel/metrics_proc.go +++ b/pkg/internal/export/otel/metrics_proc.go @@ -56,12 +56,12 @@ type procMetricsExporter struct { log *slog.Logger - attrCPUTime []attributes.Field[*process.Status, attribute.KeyValue] - attrCPUUtil []attributes.Field[*process.Status, attribute.KeyValue] - attrMemory []attributes.Field[*process.Status, attribute.KeyValue] - attrMemoryVirtual []attributes.Field[*process.Status, attribute.KeyValue] - attrDisk []attributes.Field[*process.Status, attribute.KeyValue] - attrNet []attributes.Field[*process.Status, attribute.KeyValue] + attrCPUTime attributes.Sections[[]attributes.Field[*process.Status, attribute.KeyValue]] + attrCPUUtil attributes.Sections[[]attributes.Field[*process.Status, attribute.KeyValue]] + attrMemory attributes.Sections[[]attributes.Field[*process.Status, attribute.KeyValue]] + attrMemoryVirtual attributes.Sections[[]attributes.Field[*process.Status, attribute.KeyValue]] + attrDisk attributes.Sections[[]attributes.Field[*process.Status, attribute.KeyValue]] + attrNet attributes.Sections[[]attributes.Field[*process.Status, attribute.KeyValue]] // the observation code for CPU metrics will be different depending on // the "process.cpu.state" attribute being selected or not @@ -144,22 +144,23 @@ func newProcMetricsExporter( attrDisk: attrDisk, attrNet: attrNet, } - if slices.Contains(cpuTimeNames, attr2.ProcCPUState) { + // TODO: replace .Metric by .Resource when we move process attributes to the resource + if slices.Contains(cpuTimeNames.Metric, attr2.ProcCPUState) { mr.cpuTimeObserver = cpuTimeDisaggregatedObserver } else { mr.cpuTimeObserver = cpuTimeAggregatedObserver } - if slices.Contains(cpuUtilNames, attr2.ProcCPUState) { + if slices.Contains(cpuUtilNames.Metric, attr2.ProcCPUState) { mr.cpuUtilisationObserver = cpuUtilisationDisaggregatedObserver } else { mr.cpuUtilisationObserver = cpuUtilisationAggregatedObserver } - if slices.Contains(diskNames, attr2.ProcDiskIODir) { + if slices.Contains(diskNames.Metric, attr2.ProcDiskIODir) { mr.diskObserver = diskDisaggregatedObserver } else { mr.diskObserver = diskAggregatedObserver } - if slices.Contains(netNames, attr2.ProcNetIODir) { + if slices.Contains(netNames.Metric, attr2.ProcNetIODir) { mr.netObserver = netDisaggregatedObserver } else { mr.netObserver = netAggregatedObserver @@ -212,7 +213,7 @@ func (me *procMetricsExporter) newMetricSet(service *svc.ID) (*procMetrics, erro return nil, err } else { m.cpuTime = NewExpirer[*process.Status, metric2.Float64Counter, float64]( - me.ctx, cpuTime, me.attrCPUTime, timeNow, me.cfg.Metrics.TTL) + me.ctx, cpuTime, me.attrCPUTime.Metric, timeNow, me.cfg.Metrics.TTL) } if cpuUtilisation, err := meter.Float64Gauge( @@ -224,7 +225,7 @@ func (me *procMetricsExporter) newMetricSet(service *svc.ID) (*procMetrics, erro return nil, err } else { m.cpuUtilisation = NewExpirer[*process.Status, metric2.Float64Gauge, float64]( - me.ctx, cpuUtilisation, me.attrCPUUtil, timeNow, me.cfg.Metrics.TTL) + me.ctx, cpuUtilisation, me.attrCPUUtil.Metric, timeNow, me.cfg.Metrics.TTL) } // memory metrics are defined as UpDownCounters in the Otel specification, but we @@ -239,7 +240,7 @@ func (me *procMetricsExporter) newMetricSet(service *svc.ID) (*procMetrics, erro return nil, err } else { m.memory = NewExpirer[*process.Status, metric2.Int64UpDownCounter, int64]( - me.ctx, memory, me.attrMemory, timeNow, me.cfg.Metrics.TTL) + me.ctx, memory, me.attrMemory.Metric, timeNow, me.cfg.Metrics.TTL) } if memoryVirtual, err := meter.Int64UpDownCounter( @@ -251,7 +252,7 @@ func (me *procMetricsExporter) newMetricSet(service *svc.ID) (*procMetrics, erro return nil, err } else { m.memoryVirtual = NewExpirer[*process.Status, metric2.Int64UpDownCounter, int64]( - me.ctx, memoryVirtual, me.attrMemoryVirtual, timeNow, me.cfg.Metrics.TTL) + me.ctx, memoryVirtual, me.attrMemoryVirtual.Metric, timeNow, me.cfg.Metrics.TTL) } if disk, err := meter.Int64Counter( @@ -263,7 +264,7 @@ func (me *procMetricsExporter) newMetricSet(service *svc.ID) (*procMetrics, erro return nil, err } else { m.disk = NewExpirer[*process.Status, metric2.Int64Counter, int64]( - me.ctx, disk, me.attrDisk, timeNow, me.cfg.Metrics.TTL) + me.ctx, disk, me.attrDisk.Metric, timeNow, me.cfg.Metrics.TTL) } if net, err := meter.Int64Counter( @@ -275,7 +276,7 @@ func (me *procMetricsExporter) newMetricSet(service *svc.ID) (*procMetrics, erro return nil, err } else { m.net = NewExpirer[*process.Status, metric2.Int64Counter, int64]( - me.ctx, net, me.attrNet, timeNow, me.cfg.Metrics.TTL) + me.ctx, net, me.attrNet.Metric, timeNow, me.cfg.Metrics.TTL) } return &m, nil } diff --git a/pkg/internal/export/otel/traces.go b/pkg/internal/export/otel/traces.go index 87b7c9ebd..463f00263 100644 --- a/pkg/internal/export/otel/traces.go +++ b/pkg/internal/export/otel/traces.go @@ -154,7 +154,10 @@ func GetUserSelectedAttributes(attrs attributes.Selection) (map[attr.Name]struct } traceAttrsArr := attribProvider.For(attributes.Traces) traceAttrs := make(map[attr.Name]struct{}) - for _, a := range traceAttrsArr { + for _, a := range traceAttrsArr.Metric { + traceAttrs[a] = struct{}{} + } + for _, a := range traceAttrsArr.Resource { traceAttrs[a] = struct{}{} } diff --git a/pkg/internal/export/prom/prom_proc.go b/pkg/internal/export/prom/prom_proc.go index 069a94ff2..a2e4f71a4 100644 --- a/pkg/internal/export/prom/prom_proc.go +++ b/pkg/internal/export/prom/prom_proc.go @@ -291,7 +291,8 @@ func attributesWithExplicit( // we need to be aware of the user willing to add it to explicitly choose between // observeAggregatedCPU and observeDisaggregatedCPU // Similar for "process_disk_io" or "process_network_io" - containsExplicit = slices.Contains(attrNames, explicitAttribute) + containsExplicit = slices.Contains(attrNames.Metric, explicitAttribute) || + slices.Contains(attrNames.Resource, explicitAttribute) getters = attributes.PrometheusGetters(process.PromGetters, attrNames) if containsExplicit { diff --git a/pkg/internal/pipe/instrumenter_test.go b/pkg/internal/pipe/instrumenter_test.go index 81c875572..e31c73557 100644 --- a/pkg/internal/pipe/instrumenter_test.go +++ b/pkg/internal/pipe/instrumenter_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/mariomac/guara/pkg/test" "github.com/mariomac/pipes/pipe" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -100,9 +101,11 @@ func TestBasicPipeline(t *testing.T) { string(attr.HTTPUrlPath): "/foo/bar", string(attr.ClientAddr): "1.1.1.1", string(semconv.ServiceNameKey): "foo-svc", + string(semconv.ServiceNamespaceKey): "ns", }, ResourceAttributes: map[string]string{ string(semconv.ServiceNameKey): "foo-svc", + string(semconv.ServiceNamespaceKey): "ns", string(semconv.TelemetrySDKLanguageKey): "go", string(semconv.TelemetrySDKNameKey): "beyla", }, @@ -298,12 +301,14 @@ func TestRouteConsolidation(t *testing.T) { Unit: "s", Attributes: map[string]string{ string(semconv.ServiceNameKey): "svc-1", + string(semconv.ServiceNamespaceKey): "ns", string(attr.HTTPRequestMethod): "GET", string(attr.HTTPResponseStatusCode): "200", string(semconv.HTTPRouteKey): "/user/{id}", }, ResourceAttributes: map[string]string{ string(semconv.ServiceNameKey): "svc-1", + string(semconv.ServiceNamespaceKey): "ns", string(semconv.TelemetrySDKLanguageKey): "go", string(semconv.TelemetrySDKNameKey): "beyla", }, @@ -316,12 +321,14 @@ func TestRouteConsolidation(t *testing.T) { Unit: "s", Attributes: map[string]string{ string(semconv.ServiceNameKey): "svc-1", + string(semconv.ServiceNamespaceKey): "ns", string(attr.HTTPRequestMethod): "GET", string(attr.HTTPResponseStatusCode): "200", string(semconv.HTTPRouteKey): "/products/{id}/push", }, ResourceAttributes: map[string]string{ string(semconv.ServiceNameKey): "svc-1", + string(semconv.ServiceNamespaceKey): "ns", string(semconv.TelemetrySDKLanguageKey): "go", string(semconv.TelemetrySDKNameKey): "beyla", }, @@ -334,12 +341,14 @@ func TestRouteConsolidation(t *testing.T) { Unit: "s", Attributes: map[string]string{ string(semconv.ServiceNameKey): "svc-1", + string(semconv.ServiceNamespaceKey): "ns", string(attr.HTTPRequestMethod): "GET", string(attr.HTTPResponseStatusCode): "200", string(semconv.HTTPRouteKey): "/**", }, ResourceAttributes: map[string]string{ string(semconv.ServiceNameKey): "svc-1", + string(semconv.ServiceNamespaceKey): "ns", string(semconv.TelemetrySDKLanguageKey): "go", string(semconv.TelemetrySDKNameKey): "beyla", }, @@ -388,6 +397,7 @@ func TestGRPCPipeline(t *testing.T) { Unit: "s", Attributes: map[string]string{ string(semconv.ServiceNameKey): "grpc-svc", + string(semconv.ServiceNamespaceKey): "", string(semconv.RPCSystemKey): "grpc", string(semconv.RPCGRPCStatusCodeKey): "3", string(semconv.RPCMethodKey): "/foo/bar", @@ -477,6 +487,7 @@ func TestBasicPipelineInfo(t *testing.T) { string(attr.HTTPUrlPath): "/aaa/bbb", string(attr.ClientAddr): "1.1.1.1", string(semconv.ServiceNameKey): "comm", + string(semconv.ServiceNamespaceKey): "", }, ResourceAttributes: map[string]string{ string(semconv.ServiceNameKey): "comm", @@ -550,28 +561,61 @@ func TestSpanAttributeFilterNode(t *testing.T) { go pipe.Run(ctx) // expect to receive only the records matching the Filters criteria - events := map[string]map[string]string{} - event := testutil.ReadChannel(t, tc.Records(), testTimeout) - assert.Equal(t, "http.server.request.duration", event.Name) - events[event.Attributes["url.path"]] = event.Attributes - event = testutil.ReadChannel(t, tc.Records(), testTimeout) - assert.Equal(t, "http.server.request.duration", event.Name) - events[event.Attributes["url.path"]] = event.Attributes + events := map[string]attributes.Sections[map[string]string]{} + var event collector.MetricRecord + test.Eventually(t, testTimeout, func(it require.TestingT) { + event = testutil.ReadChannel(t, tc.Records(), testTimeout) + require.Equal(it, "http.server.request.duration", event.Name) + require.Equal(it, "/user/1234", event.Attributes["url.path"]) + }) + events[event.Attributes["url.path"]] = attributes.Sections[map[string]string]{ + Metric: event.Attributes, + Resource: event.ResourceAttributes, + } + test.Eventually(t, testTimeout, func(it require.TestingT) { + event = testutil.ReadChannel(t, tc.Records(), testTimeout) + require.Equal(it, "http.server.request.duration", event.Name) + require.Equal(it, "/user/4321", event.Attributes["url.path"]) + }) + events[event.Attributes["url.path"]] = attributes.Sections[map[string]string]{ + Metric: event.Attributes, + Resource: event.ResourceAttributes, + } - assert.Equal(t, map[string]map[string]string{ + assert.Equal(t, map[string]attributes.Sections[map[string]string]{ "/user/1234": { - string(semconv.ServiceNameKey): "svc-1", - string(attr.ClientAddr): "1.1.1.1", - string(attr.HTTPRequestMethod): "GET", - string(attr.HTTPResponseStatusCode): "201", - string(attr.HTTPUrlPath): "/user/1234", + Metric: map[string]string{ + string(attr.ClientAddr): "1.1.1.1", + string(attr.HTTPRequestMethod): "GET", + string(attr.HTTPResponseStatusCode): "201", + string(attr.HTTPUrlPath): "/user/1234", + string(semconv.ServiceNameKey): "svc-1", + string(semconv.ServiceNamespaceKey): "ns", + }, + Resource: map[string]string{ + string(semconv.ServiceNameKey): "svc-1", + string(semconv.ServiceNamespaceKey): "ns", + string(semconv.TelemetrySDKLanguageKey): "go", + string(semconv.TelemetrySDKNameKey): "beyla", + string(semconv.ServiceInstanceIDKey): "", + }, }, "/user/4321": { - string(semconv.ServiceNameKey): "svc-3", - string(attr.ClientAddr): "1.1.1.1", - string(attr.HTTPRequestMethod): "GET", - string(attr.HTTPResponseStatusCode): "203", - string(attr.HTTPUrlPath): "/user/4321", + Metric: map[string]string{ + string(semconv.ServiceNameKey): "svc-3", + string(semconv.ServiceNamespaceKey): "ns", + string(attr.ClientAddr): "1.1.1.1", + string(attr.HTTPRequestMethod): "GET", + string(attr.HTTPResponseStatusCode): "203", + string(attr.HTTPUrlPath): "/user/4321", + }, + Resource: map[string]string{ + string(semconv.ServiceNameKey): "svc-3", + string(semconv.ServiceNamespaceKey): "ns", + string(semconv.TelemetrySDKLanguageKey): "go", + string(semconv.TelemetrySDKNameKey): "beyla", + string(semconv.ServiceInstanceIDKey): "", + }, }, }, events) } @@ -588,7 +632,7 @@ func newRequest(serviceName string, method, path, peer string, status int) []req Start: 2, RequestStart: 1, End: 3, - ServiceID: svc.ID{Name: serviceName}, + ServiceID: svc.ID{Namespace: "ns", Name: serviceName, UID: svc.UID(serviceName)}, }} } @@ -604,7 +648,7 @@ func newRequestWithTiming(svcName string, kind request.EventType, method, path, RequestStart: int64(goStart), Start: int64(start), End: int64(end), - ServiceID: svc.ID{Name: svcName}, + ServiceID: svc.ID{Name: svcName, UID: svc.UID(svcName)}, }} } @@ -648,6 +692,7 @@ func matchTraceEvent(t require.TestingT, name string, event collector.TraceRecor }, ResourceAttributes: map[string]string{ string(semconv.ServiceNameKey): "bar-svc", + string(semconv.ServiceNamespaceKey): "ns", string(semconv.TelemetrySDKLanguageKey): "go", string(semconv.TelemetrySDKNameKey): "beyla", string(semconv.OTelLibraryNameKey): "github.com/grafana/beyla", @@ -666,6 +711,7 @@ func matchInnerTraceEvent(t require.TestingT, name string, event collector.Trace }, ResourceAttributes: map[string]string{ string(semconv.ServiceNameKey): "bar-svc", + string(semconv.ServiceNamespaceKey): "ns", string(semconv.TelemetrySDKLanguageKey): "go", string(semconv.TelemetrySDKNameKey): "beyla", string(semconv.OTelLibraryNameKey): "github.com/grafana/beyla", diff --git a/test/integration/k8s/daemonset/k8s_daemonset_z_metrics_test.go b/test/integration/k8s/daemonset/k8s_daemonset_z_metrics_test.go index 727ac19d1..f6d718f30 100644 --- a/test/integration/k8s/daemonset/k8s_daemonset_z_metrics_test.go +++ b/test/integration/k8s/daemonset/k8s_daemonset_z_metrics_test.go @@ -16,6 +16,5 @@ func TestProcessMetrics(t *testing.T) { "k8s_deployment_name": "^otherinstance$", "k8s_replicaset_name": "^otherinstance-.*", "k8s_pod_name": "^otherinstance-.*", - "host_name": "^otherinstance-.*", })) }