Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1805: make sure to cleanup pod veth's hooks when pods is deleted #399

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func configureInformer(cfg *Config, log *logrus.Entry) ifaces.Informer {

}

func interfaceListener(ctx context.Context, ifaceEvents <-chan ifaces.Event, slog *logrus.Entry, eventAdded func(iface ifaces.Interface)) {
func interfaceListener(ctx context.Context, ifaceEvents <-chan ifaces.Event, slog *logrus.Entry, processEvent func(iface ifaces.Interface, add bool)) {
for {
select {
case <-ctx.Done():
Expand All @@ -85,10 +85,9 @@ func interfaceListener(ctx context.Context, ifaceEvents <-chan ifaces.Event, slo
slog.WithField("event", event).Debug("received event")
switch event.Type {
case ifaces.EventAdded:
eventAdded(event.Interface)
processEvent(event.Interface, true)
case ifaces.EventDeleted:
// qdiscs, ingress and egress filters are automatically deleted so we don't need to
// specifically detach them from the ebpfFetcher
processEvent(event.Interface, false)
default:
slog.WithField("event", event).Warn("unknown event type")
}
Expand Down Expand Up @@ -125,7 +124,9 @@ type Flows struct {
type ebpfFlowFetcher interface {
io.Closer
Register(iface ifaces.Interface) error
UnRegister(iface ifaces.Interface) error
AttachTCX(iface ifaces.Interface) error
DetachTCX(iface ifaces.Interface) error

LookupAndDeleteMap(*metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics
DeleteMapsStaleEntries(timeOut time.Duration)
Expand Down Expand Up @@ -434,7 +435,7 @@ func (f *Flows) Status() Status {

// interfacesManager uses an informer to check new/deleted network interfaces. For each running
// interface, it registers a flow ebpfFetcher that will forward new flows to the returned channel
// TODO: consider move this method and "onInterfaceAdded" to another type
// TODO: consider move this method and "onInterfaceEvent" to another type
func (f *Flows) interfacesManager(ctx context.Context) error {
slog := alog.WithField("function", "interfacesManager")

Expand All @@ -444,7 +445,7 @@ func (f *Flows) interfacesManager(ctx context.Context) error {
return fmt.Errorf("instantiating interfaces' informer: %w", err)
}

go interfaceListener(ctx, ifaceEvents, slog, f.onInterfaceAdded)
go interfaceListener(ctx, ifaceEvents, slog, f.onInterfaceEvent)

return nil
}
Expand Down Expand Up @@ -500,7 +501,7 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*fl
return export, nil
}

func (f *Flows) onInterfaceAdded(iface ifaces.Interface) {
func (f *Flows) onInterfaceEvent(iface ifaces.Interface, add bool) {
// ignore interfaces that do not match the user configuration acceptance/exclusion lists
allowed, err := f.filter.Allowed(iface.Name)
if err != nil {
Expand All @@ -512,14 +513,28 @@ func (f *Flows) onInterfaceAdded(iface ifaces.Interface) {
Debug("interface does not match the allow/exclusion filters. Ignoring")
return
}
alog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook")
if err := f.ebpf.AttachTCX(iface); err != nil {
alog.WithField("interface", iface).WithError(err).
Info("can't attach to TCx hook flow ebpfFetcher. fall back to use legacy TC hook")
if err := f.ebpf.Register(iface); err != nil {
if add {
alog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook")
if err := f.ebpf.AttachTCX(iface); err != nil {
alog.WithField("interface", iface).WithError(err).
Warn("can't register flow ebpfFetcher. Ignoring")
return
Info("can't attach to TCx hook flow ebpfFetcher. fall back to use legacy TC hook")
if err := f.ebpf.Register(iface); err != nil {
alog.WithField("interface", iface).WithError(err).
Warn("can't register flow ebpfFetcher. Ignoring")
return
}
}
} else {
alog.WithField("interface", iface).Info("interface deleted. trying to detach TCX hook")
if err := f.ebpf.DetachTCX(iface); err != nil {
alog.WithField("interface", iface).WithError(err).
Info("can't detach from TCx hook flow ebpfFetcher. fall back to use legacy TC hook")
if err := f.ebpf.UnRegister(iface); err != nil {
alog.WithField("interface", iface).WithError(err).
Warn("can't unregister flow ebpfFetcher. Ignoring")
return
}
}

}
}
32 changes: 24 additions & 8 deletions pkg/agent/packets_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ type Packets struct {
type ebpfPacketFetcher interface {
io.Closer
Register(iface ifaces.Interface) error
UnRegister(iface ifaces.Interface) error
AttachTCX(iface ifaces.Interface) error
DetachTCX(iface ifaces.Interface) error
LookupAndDeleteMap(*metrics.Metrics) map[int][]*byte
ReadPerf() (perf.Record, error)
}
Expand Down Expand Up @@ -267,7 +269,7 @@ func (p *Packets) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*
return export, nil
}

func (p *Packets) onInterfaceAdded(iface ifaces.Interface) {
func (p *Packets) onInterfaceAdded(iface ifaces.Interface, add bool) {
// ignore interfaces that do not match the user configuration acceptance/exclusion lists
allowed, err := p.filter.Allowed(iface.Name)
if err != nil {
Expand All @@ -279,14 +281,28 @@ func (p *Packets) onInterfaceAdded(iface ifaces.Interface) {
Debug("[PCA]interface does not match the allow/exclusion filters. Ignoring")
return
}
plog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook")
if err := p.ebpf.AttachTCX(iface); err != nil {
plog.WithField("[PCA]interface", iface).WithError(err).
Info("can't attach to TCx hook packet ebpfFetcher. fall back to use legacy TC hook")
if err := p.ebpf.Register(iface); err != nil {
if add {
plog.WithField("interface", iface).Info("interface detected. trying to attach TCX hook")
if err := p.ebpf.AttachTCX(iface); err != nil {
plog.WithField("[PCA]interface", iface).WithError(err).
Info("can't attach to TCx hook packet ebpfFetcher. fall back to use legacy TC hook")
if err := p.ebpf.Register(iface); err != nil {
plog.WithField("[PCA]interface", iface).WithError(err).
Warn("can't register packet ebpfFetcher. Ignoring")
return
}
}
} else {
plog.WithField("interface", iface).Info("interface deleted. trying to detach TCX hook")
if err := p.ebpf.DetachTCX(iface); err != nil {
plog.WithField("[PCA]interface", iface).WithError(err).
Warn("can't register packet ebpfFetcher. Ignoring")
return
Info("can't detach from TCx hook packet ebpfFetcher. check if there is any legacy TC hook")
if err := p.ebpf.UnRegister(iface); err != nil {
plog.WithField("[PCA]interface", iface).WithError(err).
Warn("can't unregister packet ebpfFetcher. Ignoring")
return
}
}

}
}
100 changes: 97 additions & 3 deletions pkg/ebpf/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,48 @@ func (m *FlowFetcher) AttachTCX(iface ifaces.Interface) error {
return nil
}

func (m *FlowFetcher) DetachTCX(iface ifaces.Interface) error {
ilog := log.WithField("iface", iface)
if iface.NetNS != netns.None() {
originalNs, err := netns.Get()
if err != nil {
return fmt.Errorf("failed to get current netns: %w", err)
}
defer func() {
if err := netns.Set(originalNs); err != nil {
ilog.WithError(err).Error("failed to set netns back")
}
originalNs.Close()
}()
if err := unix.Setns(int(iface.NetNS), unix.CLONE_NEWNET); err != nil {
return fmt.Errorf("failed to setns to %s: %w", iface.NetNS, err)
}
}
if m.enableEgress {
if l := m.egressTCXLink[iface]; l != nil {
if err := l.Close(); err != nil {
return fmt.Errorf("TCX: failed to close egress link: %w", err)
}
ilog.WithField("interface", iface.Name).Debug("successfully detach egressTCX hook")
} else {
return fmt.Errorf("egress link does not have a TCX egress hook")
}
}

if m.enableIngress {
if l := m.ingressTCXLink[iface]; l != nil {
if err := l.Close(); err != nil {
return fmt.Errorf("TCX: failed to close ingress link: %w", err)
}
ilog.WithField("interface", iface.Name).Debug("successfully detach ingressTCX hook")
} else {
return fmt.Errorf("ingress link does not have a TCX ingress hook")
}
}

return nil
}

func removeTCFilters(ifName string, tcDir uint32) error {
link, err := netlink.LinkByName(ifName)
if err != nil {
Expand All @@ -299,7 +341,7 @@ func removeTCFilters(ifName string, tcDir uint32) error {
return kerrors.NewAggregate(errs)
}

func (m *FlowFetcher) removePreviousFilters(iface ifaces.Interface) error {
func unregister(iface ifaces.Interface) error {
ilog := log.WithField("iface", iface)
ilog.Debugf("looking for previously installed TC filters on %s", iface.Name)
links, err := netlink.LinkList()
Expand Down Expand Up @@ -357,6 +399,12 @@ func (m *FlowFetcher) removePreviousFilters(iface ifaces.Interface) error {
return nil
}

func (m *FlowFetcher) UnRegister(iface ifaces.Interface) error {
// qdiscs, ingress and egress filters are automatically deleted so we don't need to
// specifically detach them from the ebpfFetcher
return unregister(iface)
}

// Register and links the eBPF fetcher into the system. The program should invoke Unregister
// before exiting.
func (m *FlowFetcher) Register(iface ifaces.Interface) error {
Expand Down Expand Up @@ -394,7 +442,7 @@ func (m *FlowFetcher) Register(iface ifaces.Interface) error {
m.qdiscs[iface] = qdisc

// Remove previously installed filters
if err := m.removePreviousFilters(iface); err != nil {
if err := unregister(iface); err != nil {
return fmt.Errorf("failed to remove previous filters: %w", err)
}

Expand Down Expand Up @@ -917,8 +965,13 @@ func registerInterface(iface ifaces.Interface) (*netlink.GenericQdisc, netlink.L
return qdisc, ipvlan, nil
}

func (p *PacketFetcher) Register(iface ifaces.Interface) error {
func (p *PacketFetcher) UnRegister(iface ifaces.Interface) error {
// qdiscs, ingress and egress filters are automatically deleted so we don't need to
// specifically detach them from the ebpfFetcher
return unregister(iface)
}

func (p *PacketFetcher) Register(iface ifaces.Interface) error {
qdisc, ipvlan, err := registerInterface(iface)
if err != nil {
return err
Expand All @@ -931,6 +984,47 @@ func (p *PacketFetcher) Register(iface ifaces.Interface) error {
return p.registerIngress(iface, ipvlan)
}

func (p *PacketFetcher) DetachTCX(iface ifaces.Interface) error {
ilog := log.WithField("iface", iface)
if iface.NetNS != netns.None() {
originalNs, err := netns.Get()
if err != nil {
return fmt.Errorf("PCA failed to get current netns: %w", err)
}
defer func() {
if err := netns.Set(originalNs); err != nil {
ilog.WithError(err).Error("PCA failed to set netns back")
}
originalNs.Close()
}()
if err := unix.Setns(int(iface.NetNS), unix.CLONE_NEWNET); err != nil {
return fmt.Errorf("PCA failed to setns to %s: %w", iface.NetNS, err)
}
}
if p.enableEgress {
if l := p.egressTCXLink[iface]; l != nil {
if err := l.Close(); err != nil {
return fmt.Errorf("TCX: failed to close egress link: %w", err)
}
ilog.WithField("interface", iface.Name).Debug("successfully detach egressTCX hook")
} else {
return fmt.Errorf("egress link does not support TCX hook")
}
}

if p.enableIngress {
if l := p.ingressTCXLink[iface]; l != nil {
if err := l.Close(); err != nil {
return fmt.Errorf("TCX: failed to close ingress link: %w", err)
}
ilog.WithField("interface", iface.Name).Debug("successfully detach ingressTCX hook")
} else {
return fmt.Errorf("ingress link does not support TCX hook")
}
}
return nil
}

func (p *PacketFetcher) AttachTCX(iface ifaces.Interface) error {
ilog := log.WithField("iface", iface)
if iface.NetNS != netns.None() {
Expand Down
3 changes: 3 additions & 0 deletions pkg/ifaces/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
"netnsHandle": netnsHandle.String(),
"error": err,
}).Debug("linkSubscribe failed retry")
if err := netnsHandle.Close(); err != nil {
log.WithError(err).Warn("netnsHandle close failed")
}
return false, nil
}

Expand Down
9 changes: 9 additions & 0 deletions pkg/test/tracer_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,19 @@ func (m *TracerFake) Register(iface ifaces.Interface) error {
return nil
}

func (m *TracerFake) UnRegister(iface ifaces.Interface) error {
m.interfaces[iface] = struct{}{}
return nil
}

func (m *TracerFake) AttachTCX(_ ifaces.Interface) error {
return nil
}

func (m *TracerFake) DetachTCX(_ ifaces.Interface) error {
return nil
}

func (m *TracerFake) LookupAndDeleteMap(_ *metrics.Metrics) map[ebpf.BpfFlowId][]ebpf.BpfFlowMetrics {
select {
case r := <-m.mapLookups:
Expand Down
Loading