diff --git a/pkg/internal/kube/informer_ip.go b/pkg/internal/kube/informer_ip.go index a9ed77a3a..61ff002c9 100644 --- a/pkg/internal/kube/informer_ip.go +++ b/pkg/internal/kube/informer_ip.go @@ -191,3 +191,14 @@ func (k *Metadata) getHostName(hostIP string) string { } return "" } + +func (k *Metadata) AddServiceIPEventHandler(s cache.ResourceEventHandler) error { + _, err := k.servicesIP.AddEventHandler(s) + // passing a snapshot of the currently stored entities + go func() { + for _, svc := range k.servicesIP.GetStore().List() { + s.OnAdd(svc, true) + } + }() + return err +} diff --git a/pkg/internal/transform/kube/db.go b/pkg/internal/transform/kube/db.go index 2ffaa4031..051a5f22e 100644 --- a/pkg/internal/transform/kube/db.go +++ b/pkg/internal/transform/kube/db.go @@ -37,6 +37,10 @@ type Database struct { // ip to pod name matcher podsMut sync.RWMutex podsByIP map[string]*kube.PodInfo + + // ip to service name matcher + svcMut sync.RWMutex + svcByIP map[string]*kube.ServiceInfo } func CreateDatabase(kubeMetadata *kube.Metadata) Database { @@ -45,6 +49,7 @@ func CreateDatabase(kubeMetadata *kube.Metadata) Database { containerIDs: map[string]*container.Info{}, namespaces: map[uint32]*container.Info{}, podsByIP: map[string]*kube.PodInfo{}, + svcByIP: map[string]*kube.ServiceInfo{}, informer: kubeMetadata, } } @@ -67,6 +72,20 @@ func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) { }); err != nil { return nil, fmt.Errorf("can't register Database as Pod event handler: %w", err) } + if err := db.informer.AddServiceIPEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + db.UpdateNewServicesByIPIndex(obj.(*kube.ServiceInfo)) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + db.UpdateDeletedServicesByIPIndex(oldObj.(*kube.ServiceInfo)) + db.UpdateNewServicesByIPIndex(newObj.(*kube.ServiceInfo)) + }, + DeleteFunc: func(obj interface{}) { + db.UpdateDeletedServicesByIPIndex(obj.(*kube.ServiceInfo)) + }, + }); err != nil { + return nil, fmt.Errorf("can't register Database as Service event handler: %w", err) + } return &db, nil } @@ -155,3 +174,45 @@ func (id *Database) PodInfoForIP(ip string) *kube.PodInfo { defer id.podsMut.RUnlock() return id.podsByIP[ip] } + +func (id *Database) UpdateNewServicesByIPIndex(svc *kube.ServiceInfo) { + if len(svc.IPInfo.IPs) > 0 { + id.svcMut.Lock() + defer id.svcMut.Unlock() + for _, ip := range svc.IPInfo.IPs { + id.svcByIP[ip] = svc + } + } +} + +func (id *Database) UpdateDeletedServicesByIPIndex(svc *kube.ServiceInfo) { + if len(svc.IPInfo.IPs) > 0 { + id.svcMut.Lock() + defer id.svcMut.Unlock() + for _, ip := range svc.IPInfo.IPs { + delete(id.svcByIP, ip) + } + } +} + +func (id *Database) ServiceInfoForIP(ip string) *kube.ServiceInfo { + id.svcMut.RLock() + defer id.svcMut.RUnlock() + return id.svcByIP[ip] +} + +func (id *Database) HostNameForIP(ip string) string { + id.svcMut.RLock() + svc, ok := id.svcByIP[ip] + id.svcMut.RUnlock() + if ok { + return svc.Name + } + id.podsMut.RLock() + pod, ok := id.podsByIP[ip] + id.podsMut.RUnlock() + if ok { + return pod.Name + } + return "" +} diff --git a/pkg/transform/k8s.go b/pkg/transform/k8s.go index d9ed96bd0..a69da4293 100644 --- a/pkg/transform/k8s.go +++ b/pkg/transform/k8s.go @@ -48,6 +48,7 @@ func KubeDecoratorProvider(ctxInfo *global.ContextInfo) pipe.MiddleProvider[[]re // production implementer: kube.Database type kubeDatabase interface { OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) + HostNameForIP(ip string) string } type metadataDecorator struct { @@ -73,6 +74,13 @@ func (md *metadataDecorator) do(span *request.Span) { // do not leave the service attributes map as nil span.ServiceID.Metadata = map[attr.Name]string{} } + // override the peer and host names from Kubernetes metadata, if found + if hn := md.db.HostNameForIP(span.Host); hn != "" { + span.HostName = hn + } + if pn := md.db.HostNameForIP(span.Peer); pn != "" { + span.PeerName = pn + } } func appendMetadata(span *request.Span, info *kube.PodInfo) { diff --git a/pkg/transform/k8s_test.go b/pkg/transform/k8s_test.go index d83d4d038..fa5c4cb3c 100644 --- a/pkg/transform/k8s_test.go +++ b/pkg/transform/k8s_test.go @@ -19,7 +19,7 @@ const timeout = 5 * time.Second func TestDecoration(t *testing.T) { // pre-populated kubernetes metadata database - dec := metadataDecorator{db: fakeDatabase{ + dec := metadataDecorator{db: &fakeDatabase{pidNSPods: map[uint32]*kube.PodInfo{ 12: &kube.PodInfo{ ObjectMeta: v1.ObjectMeta{ Name: "pod-12", Namespace: "the-ns", UID: "uid-12", @@ -43,7 +43,7 @@ func TestDecoration(t *testing.T) { NodeName: "the-node", StartTimeStr: "2020-01-02 12:56:56", }, - }} + }}} inputCh, outputhCh := make(chan []request.Span, 10), make(chan []request.Span, 10) defer close(inputCh) go dec.nodeLoop(inputCh, outputhCh) @@ -127,9 +127,16 @@ func TestDecoration(t *testing.T) { }) } -type fakeDatabase map[uint32]*kube.PodInfo +type fakeDatabase struct { + pidNSPods map[uint32]*kube.PodInfo + ipNames map[string]string +} + +func (f *fakeDatabase) HostNameForIP(ip string) string { + return f.ipNames[ip] +} -func (f fakeDatabase) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) { - pi, ok := f[pidNamespace] +func (f *fakeDatabase) OwnerPodInfo(pidNamespace uint32) (*kube.PodInfo, bool) { + pi, ok := f.pidNSPods[pidNamespace] return pi, ok } diff --git a/pkg/transform/name_resolver.go b/pkg/transform/name_resolver.go index 584e7ac4c..568139815 100644 --- a/pkg/transform/name_resolver.go +++ b/pkg/transform/name_resolver.go @@ -74,12 +74,21 @@ func trimPrefixIgnoreCase(s, prefix string) string { } func (nr *NameResolver) resolveNames(span *request.Span) { + var hn, pn string if span.IsClientSpan() { - span.HostName, span.OtherNamespace = nr.resolve(&span.ServiceID, span.Host) - span.PeerName, _ = nr.resolve(&span.ServiceID, span.Peer) + hn, span.OtherNamespace = nr.resolve(&span.ServiceID, span.Host) + pn, _ = nr.resolve(&span.ServiceID, span.Peer) } else { - span.PeerName, span.OtherNamespace = nr.resolve(&span.ServiceID, span.Peer) - span.HostName, _ = nr.resolve(&span.ServiceID, span.Host) + pn, span.OtherNamespace = nr.resolve(&span.ServiceID, span.Peer) + hn, _ = nr.resolve(&span.ServiceID, span.Host) + } + // don't set names if the peer and host names have been already decorated + // in a previous stage (e.g. Kubernetes decorator) + if pn != "" { + span.PeerName = pn + } + if hn != "" { + span.HostName = hn } } diff --git a/test/integration/k8s/common/k8s_metrics_testfuncs.go b/test/integration/k8s/common/k8s_metrics_testfuncs.go index 9967bf1a3..13d929f14 100644 --- a/test/integration/k8s/common/k8s_metrics_testfuncs.go +++ b/test/integration/k8s/common/k8s_metrics_testfuncs.go @@ -122,7 +122,7 @@ func FeatureHTTPMetricsDecoration(manifest string) features.Feature { "k8s_replicaset_name": "^testserver-", })). Assess("all the span graph metrics exist", - testMetricsDecoration(spanGraphMetrics, `{server="testserver"}`, map[string]string{ + testMetricsDecoration(spanGraphMetrics, `{server="testserver",client="internal-pinger"}`, map[string]string{ "server_service_namespace": "integration-test", "source": "beyla", }),