Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

App metrics/traces: decorate peer service IPs with service names #968

Merged
merged 3 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/internal/kube/informer_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,14 @@ func (k *Metadata) getHostName(hostIP string) string {
}
return ""
}

func (k *Metadata) AddServiceIPEventHandler(s cache.ResourceEventHandler) error {
_, err := k.servicesIP.AddEventHandler(s)
// passing a snapshot of the currently stored entities
go func() {
for _, svc := range k.servicesIP.GetStore().List() {
s.OnAdd(svc, true)
}
}()
return err
}
61 changes: 61 additions & 0 deletions pkg/internal/transform/kube/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type Database struct {
// ip to pod name matcher
podsMut sync.RWMutex
podsByIP map[string]*kube.PodInfo

// ip to service name matcher
svcMut sync.RWMutex
svcByIP map[string]*kube.ServiceInfo
}

func CreateDatabase(kubeMetadata *kube.Metadata) Database {
Expand All @@ -45,6 +49,7 @@ func CreateDatabase(kubeMetadata *kube.Metadata) Database {
containerIDs: map[string]*container.Info{},
namespaces: map[uint32]*container.Info{},
podsByIP: map[string]*kube.PodInfo{},
svcByIP: map[string]*kube.ServiceInfo{},
informer: kubeMetadata,
}
}
Expand All @@ -67,6 +72,20 @@ func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) {
}); err != nil {
return nil, fmt.Errorf("can't register Database as Pod event handler: %w", err)
}
if err := db.informer.AddServiceIPEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
db.UpdateNewServicesByIPIndex(obj.(*kube.ServiceInfo))
},
UpdateFunc: func(oldObj, newObj interface{}) {
db.UpdateDeletedServicesByIPIndex(oldObj.(*kube.ServiceInfo))
db.UpdateNewServicesByIPIndex(newObj.(*kube.ServiceInfo))
},
DeleteFunc: func(obj interface{}) {
db.UpdateDeletedServicesByIPIndex(obj.(*kube.ServiceInfo))
},
}); err != nil {
return nil, fmt.Errorf("can't register Database as Service event handler: %w", err)
}

return &db, nil
}
Expand Down Expand Up @@ -155,3 +174,45 @@ func (id *Database) PodInfoForIP(ip string) *kube.PodInfo {
defer id.podsMut.RUnlock()
return id.podsByIP[ip]
}

func (id *Database) UpdateNewServicesByIPIndex(svc *kube.ServiceInfo) {
if len(svc.IPInfo.IPs) > 0 {
id.svcMut.Lock()
defer id.svcMut.Unlock()
for _, ip := range svc.IPInfo.IPs {
id.svcByIP[ip] = svc
}
}
}

func (id *Database) UpdateDeletedServicesByIPIndex(svc *kube.ServiceInfo) {
if len(svc.IPInfo.IPs) > 0 {
id.svcMut.Lock()
defer id.svcMut.Unlock()
for _, ip := range svc.IPInfo.IPs {
delete(id.svcByIP, ip)
}
}
}

func (id *Database) ServiceInfoForIP(ip string) *kube.ServiceInfo {
id.svcMut.RLock()
defer id.svcMut.RUnlock()
return id.svcByIP[ip]
}

func (id *Database) HostNameForIP(ip string) string {
id.svcMut.RLock()
svc, ok := id.svcByIP[ip]
id.svcMut.RUnlock()
if ok {
return svc.Name
}
id.podsMut.RLock()
pod, ok := id.podsByIP[ip]
id.podsMut.RUnlock()
if ok {
return pod.Name
}
return ""
}
8 changes: 8 additions & 0 deletions pkg/transform/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func KubeDecoratorProvider(ctxInfo *global.ContextInfo) pipe.MiddleProvider[[]re
// production implementer: kube.Database
type kubeDatabase interface {
OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool)
HostNameForIP(ip string) string
}

type metadataDecorator struct {
Expand All @@ -73,6 +74,13 @@ func (md *metadataDecorator) do(span *request.Span) {
// do not leave the service attributes map as nil
span.ServiceID.Metadata = map[attr.Name]string{}
}
// override the peer and host names from Kubernetes metadata, if found
if hn := md.db.HostNameForIP(span.Host); hn != "" {
span.HostName = hn
}
if pn := md.db.HostNameForIP(span.Peer); pn != "" {
span.PeerName = pn
}
}

func appendMetadata(span *request.Span, info *kube.PodInfo) {
Expand Down
17 changes: 12 additions & 5 deletions pkg/transform/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const timeout = 5 * time.Second

func TestDecoration(t *testing.T) {
// pre-populated kubernetes metadata database
dec := metadataDecorator{db: fakeDatabase{
dec := metadataDecorator{db: &fakeDatabase{pidNSPods: map[uint32]*kube.PodInfo{
12: &kube.PodInfo{
ObjectMeta: v1.ObjectMeta{
Name: "pod-12", Namespace: "the-ns", UID: "uid-12",
Expand All @@ -43,7 +43,7 @@ func TestDecoration(t *testing.T) {
NodeName: "the-node",
StartTimeStr: "2020-01-02 12:56:56",
},
}}
}}}
inputCh, outputhCh := make(chan []request.Span, 10), make(chan []request.Span, 10)
defer close(inputCh)
go dec.nodeLoop(inputCh, outputhCh)
Expand Down Expand Up @@ -127,9 +127,16 @@ func TestDecoration(t *testing.T) {
})
}

type fakeDatabase map[uint32]*kube.PodInfo
type fakeDatabase struct {
pidNSPods map[uint32]*kube.PodInfo
ipNames map[string]string
}

func (f *fakeDatabase) HostNameForIP(ip string) string {
return f.ipNames[ip]
}

func (f fakeDatabase) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) {
pi, ok := f[pidNamespace]
func (f *fakeDatabase) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) {
pi, ok := f.pidNSPods[pidNamespace]
return pi, ok
}
17 changes: 13 additions & 4 deletions pkg/transform/name_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,21 @@ func trimPrefixIgnoreCase(s, prefix string) string {
}

func (nr *NameResolver) resolveNames(span *request.Span) {
var hn, pn string
if span.IsClientSpan() {
span.HostName, span.OtherNamespace = nr.resolve(&span.ServiceID, span.Host)
span.PeerName, _ = nr.resolve(&span.ServiceID, span.Peer)
hn, span.OtherNamespace = nr.resolve(&span.ServiceID, span.Host)
pn, _ = nr.resolve(&span.ServiceID, span.Peer)
} else {
span.PeerName, span.OtherNamespace = nr.resolve(&span.ServiceID, span.Peer)
span.HostName, _ = nr.resolve(&span.ServiceID, span.Host)
pn, span.OtherNamespace = nr.resolve(&span.ServiceID, span.Peer)
hn, _ = nr.resolve(&span.ServiceID, span.Host)
}
// don't set names if the peer and host names have been already decorated
// in a previous stage (e.g. Kubernetes decorator)
if pn != "" {
span.PeerName = pn
}
if hn != "" {
span.HostName = hn
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/integration/k8s/common/k8s_metrics_testfuncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func FeatureHTTPMetricsDecoration(manifest string) features.Feature {
"k8s_replicaset_name": "^testserver-",
})).
Assess("all the span graph metrics exist",
testMetricsDecoration(spanGraphMetrics, `{server="testserver"}`, map[string]string{
testMetricsDecoration(spanGraphMetrics, `{server="testserver",client="internal-pinger"}`, map[string]string{
"server_service_namespace": "integration-test",
"source": "beyla",
}),
Expand Down
Loading