diff --git a/docs/sources/configure/options.md b/docs/sources/configure/options.md index 8053d62a6..3b940c12e 100644 --- a/docs/sources/configure/options.md +++ b/docs/sources/configure/options.md @@ -199,7 +199,7 @@ This section allows specifying different selection criteria for different servic as well as overriding some of their metadata, such as their reported name or namespace. -For more details about this section, please go to the [discovery services section](#discovery-services-section) +For more details about this section, go to the [discovery services section](#discovery-services-section) of this document. | YAML | Environment variable | Type | Default | @@ -447,8 +447,8 @@ a 'Traceparent' header value, it will use the provided 'trace id' to create its This option does not have an effect on Go applications, where the 'Traceparent' field is always processed, without additional tracking of the request headers. -Enabling this option may increase Beyla's performance overhead in high request volume scenarios. -Please note that this option is only useful when generating Beyla traces, it does not affect +Enabling this option may increase the performance overhead in high request volume scenarios. +This option is only useful when generating Beyla traces, it does not affect generation of Beyla metrics. ## Configuration of metrics and traces attributes @@ -522,7 +522,7 @@ attributes: ``` It is IMPORTANT to consider that enabling this feature requires a previous step of -providing some extra permissions to the Beyla Pod. Please check the +providing some extra permissions to the Beyla Pod. Consult the ["Configuring Kubernetes metadata decoration section" in the "Running Beyla in Kubernetes"]({{< relref "../setup/kubernetes.md" >}}) page. | YAML | Environment variable | Type | Default | @@ -643,7 +643,7 @@ Possible values for the `ignore_mode` property are: Selectively ignoring only certain type of events might be useful in certain scenarios. For example, you may want to know the performance metrics of your health check API, but you wouldn't want the overhead of those trace records in -your target traces database. In this this example scenario, you would set the `ignore_mode` property to `traces`, such +your target traces database. In this example scenario, you would set the `ignore_mode` property to `traces`, such that only traces matching the `ignored_patterns` will be discarded, while metrics will still be recorded. | YAML | Environment variable | Type | Default | @@ -688,7 +688,7 @@ document/d/*/edit ## OTEL metrics exporter > ℹ️ If you plan to use Beyla to send metrics to Grafana Cloud, -> please check the [Grafana Cloud OTEL exporter for metrics and traces](#using-the-grafana-cloud-otel-endpoint-to-ingest-metrics-and-traces) +> consult the [Grafana Cloud OTEL exporter for metrics and traces](#using-the-grafana-cloud-otel-endpoint-to-ingest-metrics-and-traces) > section for easier configuration. YAML section `otel_metrics_export`. @@ -785,7 +785,13 @@ of Beyla: application-level metrics or network metrics. process matching the entries in the `discovery` section. - If the list contains `application_span`, the Beyla OpenTelemetry exporter exports application-level trace span metrics; but only if there is defined an OpenTelemetry endpoint, and Beyla was able to discover any - process matching the entries in the `discovery` section. + process matching the entries in the `discovery` section. +- If the list contains `application_service_graph`, the Beyla OpenTelemetry exporter exports application-level service graph metrics; + but only if there is defined an OpenTelemetry endpoint, and Beyla was able to discover any + process matching the entries in the `discovery` section. + For best experience with generating service graph metrics, use a DNS for service discovery and make sure the DNS names match + the OpenTelemetry service names used in Beyla. In Kubernetes environments, the OpenTelemetry service name set by the service name + discovery is the best choice for service graph metrics. - If the list contains `network`, the Beyla OpenTelemetry exporter exports network-level metrics; but only if there is defined an OpenTelemetry endpoint and the [network metrics are enabled]({{< relref "../network" >}}). @@ -867,7 +873,7 @@ for more information. ## OTEL traces exporter > ℹ️ If you plan to use Beyla to send metrics to Grafana Cloud, -> please check the [Grafana Cloud OTEL exporter for metrics and traces](#using-the-grafana-cloud-otel-endpoint-to-ingest-metrics-and-traces) +> consult the [Grafana Cloud OTEL exporter for metrics and traces](#using-the-grafana-cloud-otel-endpoint-to-ingest-metrics-and-traces) > section for easier configuration. YAML section `otel_traces_export`. @@ -1096,6 +1102,12 @@ of Beyla: application-level metrics or network metrics. - If the list contains `application_span`, the Beyla Prometheus exporter exports application-level metrics in traces span metrics format; but only if the Prometheus `port` property is defined, and Beyla was able to discover any process matching the entries in the `discovery` section. +- If the list contains `application_service_graph`, the Beyla Prometheus exporter exports application-level service graph metrics; + but only if the Prometheus `port` property is defined, and Beyla was able to discover any + process matching the entries in the `discovery` section. + For best experience with generating service graph metrics, use a DNS for service discovery and make sure the DNS names match + the OpenTelemetry service names used in Beyla. In Kubernetes environments, the OpenTelemetry service name set by the service name + discovery is the best choice for service graph metrics. - If the list contains `network`, the Beyla Prometheus exporter exports network-level metrics; but only if the Prometheus `port` property is defined and the [network metrics are enabled]({{< relref "../network" >}}). diff --git a/pkg/beyla/config.go b/pkg/beyla/config.go index fcb1f59e3..b954c2d45 100644 --- a/pkg/beyla/config.go +++ b/pkg/beyla/config.go @@ -49,6 +49,10 @@ var DefaultConfig = Config{ Submit: []string{"traces"}, }, }, + NameResolver: &transform.NameResolverConfig{ + CacheLen: 1024, + CacheTTL: 5 * time.Minute, + }, Metrics: otel.MetricsConfig{ Protocol: otel.ProtocolUnset, MetricsProtocol: otel.ProtocolUnset, @@ -106,11 +110,12 @@ type Config struct { Attributes Attributes `yaml:"attributes"` // Routes is an optional node. If not set, data will be directly forwarded to exporters. - Routes *transform.RoutesConfig `yaml:"routes"` - Metrics otel.MetricsConfig `yaml:"otel_metrics_export"` - Traces otel.TracesConfig `yaml:"otel_traces_export"` - Prometheus prom.PrometheusConfig `yaml:"prometheus_export"` - Printer debug.PrintEnabled `yaml:"print_traces" env:"BEYLA_PRINT_TRACES"` + Routes *transform.RoutesConfig `yaml:"routes"` + NameResolver *transform.NameResolverConfig `yaml:"name_resolver"` + Metrics otel.MetricsConfig `yaml:"otel_metrics_export"` + Traces otel.TracesConfig `yaml:"otel_traces_export"` + Prometheus prom.PrometheusConfig `yaml:"prometheus_export"` + Printer debug.PrintEnabled `yaml:"print_traces" env:"BEYLA_PRINT_TRACES"` // Exec allows selecting the instrumented executable whose complete path contains the Exec value. Exec services.RegexpAttr `yaml:"executable_name" env:"BEYLA_EXECUTABLE_NAME"` diff --git a/pkg/beyla/config_test.go b/pkg/beyla/config_test.go index 4708636bd..2eba696b2 100644 --- a/pkg/beyla/config_test.go +++ b/pkg/beyla/config_test.go @@ -149,6 +149,10 @@ network: }, }, Routes: &transform.RoutesConfig{}, + NameResolver: &transform.NameResolverConfig{ + CacheLen: 1024, + CacheTTL: 5 * time.Minute, + }, }, cfg) } diff --git a/pkg/beyla/network_cfg.go b/pkg/beyla/network_cfg.go index c2e7c0c3d..ce1e936a6 100644 --- a/pkg/beyla/network_cfg.go +++ b/pkg/beyla/network_cfg.go @@ -156,7 +156,7 @@ var defaultNetworkConfig = NetworkConfig{ func (nc *NetworkConfig) Validate(isKubeEnabled bool) error { if len(nc.AllowedAttributes) == 0 { - return errors.New("you must define some attributes in the allowed_attributes section. Please ceck documentation") + return errors.New("you must define some attributes in the allowed_attributes section. Please check documentation") } if isKubeEnabled { return nil diff --git a/pkg/internal/export/debug/debug.go b/pkg/internal/export/debug/debug.go index 485f2659d..4fa7f2d99 100644 --- a/pkg/internal/export/debug/debug.go +++ b/pkg/internal/export/debug/debug.go @@ -38,8 +38,8 @@ func printFunc() (pipe.FinalFunc[[]request.Span], error) { spans[i].Status, spans[i].Method, spans[i].Path, - spans[i].Peer, - spans[i].Host, + spans[i].Peer+" as "+spans[i].PeerName, + spans[i].Host+" as "+spans[i].HostName, spans[i].HostPort, spans[i].ContentLength, &spans[i].ServiceID, diff --git a/pkg/internal/export/otel/common.go b/pkg/internal/export/otel/common.go index 39f72c970..9923f82bd 100644 --- a/pkg/internal/export/otel/common.go +++ b/pkg/internal/export/otel/common.go @@ -262,6 +262,11 @@ const ( SourceKey = attribute.Key("source") ServiceKey = attribute.Key("service") InstanceKey = attribute.Key("instance") + ClientKey = attribute.Key("client") + ClientNamespaceKey = attribute.Key("client_service_namespace") + ServerKey = attribute.Key("server") + ServerNamespaceKey = attribute.Key("server_service_namespace") + ConnectionTypeKey = attribute.Key("connection_type") ) func HTTPRequestMethod(val string) attribute.KeyValue { @@ -327,3 +332,23 @@ func StatusCodeMetric(val int) attribute.KeyValue { func ServiceInstanceMetric(val string) attribute.KeyValue { return InstanceKey.String(val) } + +func ClientMetric(val string) attribute.KeyValue { + return ClientKey.String(val) +} + +func ClientNamespaceMetric(val string) attribute.KeyValue { + return ClientNamespaceKey.String(val) +} + +func ServerMetric(val string) attribute.KeyValue { + return ServerKey.String(val) +} + +func ServerNamespaceMetric(val string) attribute.KeyValue { + return ServerNamespaceKey.String(val) +} + +func ConnectionTypeMetric(val string) attribute.KeyValue { + return ConnectionTypeKey.String(val) +} diff --git a/pkg/internal/export/otel/metrics.go b/pkg/internal/export/otel/metrics.go index fa7c90cb9..aa836f9d8 100644 --- a/pkg/internal/export/otel/metrics.go +++ b/pkg/internal/export/otel/metrics.go @@ -12,6 +12,7 @@ import ( "github.com/mariomac/pipes/pipe" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" instrument "go.opentelemetry.io/otel/metric" @@ -41,6 +42,10 @@ const ( SpanMetricsCalls = "traces_spanmetrics_calls_total" SpanMetricsSizes = "traces_spanmetrics_size_total" TracesTargetInfo = "traces_target_info" + ServiceGraphClient = "traces_service_graph_request_client" + ServiceGraphServer = "traces_service_graph_request_server" + ServiceGraphFailed = "traces_service_graph_request_failed_total" + ServiceGraphTotal = "traces_service_graph_request_total" UsualPortGRPC = "4317" UsualPortHTTP = "4318" @@ -51,6 +56,7 @@ const ( FeatureNetwork = "network" FeatureApplication = "application" FeatureSpan = "application_span" + FeatureGraph = "application_service_graph" ) type MetricsConfig struct { @@ -132,12 +138,16 @@ func (m MetricsConfig) SpanMetricsEnabled() bool { return slices.Contains(m.Features, FeatureSpan) } +func (m MetricsConfig) ServiceGraphMetricsEnabled() bool { + return slices.Contains(m.Features, FeatureGraph) +} + func (m MetricsConfig) OTelMetricsEnabled() bool { return slices.Contains(m.Features, FeatureApplication) } func (m MetricsConfig) Enabled() bool { - return m.EndpointEnabled() && (m.OTelMetricsEnabled() || m.SpanMetricsEnabled()) + return m.EndpointEnabled() && (m.OTelMetricsEnabled() || m.SpanMetricsEnabled() || m.ServiceGraphMetricsEnabled()) } // MetricsReporter implements the graph node that receives request.Span @@ -167,6 +177,10 @@ type Metrics struct { spanMetricsCallsTotal instrument.Int64Counter spanMetricsSizeTotal instrument.Float64Counter tracesTargetInfo instrument.Int64UpDownCounter + serviceGraphClient instrument.Float64Histogram + serviceGraphServer instrument.Float64Histogram + serviceGraphFailed instrument.Int64Counter + serviceGraphTotal instrument.Int64Counter } func ReportMetrics( @@ -247,6 +261,19 @@ func (mr *MetricsReporter) spanMetricOptions(mlog *slog.Logger) []metric.Option } } +func (mr *MetricsReporter) graphMetricOptions(mlog *slog.Logger) []metric.Option { + if !mr.cfg.ServiceGraphMetricsEnabled() { + return []metric.Option{} + } + + useExponentialHistograms := isExponentialAggregation(mr.cfg, mlog) + + return []metric.Option{ + metric.WithView(otelHistogramConfig(ServiceGraphClient, mr.cfg.Buckets.DurationHistogram, useExponentialHistograms)), + metric.WithView(otelHistogramConfig(ServiceGraphServer, mr.cfg.Buckets.DurationHistogram, useExponentialHistograms)), + } +} + func (mr *MetricsReporter) setupOtelMeters(m *Metrics, meter instrument.Meter) error { if !mr.cfg.OTelMetricsEnabled() { return nil @@ -315,6 +342,36 @@ func (mr *MetricsReporter) setupSpanMeters(m *Metrics, meter instrument.Meter) e return nil } +func (mr *MetricsReporter) setupGraphMeters(m *Metrics, meter instrument.Meter) error { + if !mr.cfg.ServiceGraphMetricsEnabled() { + return nil + } + + var err error + + m.serviceGraphClient, err = meter.Float64Histogram(ServiceGraphClient, instrument.WithUnit("s")) + if err != nil { + return fmt.Errorf("creating service graph client histogram: %w", err) + } + + m.serviceGraphServer, err = meter.Float64Histogram(ServiceGraphServer, instrument.WithUnit("s")) + if err != nil { + return fmt.Errorf("creating service graph server histogram: %w", err) + } + + m.serviceGraphFailed, err = meter.Int64Counter(ServiceGraphFailed) + if err != nil { + return fmt.Errorf("creating service graph failed total: %w", err) + } + + m.serviceGraphTotal, err = meter.Int64Counter(ServiceGraphTotal) + if err != nil { + return fmt.Errorf("creating service graph total: %w", err) + } + + return nil +} + func (mr *MetricsReporter) newMetricSet(service svc.ID) (*Metrics, error) { mlog := mlog().With("service", service) mlog.Debug("creating new Metrics reporter") @@ -328,6 +385,7 @@ func (mr *MetricsReporter) newMetricSet(service svc.ID) (*Metrics, error) { opts = append(opts, mr.otelMetricOptions(mlog)...) opts = append(opts, mr.spanMetricOptions(mlog)...) + opts = append(opts, mr.graphMetricOptions(mlog)...) m := Metrics{ ctx: mr.ctx, @@ -357,6 +415,15 @@ func (mr *MetricsReporter) newMetricSet(service svc.ID) (*Metrics, error) { m.tracesTargetInfo.Add(mr.ctx, 1, attrOpt) } + if mr.cfg.ServiceGraphMetricsEnabled() { + err = mr.setupGraphMeters(&m, meter) + if err != nil { + return nil, err + } + attrOpt := instrument.WithAttributeSet(mr.metricResourceAttributes(service)) + m.tracesTargetInfo.Add(mr.ctx, 1, attrOpt) + } + return &m, nil } @@ -573,6 +640,30 @@ func (mr *MetricsReporter) spanMetricAttributes(span *request.Span) attribute.Se return attribute.NewSet(attrs...) } +func (mr *MetricsReporter) serviceGraphAttributes(span *request.Span) attribute.Set { + var attrs []attribute.KeyValue + if span.IsClientSpan() { + attrs = []attribute.KeyValue{ + ClientMetric(span.PeerName), + ClientNamespaceMetric(span.ServiceID.Namespace), + ServerMetric(span.HostName), + ServerNamespaceMetric(span.OtherNamespace), + ConnectionTypeMetric("virtual_node"), + SourceMetric("beyla"), + } + } else { + attrs = []attribute.KeyValue{ + ClientMetric(span.PeerName), + ClientNamespaceMetric(span.OtherNamespace), + ServerMetric(span.HostName), + ServerNamespaceMetric(span.ServiceID.Namespace), + ConnectionTypeMetric("virtual_node"), + SourceMetric("beyla"), + } + } + return attribute.NewSet(attrs...) +} + func (r *Metrics) record(span *request.Span, mr *MetricsReporter) { t := span.Timings() duration := t.End.Sub(t.RequestStart).Seconds() @@ -602,6 +693,19 @@ func (r *Metrics) record(span *request.Span, mr *MetricsReporter) { r.spanMetricsCallsTotal.Add(r.ctx, 1, attrOpt) r.spanMetricsSizeTotal.Add(r.ctx, float64(span.ContentLength), attrOpt) } + + if mr.cfg.ServiceGraphMetricsEnabled() { + attrOpt := instrument.WithAttributeSet(mr.serviceGraphAttributes(span)) + if span.IsClientSpan() { + r.serviceGraphClient.Record(r.ctx, duration, attrOpt) + } else { + r.serviceGraphServer.Record(r.ctx, duration, attrOpt) + } + r.serviceGraphTotal.Add(r.ctx, 1, attrOpt) + if SpanStatusCode(span) == codes.Error { + r.serviceGraphFailed.Add(r.ctx, 1, attrOpt) + } + } } func (mr *MetricsReporter) reportMetrics(input <-chan []request.Span) { diff --git a/pkg/internal/export/otel/traces.go b/pkg/internal/export/otel/traces.go index fb411de54..46f344d07 100644 --- a/pkg/internal/export/otel/traces.go +++ b/pkg/internal/export/otel/traces.go @@ -296,6 +296,22 @@ func SpanKindString(span *request.Span) string { return "SPAN_KIND_INTERNAL" } +func spanHost(span *request.Span) string { + if span.HostName != "" { + return span.HostName + } + + return span.Host +} + +func spanPeer(span *request.Span) string { + if span.PeerName != "" { + return span.PeerName + } + + return span.Peer +} + func TraceAttributes(span *request.Span) []attribute.KeyValue { var attrs []attribute.KeyValue @@ -305,8 +321,8 @@ func TraceAttributes(span *request.Span) []attribute.KeyValue { HTTPRequestMethod(span.Method), HTTPResponseStatusCode(span.Status), HTTPUrlPath(span.Path), - ClientAddr(span.Peer), - ServerAddr(span.Host), + ClientAddr(spanPeer(span)), + ServerAddr(spanHost(span)), ServerPort(span.HostPort), HTTPRequestBodySize(int(span.ContentLength)), } @@ -318,8 +334,8 @@ func TraceAttributes(span *request.Span) []attribute.KeyValue { semconv.RPCMethod(span.Path), semconv.RPCSystemGRPC, semconv.RPCGRPCStatusCodeKey.Int(span.Status), - ClientAddr(span.Peer), - ServerAddr(span.Host), + ClientAddr(spanPeer(span)), + ServerAddr(spanHost(span)), ServerPort(span.HostPort), } case request.EventTypeHTTPClient: @@ -327,7 +343,7 @@ func TraceAttributes(span *request.Span) []attribute.KeyValue { HTTPRequestMethod(span.Method), HTTPResponseStatusCode(span.Status), HTTPUrlFull(span.Path), - ServerAddr(span.Host), + ServerAddr(spanHost(span)), ServerPort(span.HostPort), HTTPRequestBodySize(int(span.ContentLength)), } @@ -336,7 +352,7 @@ func TraceAttributes(span *request.Span) []attribute.KeyValue { semconv.RPCMethod(span.Path), semconv.RPCSystemGRPC, semconv.RPCGRPCStatusCodeKey.Int(span.Status), - ServerAddr(span.Host), + ServerAddr(spanHost(span)), ServerPort(span.HostPort), } case request.EventTypeSQLClient: diff --git a/pkg/internal/export/otel/traces_test.go b/pkg/internal/export/otel/traces_test.go index 7f8df4aa4..4e1a4f8ef 100644 --- a/pkg/internal/export/otel/traces_test.go +++ b/pkg/internal/export/otel/traces_test.go @@ -480,6 +480,31 @@ func TestTracesIdGenerator(t *testing.T) { }) } +func TestSpanHostPeer(t *testing.T) { + sp := request.Span{ + HostName: "localhost", + Host: "127.0.0.1", + PeerName: "peerhost", + Peer: "127.0.0.2", + } + + assert.Equal(t, "localhost", spanHost(&sp)) + assert.Equal(t, "peerhost", spanPeer(&sp)) + + sp = request.Span{ + Host: "127.0.0.1", + Peer: "127.0.0.2", + } + + assert.Equal(t, "127.0.0.1", spanHost(&sp)) + assert.Equal(t, "127.0.0.2", spanPeer(&sp)) + + sp = request.Span{} + + assert.Equal(t, "", spanHost(&sp)) + assert.Equal(t, "", spanPeer(&sp)) +} + type fakeInternalTraces struct { imetrics.NoopReporter sum atomic.Int32 diff --git a/pkg/internal/export/prom/prom.go b/pkg/internal/export/prom/prom.go index 6016a40bd..99eba4628 100644 --- a/pkg/internal/export/prom/prom.go +++ b/pkg/internal/export/prom/prom.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/golang-lru/v2/expirable" "github.com/mariomac/pipes/pipe" "github.com/prometheus/client_golang/prometheus" + "go.opentelemetry.io/otel/codes" "github.com/grafana/beyla/pkg/buildinfo" "github.com/grafana/beyla/pkg/internal/connector" @@ -36,6 +37,11 @@ const ( SpanMetricsSizes = "traces_spanmetrics_size_total" TracesTargetInfo = "traces_target_info" + ServiceGraphClient = "traces_service_graph_request_client_seconds" + ServiceGraphServer = "traces_service_graph_request_server_seconds" + ServiceGraphFailed = "traces_service_graph_request_failed_total" + ServiceGraphTotal = "traces_service_graph_request_total" + // target will expose the process hostname-pid (or K8s Pod). // It is advised for users that to use relabeling rules to // override the "instance" attribute with "target" in the @@ -76,6 +82,12 @@ const ( telemetryLanguageKey = "telemetry_sdk_language" telemetrySDKKey = "telemetry_sdk_name" + clientKey = "client" + clientNamespaceKey = "client_service_namespace" + serverKey = "server" + serverNamespaceKey = "server_service_namespace" + connectionTypeKey = "connection_type" + // default values for the histogram configuration // from https://grafana.com/docs/mimir/latest/send/native-histograms/#migrate-from-classic-histograms defaultHistogramBucketFactor = 1.1 @@ -125,9 +137,13 @@ func (p PrometheusConfig) OTelMetricsEnabled() bool { return slices.Contains(p.Features, otel.FeatureApplication) } +func (p PrometheusConfig) ServiceGraphMetricsEnabled() bool { + return slices.Contains(p.Features, otel.FeatureGraph) +} + // nolint:gocritic func (p PrometheusConfig) Enabled() bool { - return (p.Port != 0 || p.Registry != nil) && (p.OTelMetricsEnabled() || p.SpanMetricsEnabled()) + return (p.Port != 0 || p.Registry != nil) && (p.OTelMetricsEnabled() || p.SpanMetricsEnabled() || p.ServiceGraphMetricsEnabled()) } type metricsReporter struct { @@ -148,6 +164,12 @@ type metricsReporter struct { spanMetricsSizeTotal *prometheus.CounterVec tracesTargetInfo *prometheus.GaugeVec + // trace service graph + serviceGraphClient *prometheus.HistogramVec + serviceGraphServer *prometheus.HistogramVec + serviceGraphFailed *prometheus.CounterVec + serviceGraphTotal *prometheus.CounterVec + promConnect *connector.PrometheusManager bgCtx context.Context @@ -266,6 +288,30 @@ func newReporter(ctx context.Context, cfg *PrometheusConfig, ctxInfo *global.Con Name: TracesTargetInfo, Help: "target service information in trace span metric format", }, labelNamesTargetInfo(ctxInfo)), + serviceGraphClient: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: ServiceGraphClient, + Help: "duration of client service calls, in seconds, in trace service graph metrics format", + Buckets: cfg.Buckets.DurationHistogram, + NativeHistogramBucketFactor: defaultHistogramBucketFactor, + NativeHistogramMaxBucketNumber: defaultHistogramMaxBucketNumber, + NativeHistogramMinResetDuration: defaultHistogramMinResetDuration, + }, labelNamesServiceGraph()), + serviceGraphServer: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: ServiceGraphServer, + Help: "duration of server service calls, in seconds, in trace service graph metrics format", + Buckets: cfg.Buckets.DurationHistogram, + NativeHistogramBucketFactor: defaultHistogramBucketFactor, + NativeHistogramMaxBucketNumber: defaultHistogramMaxBucketNumber, + NativeHistogramMinResetDuration: defaultHistogramMinResetDuration, + }, labelNamesServiceGraph()), + serviceGraphFailed: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: ServiceGraphFailed, + Help: "number of failed service calls in trace service graph metrics format", + }, labelNamesServiceGraph()), + serviceGraphTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: ServiceGraphTotal, + Help: "number of service calls in trace service graph metrics format", + }, labelNamesServiceGraph()), } if cfg.SpanMetricsEnabled() { @@ -300,6 +346,15 @@ func newReporter(ctx context.Context, cfg *PrometheusConfig, ctxInfo *global.Con ) } + if cfg.ServiceGraphMetricsEnabled() { + registeredMetrics = append(registeredMetrics, + mr.serviceGraphClient, + mr.serviceGraphServer, + mr.serviceGraphFailed, + mr.serviceGraphTotal, + ) + } + if mr.cfg.Registry != nil { mr.cfg.Registry.MustRegister(registeredMetrics...) } else { @@ -357,6 +412,19 @@ func (r *metricsReporter) observe(span *request.Span) { r.tracesTargetInfo.WithLabelValues(lv...).Add(1) } } + + if r.cfg.ServiceGraphMetricsEnabled() { + lvg := r.labelValuesServiceGraph(span) + if span.IsClientSpan() { + r.serviceGraphClient.WithLabelValues(lvg...).Observe(duration) + } else { + r.serviceGraphServer.WithLabelValues(lvg...).Observe(duration) + } + r.serviceGraphTotal.WithLabelValues(lvg...).Add(1) + if otel.SpanStatusCode(span) == codes.Error { + r.serviceGraphFailed.WithLabelValues(lvg...).Add(1) + } + } } // labelNamesSQL must return the label names in the same order as would be returned @@ -519,6 +587,7 @@ func appendK8sLabelValuesService(values []string, service svc.ID) []string { ) return values } + func labelNamesSpans() []string { return []string{serviceKey, serviceNamespaceKey, spanNameKey, statusCodeKey, spanKindKey, serviceInstanceKey, serviceJobKey, sourceKey} } @@ -571,3 +640,28 @@ func (r *metricsReporter) labelValuesTargetInfo(service svc.ID) []string { return values } + +func labelNamesServiceGraph() []string { + return []string{clientKey, clientNamespaceKey, serverKey, serverNamespaceKey, connectionTypeKey, sourceKey} +} + +func (r *metricsReporter) labelValuesServiceGraph(span *request.Span) []string { + if span.IsClientSpan() { + return []string{ + span.PeerName, + span.ServiceID.Namespace, + span.HostName, + span.OtherNamespace, + "virtual_node", + "beyla", + } + } + return []string{ + span.PeerName, + span.OtherNamespace, + span.HostName, + span.ServiceID.Namespace, + "virtual_node", + "beyla", + } +} diff --git a/pkg/internal/kube/informer.go b/pkg/internal/kube/informer.go index 8d05764f7..551c04e95 100644 --- a/pkg/internal/kube/informer.go +++ b/pkg/internal/kube/informer.go @@ -60,6 +60,7 @@ type PodInfo struct { // StartTimeStr caches value of ObjectMeta.StartTimestamp.String() StartTimeStr string ContainerIDs []string + IPs []string } type ReplicaSetInfo struct { @@ -138,6 +139,14 @@ func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFacto rmContainerIDSchema(pod.Status.EphemeralContainerStatuses[i].ContainerID)) } + ips := make([]string, 0, len(pod.Status.PodIPs)) + for _, ip := range pod.Status.PodIPs { + // ignoring host-networked Pod IPs + if ip.IP != pod.Status.HostIP { + ips = append(ips, ip.IP) + } + } + owner := OwnerFromPodInfo(pod) startTime := pod.GetCreationTimestamp().String() if log.Enabled(context.TODO(), slog.LevelDebug) { @@ -157,6 +166,7 @@ func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFacto NodeName: pod.Spec.NodeName, StartTimeStr: startTime, ContainerIDs: containerIDs, + IPs: ips, }, nil }); err != nil { return fmt.Errorf("can't set pods transform: %w", err) @@ -350,3 +360,16 @@ func (k *Metadata) AddReplicaSetEventHandler(h cache.ResourceEventHandler) error }() return err } + +func (i *PodInfo) ServiceName() string { + if i.Owner != nil { + // we have two levels of ownership at most + if i.Owner.Owner != nil { + return i.Owner.Owner.Name + } + + return i.Owner.Name + } + + return i.Name +} diff --git a/pkg/internal/kube/informer_test.go b/pkg/internal/kube/informer_test.go new file mode 100644 index 000000000..7f675c4b9 --- /dev/null +++ b/pkg/internal/kube/informer_test.go @@ -0,0 +1,51 @@ +package kube + +import ( + "testing" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestServiceName(t *testing.T) { + pod := PodInfo{ + Owner: &Owner{ + Name: "nested_one", + }, + } + + pod2 := PodInfo{ + Owner: &Owner{ + Owner: &Owner{ + Name: "nested_two", + }, + }, + } + + pod3 := PodInfo{ + Owner: &Owner{ + Owner: &Owner{ + Owner: &Owner{ + Name: "nested_three", + }, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "too_nested", + }, + } + + pod4 := PodInfo{ + ObjectMeta: metav1.ObjectMeta{ + Name: "not_nested", + }, + } + + pod5 := PodInfo{} + + assert.Equal(t, "nested_one", pod.ServiceName()) + assert.Equal(t, "nested_two", pod2.ServiceName()) + assert.Equal(t, "", pod3.ServiceName()) + assert.Equal(t, "not_nested", pod4.ServiceName()) + assert.Equal(t, "", pod5.ServiceName()) +} diff --git a/pkg/internal/pipe/instrumenter.go b/pkg/internal/pipe/instrumenter.go index 799f99ca5..6ca58eae5 100644 --- a/pkg/internal/pipe/instrumenter.go +++ b/pkg/internal/pipe/instrumenter.go @@ -22,12 +22,14 @@ import ( type nodesMap struct { TracesReader pipe.Start[[]request.Span] - // Routes is an optional pipe. If not set, data will be bypassed to the next stage in the pipeline. + // Routes is an optional pipe. If not enabled, data will be bypassed to the next stage in the pipeline. Routes pipe.Middle[[]request.Span, []request.Span] - // Kubernetes is an optional pipe. If not set, data will be bypassed to the exporters. + // Kubernetes is an optional pipe. If not enabled, data will be bypassed to the exporters. Kubernetes pipe.Middle[[]request.Span, []request.Span] + NameResolver pipe.Middle[[]request.Span, []request.Span] + AlloyTraces pipe.Final[[]request.Span] Metrics pipe.Final[[]request.Span] Traces pipe.Final[[]request.Span] @@ -42,19 +44,21 @@ type nodesMap struct { func (n *nodesMap) Connect() { n.TracesReader.SendTo(n.Routes) n.Routes.SendTo(n.Kubernetes) - n.Kubernetes.SendTo(n.AlloyTraces, n.Metrics, n.Traces, n.Prometheus, n.Printer, n.Noop) + n.Kubernetes.SendTo(n.NameResolver) + n.NameResolver.SendTo(n.AlloyTraces, n.Metrics, n.Traces, n.Prometheus, n.Printer, n.Noop) } // accessor functions to each field. Grouped here for code brevity during the pipeline build -func tracesReader(n *nodesMap) *pipe.Start[[]request.Span] { return &n.TracesReader } -func router(n *nodesMap) *pipe.Middle[[]request.Span, []request.Span] { return &n.Routes } -func kubernetes(n *nodesMap) *pipe.Middle[[]request.Span, []request.Span] { return &n.Kubernetes } -func alloyTraces(n *nodesMap) *pipe.Final[[]request.Span] { return &n.AlloyTraces } -func otelMetrics(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Metrics } -func otelTraces(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Traces } -func printer(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Printer } -func prometheus(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Prometheus } -func noop(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Noop } +func tracesReader(n *nodesMap) *pipe.Start[[]request.Span] { return &n.TracesReader } +func router(n *nodesMap) *pipe.Middle[[]request.Span, []request.Span] { return &n.Routes } +func kubernetes(n *nodesMap) *pipe.Middle[[]request.Span, []request.Span] { return &n.Kubernetes } +func nameResolver(n *nodesMap) *pipe.Middle[[]request.Span, []request.Span] { return &n.NameResolver } +func alloyTraces(n *nodesMap) *pipe.Final[[]request.Span] { return &n.AlloyTraces } +func otelMetrics(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Metrics } +func otelTraces(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Traces } +func printer(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Printer } +func prometheus(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Prometheus } +func noop(n *nodesMap) *pipe.Final[[]request.Span] { return &n.Noop } // builder with injectable instantiators for unit testing type graphFunctions struct { @@ -96,7 +100,7 @@ func newGraphBuilder(ctx context.Context, config *beyla.Config, ctxInfo *global. pipe.AddMiddleProvider(gnb, router, transform.RoutesProvider(config.Routes)) pipe.AddMiddleProvider(gnb, kubernetes, transform.KubeDecoratorProvider(ctxInfo, &config.Attributes.Kubernetes)) - + pipe.AddMiddleProvider(gnb, nameResolver, transform.NameResolutionProvider(gb.ctxInfo, config.NameResolver)) config.Metrics.Grafana = &gb.config.Grafana.OTLP pipe.AddFinalProvider(gnb, otelMetrics, otel.ReportMetrics(ctx, &config.Metrics, gb.ctxInfo)) config.Traces.Grafana = &gb.config.Grafana.OTLP diff --git a/pkg/internal/request/span.go b/pkg/internal/request/span.go index 4d3bdf59d..ccf13dfff 100644 --- a/pkg/internal/request/span.go +++ b/pkg/internal/request/span.go @@ -50,26 +50,29 @@ type PidInfo struct { // Span contains the information being submitted by the following nodes in the graph. // It enables comfortable handling of data from Go. type Span struct { - Type EventType - IgnoreSpan IgnoreMode - ID uint64 - Method string - Path string - Route string - Peer string - Host string - HostPort int - Status int - ContentLength int64 - RequestStart int64 - Start int64 - End int64 - ServiceID svc.ID // TODO: rename to Service or ResourceAttrs - TraceID trace2.TraceID - SpanID trace2.SpanID - ParentSpanID trace2.SpanID - Flags uint8 - Pid PidInfo + Type EventType + IgnoreSpan IgnoreMode + ID uint64 + Method string + Path string + Route string + Peer string + Host string + HostPort int + Status int + ContentLength int64 + RequestStart int64 + Start int64 + End int64 + ServiceID svc.ID // TODO: rename to Service or ResourceAttrs + TraceID trace2.TraceID + SpanID trace2.SpanID + ParentSpanID trace2.SpanID + Flags uint8 + Pid PidInfo + PeerName string + HostName string + OtherNamespace string } func (s *Span) Inside(parent *Span) bool { @@ -108,3 +111,16 @@ func (s *Span) IsValid() bool { return true } + +func (s *Span) IsClientSpan() bool { + switch s.Type { + case EventTypeGRPCClient: + fallthrough + case EventTypeHTTPClient: + fallthrough + case EventTypeSQLClient: + return true + } + + return false +} diff --git a/pkg/internal/request/span_test.go b/pkg/internal/request/span_test.go new file mode 100644 index 000000000..94e842441 --- /dev/null +++ b/pkg/internal/request/span_test.go @@ -0,0 +1,23 @@ +package request + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSpanClientServer(t *testing.T) { + for _, st := range []EventType{EventTypeHTTP, EventTypeGRPC} { + span := &Span{ + Type: st, + } + assert.False(t, span.IsClientSpan()) + } + + for _, st := range []EventType{EventTypeHTTPClient, EventTypeGRPCClient, EventTypeSQLClient} { + span := &Span{ + Type: st, + } + assert.True(t, span.IsClientSpan()) + } +} diff --git a/pkg/internal/transform/kube/db.go b/pkg/internal/transform/kube/db.go index 5a845e9d1..e98c542e8 100644 --- a/pkg/internal/transform/kube/db.go +++ b/pkg/internal/transform/kube/db.go @@ -1,8 +1,11 @@ package kube import ( + "fmt" "log/slog" + "k8s.io/client-go/tools/cache" + "github.com/grafana/beyla/pkg/internal/helpers/container" "github.com/grafana/beyla/pkg/internal/kube" ) @@ -25,17 +28,40 @@ type Database struct { // key: pid namespace fetchedPodsCache map[uint32]*kube.PodInfo + + // ip to pod name matcher + podsByIP map[string]*kube.PodInfo } -func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) { - db := Database{ +func CreateDatabase(kubeMetadata *kube.Metadata) Database { + return Database{ fetchedPodsCache: map[uint32]*kube.PodInfo{}, containerIDs: map[string]*container.Info{}, namespaces: map[uint32]*container.Info{}, + podsByIP: map[string]*kube.PodInfo{}, informer: kubeMetadata, } +} + +func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) { + db := CreateDatabase(kubeMetadata) db.informer.AddContainerEventHandler(&db) + if err := db.informer.AddPodEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + db.UpdateNewPodsByIPIndex(obj.(*kube.PodInfo)) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + db.UpdateDeletedPodsByIPIndex(oldObj.(*kube.PodInfo)) + db.UpdateNewPodsByIPIndex(newObj.(*kube.PodInfo)) + }, + DeleteFunc: func(obj interface{}) { + db.UpdateDeletedPodsByIPIndex(obj.(*kube.PodInfo)) + }, + }); err != nil { + return nil, fmt.Errorf("can't register Database as Pod event handler: %w", err) + } + return &db, nil } @@ -80,3 +106,23 @@ func (id *Database) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) { id.informer.FetchPodOwnerInfo(pod) return pod, true } + +func (id *Database) UpdateNewPodsByIPIndex(pod *kube.PodInfo) { + if len(pod.IPs) > 0 { + for _, ip := range pod.IPs { + id.podsByIP[ip] = pod + } + } +} + +func (id *Database) UpdateDeletedPodsByIPIndex(pod *kube.PodInfo) { + if len(pod.IPs) > 0 { + for _, ip := range pod.IPs { + delete(id.podsByIP, ip) + } + } +} + +func (id *Database) PodInfoForIP(ip string) *kube.PodInfo { + return id.podsByIP[ip] +} diff --git a/pkg/transform/k8s.go b/pkg/transform/k8s.go index 5e542b0c0..523c31a4e 100644 --- a/pkg/transform/k8s.go +++ b/pkg/transform/k8s.go @@ -113,16 +113,7 @@ func appendMetadata(span *request.Span, info *kube.PodInfo) { // service name and namespace, we will automatically set it from // the kubernetes metadata if span.ServiceID.AutoName { - if info.Owner != nil { - // we have two levels of ownership at most - if info.Owner.Owner != nil { - span.ServiceID.Name = info.Owner.Owner.Name - } else { - span.ServiceID.Name = info.Owner.Name - } - } else { - span.ServiceID.Name = info.Name - } + span.ServiceID.Name = info.ServiceName() } if span.ServiceID.Namespace == "" { span.ServiceID.Namespace = info.Namespace diff --git a/pkg/transform/name_resolver.go b/pkg/transform/name_resolver.go new file mode 100644 index 000000000..b10ed025f --- /dev/null +++ b/pkg/transform/name_resolver.go @@ -0,0 +1,190 @@ +package transform + +import ( + "context" + "net" + "strings" + "time" + + "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/mariomac/pipes/pipe" + + "github.com/grafana/beyla/pkg/internal/kube" + "github.com/grafana/beyla/pkg/internal/pipe/global" + "github.com/grafana/beyla/pkg/internal/request" + "github.com/grafana/beyla/pkg/internal/svc" + kube2 "github.com/grafana/beyla/pkg/internal/transform/kube" +) + +type NameResolverConfig struct { + // CacheLen specifies the max size of the LRU cache that is checked before + // performing the name lookup. Default: 256 + CacheLen int `yaml:"cache_len" env:"BEYLA_NAME_RESOLVER_CACHE_LEN"` + // CacheTTL specifies the time-to-live of a cached IP->hostname entry. After the + // cached entry becomes older than this time, the IP->hostname entry will be looked + // up again. + CacheTTL time.Duration `yaml:"cache_expiry" env:"BEYLA_NAME_RESOLVER_CACHE_TTL"` +} + +type NameResolver struct { + cache *expirable.LRU[string, string] + sCache *expirable.LRU[string, svc.ID] + cfg *NameResolverConfig + db *kube2.Database +} + +func NameResolutionProvider(ctxInfo *global.ContextInfo, cfg *NameResolverConfig) pipe.MiddleProvider[[]request.Span, []request.Span] { + return func() (pipe.MiddleFunc[[]request.Span, []request.Span], error) { + if cfg == nil { + return pipe.Bypass[[]request.Span](), nil + } + return nameResolver(ctxInfo, cfg) + } +} + +func nameResolver(ctxInfo *global.ContextInfo, cfg *NameResolverConfig) (pipe.MiddleFunc[[]request.Span, []request.Span], error) { + nr := NameResolver{ + cfg: cfg, + db: ctxInfo.AppO11y.K8sDatabase, + cache: expirable.NewLRU[string, string](cfg.CacheLen, nil, cfg.CacheTTL), + sCache: expirable.NewLRU[string, svc.ID](cfg.CacheLen, nil, cfg.CacheTTL), + } + + return func(in <-chan []request.Span, out chan<- []request.Span) { + for spans := range in { + for i := range spans { + s := &spans[i] + nr.resolveNames(s) + } + out <- spans + } + }, nil +} + +func trimSuffixIgnoreCase(s, suffix string) string { + if len(s) >= len(suffix) && strings.EqualFold(s[len(s)-len(suffix):], suffix) { + return s[:len(s)-len(suffix)] + } + return s +} + +func trimPrefixIgnoreCase(s, prefix string) string { + if len(s) >= len(prefix) && strings.EqualFold(s[0:len(prefix)], prefix) { + return s[len(prefix):] + } + return s +} + +func (nr *NameResolver) resolveNames(span *request.Span) { + if span.IsClientSpan() { + span.HostName, span.OtherNamespace = nr.resolve(&span.ServiceID, span.Host) + span.PeerName = span.ServiceID.Name + if len(span.Peer) > 0 { + nr.sCache.Add(span.Peer, span.ServiceID) + } + } else { + span.PeerName, span.OtherNamespace = nr.resolve(&span.ServiceID, span.Peer) + span.HostName = span.ServiceID.Name + if len(span.Host) > 0 { + nr.sCache.Add(span.Host, span.ServiceID) + } + } +} + +func (nr *NameResolver) resolve(svc *svc.ID, ip string) (string, string) { + var name, ns string + + if len(ip) > 0 { + peerSvc, ok := nr.sCache.Get(ip) + if ok { + name = peerSvc.Name + ns = peerSvc.Namespace + } else { + var peer string + peer, ns = nr.dnsResolve(svc, ip) + if len(peer) > 0 { + name = peer + } else { + name = ip + } + } + } + + return name, ns +} + +func (nr *NameResolver) cleanName(svc *svc.ID, ip, n string) string { + n = strings.TrimSuffix(n, ".") + n = trimSuffixIgnoreCase(n, ".svc.cluster.local") + n = trimSuffixIgnoreCase(n, "."+svc.Namespace) + + kubeNamespace, ok := svc.Metadata[kube.NamespaceName] + if ok && kubeNamespace != "" && kubeNamespace != svc.Namespace { + n = trimSuffixIgnoreCase(n, "."+kubeNamespace) + } + + dashIP := strings.ReplaceAll(ip, ".", "-") + "." + n = trimPrefixIgnoreCase(n, dashIP) + + return n +} + +func (nr *NameResolver) dnsResolve(svc *svc.ID, ip string) (string, string) { + if ip == "" { + return "", "" + } + + if nr.db != nil { + ipAddr := net.ParseIP(ip) + + if ipAddr != nil && !ipAddr.IsLoopback() { + n, ns := nr.resolveFromK8s(ip) + + if n != "" { + return n, ns + } + } + } + + n := nr.resolveIP(ip) + if n == ip { + return n, svc.Namespace + } + + n = nr.cleanName(svc, ip, n) + + // fmt.Printf("%s -> %s\n", ip, n) + + return n, svc.Namespace +} + +func (nr *NameResolver) resolveFromK8s(ip string) (string, string) { + info := nr.db.PodInfoForIP(ip) + if info == nil { + return "", "" + } + + return info.ServiceName(), info.Namespace +} + +func (nr *NameResolver) resolveIP(ip string) string { + if host, ok := nr.cache.Get(ip); ok { + return host + } + + var r *net.Resolver + addr, err := r.LookupAddr(context.Background(), ip) + + if err != nil { + nr.cache.Add(ip, ip) + return ip + } + + for _, a := range addr { + nr.cache.Add(ip, a) + return a + } + + nr.cache.Add(ip, ip) + return ip +} diff --git a/pkg/transform/name_resolver_test.go b/pkg/transform/name_resolver_test.go new file mode 100644 index 000000000..921fc0dcd --- /dev/null +++ b/pkg/transform/name_resolver_test.go @@ -0,0 +1,133 @@ +package transform + +import ( + "testing" + "time" + + "github.com/hashicorp/golang-lru/v2/expirable" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + kube2 "github.com/grafana/beyla/pkg/internal/kube" + "github.com/grafana/beyla/pkg/internal/request" + "github.com/grafana/beyla/pkg/internal/svc" + "github.com/grafana/beyla/pkg/internal/transform/kube" +) + +func TestSuffixPrefix(t *testing.T) { + assert.Equal(t, "super", trimSuffixIgnoreCase("superDuper", "DUPER")) + assert.Equal(t, "superDup", trimSuffixIgnoreCase("superDuper", "ER")) + assert.Equal(t, "superDuper", trimSuffixIgnoreCase("superDuper", "Not matching")) + assert.Equal(t, "superDuper", trimSuffixIgnoreCase("superDuper", "SuperDuperDuper")) + assert.Equal(t, "", trimSuffixIgnoreCase("superDuper", "SuperDuper")) + assert.Equal(t, "superDuper", trimSuffixIgnoreCase("superDuper", "")) + + assert.Equal(t, "super", trimPrefixIgnoreCase("Dupersuper", "DUPER")) + assert.Equal(t, "super", trimPrefixIgnoreCase("Ersuper", "ER")) + assert.Equal(t, "superDuper", trimPrefixIgnoreCase("superDuper", "Not matching")) + assert.Equal(t, "superDuper", trimPrefixIgnoreCase("superDuper", "SuperDuperDuper")) + assert.Equal(t, "", trimPrefixIgnoreCase("superDuper", "SuperDuper")) + assert.Equal(t, "superDuper", trimPrefixIgnoreCase("superDuper", "")) +} + +func TestResolveFromK8s(t *testing.T) { + db := kube.CreateDatabase(nil) + + pod1 := kube2.PodInfo{ + ObjectMeta: metav1.ObjectMeta{Name: "pod1"}, + IPs: []string{"10.0.0.1", "10.1.0.1"}, + } + + pod2 := kube2.PodInfo{ + ObjectMeta: metav1.ObjectMeta{Name: "pod2", Namespace: "something"}, + IPs: []string{"10.0.0.2", "10.1.0.2"}, + } + + pod3 := kube2.PodInfo{ + ObjectMeta: metav1.ObjectMeta{Name: "pod3"}, + IPs: []string{"10.0.0.3", "10.1.0.3"}, + } + + db.UpdateNewPodsByIPIndex(&pod1) + db.UpdateNewPodsByIPIndex(&pod2) + db.UpdateNewPodsByIPIndex(&pod3) + + assert.Equal(t, &pod1, db.PodInfoForIP("10.0.0.1")) + assert.Equal(t, &pod1, db.PodInfoForIP("10.1.0.1")) + assert.Equal(t, &pod2, db.PodInfoForIP("10.0.0.2")) + assert.Equal(t, &pod2, db.PodInfoForIP("10.1.0.2")) + assert.Equal(t, &pod3, db.PodInfoForIP("10.1.0.3")) + db.UpdateDeletedPodsByIPIndex(&pod3) + assert.Nil(t, db.PodInfoForIP("10.1.0.3")) + + nr := NameResolver{ + db: &db, + cache: expirable.NewLRU[string, string](10, nil, 5*time.Hour), + sCache: expirable.NewLRU[string, svc.ID](10, nil, 5*time.Hour), + } + + name, namespace := nr.resolveFromK8s("10.0.0.1") + assert.Equal(t, "pod1", name) + assert.Equal(t, "", namespace) + + name, namespace = nr.resolveFromK8s("10.0.0.2") + assert.Equal(t, "pod2", name) + assert.Equal(t, "something", namespace) + + name, namespace = nr.resolveFromK8s("10.0.0.3") + assert.Equal(t, "", name) + assert.Equal(t, "", namespace) + + clientSpan := request.Span{ + Type: request.EventTypeHTTPClient, + Peer: "10.0.0.1", + Host: "10.0.0.2", + ServiceID: svc.ID{ + Name: "pod1", + Namespace: "", + }, + } + + serverSpan := request.Span{ + Type: request.EventTypeHTTP, + Peer: "10.0.0.1", + Host: "10.0.0.2", + ServiceID: svc.ID{ + Name: "pod2", + Namespace: "something", + }, + } + + nr.resolveNames(&clientSpan) + + assert.Equal(t, "pod1", clientSpan.PeerName) + assert.Equal(t, "", clientSpan.ServiceID.Namespace) + assert.Equal(t, "pod2", clientSpan.HostName) + assert.Equal(t, "something", clientSpan.OtherNamespace) + + nr.resolveNames(&serverSpan) + + assert.Equal(t, "pod1", serverSpan.PeerName) + assert.Equal(t, "", serverSpan.OtherNamespace) + assert.Equal(t, "pod2", serverSpan.HostName) + assert.Equal(t, "something", serverSpan.ServiceID.Namespace) +} + +func TestCleanName(t *testing.T) { + s := svc.ID{ + Name: "service", + Namespace: "special.namespace", + Metadata: map[string]string{ + kube2.NamespaceName: "k8snamespace", + }, + } + + nr := NameResolver{} + + assert.Equal(t, "service", nr.cleanName(&s, "127.0.0.1", "127-0-0-1.service")) + assert.Equal(t, "1.service", nr.cleanName(&s, "127.0.0.1", "1.service")) + assert.Equal(t, "service", nr.cleanName(&s, "127.0.0.1", "service.")) + assert.Equal(t, "service", nr.cleanName(&s, "127.0.0.1", "service.svc.cluster.local.")) + assert.Equal(t, "service", nr.cleanName(&s, "127.0.0.1", "service.special.namespace.svc.cluster.local.")) + assert.Equal(t, "service", nr.cleanName(&s, "127.0.0.1", "service.k8snamespace.svc.cluster.local.")) +} diff --git a/test/integration/k8s/common/k8s_metrics_testfuncs.go b/test/integration/k8s/common/k8s_metrics_testfuncs.go index 77ffcc31a..3aef3b63e 100644 --- a/test/integration/k8s/common/k8s_metrics_testfuncs.go +++ b/test/integration/k8s/common/k8s_metrics_testfuncs.go @@ -55,6 +55,12 @@ var ( "rpc_client_duration_seconds_sum", "rpc_client_duration_seconds_bucket", } + spanGraphMetrics = []string{ + "traces_service_graph_request_server_seconds_count", + "traces_service_graph_request_server_seconds_bucket", + "traces_service_graph_request_server_seconds_sum", + "traces_service_graph_request_total", + } ) func DoWaitForComponentsAvailable(t *testing.T) { @@ -106,6 +112,11 @@ func FeatureHTTPMetricsDecoration() features.Feature { "k8s_pod_start_time": TimeRegex, "k8s_deployment_name": "^testserver$", "k8s_replicaset_name": "^testserver-", + })). + Assess("all the span graph metrics exist", + testMetricsDecoration(spanGraphMetrics, `{connection_type="virtual_node",server="testserver"}`, map[string]string{ + "server_service_namespace": "integration-test", + "source": "beyla", }), ).Feature() } diff --git a/test/integration/k8s/manifests/05-instrumented-service-otel.yml b/test/integration/k8s/manifests/05-instrumented-service-otel.yml index a8ee89a95..a54c09d85 100644 --- a/test/integration/k8s/manifests/05-instrumented-service-otel.yml +++ b/test/integration/k8s/manifests/05-instrumented-service-otel.yml @@ -114,3 +114,6 @@ spec: value: "8999" - name: BEYLA_KUBE_METADATA_ENABLE value: "autodetect" + - name: BEYLA_OTEL_METRIC_FEATURES + value: "application,application_span,application_service_graph" + diff --git a/test/integration/k8s/manifests/05-instrumented-service-prometheus.yml b/test/integration/k8s/manifests/05-instrumented-service-prometheus.yml index 72eeba708..135e76cbd 100644 --- a/test/integration/k8s/manifests/05-instrumented-service-prometheus.yml +++ b/test/integration/k8s/manifests/05-instrumented-service-prometheus.yml @@ -124,6 +124,8 @@ spec: value: "true" - name: BEYLA_KUBE_METADATA_ENABLE value: "autodetect" + - name: BEYLA_PROMETHEUS_FEATURES + value: "application,application_span,application_service_graph" ports: - containerPort: 8999 hostPort: 8999