diff --git a/felix/bpf/attach.go b/felix/bpf/attach.go index 7f2cc9b90bf..262f250c7fc 100644 --- a/felix/bpf/attach.go +++ b/felix/bpf/attach.go @@ -201,3 +201,73 @@ func sha256OfFile(name string) (string, error) { } return hex.EncodeToString(hasher.Sum(nil)), nil } + +// EPAttachInfo tells what programs are attached to an endpoint. +type EPAttachInfo struct { + TCId int + XDPId int + XDPMode string +} + +// ListCalicoAttached list all programs that are attached to TC or XDP and are +// related to Calico. That is, they have jumpmap pinned in our dir hierarchy. +func ListCalicoAttached() (map[string]EPAttachInfo, error) { + aTC, aXDP, err := ListTcXDPAttachedProgs() + if err != nil { + return nil, err + } + + attachedProgIDs := set.New[int]() + + for _, p := range aTC { + attachedProgIDs.Add(p.ID) + } + + for _, p := range aXDP { + attachedProgIDs.Add(p.ID) + } + + maps, err := ListPerEPMaps() + if err != nil { + return nil, err + } + + allProgs, err := GetAllProgs() + if err != nil { + return nil, err + } + + caliProgs := set.New[int]() + + for _, p := range allProgs { + if !attachedProgIDs.Contains(p.Id) { + continue + } + + for _, m := range p.MapIds { + if _, ok := maps[m]; ok { + caliProgs.Add(p.Id) + break + } + } + } + + ai := make(map[string]EPAttachInfo) + + for _, p := range aTC { + if caliProgs.Contains(p.ID) { + ai[p.DevName] = EPAttachInfo{TCId: p.ID} + } + } + + for _, p := range aXDP { + if caliProgs.Contains(p.ID) { + info := ai[p.DevName] + info.XDPId = p.ID + info.XDPMode = p.Mode + ai[p.DevName] = info + } + } + + return ai, nil +} diff --git a/felix/bpf/bpf.go b/felix/bpf/bpf.go index dae57a07c62..8dd2f2cb7f0 100644 --- a/felix/bpf/bpf.go +++ b/felix/bpf/bpf.go @@ -22,6 +22,7 @@ package bpf import ( "bufio" + "bytes" "encoding/binary" "encoding/json" "errors" @@ -45,6 +46,7 @@ import ( "github.com/projectcalico/calico/felix/environment" "github.com/projectcalico/calico/felix/labelindex" "github.com/projectcalico/calico/felix/proto" + "github.com/projectcalico/calico/libcalico-go/lib/set" ) // Hook is the hook to which a BPF program should be attached. This is relative to @@ -528,7 +530,7 @@ type perCpuMapEntry []struct { } `json:"values"` } -type progInfo struct { +type ProgInfo struct { Id int `json:"id"` Type string `json:"type"` Tag string `json:"tag"` @@ -1096,7 +1098,7 @@ func (b *BPFLib) GetXDPTag(ifName string) (string, error) { return "", fmt.Errorf("failed to show XDP program (%s): %s\n%s", progPath, err, output) } - p := progInfo{} + p := ProgInfo{} err = json.Unmarshal(output, &p) if err != nil { return "", fmt.Errorf("cannot parse json output: %v\n%s", err, output) @@ -1186,7 +1188,7 @@ func (b *BPFLib) GetMapsFromXDP(ifName string) ([]int, error) { if err != nil { return nil, fmt.Errorf("failed to show XDP program (%s): %s\n%s", progPath, err, output) } - p := progInfo{} + p := ProgInfo{} err = json.Unmarshal(output, &p) if err != nil { return nil, fmt.Errorf("cannot parse json output: %v\n%s", err, output) @@ -1602,7 +1604,11 @@ func (b *BPFLib) getSkMsgID() (int, error) { return -1, nil } -func getAllProgs() ([]progInfo, error) { +func GetAllProgs() ([]ProgInfo, error) { + return getAllProgs() +} + +func getAllProgs() ([]ProgInfo, error) { prog := "bpftool" args := []string{ "--json", @@ -1617,7 +1623,7 @@ func getAllProgs() ([]progInfo, error) { return nil, fmt.Errorf("failed to get progs: %s\n%s", err, output) } - var progs []progInfo + var progs []ProgInfo err = json.Unmarshal(output, &progs) if err != nil { return nil, fmt.Errorf("cannot parse json output: %v\n%s", err, output) @@ -1672,7 +1678,7 @@ func (b *BPFLib) getSockMapID(progID int) (int, error) { return -1, fmt.Errorf("failed to get sockmap ID for prog %d: %s\n%s", progID, err, output) } - p := progInfo{} + p := ProgInfo{} err = json.Unmarshal(output, &p) if err != nil { return -1, fmt.Errorf("cannot parse json output: %v\n%s", err, output) @@ -2312,3 +2318,174 @@ func MapPinPath(typ int, name, iface string, hook Hook) string { } return path.Join(PinBaseDir, subDir, name) } + +type TcList []struct { + DevName string `json:"devname"` + ID int `json:"id"` +} + +type XDPList []struct { + DevName string `json:"devname"` + IfIndex int `json:"ifindex"` + Mode string `json:"mode"` + ID int `json:"id"` +} + +// ListTcXDPAttachedProgs returns all programs attached to TC or XDP hooks. +func ListTcXDPAttachedProgs() (TcList, XDPList, error) { + // Find all the programs that are attached to interfaces. + out, err := exec.Command("bpftool", "net", "-j").Output() + if err != nil { + return nil, nil, fmt.Errorf("failed to list attached bpf programs: %w", err) + } + + var attached []struct { + TC TcList `json:"tc"` + XDP XDPList `json:"xdp"` + } + + err = json.Unmarshal(out, &attached) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse list of attached BPF programs: %w\n%s", err, out) + } + + return attached[0].TC, attached[0].XDP, nil +} + +func ListPerEPMaps() (map[int]string, error) { + mapIDToPath := make(map[int]string) + err := filepath.Walk("/sys/fs/bpf/tc", func(p string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if strings.HasPrefix(info.Name(), JumpMapName()) || + strings.HasPrefix(info.Name(), CountersMapName()) { + log.WithField("path", p).Debug("Examining map") + + out, err := exec.Command("bpftool", "map", "show", "pinned", p).Output() + if err != nil { + log.WithError(err).Panic("Failed to show map") + } + log.WithField("dump", string(out)).Debug("Map show before deletion") + idStr := string(bytes.Split(out, []byte(":"))[0]) + id, err := strconv.Atoi(idStr) + if err != nil { + log.WithError(err).WithField("dump", string(out)).Error("Failed to parse bpftool output.") + return err + } + mapIDToPath[id] = p + } + return nil + }) + + return mapIDToPath, err +} + +// pinDirRegex matches tc's and xdp's auto-created directory names, directories created when using libbpf +// so we can clean them up when removing maps without accidentally removing other user-created dirs.. +var pinDirRegex = regexp.MustCompile(`([0-9a-f]{40})|(.*_(igr|egr|xdp))`) + +// CleanUpMaps scans for cali_jump maps that are still pinned to the filesystem but no longer referenced by +// our BPF programs. +func CleanUpMaps() { + // Find the maps we care about by walking the BPF filesystem. + mapIDToPath, err := ListPerEPMaps() + if os.IsNotExist(err) { + log.WithError(err).Warn("tc directory missing from BPF file system?") + return + } + if err != nil { + log.WithError(err).Error("Error while looking for maps.") + return + } + + aTc, aXdp, err := ListTcXDPAttachedProgs() + if err != nil { + log.WithError(err).Warn("Failed to list attached programs.") + return + } + log.WithFields(log.Fields{"tc": aTc, "xdp": aXdp}).Debug("Attached BPF programs") + + attachedProgs := set.New[int]() + for _, prog := range aTc { + log.WithField("prog", prog).Debug("Adding TC prog to attached set") + attachedProgs.Add(prog.ID) + } + for _, prog := range aXdp { + log.WithField("prog", prog).Debug("Adding XDP prog to attached set") + attachedProgs.Add(prog.ID) + } + + // Find all the maps that the attached programs refer to and remove them from consideration. + progsJSON, err := exec.Command("bpftool", "prog", "list", "--json").Output() + if err != nil { + log.WithError(err).Info("Failed to list BPF programs, assuming there's nothing to clean up.") + return + } + var progs []struct { + ID int `json:"id"` + Name string `json:"name"` + Maps []int `json:"map_ids"` + } + err = json.Unmarshal(progsJSON, &progs) + if err != nil { + log.WithError(err).Info("Failed to parse bpftool output. Assuming nothing to clean up.") + return + } + for _, p := range progs { + if !attachedProgs.Contains(p.ID) { + log.WithField("prog", p).Debug("Prog is not in the attached set, skipping") + continue + } + for _, id := range p.Maps { + log.WithField("mapID", id).WithField("prog", p).Debugf("Map is still in use: %v", mapIDToPath[id]) + delete(mapIDToPath, id) + } + } + + // Remove the pins. + for id, p := range mapIDToPath { + log.WithFields(log.Fields{"id": id, "path": p}).Debug("Removing stale BPF map pin.") + err := os.Remove(p) + if err != nil { + log.WithError(err).Warn("Removed stale BPF map pin.") + } + log.WithFields(log.Fields{"id": id, "path": p}).Info("Removed stale BPF map pin.") + } + + // Look for empty dirs. + emptyAutoDirs := set.New[string]() + err = filepath.Walk("/sys/fs/bpf/tc", func(p string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() && pinDirRegex.MatchString(info.Name()) { + p := path.Clean(p) + log.WithField("path", p).Debug("Found tc auto-created dir.") + emptyAutoDirs.Add(p) + } else { + dirPath := path.Clean(path.Dir(p)) + if emptyAutoDirs.Contains(dirPath) { + log.WithField("path", dirPath).Debug("tc dir is not empty.") + emptyAutoDirs.Discard(dirPath) + } + } + return nil + }) + if os.IsNotExist(err) { + log.WithError(err).Warn("tc directory missing from BPF file system?") + return + } + if err != nil { + log.WithError(err).Error("Error while looking for maps.") + } + + emptyAutoDirs.Iter(func(p string) error { + log.WithField("path", p).Debug("Removing empty dir.") + err := os.Remove(p) + if err != nil { + log.WithError(err).Error("Error while removing empty dir.") + } + return nil + }) +} diff --git a/felix/bpf/tc/attach.go b/felix/bpf/tc/attach.go index 6d975463df7..806a949f0e6 100644 --- a/felix/bpf/tc/attach.go +++ b/felix/bpf/tc/attach.go @@ -15,9 +15,7 @@ package tc import ( - "bytes" "encoding/binary" - "encoding/json" "errors" "fmt" "io/ioutil" @@ -25,16 +23,12 @@ import ( "os" "os/exec" "path" - "path/filepath" "regexp" "strconv" "strings" - "sync" log "github.com/sirupsen/logrus" - "github.com/projectcalico/calico/libcalico-go/lib/set" - "github.com/projectcalico/calico/felix/bpf" "github.com/projectcalico/calico/felix/bpf/bpfutils" "github.com/projectcalico/calico/felix/bpf/libbpf" @@ -66,7 +60,6 @@ type AttachPoint struct { NATout uint32 } -var tcLock sync.RWMutex var ErrDeviceNotFound = errors.New("device not found") var ErrInterrupted = errors.New("dump interrupted") var prefHandleRe = regexp.MustCompile(`pref ([^ ]+) .* handle ([^ ]+)`) @@ -137,13 +130,6 @@ func (ap *AttachPoint) AttachProgram() (int, error) { binaryToLoad = tempBinary } - // Using the RLock allows multiple attach calls to proceed in parallel unless - // CleanUpMaps() (which takes the writer lock) is running. - logCxt.Debug("AttachProgram waiting for lock...") - tcLock.RLock() - defer tcLock.RUnlock() - logCxt.Debug("AttachProgram got lock.") - progsToClean, err := ap.listAttachedPrograms() if err != nil { return -1, err @@ -201,30 +187,8 @@ func (ap *AttachPoint) AttachProgram() (int, error) { } logCxt.Info("Program attached to TC.") - var progErrs []error - for _, p := range progsToClean { - log.WithField("prog", p).Debug("Cleaning up old calico program") - attemptCleanup := func() error { - _, err := ExecTC("filter", "del", "dev", ap.Iface, string(ap.Hook), "pref", p.pref, "handle", p.handle, "bpf") - return err - } - err = attemptCleanup() - if errors.Is(err, ErrInterrupted) { - // This happens if the interface is deleted in the middle of calling tc. - log.Debug("First cleanup hit 'Dump was interrupted', retrying (once).") - err = attemptCleanup() - } - if errors.Is(err, ErrDeviceNotFound) { - continue - } - if err != nil { - log.WithError(err).WithField("prog", p).Warn("Failed to clean up old calico program.") - progErrs = append(progErrs, err) - } - } - - if len(progErrs) != 0 { - return -1, fmt.Errorf("failed to clean up one or more old calico programs: %v", progErrs) + if err := ap.detachPrograms(progsToClean); err != nil { + return -1, err } // Store information of object in a json file so in future we can skip reattaching it. @@ -254,8 +218,41 @@ func (ap *AttachPoint) patchLogPrefix(logCtx *log.Entry, ifile, ofile string) er } func (ap *AttachPoint) DetachProgram() error { - // We never detach TC programs, so this should not be called. - ap.Log().Panic("DetachProgram is not implemented for TC") + progsToClean, err := ap.listAttachedPrograms() + if err != nil { + return err + } + + return ap.detachPrograms(progsToClean) +} + +func (ap *AttachPoint) detachPrograms(progsToClean []attachedProg) error { + var progErrs []error + for _, p := range progsToClean { + log.WithField("prog", p).Debug("Cleaning up old calico program") + attemptCleanup := func() error { + _, err := ExecTC("filter", "del", "dev", ap.Iface, string(ap.Hook), "pref", p.pref, "handle", p.handle, "bpf") + return err + } + err := attemptCleanup() + if errors.Is(err, ErrInterrupted) { + // This happens if the interface is deleted in the middle of calling tc. + log.Debug("First cleanup hit 'Dump was interrupted', retrying (once).") + err = attemptCleanup() + } + if errors.Is(err, ErrDeviceNotFound) { + continue + } + if err != nil { + log.WithError(err).WithField("prog", p).Warn("Failed to clean up old calico program.") + progErrs = append(progErrs, err) + } + } + + if len(progErrs) != 0 { + return fmt.Errorf("failed to clean up one or more old calico programs: %v", progErrs) + } + return nil } @@ -388,159 +385,6 @@ func (ap *AttachPoint) IsAttached() (bool, error) { return len(progs) > 0, nil } -// tcDirRegex matches tc's and xdp's auto-created directory names, directories created when using libbpf -// so we can clean them up when removing maps without accidentally removing other user-created dirs.. -var tcDirRegex = regexp.MustCompile(`([0-9a-f]{40})|(.*_(igr|egr|xdp))`) - -// CleanUpMaps scans for cali_jump maps that are still pinned to the filesystem but no longer referenced by -// our BPF programs. -func CleanUpMaps() { - // So that we serialise with AttachProgram() - log.Debug("CleanUpMaps waiting for lock...") - tcLock.Lock() - defer tcLock.Unlock() - log.Debug("CleanUpMaps got lock, cleaning up...") - - // Find the maps we care about by walking the BPF filesystem. - mapIDToPath := make(map[int]string) - err := filepath.Walk("/sys/fs/bpf/tc", func(p string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if strings.HasPrefix(info.Name(), bpf.JumpMapName()) || - strings.HasPrefix(info.Name(), bpf.CountersMapName()) { - log.WithField("path", p).Debug("Examining map") - - out, err := exec.Command("bpftool", "map", "show", "pinned", p).Output() - if err != nil { - log.WithError(err).Panic("Failed to show map") - } - log.WithField("dump", string(out)).Debug("Map show before deletion") - idStr := string(bytes.Split(out, []byte(":"))[0]) - id, err := strconv.Atoi(idStr) - if err != nil { - log.WithError(err).WithField("dump", string(out)).Error("Failed to parse bpftool output.") - return err - } - mapIDToPath[id] = p - } - return nil - }) - if os.IsNotExist(err) { - log.WithError(err).Warn("tc directory missing from BPF file system?") - return - } - if err != nil { - log.WithError(err).Error("Error while looking for maps.") - } - - // Find all the programs that are attached to interfaces. - out, err := exec.Command("bpftool", "net", "-j").Output() - if err != nil { - log.WithError(err).Panic("Failed to list attached bpf programs") - } - log.WithField("dump", string(out)).Debug("Attached BPF programs") - - var attached []struct { - TC []struct { - DevName string `json:"devname"` - ID int `json:"id"` - } `json:"tc"` - XDP []struct { - DevName string `json:"devname"` - IfIndex int `json:"ifindex"` - Mode string `json:"mode"` - ID int `json:"id"` - } `json:"xdp"` - } - err = json.Unmarshal(out, &attached) - if err != nil { - log.WithError(err).WithField("dump", string(out)).Error("Failed to parse list of attached BPF programs") - } - attachedProgs := set.New[int]() - for _, prog := range attached[0].TC { - log.WithField("prog", prog).Debug("Adding TC prog to attached set") - attachedProgs.Add(prog.ID) - } - for _, prog := range attached[0].XDP { - log.WithField("prog", prog).Debug("Adding XDP prog to attached set") - attachedProgs.Add(prog.ID) - } - - // Find all the maps that the attached programs refer to and remove them from consideration. - progsJSON, err := exec.Command("bpftool", "prog", "list", "--json").Output() - if err != nil { - log.WithError(err).Info("Failed to list BPF programs, assuming there's nothing to clean up.") - return - } - var progs []struct { - ID int `json:"id"` - Name string `json:"name"` - Maps []int `json:"map_ids"` - } - err = json.Unmarshal(progsJSON, &progs) - if err != nil { - log.WithError(err).Info("Failed to parse bpftool output. Assuming nothing to clean up.") - return - } - for _, p := range progs { - if !attachedProgs.Contains(p.ID) { - log.WithField("prog", p).Debug("Prog is not in the attached set, skipping") - continue - } - for _, id := range p.Maps { - log.WithField("mapID", id).WithField("prog", p).Debugf("Map is still in use: %v", mapIDToPath[id]) - delete(mapIDToPath, id) - } - } - - // Remove the pins. - for id, p := range mapIDToPath { - log.WithFields(log.Fields{"id": id, "path": p}).Debug("Removing stale BPF map pin.") - err := os.Remove(p) - if err != nil { - log.WithError(err).Warn("Removed stale BPF map pin.") - } - log.WithFields(log.Fields{"id": id, "path": p}).Info("Removed stale BPF map pin.") - } - - // Look for empty dirs. - emptyAutoDirs := set.New[string]() - err = filepath.Walk("/sys/fs/bpf/tc", func(p string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if info.IsDir() && tcDirRegex.MatchString(info.Name()) { - p := path.Clean(p) - log.WithField("path", p).Debug("Found tc auto-created dir.") - emptyAutoDirs.Add(p) - } else { - dirPath := path.Clean(path.Dir(p)) - if emptyAutoDirs.Contains(dirPath) { - log.WithField("path", dirPath).Debug("tc dir is not empty.") - emptyAutoDirs.Discard(dirPath) - } - } - return nil - }) - if os.IsNotExist(err) { - log.WithError(err).Warn("tc directory missing from BPF file system?") - return - } - if err != nil { - log.WithError(err).Error("Error while looking for maps.") - } - - emptyAutoDirs.Iter(func(p string) error { - log.WithField("path", p).Debug("Removing empty dir.") - err := os.Remove(p) - if err != nil { - log.WithError(err).Error("Error while removing empty dir.") - } - return nil - }) -} - // EnsureQdisc makes sure that qdisc is attached to the given interface func EnsureQdisc(ifaceName string) error { hasQdisc, err := HasQdisc(ifaceName) diff --git a/felix/bpf/ut/attach_test.go b/felix/bpf/ut/attach_test.go index 7c71eab3d30..e9c37d41789 100644 --- a/felix/bpf/ut/attach_test.go +++ b/felix/bpf/ut/attach_test.go @@ -74,7 +74,7 @@ func TestReattachPrograms(t *testing.T) { // Start with a clean base state in case another test left something behind. t.Log("Doing initial clean up") - tc.CleanUpMaps() + bpf.CleanUpMaps() bpf.CleanAttachedProgDir() startingJumpMaps := countJumpMaps() @@ -139,9 +139,22 @@ func TestReattachPrograms(t *testing.T) { Expect(bpf.RuntimeJSONFilename(ap2.IfaceName(), "egress")).To(BeARegularFile()) Expect(bpf.RuntimeJSONFilename(ap3.IfaceName(), "xdp")).To(BeARegularFile()) + list, err := bpf.ListCalicoAttached() + Expect(err).NotTo(HaveOccurred()) + Expect(list).To(HaveLen(3)) + Expect(list).To(HaveKey(vethName1)) + Expect(list[vethName1].TCId).NotTo(Equal(0)) + Expect(list[vethName1].XDPId).To(Equal(0)) + Expect(list).To(HaveKey(vethName2)) + Expect(list[vethName2].TCId).NotTo(Equal(0)) + Expect(list[vethName2].XDPId).To(Equal(0)) + Expect(list).To(HaveKey(vethName3)) + Expect(list[vethName3].TCId).To(Equal(0)) + Expect(list[vethName3].XDPId).NotTo(Equal(0)) + // Clean up maps, but nothing should change t.Log("Cleaning up, should remove the first map") - tc.CleanUpMaps() + bpf.CleanUpMaps() Expect(countJumpMaps()).To(BeNumerically("==", startingJumpMaps+3), "unexpected number of jump maps") Expect(countTCDirs()).To(BeNumerically("==", startingTCDirs+3), "unexpected number of TC dirs") Expect(countHashFiles()).To(BeNumerically("==", startingHashFiles+3), "unexpected number of hash files") @@ -155,7 +168,7 @@ func TestReattachPrograms(t *testing.T) { Expect(err).NotTo(HaveOccurred()) err = tc.RemoveQdisc(vethName2) Expect(err).NotTo(HaveOccurred()) - tc.CleanUpMaps() + bpf.CleanUpMaps() Expect(countJumpMaps()).To(BeNumerically("==", startingJumpMaps+1), "unexpected number of jump maps") Expect(countTCDirs()).To(BeNumerically("==", startingTCDirs+1), "unexpected number of TC dirs") Expect(countHashFiles()).To(BeNumerically("==", startingHashFiles+1), "unexpected number of hash files") @@ -180,7 +193,7 @@ func TestReattachPrograms(t *testing.T) { t.Log("Removing the XDP program and cleaning up its jump map, should return to base state") err = ap3.DetachProgram() Expect(err).NotTo(HaveOccurred()) - tc.CleanUpMaps() + bpf.CleanUpMaps() Expect(countJumpMaps()).To(BeNumerically("==", startingJumpMaps), "unexpected number of jump maps") Expect(countTCDirs()).To(BeNumerically("==", startingTCDirs), "unexpected number of TC dirs") Expect(countHashFiles()).To(BeNumerically("==", startingHashFiles), "unexpected number of hash files") diff --git a/felix/dataplane/linux/bpf_ep_mgr.go b/felix/dataplane/linux/bpf_ep_mgr.go index e07888cb9f3..1f29b2bf23b 100644 --- a/felix/dataplane/linux/bpf_ep_mgr.go +++ b/felix/dataplane/linux/bpf_ep_mgr.go @@ -166,10 +166,17 @@ const ( ) type bpfEndpointManager struct { + initAttaches map[string]bpf.EPAttachInfo + initUnknownIfaces set.Set[string] + // Main store of information about interfaces; indexed on interface name. ifacesLock sync.Mutex nameToIface map[string]bpfInterface + // Using the RLock allows multiple attach calls to proceed in parallel unless + // CleanUpMaps() (which takes the writer lock) is running. + cleanupLock sync.RWMutex + allWEPs map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint happyWEPs map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint happyWEPsDirty bool @@ -280,6 +287,7 @@ func newBPFEndpointManager( livenessCallback = func() {} } m := &bpfEndpointManager{ + initUnknownIfaces: set.New[string](), dp: dp, allWEPs: map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint{}, happyWEPs: map[proto.WorkloadEndpointID]*proto.WorkloadEndpoint{}, @@ -311,14 +319,10 @@ func newBPFEndpointManager( )), ruleRenderer: iptablesRuleRenderer, iptablesFilterTable: iptablesFilterTable, - mapCleanupRunner: ratelimited.NewRunner(mapCleanupInterval, func(ctx context.Context) { - log.Debug("TC maps cleanup triggered.") - tc.CleanUpMaps() - }), - onStillAlive: livenessCallback, - hostIfaceToEpMap: map[string]proto.HostEndpoint{}, - ifaceToIpMap: map[string]net.IP{}, - opReporter: opReporter, + onStillAlive: livenessCallback, + hostIfaceToEpMap: map[string]proto.HostEndpoint{}, + ifaceToIpMap: map[string]net.IP{}, + opReporter: opReporter, // ipv6Enabled Should be set to config.Ipv6Enabled, but for now it is better // to set it to BPFIpv6Enabled which is a dedicated flag for development of IPv6. // TODO: set ipv6Enabled to config.Ipv6Enabled when IPv6 support is complete @@ -351,6 +355,17 @@ func newBPFEndpointManager( m.dp = m } + m.mapCleanupRunner = ratelimited.NewRunner(mapCleanupInterval, func(ctx context.Context) { + log.Debug("TC maps cleanup triggered.") + // So that we serialise with AttachProgram() + log.Debug("CleanUpMaps waiting for lock...") + m.cleanupLock.Lock() + defer m.cleanupLock.Unlock() + log.Debug("CleanUpMaps got lock, cleaning up...") + + bpf.CleanUpMaps() + }) + if config.FeatureGates != nil { switch config.FeatureGates["BPFConnectTimeLoadBalancingWorkaround"] { case "enabled": @@ -544,6 +559,38 @@ func (m *bpfEndpointManager) updateIfaceStateMap(name string, iface *bpfInterfac } } +func (m *bpfEndpointManager) cleanupOldAttach(iface string, ai bpf.EPAttachInfo) error { + if ai.XDPId != 0 { + ap := xdp.AttachPoint{ + Iface: iface, + // Try all modes in this order + Modes: []bpf.XDPMode{bpf.XDPGeneric, bpf.XDPDriver, bpf.XDPOffload}, + } + + if err := m.dp.ensureNoProgram(&ap); err != nil { + return fmt.Errorf("xdp: %w", err) + } + } + if ai.TCId != 0 { + ap := tc.AttachPoint{ + Iface: iface, + Hook: bpf.HookEgress, + } + + if err := m.dp.ensureNoProgram(&ap); err != nil { + return fmt.Errorf("tc egress: %w", err) + } + + ap.Hook = bpf.HookIngress + + if err := m.dp.ensureNoProgram(&ap); err != nil { + return fmt.Errorf("tc ingress: %w", err) + } + } + + return nil +} + func (m *bpfEndpointManager) onInterfaceUpdate(update *ifaceUpdate) { log.Debugf("Interface update for %v, state %v", update.Name, update.State) // Should be safe without the lock since there shouldn't be any active background threads @@ -558,6 +605,18 @@ func (m *bpfEndpointManager) onInterfaceUpdate(update *ifaceUpdate) { } if !m.isDataIface(update.Name) && !m.isWorkloadIface(update.Name) && !m.isL3Iface(update.Name) { + if update.State == ifacemonitor.StateUp { + if ai, ok := m.initAttaches[update.Name]; ok { + if err := m.cleanupOldAttach(update.Name, ai); err != nil { + log.WithError(err).Warnf("Failed to detach old programs from now unused device '%s'", update.Name) + } else { + delete(m.initAttaches, update.Name) + } + } + } + if m.initUnknownIfaces != nil { + m.initUnknownIfaces.Add(update.Name) + } log.WithField("update", update).Debug("Ignoring interface that's neither data nor workload nor L3.") return } @@ -568,6 +627,7 @@ func (m *bpfEndpointManager) onInterfaceUpdate(update *ifaceUpdate) { // For specific host endpoints OnHEPUpdate doesn't depend on iface state, and has // already stored and mapped as needed. if ifaceIsUp { + delete(m.initAttaches, update.Name) // We require host interfaces to be in non-strict RPF mode so that // packets can return straight to host for services bypassing CTLB. switch update.Name { @@ -741,7 +801,23 @@ func (m *bpfEndpointManager) markExistingWEPDirty(wlID proto.WorkloadEndpointID, func (m *bpfEndpointManager) CompleteDeferredWork() error { // Do one-off initialisation. - m.dp.ensureStarted() + m.startupOnce.Do(func() { + m.dp.ensureStarted() + + m.initUnknownIfaces.Iter(func(iface string) error { + if ai, ok := m.initAttaches[iface]; ok { + if err := m.cleanupOldAttach(iface, ai); err != nil { + log.WithError(err).Warnf("Failed to detach old programs from now unused device '%s'", iface) + } else { + delete(m.initAttaches, iface) + return set.RemoveItem + } + } + return nil + }) + + m.initUnknownIfaces = nil + }) m.applyProgramsToDirtyDataInterfaces() m.updateWEPsInDataplane() @@ -1668,10 +1744,15 @@ func (m *bpfEndpointManager) setRPFilter(iface string, val int) error { } func (m *bpfEndpointManager) ensureStarted() { - m.startupOnce.Do(func() { - log.Info("Starting map cleanup runner.") - m.mapCleanupRunner.Start(context.Background()) - }) + log.Info("Starting map cleanup runner.") + m.mapCleanupRunner.Start(context.Background()) + + var err error + + m.initAttaches, err = bpf.ListCalicoAttached() + if err != nil { + log.WithError(err).Warn("Failed to list previously attached programs. We may not clean up some.") + } } func (m *bpfEndpointManager) ensureBPFDevices() error { @@ -1793,7 +1874,14 @@ func (m *bpfEndpointManager) ensureProgramAttached(ap attachPoint) (bpf.MapFD, e if jumpMapFD == 0 { ap.Log().Info("Need to attach program") // We don't have a program attached to this interface yet, attach one now. + + ap.Log().Debug("AttachProgram waiting for lock...") + m.cleanupLock.RLock() + ap.Log().Debug("AttachProgram got lock.") + progID, err := ap.AttachProgram() + m.cleanupLock.RUnlock() + if err != nil { return 0, err } @@ -1808,8 +1896,7 @@ func (m *bpfEndpointManager) ensureProgramAttached(ap attachPoint) (bpf.MapFD, e return jumpMapFD, nil } -// Ensure that the specified interface does not have our XDP program, in any mode, but avoid -// touching anyone else's XDP program(s). +// Ensure that the specified attach point does not have our program. func (m *bpfEndpointManager) ensureNoProgram(ap attachPoint) error { // Clean up jump map FD if there is one. @@ -1824,11 +1911,11 @@ func (m *bpfEndpointManager) ensureNoProgram(ap attachPoint) error { } } - // Ensure interface does not have our XDP program attached in any mode. + // Ensure interface does not have our program attached. err := ap.DetachProgram() // Forget the policy debug info - m.removePolicyDebugInfo(ap.IfaceName(), 4, bpf.HookXDP) + m.removePolicyDebugInfo(ap.IfaceName(), 4, ap.HookName()) return err } diff --git a/felix/dataplane/linux/bpf_ep_mgr_test.go b/felix/dataplane/linux/bpf_ep_mgr_test.go index 0afc490022e..4bd6fae724e 100644 --- a/felix/dataplane/linux/bpf_ep_mgr_test.go +++ b/felix/dataplane/linux/bpf_ep_mgr_test.go @@ -53,6 +53,8 @@ type mockDataplane struct { fds map[string]uint32 state map[uint32]polprog.Rules routes map[ip.V4CIDR]struct{} + + ensureStartedFn func() } func newMockDataplane() *mockDataplane { @@ -65,6 +67,9 @@ func newMockDataplane() *mockDataplane { } func (m *mockDataplane) ensureStarted() { + if m.ensureStartedFn != nil { + m.ensureStartedFn() + } } func (m *mockDataplane) ensureBPFDevices() error { @@ -140,6 +145,12 @@ func (m *mockDataplane) setAndReturn(vari **polprog.Rules, key string) func() *p } } +func (m *mockDataplane) programAttached(key string) bool { + m.mutex.Lock() + defer m.mutex.Unlock() + return m.fds[key] != 0 +} + func (m *mockDataplane) setRoute(cidr ip.V4CIDR) { m.mutex.Lock() defer m.mutex.Unlock() @@ -168,7 +179,6 @@ var _ = Describe("BPF Endpoint Manager", func() { fibLookupEnabled bool endpointToHostAction string dataIfacePattern string - l3IfacePattern string workloadIfaceRegex string ipSetIDAllocator *idalloc.IDAllocator vxlanMTU int @@ -183,8 +193,7 @@ var _ = Describe("BPF Endpoint Manager", func() { BeforeEach(func() { fibLookupEnabled = true endpointToHostAction = "DROP" - dataIfacePattern = "^((en|wl|ww|sl|ib)[opsx].*|(eth|wlan|wwan).*|tunl0$|wireguard.cali$)" - l3IfacePattern = "^(wireguard.cali$)" + dataIfacePattern = "^eth0" workloadIfaceRegex = "cali" ipSetIDAllocator = idalloc.New() vxlanMTU = 0 @@ -225,7 +234,6 @@ var _ = Describe("BPF Endpoint Manager", func() { Hostname: "uthost", BPFLogLevel: "info", BPFDataIfacePattern: regexp.MustCompile(dataIfacePattern), - BPFL3IfacePattern: regexp.MustCompile(l3IfacePattern), VXLANMTU: vxlanMTU, VXLANPort: rrConfigNormal.VXLANPort, BPFNodePortDSREnabled: nodePortDSR, @@ -255,8 +263,8 @@ var _ = Describe("BPF Endpoint Manager", func() { bpfEpMgr.OnUpdate(&ifaceUpdate{Name: name, State: state, Index: index}) err := bpfEpMgr.CompleteDeferredWork() Expect(err).NotTo(HaveOccurred()) - if state == ifacemonitor.StateUp { - Expect(ifStateMap.ContainsKey(ifstate.NewKey(uint32(index)).AsBytes())).To(BeTrue()) + if state == ifacemonitor.StateUp && (bpfEpMgr.isDataIface(name) || bpfEpMgr.isWorkloadIface(name)) { + ExpectWithOffset(1, ifStateMap.ContainsKey(ifstate.NewKey(uint32(index)).AsBytes())).To(BeTrue()) } } } @@ -415,6 +423,75 @@ var _ = Describe("BPF Endpoint Manager", func() { genIfaceUpdate("eth0", ifacemonitor.StateUp, 10)() }) + It("should attach to eth0", func() { + Expect(dp.programAttached("eth0:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth0:egress")).To(BeTrue()) + }) + + Context("with dataIfacePattern changed to eth1", func() { + JustBeforeEach(func() { + dataIfacePattern = "^eth1" + newBpfEpMgr() + + dp.ensureStartedFn = func() { + bpfEpMgr.initAttaches = map[string]bpf.EPAttachInfo{ + "eth0": {TCId: 12345}, + } + } + + }) + + It("should detach from eth0 when eth0 up before first CompleteDeferredWork()", func() { + Expect(dp.programAttached("eth0:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth0:egress")).To(BeTrue()) + + genIfaceUpdate("eth0", ifacemonitor.StateUp, 10)() + genIfaceUpdate("eth1", ifacemonitor.StateUp, 11)() + + err := bpfEpMgr.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) + + // We inherited dp from the previous bpfEpMgr and it has eth0 + // attached. This should clean it up. + Expect(dp.programAttached("eth0:ingress")).To(BeFalse()) + Expect(dp.programAttached("eth0:egress")).To(BeFalse()) + + Expect(dp.programAttached("eth1:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth1:egress")).To(BeTrue()) + }) + + It("should detach from eth0 when eth0 up after first CompleteDeferredWork()", func() { + Expect(dp.programAttached("eth0:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth0:egress")).To(BeTrue()) + + genIfaceUpdate("eth1", ifacemonitor.StateUp, 11)() + + err := bpfEpMgr.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) + + // We inherited dp from the previous bpfEpMgr and it has eth0 + // attached. We should see it. + Expect(dp.programAttached("eth0:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth0:egress")).To(BeTrue()) + + Expect(dp.programAttached("eth1:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth1:egress")).To(BeTrue()) + + genIfaceUpdate("eth0", ifacemonitor.StateUp, 10)() + + err = bpfEpMgr.CompleteDeferredWork() + Expect(err).NotTo(HaveOccurred()) + + // We inherited dp from the previous bpfEpMgr and it has eth0 + // attached. This should clean it up. + Expect(dp.programAttached("eth0:ingress")).To(BeFalse()) + Expect(dp.programAttached("eth0:egress")).To(BeFalse()) + + Expect(dp.programAttached("eth1:ingress")).To(BeTrue()) + Expect(dp.programAttached("eth1:egress")).To(BeTrue()) + }) + }) + Context("with eth0 host endpoint", func() { JustBeforeEach(genHEPUpdate("eth0", hostEp)) diff --git a/felix/docker-image/calico-felix-wrapper b/felix/docker-image/calico-felix-wrapper index 3c678fe08aa..fb94af42a9e 100755 --- a/felix/docker-image/calico-felix-wrapper +++ b/felix/docker-image/calico-felix-wrapper @@ -42,6 +42,8 @@ while true; do done fi + source /extra-env.sh + echo "calico-felix-wrapper: Starting calico-felix" calico-felix & pid=$! diff --git a/felix/fv/bpf_attach_test.go b/felix/fv/bpf_attach_test.go index 09ff9fecd68..43b6c394562 100644 --- a/felix/fv/bpf_attach_test.go +++ b/felix/fv/bpf_attach_test.go @@ -35,8 +35,8 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ Felix bpf reattach object", } var ( - infra infrastructure.DatastoreInfra - felixes []*infrastructure.Felix + infra infrastructure.DatastoreInfra + felix *infrastructure.Felix ) BeforeEach(func() { @@ -51,7 +51,8 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ Felix bpf reattach object", }, } - felixes, _ = infrastructure.StartNNodeTopology(1, opts, infra) + felixes, _ := infrastructure.StartNNodeTopology(1, opts, infra) + felix = felixes[0] err := infra.AddAllowToDatastore("host-endpoint=='true'") Expect(err).NotTo(HaveOccurred()) @@ -62,15 +63,11 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ Felix bpf reattach object", infra.DumpErrorData() } - for _, felix := range felixes { - felix.Stop() - } - + felix.Stop() infra.Stop() }) It("should not reattach bpf programs", func() { - felix := felixes[0] // This should not happen at initial execution of felix, since there is no program attached firstRunBase := felix.WatchStdoutFor(regexp.MustCompile("Program already attached, skip reattaching")) @@ -94,4 +91,28 @@ var _ = infrastructure.DatastoreDescribe("_BPF-SAFE_ Felix bpf reattach object", Eventually(secondRunProg2, "10s", "100ms").Should(BeClosed()) Expect(secondRunBase).NotTo(BeClosed()) }) + + It("should clean up programs when BPFDataIfacePattern changes", func() { + By("Starting Felix") + felix.TriggerDelayedStart() + + By("Checking that eth0 has a program") + + Eventually(func() string { + out, _ := felix.ExecOutput("bpftool", "-jp", "net") + return out + }, "15s", "1s").Should(ContainSubstring("eth0")) + + By("Changing env and restarting felix") + + felix.SetEvn(map[string]string{"FELIX_BPFDataIfacePattern": "eth1"}) + felix.Restart() + + By("Checking that eth0 does not have a program anymore") + + Eventually(func() string { + out, _ := felix.ExecOutput("bpftool", "-jp", "net") + return out + }, "15s", "1s").ShouldNot(ContainSubstring("eth0")) + }) }) diff --git a/felix/fv/infrastructure/felix.go b/felix/fv/infrastructure/felix.go index 40574629224..703b8d1f91a 100644 --- a/felix/fv/infrastructure/felix.go +++ b/felix/fv/infrastructure/felix.go @@ -15,6 +15,7 @@ package infrastructure import ( + "bufio" "fmt" "os" "path" @@ -229,6 +230,25 @@ func (f *Felix) Restart() { Eventually(f.GetFelixPID, "10s", "100ms").ShouldNot(Equal(oldPID)) } +func (f *Felix) SetEvn(env map[string]string) { + fn := "extra-env.sh" + + file, err := os.OpenFile("./"+fn, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) + Expect(err).NotTo(HaveOccurred()) + + fw := bufio.NewWriter(file) + + for k, v := range env { + fmt.Fprintf(fw, "export %s=%v\n", k, v) + } + + fw.Flush() + file.Close() + + err = f.CopyFileIntoContainer("./"+fn, "/"+fn) + Expect(err).NotTo(HaveOccurred()) +} + // AttachTCPDump returns tcpdump attached to the container func (f *Felix) AttachTCPDump(iface string) *tcpdump.TCPDump { return tcpdump.Attach(f.Container.Name, "", iface)