From 6a5afe64751932724d5302d3d5a4ac826b6f7d63 Mon Sep 17 00:00:00 2001 From: Mohamed Mahmoud Date: Tue, 27 Aug 2024 14:40:26 -0400 Subject: [PATCH] NETOBSERV-1805: make sure to cleanup pod veth's hooks when pods is deleted Today ebpf agent only cleanup pods when the pod restarts but in this issue pods where continosly recreated but it was never deleted causing the fd leak Signed-off-by: Mohamed Mahmoud --- pkg/agent/agent.go | 43 ++++++++++------ pkg/agent/packets_agent.go | 32 +++++++++--- pkg/ebpf/tracer.go | 100 +++++++++++++++++++++++++++++++++++-- pkg/ifaces/watcher.go | 3 ++ pkg/test/tracer_fake.go | 9 ++++ 5 files changed, 162 insertions(+), 25 deletions(-) diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 5b5bcb099..642e5d5ba 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -75,7 +75,7 @@ func configureInformer(cfg *Config, log *logrus.Entry) ifaces.Informer { } -func interfaceListener(ctx context.Context, ifaceEvents <-chan ifaces.Event, slog *logrus.Entry, eventAdded func(iface ifaces.Interface)) { +func interfaceListener(ctx context.Context, ifaceEvents <-chan ifaces.Event, slog *logrus.Entry, processEvent func(iface ifaces.Interface, add bool)) { for { select { case <-ctx.Done(): @@ -85,10 +85,9 @@ func interfaceListener(ctx context.Context, ifaceEvents <-chan ifaces.Event, slo slog.WithField("event", event).Debug("received event") switch event.Type { case ifaces.EventAdded: - eventAdded(event.Interface) + processEvent(event.Interface, true) case ifaces.EventDeleted: - // qdiscs, ingress and egress filters are automatically deleted so we don't need to - // specifically detach them from the ebpfFetcher + processEvent(event.Interface, false) default: slog.WithField("event", event).Warn("unknown event type") } @@ -125,7 +124,9 @@ type Flows struct { type ebpfFlowFetcher interface { io.Closer Register(iface ifaces.Interface) error + UnRegister(iface ifaces.Interface) error AttachTCX(iface ifaces.Interface) error + DetachTCX(iface ifaces.Interface) error LookupAndDeleteMap(*metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics DeleteMapsStaleEntries(timeOut time.Duration) @@ -434,7 +435,7 @@ func (f *Flows) Status() Status { // interfacesManager uses an informer to check new/deleted network interfaces. For each running // interface, it registers a flow ebpfFetcher that will forward new flows to the returned channel -// TODO: consider move this method and "onInterfaceAdded" to another type +// TODO: consider move this method and "onInterfaceEvent" to another type func (f *Flows) interfacesManager(ctx context.Context) error { slog := alog.WithField("function", "interfacesManager") @@ -444,7 +445,7 @@ func (f *Flows) interfacesManager(ctx context.Context) error { return fmt.Errorf("instantiating interfaces' informer: %w", err) } - go interfaceListener(ctx, ifaceEvents, slog, f.onInterfaceAdded) + go interfaceListener(ctx, ifaceEvents, slog, f.onInterfaceEvent) return nil } @@ -500,7 +501,7 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl return export, nil } -func (f *Flows) onInterfaceAdded(iface ifaces.Interface) { +func (f *Flows) onInterfaceEvent(iface ifaces.Interface, add bool) { // ignore interfaces that do not match the user configuration acceptance/exclusion lists allowed, err := f.filter.Allowed(iface.Name) if err != nil { @@ -512,14 +513,28 @@ func (f *Flows) onInterfaceAdded(iface ifaces.Interface) { Debug("interface does not match the allow/exclusion filters. Ignoring") return } - alog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook") - if err := f.ebpf.AttachTCX(iface); err != nil { - alog.WithField("interface", iface).WithError(err). - Info("can't attach to TCx hook flow ebpfFetcher. fall back to use legacy TC hook") - if err := f.ebpf.Register(iface); err != nil { + if add { + alog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook") + if err := f.ebpf.AttachTCX(iface); err != nil { alog.WithField("interface", iface).WithError(err). - Warn("can't register flow ebpfFetcher. Ignoring") - return + Info("can't attach to TCx hook flow ebpfFetcher. fall back to use legacy TC hook") + if err := f.ebpf.Register(iface); err != nil { + alog.WithField("interface", iface).WithError(err). + Warn("can't register flow ebpfFetcher. Ignoring") + return + } } + } else { + alog.WithField("interface", iface).Info("interface deleted. trying to detach TCX hook") + if err := f.ebpf.DetachTCX(iface); err != nil { + alog.WithField("interface", iface).WithError(err). + Info("can't detach from TCx hook flow ebpfFetcher. fall back to use legacy TC hook") + if err := f.ebpf.UnRegister(iface); err != nil { + alog.WithField("interface", iface).WithError(err). + Warn("can't unregister flow ebpfFetcher. Ignoring") + return + } + } + } } diff --git a/pkg/agent/packets_agent.go b/pkg/agent/packets_agent.go index 7635925db..dc4373e4f 100644 --- a/pkg/agent/packets_agent.go +++ b/pkg/agent/packets_agent.go @@ -41,7 +41,9 @@ type Packets struct { type ebpfPacketFetcher interface { io.Closer Register(iface ifaces.Interface) error + UnRegister(iface ifaces.Interface) error AttachTCX(iface ifaces.Interface) error + DetachTCX(iface ifaces.Interface) error LookupAndDeleteMap(*metrics.Metrics) map[int][]*byte ReadPerf() (perf.Record, error) } @@ -267,7 +269,7 @@ func (p *Packets) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]* return export, nil } -func (p *Packets) onInterfaceAdded(iface ifaces.Interface) { +func (p *Packets) onInterfaceAdded(iface ifaces.Interface, add bool) { // ignore interfaces that do not match the user configuration acceptance/exclusion lists allowed, err := p.filter.Allowed(iface.Name) if err != nil { @@ -279,14 +281,28 @@ func (p *Packets) onInterfaceAdded(iface ifaces.Interface) { Debug("[PCA]interface does not match the allow/exclusion filters. Ignoring") return } - plog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook") - if err := p.ebpf.AttachTCX(iface); err != nil { - plog.WithField("[PCA]interface", iface).WithError(err). - Info("can't attach to TCx hook packet ebpfFetcher. fall back to use legacy TC hook") - if err := p.ebpf.Register(iface); err != nil { + if add { + plog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook") + if err := p.ebpf.AttachTCX(iface); err != nil { + plog.WithField("[PCA]interface", iface).WithError(err). + Info("can't attach to TCx hook packet ebpfFetcher. fall back to use legacy TC hook") + if err := p.ebpf.Register(iface); err != nil { + plog.WithField("[PCA]interface", iface).WithError(err). + Warn("can't register packet ebpfFetcher. Ignoring") + return + } + } + } else { + plog.WithField("interface", iface).Info("interface deleted. trying to detach TCX hook") + if err := p.ebpf.DetachTCX(iface); err != nil { plog.WithField("[PCA]interface", iface).WithError(err). - Warn("can't register packet ebpfFetcher. Ignoring") - return + Info("can't detach from TCx hook packet ebpfFetcher. check if there is any legacy TC hook") + if err := p.ebpf.UnRegister(iface); err != nil { + plog.WithField("[PCA]interface", iface).WithError(err). + Warn("can't unregister packet ebpfFetcher. Ignoring") + return + } } + } } diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index 8924e1b6d..e6f8baf32 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -279,6 +279,48 @@ func (m *FlowFetcher) AttachTCX(iface ifaces.Interface) error { return nil } +func (m *FlowFetcher) DetachTCX(iface ifaces.Interface) error { + ilog := log.WithField("iface", iface) + if iface.NetNS != netns.None() { + originalNs, err := netns.Get() + if err != nil { + return fmt.Errorf("failed to get current netns: %w", err) + } + defer func() { + if err := netns.Set(originalNs); err != nil { + ilog.WithError(err).Error("failed to set netns back") + } + originalNs.Close() + }() + if err := unix.Setns(int(iface.NetNS), unix.CLONE_NEWNET); err != nil { + return fmt.Errorf("failed to setns to %s: %w", iface.NetNS, err) + } + } + if m.enableEgress { + if l := m.egressTCXLink[iface]; l != nil { + if err := l.Close(); err != nil { + return fmt.Errorf("TCX: failed to close egress link: %w", err) + } + ilog.WithField("interface", iface.Name).Debug("successfully detach egressTCX hook") + } else { + return fmt.Errorf("egress link does not have a TCX egress hook") + } + } + + if m.enableIngress { + if l := m.ingressTCXLink[iface]; l != nil { + if err := l.Close(); err != nil { + return fmt.Errorf("TCX: failed to close ingress link: %w", err) + } + ilog.WithField("interface", iface.Name).Debug("successfully detach ingressTCX hook") + } else { + return fmt.Errorf("ingress link does not have a TCX ingress hook") + } + } + + return nil +} + func removeTCFilters(ifName string, tcDir uint32) error { link, err := netlink.LinkByName(ifName) if err != nil { @@ -299,7 +341,7 @@ func removeTCFilters(ifName string, tcDir uint32) error { return kerrors.NewAggregate(errs) } -func (m *FlowFetcher) removePreviousFilters(iface ifaces.Interface) error { +func unregister(iface ifaces.Interface) error { ilog := log.WithField("iface", iface) ilog.Debugf("looking for previously installed TC filters on %s", iface.Name) links, err := netlink.LinkList() @@ -357,6 +399,12 @@ func (m *FlowFetcher) removePreviousFilters(iface ifaces.Interface) error { return nil } +func (m *FlowFetcher) UnRegister(iface ifaces.Interface) error { + // qdiscs, ingress and egress filters are automatically deleted so we don't need to + // specifically detach them from the ebpfFetcher + return unregister(iface) +} + // Register and links the eBPF fetcher into the system. The program should invoke Unregister // before exiting. func (m *FlowFetcher) Register(iface ifaces.Interface) error { @@ -394,7 +442,7 @@ func (m *FlowFetcher) Register(iface ifaces.Interface) error { m.qdiscs[iface] = qdisc // Remove previously installed filters - if err := m.removePreviousFilters(iface); err != nil { + if err := unregister(iface); err != nil { return fmt.Errorf("failed to remove previous filters: %w", err) } @@ -917,8 +965,13 @@ func registerInterface(iface ifaces.Interface) (*netlink.GenericQdisc, netlink.L return qdisc, ipvlan, nil } -func (p *PacketFetcher) Register(iface ifaces.Interface) error { +func (p *PacketFetcher) UnRegister(iface ifaces.Interface) error { + // qdiscs, ingress and egress filters are automatically deleted so we don't need to + // specifically detach them from the ebpfFetcher + return unregister(iface) +} +func (p *PacketFetcher) Register(iface ifaces.Interface) error { qdisc, ipvlan, err := registerInterface(iface) if err != nil { return err @@ -931,6 +984,47 @@ func (p *PacketFetcher) Register(iface ifaces.Interface) error { return p.registerIngress(iface, ipvlan) } +func (p *PacketFetcher) DetachTCX(iface ifaces.Interface) error { + ilog := log.WithField("iface", iface) + if iface.NetNS != netns.None() { + originalNs, err := netns.Get() + if err != nil { + return fmt.Errorf("PCA failed to get current netns: %w", err) + } + defer func() { + if err := netns.Set(originalNs); err != nil { + ilog.WithError(err).Error("PCA failed to set netns back") + } + originalNs.Close() + }() + if err := unix.Setns(int(iface.NetNS), unix.CLONE_NEWNET); err != nil { + return fmt.Errorf("PCA failed to setns to %s: %w", iface.NetNS, err) + } + } + if p.enableEgress { + if l := p.egressTCXLink[iface]; l != nil { + if err := l.Close(); err != nil { + return fmt.Errorf("TCX: failed to close egress link: %w", err) + } + ilog.WithField("interface", iface.Name).Debug("successfully detach egressTCX hook") + } else { + return fmt.Errorf("egress link does not support TCX hook") + } + } + + if p.enableIngress { + if l := p.ingressTCXLink[iface]; l != nil { + if err := l.Close(); err != nil { + return fmt.Errorf("TCX: failed to close ingress link: %w", err) + } + ilog.WithField("interface", iface.Name).Debug("successfully detach ingressTCX hook") + } else { + return fmt.Errorf("ingress link does not support TCX hook") + } + } + return nil +} + func (p *PacketFetcher) AttachTCX(iface ifaces.Interface) error { ilog := log.WithField("iface", iface) if iface.NetNS != netns.None() { diff --git a/pkg/ifaces/watcher.go b/pkg/ifaces/watcher.go index b45c8ee41..25274d6d7 100644 --- a/pkg/ifaces/watcher.go +++ b/pkg/ifaces/watcher.go @@ -82,6 +82,9 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) { "netnsHandle": netnsHandle.String(), "error": err, }).Debug("linkSubscribe failed retry") + if err := netnsHandle.Close(); err != nil { + log.WithError(err).Warn("netnsHandle close failed") + } return false, nil } diff --git a/pkg/test/tracer_fake.go b/pkg/test/tracer_fake.go index 08a3dbf54..3d07128e7 100644 --- a/pkg/test/tracer_fake.go +++ b/pkg/test/tracer_fake.go @@ -36,10 +36,19 @@ func (m *TracerFake) Register(iface ifaces.Interface) error { return nil } +func (m *TracerFake) UnRegister(iface ifaces.Interface) error { + m.interfaces[iface] = struct{}{} + return nil +} + func (m *TracerFake) AttachTCX(_ ifaces.Interface) error { return nil } +func (m *TracerFake) DetachTCX(_ ifaces.Interface) error { + return nil +} + func (m *TracerFake) LookupAndDeleteMap(_ *metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics { select { case r := <-m.mapLookups: