Skip to content

Commit

Permalink
Resolve node ips (#1121)
Browse files Browse the repository at this point in the history
  • Loading branch information
grcevski authored Aug 30, 2024
1 parent 4494fae commit ae750f7
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 10 deletions.
14 changes: 14 additions & 0 deletions pkg/internal/kube/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,20 @@ func (k *Metadata) AddReplicaSetEventHandler(h cache.ResourceEventHandler) error
return err
}

func (k *Metadata) AddNodeEventHandler(h cache.ResourceEventHandler) error {
if k.disabledInformers.Has(InformerNode) {
return nil
}
_, err := k.nodesIP.AddEventHandler(h)
// passing a snapshot of the currently stored entities
go func() {
for _, node := range k.nodesIP.GetStore().List() {
h.OnAdd(node, true)
}
}()
return err
}

func (i *PodInfo) ServiceName() string {
if i.Owner != nil {
// we have two levels of ownership at most
Expand Down
73 changes: 73 additions & 0 deletions pkg/internal/transform/kube/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type Database struct {
// ip to service name matcher
svcMut sync.RWMutex
svcByIP map[string]*kube.ServiceInfo

// ip to node name matcher
nodeMut sync.RWMutex
nodeByIP map[string]*kube.NodeInfo
}

func CreateDatabase(kubeMetadata *kube.Metadata) Database {
Expand All @@ -50,6 +54,7 @@ func CreateDatabase(kubeMetadata *kube.Metadata) Database {
namespaces: map[uint32]*container.Info{},
podsByIP: map[string]*kube.PodInfo{},
svcByIP: map[string]*kube.ServiceInfo{},
nodeByIP: map[string]*kube.NodeInfo{},
informer: kubeMetadata,
}
}
Expand Down Expand Up @@ -86,6 +91,20 @@ func StartDatabase(kubeMetadata *kube.Metadata) (*Database, error) {
}); err != nil {
return nil, fmt.Errorf("can't register Database as Service event handler: %w", err)
}
if err := db.informer.AddNodeEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
db.UpdateNewNodesByIPIndex(obj.(*kube.NodeInfo))
},
UpdateFunc: func(oldObj, newObj interface{}) {
db.UpdateDeletedNodesByIPIndex(oldObj.(*kube.NodeInfo))
db.UpdateNewNodesByIPIndex(newObj.(*kube.NodeInfo))
},
DeleteFunc: func(obj interface{}) {
db.UpdateDeletedNodesByIPIndex(obj.(*kube.NodeInfo))
},
}); err != nil {
return nil, fmt.Errorf("can't register Database as Node event handler: %w", err)
}

return &db, nil
}
Expand Down Expand Up @@ -217,6 +236,32 @@ func (id *Database) ServiceInfoForIP(ip string) *kube.ServiceInfo {
return id.svcByIP[ip]
}

func (id *Database) UpdateNewNodesByIPIndex(svc *kube.NodeInfo) {
id.nodeMut.Lock()
defer id.nodeMut.Unlock()
if len(svc.IPInfo.IPs) > 0 {
for _, ip := range svc.IPInfo.IPs {
id.nodeByIP[ip] = svc
}
}
}

func (id *Database) UpdateDeletedNodesByIPIndex(svc *kube.NodeInfo) {
id.nodeMut.Lock()
defer id.nodeMut.Unlock()
if len(svc.IPInfo.IPs) > 0 {
for _, ip := range svc.IPInfo.IPs {
delete(id.nodeByIP, ip)
}
}
}

func (id *Database) NodeInfoForIP(ip string) *kube.NodeInfo {
id.nodeMut.RLock()
defer id.nodeMut.RUnlock()
return id.nodeByIP[ip]
}

func (id *Database) HostNameForIP(ip string) string {
id.svcMut.RLock()
svc, ok := id.svcByIP[ip]
Expand All @@ -230,5 +275,33 @@ func (id *Database) HostNameForIP(ip string) string {
if ok {
return pod.Name
}
id.nodeMut.RLock()
node, ok := id.nodeByIP[ip]
id.nodeMut.RUnlock()
if ok {
return node.Name
}
return ""
}

func (id *Database) ServiceNameNamespaceForIP(ip string) (string, string) {
id.svcMut.RLock()
svc, ok := id.svcByIP[ip]
id.svcMut.RUnlock()
if ok {
return svc.Name, svc.Namespace
}
id.podsMut.RLock()
pod, ok := id.podsByIP[ip]
id.podsMut.RUnlock()
if ok {
return pod.ServiceName(), pod.Namespace
}
id.nodeMut.RLock()
node, ok := id.nodeByIP[ip]
id.nodeMut.RUnlock()
if ok {
return node.Name, node.Namespace
}
return "", ""
}
11 changes: 1 addition & 10 deletions pkg/transform/name_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,16 +173,7 @@ func (nr *NameResolver) dnsResolve(svc *svc.ID, ip string) (string, string) {
}

func (nr *NameResolver) resolveFromK8s(ip string) (string, string) {
svcInfo := nr.db.ServiceInfoForIP(ip)
if svcInfo == nil {
podInfo := nr.db.PodInfoForIP(ip)
if podInfo == nil {
return "", ""
}
return podInfo.ServiceName(), podInfo.Namespace
}

return svcInfo.Name, svcInfo.Namespace
return nr.db.ServiceNameNamespaceForIP(ip)
}

func (nr *NameResolver) resolveIP(ip string) string {
Expand Down
83 changes: 83 additions & 0 deletions pkg/transform/name_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,86 @@ func TestCleanName(t *testing.T) {
assert.Equal(t, "service", nr.cleanName(&s, "127.0.0.1", "service.special.namespace.svc.cluster.local."))
assert.Equal(t, "service", nr.cleanName(&s, "127.0.0.1", "service.k8snamespace.svc.cluster.local."))
}

func TestResolveNodesFromK8s(t *testing.T) {
db := kube.CreateDatabase(nil)

node1 := kube2.NodeInfo{
ObjectMeta: metav1.ObjectMeta{Name: "node1"},
IPInfo: kube2.IPInfo{IPs: []string{"10.0.0.1", "10.1.0.1"}},
}

node2 := kube2.NodeInfo{
ObjectMeta: metav1.ObjectMeta{Name: "node2", Namespace: "something"},
IPInfo: kube2.IPInfo{IPs: []string{"10.0.0.2", "10.1.0.2"}},
}

node3 := kube2.NodeInfo{
ObjectMeta: metav1.ObjectMeta{Name: "node3"},
IPInfo: kube2.IPInfo{IPs: []string{"10.0.0.3", "10.1.0.3"}},
}

db.UpdateNewNodesByIPIndex(&node1)
db.UpdateNewNodesByIPIndex(&node2)
db.UpdateNewNodesByIPIndex(&node3)

assert.Equal(t, &node1, db.NodeInfoForIP("10.0.0.1"))
assert.Equal(t, &node1, db.NodeInfoForIP("10.1.0.1"))
assert.Equal(t, &node2, db.NodeInfoForIP("10.0.0.2"))
assert.Equal(t, &node2, db.NodeInfoForIP("10.1.0.2"))
assert.Equal(t, &node3, db.NodeInfoForIP("10.1.0.3"))
db.UpdateDeletedNodesByIPIndex(&node3)
assert.Nil(t, db.NodeInfoForIP("10.1.0.3"))

nr := NameResolver{
db: &db,
cache: expirable.NewLRU[string, string](10, nil, 5*time.Hour),
sources: resolverSources([]string{"dns", "k8s"}),
}

name, namespace := nr.resolveFromK8s("10.0.0.1")
assert.Equal(t, "node1", name)
assert.Equal(t, "", namespace)

name, namespace = nr.resolveFromK8s("10.0.0.2")
assert.Equal(t, "node2", name)
assert.Equal(t, "something", namespace)

name, namespace = nr.resolveFromK8s("10.0.0.3")
assert.Equal(t, "", name)
assert.Equal(t, "", namespace)

clientSpan := request.Span{
Type: request.EventTypeHTTPClient,
Peer: "10.0.0.1",
Host: "10.0.0.2",
ServiceID: svc.ID{
Name: "node1",
Namespace: "",
},
}

serverSpan := request.Span{
Type: request.EventTypeHTTP,
Peer: "10.0.0.1",
Host: "10.0.0.2",
ServiceID: svc.ID{
Name: "node2",
Namespace: "something",
},
}

nr.resolveNames(&clientSpan)

assert.Equal(t, "node1", clientSpan.PeerName)
assert.Equal(t, "", clientSpan.ServiceID.Namespace)
assert.Equal(t, "node2", clientSpan.HostName)
assert.Equal(t, "something", clientSpan.OtherNamespace)

nr.resolveNames(&serverSpan)

assert.Equal(t, "node1", serverSpan.PeerName)
assert.Equal(t, "", serverSpan.OtherNamespace)
assert.Equal(t, "node2", serverSpan.HostName)
assert.Equal(t, "something", serverSpan.ServiceID.Namespace)
}

0 comments on commit ae750f7

Please sign in to comment.