From d00a4f156fad5a7ba6518b2f7b8ace45f577944b Mon Sep 17 00:00:00 2001 From: Tomas Hruby Date: Fri, 18 Nov 2022 09:47:51 -0800 Subject: [PATCH] [BPF] detach cali programs from devices no longer in regex It can happen that the device regexp changes and calico programs then remain attached to devices that we do not observe anymore. For instance, user has the default ethX in the regexp, but those devices are bonded and the user does not have bondX in the regexp. The user fixes the regexp to include bondX only, but then the programs stay attached to ethX. User has to remove the programs manually or restart the node. We figure out that the programs on devices not in regexp are ours when those programs have a jump map pinned inthe calico bpffs hierarchy. This is not really a problem for workload devices as they usually disapear and programs get detached then. --- felix/bpf/tc/attach.go | 63 ++++++++++------- felix/dataplane/linux/bpf_ep_mgr.go | 78 ++++++++++++++++++++- felix/dataplane/linux/bpf_ep_mgr_test.go | 89 ++++++++++++++++++++++-- 3 files changed, 195 insertions(+), 35 deletions(-) diff --git a/felix/bpf/tc/attach.go b/felix/bpf/tc/attach.go index 1911ec2d369..fa686336e7c 100644 --- a/felix/bpf/tc/attach.go +++ b/felix/bpf/tc/attach.go @@ -187,30 +187,8 @@ func (ap *AttachPoint) AttachProgram() (int, error) { } logCxt.Info("Program attached to TC.") - var progErrs []error - for _, p := range progsToClean { - log.WithField("prog", p).Debug("Cleaning up old calico program") - attemptCleanup := func() error { - _, err := ExecTC("filter", "del", "dev", ap.Iface, string(ap.Hook), "pref", p.pref, "handle", p.handle, "bpf") - return err - } - err = attemptCleanup() - if errors.Is(err, ErrInterrupted) { - // This happens if the interface is deleted in the middle of calling tc. - log.Debug("First cleanup hit 'Dump was interrupted', retrying (once).") - err = attemptCleanup() - } - if errors.Is(err, ErrDeviceNotFound) { - continue - } - if err != nil { - log.WithError(err).WithField("prog", p).Warn("Failed to clean up old calico program.") - progErrs = append(progErrs, err) - } - } - - if len(progErrs) != 0 { - return -1, fmt.Errorf("failed to clean up one or more old calico programs: %v", progErrs) + if err := ap.detachPrograms(progsToClean); err != nil { + return -1, err } // Store information of object in a json file so in future we can skip reattaching it. @@ -240,8 +218,41 @@ func (ap *AttachPoint) patchLogPrefix(logCtx *log.Entry, ifile, ofile string) er } func (ap *AttachPoint) DetachProgram() error { - // We never detach TC programs, so this should not be called. - ap.Log().Panic("DetachProgram is not implemented for TC") + progsToClean, err := ap.listAttachedPrograms() + if err != nil { + return err + } + + return ap.detachPrograms(progsToClean) +} + +func (ap *AttachPoint) detachPrograms(progsToClean []attachedProg) error { + var progErrs []error + for _, p := range progsToClean { + log.WithField("prog", p).Debug("Cleaning up old calico program") + attemptCleanup := func() error { + _, err := ExecTC("filter", "del", "dev", ap.Iface, string(ap.Hook), "pref", p.pref, "handle", p.handle, "bpf") + return err + } + err := attemptCleanup() + if errors.Is(err, ErrInterrupted) { + // This happens if the interface is deleted in the middle of calling tc. + log.Debug("First cleanup hit 'Dump was interrupted', retrying (once).") + err = attemptCleanup() + } + if errors.Is(err, ErrDeviceNotFound) { + continue + } + if err != nil { + log.WithError(err).WithField("prog", p).Warn("Failed to clean up old calico program.") + progErrs = append(progErrs, err) + } + } + + if len(progErrs) != 0 { + return fmt.Errorf("failed to clean up one or more old calico programs: %v", progErrs) + } + return nil } diff --git a/felix/dataplane/linux/bpf_ep_mgr.go b/felix/dataplane/linux/bpf_ep_mgr.go index 0d8cde0fb89..747d0e695c1 100644 --- a/felix/dataplane/linux/bpf_ep_mgr.go +++ b/felix/dataplane/linux/bpf_ep_mgr.go @@ -166,6 +166,9 @@ const ( ) type bpfEndpointManager struct { + initAttaches map[string]bpf.EPAttachInfo + initUnknownIfaces set.Set[string] + // Main store of information about interfaces; indexed on interface name. ifacesLock sync.Mutex nameToIface map[string]bpfInterface @@ -284,6 +287,7 @@ func newBPFEndpointManager( livenessCallback = func() {} } m := &bpfEndpointManager{ + initUnknownIfaces: set.New[string](), dp: dp, allWEPs: map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint{}, happyWEPs: map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint{}, @@ -555,6 +559,38 @@ func (m *bpfEndpointManager) updateIfaceStateMap(name string, iface *bpfInterfac } } +func (m *bpfEndpointManager) cleanupOldAttach(iface string, ai bpf.EPAttachInfo) error { + if ai.XDPId != 0 { + ap := xdp.AttachPoint{ + Iface: iface, + // Try all modes in this order + Modes: []bpf.XDPMode{bpf.XDPGeneric, bpf.XDPDriver, bpf.XDPOffload}, + } + + if err := m.dp.ensureNoProgram(&ap); err != nil { + return fmt.Errorf("xdp: %w", err) + } + } + if ai.TCId != 0 { + ap := tc.AttachPoint{ + Iface: iface, + Hook: bpf.HookEgress, + } + + if err := m.dp.ensureNoProgram(&ap); err != nil { + return fmt.Errorf("tc egress: %w", err) + } + + ap.Hook = bpf.HookIngress + + if err := m.dp.ensureNoProgram(&ap); err != nil { + return fmt.Errorf("tc ingress: %w", err) + } + } + + return nil +} + func (m *bpfEndpointManager) onInterfaceUpdate(update *ifaceUpdate) { log.Debugf("Interface update for %v, state %v", update.Name, update.State) // Should be safe without the lock since there shouldn't be any active background threads @@ -569,6 +605,18 @@ func (m *bpfEndpointManager) onInterfaceUpdate(update *ifaceUpdate) { } if !m.isDataIface(update.Name) && !m.isWorkloadIface(update.Name) && !m.isL3Iface(update.Name) { + if update.State == ifacemonitor.StateUp { + if ai, ok := m.initAttaches[update.Name]; ok { + if err := m.cleanupOldAttach(update.Name, ai); err != nil { + log.WithError(err).Warnf("Failed to detach old programs from now unused device '%s'", update.Name) + } else { + delete(m.initAttaches, update.Name) + } + } + } + if m.initUnknownIfaces != nil { + m.initUnknownIfaces.Add(update.Name) + } log.WithField("update", update).Debug("Ignoring interface that's neither data nor workload nor L3.") return } @@ -579,6 +627,7 @@ func (m *bpfEndpointManager) onInterfaceUpdate(update *ifaceUpdate) { // For specific host endpoints OnHEPUpdate doesn't depend on iface state, and has // already stored and mapped as needed. if ifaceIsUp { + delete(m.initAttaches, update.Name) // We require host interfaces to be in non-strict RPF mode so that // packets can return straight to host for services bypassing CTLB. switch update.Name { @@ -752,7 +801,23 @@ func (m *bpfEndpointManager) markExistingWEPDirty(wlID proto.WorkloadEndpointID, func (m *bpfEndpointManager) CompleteDeferredWork() error { // Do one-off initialisation. - m.startupOnce.Do(m.dp.ensureStarted()) + m.startupOnce.Do(func() { + m.dp.ensureStarted() + + m.initUnknownIfaces.Iter(func(iface string) error { + if ai, ok := m.initAttaches[iface]; ok { + if err := m.cleanupOldAttach(iface, ai); err != nil { + log.WithError(err).Warnf("Failed to detach old programs from now unused device '%s'", iface) + } else { + delete(m.initAttaches, iface) + return set.RemoveItem + } + } + return nil + }) + + m.initUnknownIfaces = nil + }) m.applyProgramsToDirtyDataInterfaces() m.updateWEPsInDataplane() @@ -1681,6 +1746,13 @@ func (m *bpfEndpointManager) setRPFilter(iface string, val int) error { func (m *bpfEndpointManager) ensureStarted() { log.Info("Starting map cleanup runner.") m.mapCleanupRunner.Start(context.Background()) + + var err error + + m.initAttaches, err = bpf.ListCalicoAttached() + if err != nil { + log.WithError(err).Warn("Failed to list previously attached programs. We may not clean up some.") + } } func (m *bpfEndpointManager) ensureBPFDevices() error { @@ -1840,11 +1912,11 @@ func (m *bpfEndpointManager) ensureNoProgram(ap attachPoint) error { } } - // Ensure interface does not have our XDP program attached in any mode. + // Ensure interface does not have our program attached. err := ap.DetachProgram() // Forget the policy debug info - m.removePolicyDebugInfo(ap.IfaceName(), 4, bpf.HookXDP) + m.removePolicyDebugInfo(ap.IfaceName(), 4, ap.HookName()) return err } diff --git a/felix/dataplane/linux/bpf_ep_mgr_test.go b/felix/dataplane/linux/bpf_ep_mgr_test.go index 0afc490022e..4bd6fae724e 100644 --- a/felix/dataplane/linux/bpf_ep_mgr_test.go +++ b/felix/dataplane/linux/bpf_ep_mgr_test.go @@ -53,6 +53,8 @@ type mockDataplane struct { fds map[string]uint32 state map[uint32]polprog.Rules routes map[ip.V4CIDR]struct{} + + ensureStartedFn func() } func newMockDataplane() *mockDataplane { @@ -65,6 +67,9 @@ func newMockDataplane() *mockDataplane { } func (m *mockDataplane) ensureStarted() { + if m.ensureStartedFn != nil { + m.ensureStartedFn() + } } func (m *mockDataplane) ensureBPFDevices() error { @@ -140,6 +145,12 @@ func (m *mockDataplane) setAndReturn(vari **polprog.Rules, key string) func() *p } } +func (m *mockDataplane) programAttached(key string) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + return m.fds[key] != 0 +} + func (m *mockDataplane) setRoute(cidr ip.V4CIDR) { m.mutex.Lock() defer m.mutex.Unlock() @@ -168,7 +179,6 @@ var _ = Describe("BPF Endpoint Manager", func() { fibLookupEnabled bool endpointToHostAction string dataIfacePattern string - l3IfacePattern string workloadIfaceRegex string ipSetIDAllocator *idalloc.IDAllocator vxlanMTU int @@ -183,8 +193,7 @@ var _ = Describe("BPF Endpoint Manager", func() { BeforeEach(func() { fibLookupEnabled = true endpointToHostAction = "DROP" - dataIfacePattern = "^((en|wl|ww|sl|ib)[opsx].*|(eth|wlan|wwan).*|tunl0$|wireguard.cali$)" - l3IfacePattern = "^(wireguard.cali$)" + dataIfacePattern = "^eth0" workloadIfaceRegex = "cali" ipSetIDAllocator = idalloc.New() vxlanMTU = 0 @@ -225,7 +234,6 @@ var _ = Describe("BPF Endpoint Manager", func() { Hostname: "uthost", BPFLogLevel: "info", BPFDataIfacePattern: regexp.MustCompile(dataIfacePattern), - BPFL3IfacePattern: regexp.MustCompile(l3IfacePattern), VXLANMTU: vxlanMTU, VXLANPort: rrConfigNormal.VXLANPort, BPFNodePortDSREnabled: nodePortDSR, @@ -255,8 +263,8 @@ var _ = Describe("BPF Endpoint Manager", func() { bpfEpMgr.OnUpdate(&ifaceUpdate{Name: name, State: state, Index: index}) err := bpfEpMgr.CompleteDeferredWork() Expect(err).NotTo(HaveOccurred()) - if state == ifacemonitor.StateUp { - Expect(ifStateMap.ContainsKey(ifstate.NewKey(uint32(index)).AsBytes())).To(BeTrue()) + if state == ifacemonitor.StateUp && (bpfEpMgr.isDataIface(name) || bpfEpMgr.isWorkloadIface(name)) { + ExpectWithOffset(1, ifStateMap.ContainsKey(ifstate.NewKey(uint32(index)).AsBytes())).To(BeTrue()) } } } @@ -415,6 +423,75 @@ var _ = Describe("BPF Endpoint Manager", func() { genIfaceUpdate("eth0", ifacemonitor.StateUp, 10)() }) + It("should attach to eth0", func() { + Expect(dp.programAttached("eth0:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth0:egress")).To(BeTrue()) + }) + + Context("with dataIfacePattern changed to eth1", func() { + JustBeforeEach(func() { + dataIfacePattern = "^eth1" + newBpfEpMgr() + + dp.ensureStartedFn = func() { + bpfEpMgr.initAttaches = map[string]bpf.EPAttachInfo{ + "eth0": {TCId: 12345}, + } + } + + }) + + It("should detach from eth0 when eth0 up before first CompleteDeferredWork()", func() { + Expect(dp.programAttached("eth0:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth0:egress")).To(BeTrue()) + + genIfaceUpdate("eth0", ifacemonitor.StateUp, 10)() + genIfaceUpdate("eth1", ifacemonitor.StateUp, 11)() + + err := bpfEpMgr.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) + + // We inherited dp from the previous bpfEpMgr and it has eth0 + // attached. This should clean it up. + Expect(dp.programAttached("eth0:ingress")).To(BeFalse()) + Expect(dp.programAttached("eth0:egress")).To(BeFalse()) + + Expect(dp.programAttached("eth1:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth1:egress")).To(BeTrue()) + }) + + It("should detach from eth0 when eth0 up after first CompleteDeferredWork()", func() { + Expect(dp.programAttached("eth0:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth0:egress")).To(BeTrue()) + + genIfaceUpdate("eth1", ifacemonitor.StateUp, 11)() + + err := bpfEpMgr.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) + + // We inherited dp from the previous bpfEpMgr and it has eth0 + // attached. We should see it. + Expect(dp.programAttached("eth0:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth0:egress")).To(BeTrue()) + + Expect(dp.programAttached("eth1:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth1:egress")).To(BeTrue()) + + genIfaceUpdate("eth0", ifacemonitor.StateUp, 10)() + + err = bpfEpMgr.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) + + // We inherited dp from the previous bpfEpMgr and it has eth0 + // attached. This should clean it up. + Expect(dp.programAttached("eth0:ingress")).To(BeFalse()) + Expect(dp.programAttached("eth0:egress")).To(BeFalse()) + + Expect(dp.programAttached("eth1:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth1:egress")).To(BeTrue()) + }) + }) + Context("with eth0 host endpoint", func() { JustBeforeEach(genHEPUpdate("eth0", hostEp))