diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 904623734..4e44b90c6 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -75,7 +75,7 @@ func configureInformer(cfg *Config, log *logrus.Entry) ifaces.Informer { } -func interfaceListener(ctx context.Context, ifaceEvents <-chan ifaces.Event, slog *logrus.Entry, eventAdded func(iface ifaces.Interface)) { +func interfaceListener(ctx context.Context, ifaceEvents <-chan ifaces.Event, slog *logrus.Entry, processEvent func(iface ifaces.Interface, add bool)) { for { select { case <-ctx.Done(): @@ -85,10 +85,9 @@ func interfaceListener(ctx context.Context, ifaceEvents <-chan ifaces.Event, slo slog.WithField("event", event).Debug("received event") switch event.Type { case ifaces.EventAdded: - eventAdded(event.Interface) + processEvent(event.Interface, true) case ifaces.EventDeleted: - // qdiscs, ingress and egress filters are automatically deleted so we don't need to - // specifically detach them from the ebpfFetcher + processEvent(event.Interface, false) default: slog.WithField("event", event).Warn("unknown event type") } @@ -125,7 +124,9 @@ type Flows struct { type ebpfFlowFetcher interface { io.Closer Register(iface ifaces.Interface) error + UnRegister(iface ifaces.Interface) error AttachTCX(iface ifaces.Interface) error + DetachTCX(iface ifaces.Interface) error LookupAndDeleteMap(*metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics DeleteMapsStaleEntries(timeOut time.Duration) @@ -432,7 +433,7 @@ func (f *Flows) Status() Status { // interfacesManager uses an informer to check new/deleted network interfaces. For each running // interface, it registers a flow ebpfFetcher that will forward new flows to the returned channel -// TODO: consider move this method and "onInterfaceAdded" to another type +// TODO: consider move this method and "onInterfaceEvent" to another type func (f *Flows) interfacesManager(ctx context.Context) error { slog := alog.WithField("function", "interfacesManager") @@ -442,7 +443,7 @@ func (f *Flows) interfacesManager(ctx context.Context) error { return fmt.Errorf("instantiating interfaces' informer: %w", err) } - go interfaceListener(ctx, ifaceEvents, slog, f.onInterfaceAdded) + go interfaceListener(ctx, ifaceEvents, slog, f.onInterfaceEvent) return nil } @@ -498,7 +499,7 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl return export, nil } -func (f *Flows) onInterfaceAdded(iface ifaces.Interface) { +func (f *Flows) onInterfaceEvent(iface ifaces.Interface, add bool) { // ignore interfaces that do not match the user configuration acceptance/exclusion lists allowed, err := f.filter.Allowed(iface.Name) if err != nil { @@ -510,14 +511,28 @@ func (f *Flows) onInterfaceAdded(iface ifaces.Interface) { Debug("interface does not match the allow/exclusion filters. Ignoring") return } - alog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook") - if err := f.ebpf.AttachTCX(iface); err != nil { - alog.WithField("interface", iface).WithError(err). - Info("can't attach to TCx hook flow ebpfFetcher. fall back to use legacy TC hook") - if err := f.ebpf.Register(iface); err != nil { + if add { + alog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook") + if err := f.ebpf.AttachTCX(iface); err != nil { alog.WithField("interface", iface).WithError(err). - Warn("can't register flow ebpfFetcher. Ignoring") - return + Info("can't attach to TCx hook flow ebpfFetcher. fall back to use legacy TC hook") + if err := f.ebpf.Register(iface); err != nil { + alog.WithField("interface", iface).WithError(err). + Warn("can't register flow ebpfFetcher. Ignoring") + return + } } + } else { + alog.WithField("interface", iface).Info("interface deleted. trying to detach TCX hook") + if err := f.ebpf.DetachTCX(iface); err != nil { + alog.WithField("interface", iface).WithError(err). + Info("can't detach from TCx hook flow ebpfFetcher. fall back to use legacy TC hook") + if err := f.ebpf.UnRegister(iface); err != nil { + alog.WithField("interface", iface).WithError(err). + Warn("can't unregister flow ebpfFetcher. Ignoring") + return + } + } + } } diff --git a/pkg/agent/packets_agent.go b/pkg/agent/packets_agent.go index 70c89803c..550a8a3fe 100644 --- a/pkg/agent/packets_agent.go +++ b/pkg/agent/packets_agent.go @@ -40,7 +40,9 @@ type Packets struct { type ebpfPacketFetcher interface { io.Closer Register(iface ifaces.Interface) error + UnRegister(iface ifaces.Interface) error AttachTCX(iface ifaces.Interface) error + DetachTCX(iface ifaces.Interface) error LookupAndDeleteMap(*metrics.Metrics) map[int][]*byte ReadPerf() (perf.Record, error) } @@ -234,7 +236,7 @@ func (p *Packets) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]* return export, nil } -func (p *Packets) onInterfaceAdded(iface ifaces.Interface) { +func (p *Packets) onInterfaceAdded(iface ifaces.Interface, add bool) { // ignore interfaces that do not match the user configuration acceptance/exclusion lists allowed, err := p.filter.Allowed(iface.Name) if err != nil { @@ -246,14 +248,28 @@ func (p *Packets) onInterfaceAdded(iface ifaces.Interface) { Debug("[PCA]interface does not match the allow/exclusion filters. Ignoring") return } - plog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook") - if err := p.ebpf.AttachTCX(iface); err != nil { - plog.WithField("[PCA]interface", iface).WithError(err). - Info("can't attach to TCx hook packet ebpfFetcher. fall back to use legacy TC hook") - if err := p.ebpf.Register(iface); err != nil { + if add { + plog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook") + if err := p.ebpf.AttachTCX(iface); err != nil { + plog.WithField("[PCA]interface", iface).WithError(err). + Info("can't attach to TCx hook packet ebpfFetcher. fall back to use legacy TC hook") + if err := p.ebpf.Register(iface); err != nil { + plog.WithField("[PCA]interface", iface).WithError(err). + Warn("can't register packet ebpfFetcher. Ignoring") + return + } + } + } else { + plog.WithField("interface", iface).Info("interface deleted. trying to detach TCX hook") + if err := p.ebpf.DetachTCX(iface); err != nil { plog.WithField("[PCA]interface", iface).WithError(err). - Warn("can't register packet ebpfFetcher. Ignoring") - return + Info("can't detach from TCx hook packet ebpfFetcher. check if there is any legacy TC hook") + if err := p.ebpf.UnRegister(iface); err != nil { + plog.WithField("[PCA]interface", iface).WithError(err). + Warn("can't unregister packet ebpfFetcher. Ignoring") + return + } } + } } diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index 108bb0f9c..9dbd2bd3c 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -265,6 +265,48 @@ func (m *FlowFetcher) AttachTCX(iface ifaces.Interface) error { return nil } +func (m *FlowFetcher) DetachTCX(iface ifaces.Interface) error { + ilog := log.WithField("iface", iface) + if iface.NetNS != netns.None() { + originalNs, err := netns.Get() + if err != nil { + return fmt.Errorf("failed to get current netns: %w", err) + } + defer func() { + if err := netns.Set(originalNs); err != nil { + ilog.WithError(err).Error("failed to set netns back") + } + originalNs.Close() + }() + if err := unix.Setns(int(iface.NetNS), unix.CLONE_NEWNET); err != nil { + return fmt.Errorf("failed to setns to %s: %w", iface.NetNS, err) + } + } + if m.enableEgress { + if l := m.egressTCXLink[iface]; l != nil { + if err := l.Close(); err != nil { + return fmt.Errorf("TCX: failed to close egress link: %w", err) + } + ilog.WithField("interface", iface.Name).Debug("successfully detach egressTCX hook") + } else { + return fmt.Errorf("egress link does not have a TCX egress hook") + } + } + + if m.enableIngress { + if l := m.ingressTCXLink[iface]; l != nil { + if err := l.Close(); err != nil { + return fmt.Errorf("TCX: failed to close ingress link: %w", err) + } + ilog.WithField("interface", iface.Name).Debug("successfully detach ingressTCX hook") + } else { + return fmt.Errorf("ingress link does not have a TCX ingress hook") + } + } + + return nil +} + func removeTCFilters(ifName string, tcDir uint32) error { link, err := netlink.LinkByName(ifName) if err != nil { @@ -285,7 +327,7 @@ func removeTCFilters(ifName string, tcDir uint32) error { return kerrors.NewAggregate(errs) } -func (m *FlowFetcher) removePreviousFilters(iface ifaces.Interface) error { +func unregister(iface ifaces.Interface) error { ilog := log.WithField("iface", iface) ilog.Debugf("looking for previously installed TC filters on %s", iface.Name) links, err := netlink.LinkList() @@ -343,6 +385,12 @@ func (m *FlowFetcher) removePreviousFilters(iface ifaces.Interface) error { return nil } +func (m *FlowFetcher) UnRegister(iface ifaces.Interface) error { + // qdiscs, ingress and egress filters are automatically deleted so we don't need to + // specifically detach them from the ebpfFetcher + return unregister(iface) +} + // Register and links the eBPF fetcher into the system. The program should invoke Unregister // before exiting. func (m *FlowFetcher) Register(iface ifaces.Interface) error { @@ -380,7 +428,7 @@ func (m *FlowFetcher) Register(iface ifaces.Interface) error { m.qdiscs[iface] = qdisc // Remove previously installed filters - if err := m.removePreviousFilters(iface); err != nil { + if err := unregister(iface); err != nil { return fmt.Errorf("failed to remove previous filters: %w", err) } @@ -912,8 +960,13 @@ func registerInterface(iface ifaces.Interface) (*netlink.GenericQdisc, netlink.L return qdisc, ipvlan, nil } -func (p *PacketFetcher) Register(iface ifaces.Interface) error { +func (p *PacketFetcher) UnRegister(iface ifaces.Interface) error { + // qdiscs, ingress and egress filters are automatically deleted so we don't need to + // specifically detach them from the ebpfFetcher + return unregister(iface) +} +func (p *PacketFetcher) Register(iface ifaces.Interface) error { qdisc, ipvlan, err := registerInterface(iface) if err != nil { return err @@ -926,6 +979,47 @@ func (p *PacketFetcher) Register(iface ifaces.Interface) error { return p.registerIngress(iface, ipvlan) } +func (p *PacketFetcher) DetachTCX(iface ifaces.Interface) error { + ilog := log.WithField("iface", iface) + if iface.NetNS != netns.None() { + originalNs, err := netns.Get() + if err != nil { + return fmt.Errorf("PCA failed to get current netns: %w", err) + } + defer func() { + if err := netns.Set(originalNs); err != nil { + ilog.WithError(err).Error("PCA failed to set netns back") + } + originalNs.Close() + }() + if err := unix.Setns(int(iface.NetNS), unix.CLONE_NEWNET); err != nil { + return fmt.Errorf("PCA failed to setns to %s: %w", iface.NetNS, err) + } + } + if p.enableEgress { + if l := p.egressTCXLink[iface]; l != nil { + if err := l.Close(); err != nil { + return fmt.Errorf("TCX: failed to close egress link: %w", err) + } + ilog.WithField("interface", iface.Name).Debug("successfully detach egressTCX hook") + } else { + return fmt.Errorf("egress link does not support TCX hook") + } + } + + if p.enableIngress { + if l := p.ingressTCXLink[iface]; l != nil { + if err := l.Close(); err != nil { + return fmt.Errorf("TCX: failed to close ingress link: %w", err) + } + ilog.WithField("interface", iface.Name).Debug("successfully detach ingressTCX hook") + } else { + return fmt.Errorf("ingress link does not support TCX hook") + } + } + return nil +} + func (p *PacketFetcher) AttachTCX(iface ifaces.Interface) error { ilog := log.WithField("iface", iface) if iface.NetNS != netns.None() { diff --git a/pkg/ifaces/watcher.go b/pkg/ifaces/watcher.go index b45c8ee41..25274d6d7 100644 --- a/pkg/ifaces/watcher.go +++ b/pkg/ifaces/watcher.go @@ -82,6 +82,9 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) { "netnsHandle": netnsHandle.String(), "error": err, }).Debug("linkSubscribe failed retry") + if err := netnsHandle.Close(); err != nil { + log.WithError(err).Warn("netnsHandle close failed") + } return false, nil } diff --git a/pkg/test/tracer_fake.go b/pkg/test/tracer_fake.go index 08a3dbf54..3d07128e7 100644 --- a/pkg/test/tracer_fake.go +++ b/pkg/test/tracer_fake.go @@ -36,10 +36,19 @@ func (m *TracerFake) Register(iface ifaces.Interface) error { return nil } +func (m *TracerFake) UnRegister(iface ifaces.Interface) error { + m.interfaces[iface] = struct{}{} + return nil +} + func (m *TracerFake) AttachTCX(_ ifaces.Interface) error { return nil } +func (m *TracerFake) DetachTCX(_ ifaces.Interface) error { + return nil +} + func (m *TracerFake) LookupAndDeleteMap(_ *metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics { select { case r := <-m.mapLookups: