From ae750f71d07a31534c579806b3a1160f42262493 Mon Sep 17 00:00:00 2001 From: Nikola Grcevski <6207777+grcevski@users.noreply.github.com> Date: Fri, 30 Aug 2024 15:17:27 -0400 Subject: [PATCH] Resolve node ips (#1121) --- pkg/internal/kube/informer.go | 14 +++++ pkg/internal/transform/kube/db.go | 73 +++++++++++++++++++++++++ pkg/transform/name_resolver.go | 11 +--- pkg/transform/name_resolver_test.go | 83 +++++++++++++++++++++++++++++ 4 files changed, 171 insertions(+), 10 deletions(-) diff --git a/pkg/internal/kube/informer.go b/pkg/internal/kube/informer.go index 067fc4126..f05f77794 100644 --- a/pkg/internal/kube/informer.go +++ b/pkg/internal/kube/informer.go @@ -415,6 +415,20 @@ func (k *Metadata) AddReplicaSetEventHandler(h cache.ResourceEventHandler) error return err } +func (k *Metadata) AddNodeEventHandler(h cache.ResourceEventHandler) error { + if k.disabledInformers.Has(InformerNode) { + return nil + } + _, err := k.nodesIP.AddEventHandler(h) + // passing a snapshot of the currently stored entities + go func() { + for _, node := range k.nodesIP.GetStore().List() { + h.OnAdd(node, true) + } + }() + return err +} + func (i *PodInfo) ServiceName() string { if i.Owner != nil { // we have two levels of ownership at most diff --git a/pkg/internal/transform/kube/db.go b/pkg/internal/transform/kube/db.go index 438480a02..fbba34a24 100644 --- a/pkg/internal/transform/kube/db.go +++ b/pkg/internal/transform/kube/db.go @@ -41,6 +41,10 @@ type Database struct { // ip to service name matcher svcMut sync.RWMutex svcByIP map[string]*kube.ServiceInfo + + // ip to node name matcher + nodeMut sync.RWMutex + nodeByIP map[string]*kube.NodeInfo } func CreateDatabase(kubeMetadata *kube.Metadata) Database { @@ -50,6 +54,7 @@ func CreateDatabase(kubeMetadata *kube.Metadata) Database { namespaces: map[uint32]*container.Info{}, podsByIP: map[string]*kube.PodInfo{}, svcByIP: map[string]*kube.ServiceInfo{}, + nodeByIP: map[string]*kube.NodeInfo{}, informer: kubeMetadata, } } @@ -86,6 +91,20 @@ func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) { }); err != nil { return nil, fmt.Errorf("can't register Database as Service event handler: %w", err) } + if err := db.informer.AddNodeEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + db.UpdateNewNodesByIPIndex(obj.(*kube.NodeInfo)) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + db.UpdateDeletedNodesByIPIndex(oldObj.(*kube.NodeInfo)) + db.UpdateNewNodesByIPIndex(newObj.(*kube.NodeInfo)) + }, + DeleteFunc: func(obj interface{}) { + db.UpdateDeletedNodesByIPIndex(obj.(*kube.NodeInfo)) + }, + }); err != nil { + return nil, fmt.Errorf("can't register Database as Node event handler: %w", err) + } return &db, nil } @@ -217,6 +236,32 @@ func (id *Database) ServiceInfoForIP(ip string) *kube.ServiceInfo { return id.svcByIP[ip] } +func (id *Database) UpdateNewNodesByIPIndex(svc *kube.NodeInfo) { + id.nodeMut.Lock() + defer id.nodeMut.Unlock() + if len(svc.IPInfo.IPs) > 0 { + for _, ip := range svc.IPInfo.IPs { + id.nodeByIP[ip] = svc + } + } +} + +func (id *Database) UpdateDeletedNodesByIPIndex(svc *kube.NodeInfo) { + id.nodeMut.Lock() + defer id.nodeMut.Unlock() + if len(svc.IPInfo.IPs) > 0 { + for _, ip := range svc.IPInfo.IPs { + delete(id.nodeByIP, ip) + } + } +} + +func (id *Database) NodeInfoForIP(ip string) *kube.NodeInfo { + id.nodeMut.RLock() + defer id.nodeMut.RUnlock() + return id.nodeByIP[ip] +} + func (id *Database) HostNameForIP(ip string) string { id.svcMut.RLock() svc, ok := id.svcByIP[ip] @@ -230,5 +275,33 @@ func (id *Database) HostNameForIP(ip string) string { if ok { return pod.Name } + id.nodeMut.RLock() + node, ok := id.nodeByIP[ip] + id.nodeMut.RUnlock() + if ok { + return node.Name + } return "" } + +func (id *Database) ServiceNameNamespaceForIP(ip string) (string, string) { + id.svcMut.RLock() + svc, ok := id.svcByIP[ip] + id.svcMut.RUnlock() + if ok { + return svc.Name, svc.Namespace + } + id.podsMut.RLock() + pod, ok := id.podsByIP[ip] + id.podsMut.RUnlock() + if ok { + return pod.ServiceName(), pod.Namespace + } + id.nodeMut.RLock() + node, ok := id.nodeByIP[ip] + id.nodeMut.RUnlock() + if ok { + return node.Name, node.Namespace + } + return "", "" +} diff --git a/pkg/transform/name_resolver.go b/pkg/transform/name_resolver.go index 5fa005f9d..1d8cd8696 100644 --- a/pkg/transform/name_resolver.go +++ b/pkg/transform/name_resolver.go @@ -173,16 +173,7 @@ func (nr *NameResolver) dnsResolve(svc *svc.ID, ip string) (string, string) { } func (nr *NameResolver) resolveFromK8s(ip string) (string, string) { - svcInfo := nr.db.ServiceInfoForIP(ip) - if svcInfo == nil { - podInfo := nr.db.PodInfoForIP(ip) - if podInfo == nil { - return "", "" - } - return podInfo.ServiceName(), podInfo.Namespace - } - - return svcInfo.Name, svcInfo.Namespace + return nr.db.ServiceNameNamespaceForIP(ip) } func (nr *NameResolver) resolveIP(ip string) string { diff --git a/pkg/transform/name_resolver_test.go b/pkg/transform/name_resolver_test.go index 43e8e6a6a..70fd54b0c 100644 --- a/pkg/transform/name_resolver_test.go +++ b/pkg/transform/name_resolver_test.go @@ -215,3 +215,86 @@ func TestCleanName(t *testing.T) { assert.Equal(t, "service", nr.cleanName(&s, "127.0.0.1", "service.special.namespace.svc.cluster.local.")) assert.Equal(t, "service", nr.cleanName(&s, "127.0.0.1", "service.k8snamespace.svc.cluster.local.")) } + +func TestResolveNodesFromK8s(t *testing.T) { + db := kube.CreateDatabase(nil) + + node1 := kube2.NodeInfo{ + ObjectMeta: metav1.ObjectMeta{Name: "node1"}, + IPInfo: kube2.IPInfo{IPs: []string{"10.0.0.1", "10.1.0.1"}}, + } + + node2 := kube2.NodeInfo{ + ObjectMeta: metav1.ObjectMeta{Name: "node2", Namespace: "something"}, + IPInfo: kube2.IPInfo{IPs: []string{"10.0.0.2", "10.1.0.2"}}, + } + + node3 := kube2.NodeInfo{ + ObjectMeta: metav1.ObjectMeta{Name: "node3"}, + IPInfo: kube2.IPInfo{IPs: []string{"10.0.0.3", "10.1.0.3"}}, + } + + db.UpdateNewNodesByIPIndex(&node1) + db.UpdateNewNodesByIPIndex(&node2) + db.UpdateNewNodesByIPIndex(&node3) + + assert.Equal(t, &node1, db.NodeInfoForIP("10.0.0.1")) + assert.Equal(t, &node1, db.NodeInfoForIP("10.1.0.1")) + assert.Equal(t, &node2, db.NodeInfoForIP("10.0.0.2")) + assert.Equal(t, &node2, db.NodeInfoForIP("10.1.0.2")) + assert.Equal(t, &node3, db.NodeInfoForIP("10.1.0.3")) + db.UpdateDeletedNodesByIPIndex(&node3) + assert.Nil(t, db.NodeInfoForIP("10.1.0.3")) + + nr := NameResolver{ + db: &db, + cache: expirable.NewLRU[string, string](10, nil, 5*time.Hour), + sources: resolverSources([]string{"dns", "k8s"}), + } + + name, namespace := nr.resolveFromK8s("10.0.0.1") + assert.Equal(t, "node1", name) + assert.Equal(t, "", namespace) + + name, namespace = nr.resolveFromK8s("10.0.0.2") + assert.Equal(t, "node2", name) + assert.Equal(t, "something", namespace) + + name, namespace = nr.resolveFromK8s("10.0.0.3") + assert.Equal(t, "", name) + assert.Equal(t, "", namespace) + + clientSpan := request.Span{ + Type: request.EventTypeHTTPClient, + Peer: "10.0.0.1", + Host: "10.0.0.2", + ServiceID: svc.ID{ + Name: "node1", + Namespace: "", + }, + } + + serverSpan := request.Span{ + Type: request.EventTypeHTTP, + Peer: "10.0.0.1", + Host: "10.0.0.2", + ServiceID: svc.ID{ + Name: "node2", + Namespace: "something", + }, + } + + nr.resolveNames(&clientSpan) + + assert.Equal(t, "node1", clientSpan.PeerName) + assert.Equal(t, "", clientSpan.ServiceID.Namespace) + assert.Equal(t, "node2", clientSpan.HostName) + assert.Equal(t, "something", clientSpan.OtherNamespace) + + nr.resolveNames(&serverSpan) + + assert.Equal(t, "node1", serverSpan.PeerName) + assert.Equal(t, "", serverSpan.OtherNamespace) + assert.Equal(t, "node2", serverSpan.HostName) + assert.Equal(t, "something", serverSpan.ServiceID.Namespace) +}