Skip to content

Commit

Permalink
Send K8s metadata as resource metadata (#548)
Browse files Browse the repository at this point in the history
* Send K8s metadata as resource metadata

* fixing integration tests
  • Loading branch information
mariomac authored Jan 15, 2024
1 parent 99088fd commit 58b6cfb
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 89 deletions.
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ linters-settings:
go: "1.21"
gocritic:
enabled-checks:
- hugeParam
- rangeExprCopy
- rangeValCopy
- indexAlloc
Expand Down
14 changes: 9 additions & 5 deletions pkg/internal/export/otel/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,17 @@ func otelResource(service svc.ID) *resource.Resource {
attrs = append(attrs, semconv.ServiceNamespace(service.Namespace))
}

for k, v := range service.Metadata {
attrs = append(attrs, attribute.String(k, v))
}

return resource.NewWithAttributes(semconv.SchemaURL, attrs...)
}

// ReporterPool keeps an LRU cache of different OTEL reporters given a service name.
// TODO: evict reporters after a time without being accessed
type ReporterPool[T any] struct {
pool *simplelru.LRU[svc.ID, T]
pool *simplelru.LRU[svc.UID, T]

itemConstructor func(svc.ID) (T, error)
}
Expand All @@ -89,25 +93,25 @@ type ReporterPool[T any] struct {
// instantiate the generic OTEL metrics/traces reporter.
func NewReporterPool[T any](
cacheLen int,
callback simplelru.EvictCallback[svc.ID, T],
callback simplelru.EvictCallback[svc.UID, T],
itemConstructor func(id svc.ID) (T, error),
) ReporterPool[T] {
pool, _ := simplelru.NewLRU[svc.ID, T](cacheLen, callback)
pool, _ := simplelru.NewLRU[svc.UID, T](cacheLen, callback)
return ReporterPool[T]{pool: pool, itemConstructor: itemConstructor}
}

// For retrieves the associated item for the given service name, or
// creates a new one if it does not exist
func (rp *ReporterPool[T]) For(service svc.ID) (T, error) {
if m, ok := rp.pool.Get(service); ok {
if m, ok := rp.pool.Get(service.UID); ok {
return m, nil
}
m, err := rp.itemConstructor(service)
if err != nil {
var t T
return t, fmt.Errorf("creating resource for service %q: %w", &service, err)
}
rp.pool.Add(service, m)
rp.pool.Add(service.UID, m)
return m, nil
}

Expand Down
12 changes: 4 additions & 8 deletions pkg/internal/export/otel/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func newMetricsReporter(ctx context.Context, cfg *MetricsConfig, ctxInfo *global
cfg: cfg,
}
mr.reporters = NewReporterPool[*Metrics](cfg.ReportersCacheLen,
func(id svc.ID, v *Metrics) {
func(id svc.UID, v *Metrics) {
llog := log.With("service", id)
llog.Debug("evicting metrics reporter from cache")
go func() {
Expand Down Expand Up @@ -370,10 +370,6 @@ func (mr *MetricsReporter) metricAttributes(span *request.Span) attribute.Set {
attrs = append(attrs, semconv.ServiceName(span.ServiceID.Name))
}

for key, val := range span.Metadata {
attrs = append(attrs, attribute.String(key, val))
}

return attribute.NewSet(attrs...)
}

Expand All @@ -399,7 +395,7 @@ func (r *Metrics) record(span *request.Span, attrs attribute.Set) {
}

func (mr *MetricsReporter) reportMetrics(input <-chan []request.Span) {
var lastSvc svc.ID
var lastSvcUID svc.UID
var reporter *Metrics
for spans := range input {
for i := range spans {
Expand All @@ -417,14 +413,14 @@ func (mr *MetricsReporter) reportMetrics(input <-chan []request.Span) {
// only a single instrumented process.
// In multi-process tracing, this is likely to happen as most
// tracers group traces belonging to the same service in the same slice.
if s.ServiceID != lastSvc || reporter == nil {
if s.ServiceID.UID != lastSvcUID || reporter == nil {
lm, err := mr.reporters.For(s.ServiceID)
if err != nil {
mlog().Error("unexpected error creating OTEL resource. Ignoring metric",
err, "service", s.ServiceID)
continue
}
lastSvc = s.ServiceID
lastSvcUID = s.ServiceID.UID
reporter = lm
}
reporter.record(s, mr.metricAttributes(s))
Expand Down
13 changes: 4 additions & 9 deletions pkg/internal/export/otel/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func newTracesReporter(ctx context.Context, cfg *TracesConfig, ctxInfo *global.C
log := tlog()
r := TracesReporter{ctx: ctx, cfg: cfg}
r.reporters = NewReporterPool[*Tracers](cfg.ReportersCacheLen,
func(k svc.ID, v *Tracers) {
func(k svc.UID, v *Tracers) {
llog := log.With("service", k)
llog.Debug("evicting traces reporter from cache")
go func() {
Expand Down Expand Up @@ -338,11 +338,6 @@ func (r *TracesReporter) traceAttributes(span *request.Span) []attribute.KeyValu
}
}

// append extra metadata
for key, val := range span.Metadata {
attrs = append(attrs, attribute.String(key, val))
}

return attrs
}

Expand Down Expand Up @@ -445,7 +440,7 @@ func (r *TracesReporter) makeSpan(parentCtx context.Context, tracer trace2.Trace
}

func (r *TracesReporter) reportTraces(input <-chan []request.Span) {
var lastSvc svc.ID
var lastSvcUID svc.UID
var reporter trace2.Tracer
for spans := range input {
for i := range spans {
Expand All @@ -457,14 +452,14 @@ func (r *TracesReporter) reportTraces(input <-chan []request.Span) {
}

// small optimization: read explanation in MetricsReporter.reportMetrics
if span.ServiceID != lastSvc || reporter == nil {
if span.ServiceID.UID != lastSvcUID || reporter == nil {
lm, err := r.reporters.For(span.ServiceID)
if err != nil {
mlog().Error("unexpected error creating OTEL resource. Ignoring trace",
err, "service", span.ServiceID)
continue
}
lastSvc = span.ServiceID
lastSvcUID = span.ServiceID.UID
reporter = lm.tracer
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/internal/export/prom/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,12 +312,12 @@ func appendK8sLabelNames(names []string) []string {
func appendK8sLabelValues(values []string, span *request.Span) []string {
// must follow the order in appendK8sLabelNames
values = append(values,
span.Metadata[transform.NamespaceName],
span.Metadata[transform.DeploymentName],
span.Metadata[transform.PodName],
span.Metadata[transform.NodeName],
span.Metadata[transform.PodUID],
span.Metadata[transform.PodStartTime],
span.ServiceID.Metadata[transform.NamespaceName],
span.ServiceID.Metadata[transform.DeploymentName],
span.ServiceID.Metadata[transform.PodName],
span.ServiceID.Metadata[transform.NodeName],
span.ServiceID.Metadata[transform.PodUID],
span.ServiceID.Metadata[transform.PodStartTime],
)
return values
}
3 changes: 1 addition & 2 deletions pkg/internal/request/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ type Span struct {
RequestStart int64
Start int64
End int64
ServiceID svc.ID
Metadata map[string]string
ServiceID svc.ID // TODO: rename to Service or ResourceAttrs
TraceID trace2.TraceID
SpanID trace2.SpanID
ParentSpanID trace2.SpanID
Expand Down
15 changes: 13 additions & 2 deletions pkg/internal/svc/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,18 @@ func (it InstrumentableType) String() string {
}
}

// ID stores the coordinates that uniquely identifies a service:
// its name and optionally a namespace
// UID uniquely identifies a service instance
type UID string

// ID stores the metadata attributes of a service/resource
// TODO: rename to svc.Attributes
type ID struct {
// UID might coincide with other fields (usually, Instance), but UID
// can't be overriden by the user, so it's the only field that can be
// used for internal differentiation of the users.
// UID is not exported in the metrics or traces.
UID UID

Name string
// AutoName is true if the Name has been automatically set by Beyla (e.g. executable name when
// the Name is empty). This will allow later refinement of the Name value (e.g. to override it
Expand All @@ -49,6 +58,8 @@ type ID struct {
Namespace string
SDKLanguage InstrumentableType
Instance string

Metadata map[string]string
}

func (i *ID) String() string {
Expand Down
18 changes: 12 additions & 6 deletions pkg/internal/traces/read_decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/mariomac/pipes/pkg/node"

"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/svc"
"github.com/grafana/beyla/pkg/internal/traces/hostname"
)

Expand Down Expand Up @@ -76,14 +77,18 @@ func ReadFromChannel(ctx context.Context, r ReadDecorator) (node.StartFunc[[]req
}

func getDecorator(cfg *InstanceIDConfig) decorator {
if cfg.OverrideInstanceID != "" {
return func(spans []request.Span) {
for i := range spans {
spans[i].ServiceID.Instance = cfg.OverrideInstanceID
}
hnPidDecorator := hostNamePIDDecorator(cfg)
if cfg.OverrideInstanceID == "" {
return hnPidDecorator
}
return func(spans []request.Span) {
// first decorate normally
hnPidDecorator(spans)
// later, override instance IDs
for i := range spans {
spans[i].ServiceID.Instance = cfg.OverrideInstanceID
}
}
return hostNamePIDDecorator(cfg)
}

func hostNamePIDDecorator(cfg *InstanceIDConfig) decorator {
Expand Down Expand Up @@ -114,6 +119,7 @@ func hostNamePIDDecorator(cfg *InstanceIDConfig) decorator {
idsCache.Add(spans[i].Pid.HostPID, instanceID)
}
spans[i].ServiceID.Instance = instanceID
spans[i].ServiceID.UID = svc.UID(instanceID)
}
}
}
39 changes: 23 additions & 16 deletions pkg/internal/traces/read_decorator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,32 @@ func TestReadDecorator(t *testing.T) {
require.NotEmpty(t, dnsHostname)

type testCase struct {
desc string
cfg ReadDecorator
expected string
desc string
cfg ReadDecorator
expectedID string
expectedUID svc.UID
}
for _, tc := range []testCase{{
desc: "dns",
cfg: ReadDecorator{InstanceID: InstanceIDConfig{HostnameDNSResolution: true}},
expected: dnsHostname + "-1234",
desc: "dns",
cfg: ReadDecorator{InstanceID: InstanceIDConfig{HostnameDNSResolution: true}},
expectedID: dnsHostname + "-1234",
expectedUID: svc.UID(dnsHostname + "-1234"),
}, {
desc: "no-dns",
expected: localHostname + "-1234",
desc: "no-dns",
expectedID: localHostname + "-1234",
expectedUID: svc.UID(localHostname + "-1234"),
}, {
desc: "override hostname",
cfg: ReadDecorator{InstanceID: InstanceIDConfig{OverrideHostname: "foooo"}},
expected: "foooo-1234",
desc: "override hostname",
cfg: ReadDecorator{InstanceID: InstanceIDConfig{OverrideHostname: "foooo"}},
expectedID: "foooo-1234",
expectedUID: "foooo-1234",
}, {
desc: "override HN",
cfg: ReadDecorator{InstanceID: InstanceIDConfig{OverrideInstanceID: "instanceee"}},
expected: "instanceee",
desc: "override HN",
cfg: ReadDecorator{InstanceID: InstanceIDConfig{OverrideInstanceID: "instanceee"}},
expectedID: "instanceee",
// even if we override instance ID, the UID should be set to a really unique value
// (same as the automatic instanceID value)
expectedUID: svc.UID(localHostname + "-1234"),
}} {
t.Run(tc.desc, func(t *testing.T) {
cfg := tc.cfg
Expand All @@ -61,8 +68,8 @@ func TestReadDecorator(t *testing.T) {
}
outSpans := testutil.ReadChannel(t, decoratedOutput, testTimeout)
assert.Equal(t, []request.Span{
{ServiceID: svc.ID{Instance: tc.expected}, Path: "/foo", Pid: request.PidInfo{HostPID: 1234}},
{ServiceID: svc.ID{Instance: tc.expected}, Path: "/bar", Pid: request.PidInfo{HostPID: 1234}},
{ServiceID: svc.ID{Instance: tc.expectedID, UID: tc.expectedUID}, Path: "/foo", Pid: request.PidInfo{HostPID: 1234}},
{ServiceID: svc.ID{Instance: tc.expectedID, UID: tc.expectedUID}, Path: "/bar", Pid: request.PidInfo{HostPID: 1234}},
}, outSpans)
})
}
Expand Down
26 changes: 16 additions & 10 deletions pkg/internal/transform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/grafana/beyla/pkg/internal/kube"
"github.com/grafana/beyla/pkg/internal/pipe/global"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/svc"
)

type KubeEnableFlag string
Expand Down Expand Up @@ -93,11 +94,11 @@ func (md *metadataDecorator) nodeLoop(in <-chan []request.Span, out chan<- []req
}

func (md *metadataDecorator) do(span *request.Span) {
if span.Metadata == nil {
span.Metadata = make(map[string]string, 5)
}
if podInfo, ok := md.db.OwnerPodInfo(span.Pid.Namespace); ok {
appendMetadata(span, podInfo)
} else {
// do not leave the service attributes map as nil
span.ServiceID.Metadata = map[string]string{}
}
}

Expand All @@ -117,13 +118,18 @@ func appendMetadata(span *request.Span, info *kube.PodInfo) {
if span.ServiceID.Namespace == "" {
span.ServiceID.Namespace = info.Namespace
}

span.Metadata[NamespaceName] = info.Namespace
span.Metadata[PodName] = info.Name
span.Metadata[NodeName] = info.NodeName
span.Metadata[PodUID] = string(info.UID)
span.Metadata[PodStartTime] = info.StartTimeStr
span.ServiceID.UID = svc.UID(info.UID)

// if, in the future, other pipeline steps modify the service metadata, we should
// replace the map literal by individual entry insertions
span.ServiceID.Metadata = map[string]string{
NamespaceName: info.Namespace,
PodName: info.Name,
NodeName: info.NodeName,
PodUID: string(info.UID),
PodStartTime: info.StartTimeStr,
}
if info.DeploymentName != "" {
span.Metadata[DeploymentName] = info.DeploymentName
span.ServiceID.Metadata[DeploymentName] = info.DeploymentName
}
}
Loading

0 comments on commit 58b6cfb

Please sign in to comment.